home *** CD-ROM | disk | FTP | other *** search
Text File | 1994-01-13 | 33.0 KB | 1,334 lines |
- Newsgroups: comp.sources.unix
- From: pcg@aber.ac.uk (Piercarlo Grandi)
- Subject: v27i195: team - portable multi-buffered tape streaming utility, Part01/01
- Message-id: <1.758496249.28141@gw.home.vix.com>
- Sender: unix-sources-moderator@gw.home.vix.com
- Approved: vixie@gw.home.vix.com
-
- Submitted-By: pcg@aber.ac.uk (Piercarlo Grandi)
- Posting-Number: Volume 27, Issue 195
- Archive-Name: team/part01
-
- There exist a few filters that help tapes streams by buffering IO and
- allowing reads to overlaps with writes under Unix. Most of these filters
- rely on relatively unportable features, for example SYSV like shared
- memory.
-
- team is a filter that runs essentially unchanged on any Unix version, as
- it relies only on features present in V7. A number of team processes
- (team members) share a common input fd and a common output fd, and they
- take turns at reading from the former and writing to the latter; they
- synchronize by using a ring of pipes between them, where a "read-enable"
- and a "write-enable" token circulate.
-
- team is not just very portable, but also portable and efficient. It also
- has some bells & whistles, like command line options to specify the
- number of processes in a team, the block size for IO, and the volume
- size of the input or output media. It also optionally reports its
- progress.
-
- Previous versions of team have been circulating (e.g. via alt.sources)
- for several years; I have not found a bug for a long time, even if
- surely they will exist.
-
- The team source is GPL'ed, and it comes with no warranty.
-
- Note: this program was developed entirely by the author in his own
- time, using his own resources, on his machine, in the context of
- his own research activities. In no way has the University of Wales,
- Aberystwyth contributed aided or abetted to this work, for which
- they bear no responsibility whatsoever. I am grateful to UWA for the
- ability to use their systems (as a paying customer) to post this work.
-
- pcg@aber.ac.uk (Piercarlo Grandi)
-
- #! /bin/sh
- # This is a shell archive. Remove anything before this line, then unpack
- # it by saving it into a file and typing "sh file". To overwrite existing
- # files, type "sh file -c". You can also feed this as standard input via
- # unshar, or by typing "sh <file", e.g.. If this archive is complete, you
- # will see the following message at the end:
- # "End of shell archive."
- # Contents: Makefile team.1 team.c
- # Wrapped by pcg@decb.aber.ac.uk on Thu Jan 13 19:00:25 1994
- PATH=/bin:/usr/bin:/usr/ucb ; export PATH
- if test -f 'Makefile' -a "${1}" != "-c" ; then
- echo shar: Will not clobber existing file \"'Makefile'\"
- else
- echo shar: Extracting \"'Makefile'\" \(314 characters\)
- sed "s/^X//" >'Makefile' <<'END_OF_FILE'
- XCFLAGS =-O
- XLDFLAGS =-s
- X
- XINSTX =install -m 0755 -s
- XINSTD =install -m 0644
- X
- XDEST=
- X
- XMI =$(DEST)/usr/
- XMD =$(DEST)/usr/
- X
- XM1X =1
- X
- XBIND =$(MD)bin/
- XMANI =$(MI)man
- X
- XMANI1 =$(MANI)$(M1X)/
- X
- Xall: team
- X
- Xclean::
- X rm -f team team.o
- X
- X$(BIND)team: team; $(INSTX) $? $@
- X$(MANI1)team.$(M1X): team.1; $(INSTD) $? $@
- END_OF_FILE
- if test 314 -ne `wc -c <'Makefile'`; then
- echo shar: \"'Makefile'\" unpacked with wrong size!
- fi
- # end of 'Makefile'
- fi
- if test -f 'team.1' -a "${1}" != "-c" ; then
- echo shar: Will not clobber existing file \"'team.1'\"
- else
- echo shar: Extracting \"'team.1'\" \(5406 characters\)
- sed "s/^X//" >'team.1' <<'END_OF_FILE'
- X'\" Copyright 1987,1989 Piercarlo Grandi. All rights reserved.
- X.TH TEAM 1 (pg)
- X.SH NAME
- Xteam \- parallel "pipe", allows asynchronous io
- X.SH SYNOPSIS
- X.B team
- X.RB [ -r ]
- X.RB [ -v ]
- X.RB [ -i
- X.I volsize
- X.RB [ b | k | m ]]
- X.RB [ -o
- X.I volsize
- X.RB [ b | k | m ]]
- X.RI [ blocksize
- X.R [
- X.RB [ b | k | m ]
- X.RB [ processes ]]
- X.SH DESCRIPTION
- X.I Team
- Xjust copies its standard input to its standard output. It does so
- Xhowever forking a team of independent
- X.I processes
- X(default is 4), arranged in a ring, with reads overlapped
- Xwith writes.
- X.LP
- XEach process will wait for the end of the read phase of
- Xprevious process, will then read
- X.I blocksize
- Xbytes (or 512 byte blocks if suffixed with
- X.B b
- Xor kilobytes if suffixed with
- X.BR k ,
- Xor megabytes if suffixed with
- X.BR m ;
- Xthe default is
- X.BR 20k )
- Xfrom its standard input, activate the next process read
- Xphase, wait for the previous process write phase end, then
- Xwrite to its standard output, and activate the next process
- Xwrite phase.
- X.LP
- XIf the input or output volume ends or an IO error is detected,
- X.I team
- Xwill ask wehtether the user wants to continue (but only if the
- Xconcerned volume is a block or character device and the program's
- Xstandard error channel is a tty device). Possible answers are
- X.TP
- X.B c
- XThis means that the user wishes to continue, but the file pointer shall be
- Xreset to the beginning of the volume, as a new volume has been started
- Xanew. This is the the answer to give on end of volume when writing to
- Xmultiple floppies, etc...
- X.TP
- X.B y
- XThe user simply wishes to continue, without any change.
- X.TP
- X.B n
- XThe user wishes to stop; the program will be terminated or aborted.
- X.LP
- XThere are just three options, as follows:
- X.TP
- X.B -i
- XThe value that follows the option is the assumed volume size of the input,
- Xand it is expressed in the same units as the buffer size.
- X.TP
- X.B -o
- XThe value that follows the option is the assumed volume size of the output,
- Xand it is expressed in the same units as the buffer size.
- X.TP
- X.B -v
- XIf specified as each buffer is read or written the total number of kilobytes
- Xread or written that far is printed.
- X.TP
- X.B -r
- XIf specified the number of kilobytes processed and the number of seconds
- Xtaken is
- X.B not
- Xprinted at the end of the run.
- X.LP
- X.I Team
- Xconsumes system time to synchronize and task switch among
- Xits processes; also, in order to avoid slowing it, it is
- Xbest run on a quiescent system.
- X.LP
- XThis program is most useful for output to a device, especially
- Xwhere a streaming tape is involved. It may be used to advantage with
- Xdisc to disc and disc to tape copies.
- X.SH EXAMPLES
- Xfind dir -print | cpio -oBc | team 20k 8 >/dev/rmt0
- X.br
- Xteam 20k 4 </dev/rmt0 | cpio -iBcdmu
- X.br
- Xpax -w -b 4k * | team -o 1200k 15k 2 >/dev/rdsk/f0t
- X.SH ADVICE
- XYou are advised to experiment with different combinations of block size and
- Xnumber of processes; each program used with
- X.I team
- Xworks best with certain parameters, and performance depends even more
- Xstrongly on the output device, so experiment with parameters also for
- Xthis (it seems that the blocking factor of the process that feeds
- X.I team
- Xought to be inferior to that given to it, and possibly inferior to the
- Xlimit on the size of a pipe for your version of the system).
- X.I Team
- Xought to be adaptive, and adjust dynamically both parameters, in order to
- Xreach a state where there is no pause between each stage of the ring. This is
- Xtoo difficult to achieve under UNIX.
- X.LP
- XNotice also that this program will read and write blocks all of the
- Xsame size as prescribed, except the last, even when reading from
- Xpipes; if a read from its input supplies less bytes than the prescribed
- Xblock size, this program will read again until its buffer is filled
- Xto norm or the input finishes.
- X.LP
- XA final note: it is usually advantageous to give to
- X.I team
- Xa block size that is a multiple of the block size produced by the
- Xprogram before it in a pipeline. Notice that in many cases, such
- Xas the tape archival programs, the output will not be directly
- Xrecognizable to the tape archiver in input, but will have to be reblocked
- Xback to the blocksize expected by the tape archiver either by way
- Xof
- X.I dd
- Xor reapplication of
- X.IR team ,
- Xthat is much faster of course.
- X.SH BUGS
- X.I Team
- Xwill emit a number of messages comprehensible only to the author in case
- Xof errors. Plase note them and report them to the author.
- X.LP
- XThis is not strictly a bug in this program, but rather a limitation; some
- Xdevice drivers will have problems when you change volume when this program
- Xasks you whether to continue operation. They require that the device be
- Xclosed and opened again whenever a volume is changed. Unfortunately this
- Xcannot be done, given the structure of
- X.IR team ;
- Xwith such device drivers you effectively cannot use team to write multiple
- Xvolumes.
- X.LP
- XSome device drivers, on physical end of file or volume while writing do
- Xnot do the decent thing, and write a legible truncated block and return
- Xits length; some drivers, e.g. some tape drivers, handle physical eof
- Xon write quite badly. With these drivers you had better use the
- X.B -o
- Xoption to set a logical EOF if you want to use multiple volumes. As an hint,
- Xgive the volume size as about five percent less than the nominal. Whatever
- Xvalue you use for output, take not of it, as you will have to use exactly the
- Xsame value for input!
- X.SH SEE ALSO
- X.IR volcopy (8)
- X.br
- X.IR cpio (1)
- X.br
- X.IR tar (1)
- X.br
- X.IR dump (8)
- X.SH AUTHOR
- XPiercarlo Grandi, Milano.
- END_OF_FILE
- if test 5406 -ne `wc -c <'team.1'`; then
- echo shar: \"'team.1'\" unpacked with wrong size!
- fi
- # end of 'team.1'
- fi
- if test -f 'team.c' -a "${1}" != "-c" ; then
- echo shar: Will not clobber existing file \"'team.c'\"
- else
- echo shar: Extracting \"'team.c'\" \(23313 characters\)
- sed "s/^X//" >'team.c' <<'END_OF_FILE'
- X/*
- X $Header: /fs/mumon/aware/piercarl/Cmd./Commands./team.c,v 3.1 1994/01/13 15:03:25 piercarl Exp piercarl $
- X*/
- X
- Xstatic char Notice[] =
- X "Copyright 1987,1989 Piercarlo Grandi. All rights reserved.";
- X
- X/*
- X This program is free software; you can redistribute it and/or
- X modify it under the terms of the GNU General Public License as
- X published by the Free Software Foundation; either version 2, or
- X (at your option) any later version.
- X
- X This program is distributed in the hope that it will be useful,
- X but WITHOUT ANY WARRANTY; without even the implied warranty of
- X MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- X GNU General Public License for more details.
- X
- X You may have received a copy of the GNU General Public License
- X along with this program; if not, write to the Free Software
- X Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- X*/
- X
- X#undef DEBUG
- X
- X/*
- X Unix programs normally do synchronous read and write, that is,
- X you read and then you write; no overlap is possible.
- X
- X This is especially catastrophic for device to device copies,
- X whereit is important to minimize elapsed time, by overlapping
- X activity on one with activity on another.
- X
- X To obtain this, a multiprocess structure is necessary under
- X Unix. This program is functionally equivalento to a pipe, in
- X that it copies its input (fd 0) to its output (fd 1) link.
- X
- X This programs is executed as a Team of N processes, called
- X Guys, all of which share the same input and output links; the
- X first reads a chunk of input, awakens the second, writes the
- X chunk to its output; the second does the same, and the last
- X awakens the first.
- X
- X Since this process is essentially cyclic, we use a ring of
- X pipes to synchronize the Guys. Each guy has un input pipe from
- X the upstream guy and an output pipe to the downstream guy.
- X Whenever a guy receives a READ command from the upstream, it
- X first reads a block and then passes on the READ command
- X downstream; it then waits for a WRITE command from upstream,
- X and then writes the block and after that passes the WRITE
- X command downstream. A count of how much has been processed is
- X also passwd along, for statistics and verification.
- X
- X Two other commands are used, one is STOP, and is sent
- X downstream from the guy that detects the end of file of the
- X input, after which the guy exits, and ABORT, which is sent
- X downstream from the guy which detects trouble in the guy
- X upstream to it, which has much the same effect.
- X*/
- X
- X#define TeamLVOLSZ (1L<<10)
- X#define TeamHVOLSZ ((long unsigned) 3 * ((long unsigned) 1 << 30))
- X
- X#define TeamLBUFSZ (64) /* Low buffer size */
- X#define TeamDBUFSZ (60*512) /* Default buffer size */
- X#define TeamHBUFSZ (1L<<20) /* High buffer size */
- X
- X#define TeamDTEAMSZ 4 /* Default # of processes */
- X#define TeamHTEAMSZ 16 /* High # of processes */
- X
- X/*
- X External components... Probably the only system dependent part
- X of this program, as some systems have something in
- X /usr/include/sys where others have it in /usr/include.
- X
- X Also, the mesg() procedure is highly system dependent... watch
- X out for locking and variable number of arguments.
- X*/
- X
- X#include <errno.h>
- X#include <signal.h>
- X#include <stdio.h>
- X#include <sys/types.h>
- X#include <sys/file.h>
- X#include <sys/stat.h>
- X#include <fcntl.h>
- X
- X#ifdef sun
- X# undef F_SETLKW
- X#endif
- X
- X#if (PCG)
- X# include "Extend.h"
- X# include "Here.h"
- X# include "Type.h"
- X#else
- X# define call (void)
- X# define were if
- X# define fast register
- X# define global /* extern */
- X# define local static
- X# define when break; case
- X# define otherwise break; default
- X# define mode(which,name) typedef which name name; which name
- X# define bool int
- X# define true 1
- X# define false 0
- X# define nil(type) ((type) 0)
- X# define scalar int
- X typedef char *pointer;
- X# if (defined(SMALL_M))
- X typedef unsigned address;
- X# else
- X typedef long address;
- X# endif
- X# if (__STDC__)
- X# define of(list) list
- X# define on(list)
- X# define is(list) (list)
- X# define _ ,
- X# define noparms void
- X# else
- X# define void int
- X# define const /* const */
- X# define of(list) ()
- X# define on(list) list
- X# define is(list) list;
- X# define _ ;
- X# define noparms
- X# endif
- X#endif
- X
- X#ifdef DEBUG
- X# define Mesg(list) mesg list
- X#else
- X# define Mesg(list)
- X#endif
- X
- X/*VARARGS1*/
- Xmesg(a,b,c,d,e,f,g,h,i)
- X char *a;
- X int b,c,d,e,f,g,h,i;
- X{
- X# if (defined F_SETLKW)
- X struct flock l;
- X l.l_whence = 0; l.l_start = 0L; l.l_len = 0L;
- X l.l_type = F_WRLCK; fcntl(fileno(stderr),F_SETLKW,&l);
- X# endif
- X# if (defined LOCK_EX)
- X flock(fileno(stderr),LOCK_EX);
- X# endif
- X fprintf(stderr,a,b,c,d,e,f,g,h,i);
- X# if (defined LOCK_EX)
- X flock(fileno(stderr),LOCK_UN);
- X# endif
- X# if (defined F_SETLKW)
- X l.l_type = F_UNLCK; fcntl(fileno(stderr),F_SETLKW,&l);
- X# endif
- X}
- X
- Xlocal bool verbose = false;
- Xlocal bool report = true;
- X
- Xextern int errno;
- Xlocal time_t origin;
- X
- Xextern time_t time of((time_t *));
- Xextern int atoi of((const char *));
- Xextern char *malloc of((unsigned));
- Xextern char *calloc of((address,unsigned));
- Xextern char *strchr of((const char *,int));
- X
- Xextern int getopt of((int,char *[],const char *));
- Xextern char *optarg;
- Xextern int optind;
- X
- X/*
- X The regular Unix read and write calls are not guaranteed to process
- X all the bytes requested. These procedures guarantee that if the
- X request is for N bytes, all of them are read or written unless there
- X is an error or eof.
- X*/
- X
- X#define FdCLOSED 0
- X#define FdOPEN 1
- X#define FdEOF 2
- X#define FdERROR 3
- X
- Xmode(struct,Fd)
- X{
- X int fd;
- X short status;
- X long unsigned size;
- X};
- X
- Xlocal Fd FdIn,FdOut;
- X
- Xlocal bool FdOpen on((fd,ffd,size)) is
- X(
- X fast Fd *fd
- X_ int ffd
- X_ long unsigned size
- X)
- X{
- X fd->status = (ffd >= 0) ? FdOPEN : FdCLOSED;
- X fd->fd = ffd;
- X fd->size = size;
- X
- X Mesg(("FdOpen fd %d\n",ffd));
- X
- X return ffd >= 0;
- X}
- X
- Xlocal bool FdClose on((fd)) is
- X(
- X fast Fd *fd
- X)
- X{
- X int ffd;
- X
- X ffd = fd->fd;
- X Mesg(("FdClose fd %d\n",fd->fd));
- X fd->status = FdCLOSED;
- X fd->fd = -1;
- X
- X return close(ffd) >= 0;
- X}
- X
- Xlocal bool FdCopy on((to,from)) is
- X(
- X fast Fd *to
- X_ fast Fd *from
- X)
- X{
- X to->size = from->size;
- X to->status = from->status;
- X to->fd = dup(from->fd);
- X Mesg(("FdCopy of %d is %d\n",from->fd,to->fd));
- X return to->fd >= 0;
- X}
- X
- Xlocal void FdSet on((to,from)) is
- X(
- X fast Fd *to
- X_ fast Fd *from
- X)
- X{
- X if (from->fd < 0)
- X mesg("team: set an invalid fd\n");
- X to->size = from->size;
- X to->status = from->status;
- X to->fd = from->fd;
- X}
- X
- Xlocal long unsigned FdRetry on((fd,which,done,space)) is
- X(
- X fast Fd *fd
- X_ char *which
- X_ long unsigned done
- X_ long unsigned space
- X)
- X{
- X int tty;
- X char reply[2];
- X struct stat st;
- X
- X if (fstat(fd->fd,&st) < 0)
- X {
- X perror(which);
- X return 0;
- X }
- X
- X st.st_mode &= S_IFMT;
- X if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK)
- X return 0;
- X
- X if (!isatty(fileno(stderr)))
- X return 0;
- X
- X if ((tty = open("/dev/tty",0)) < 0)
- X {
- X perror("/dev/tty");
- X return 0;
- X }
- X
- X do
- X {
- X#if (defined i386 || defined sun)
- X extern char *(sys_errlist[]);
- X char *errmsg = sys_errlist[errno];
- X#else
- X char errmsg[32];
- X (void) sprintf(errmsg,"Error %d",errno);
- X#endif
- X if (errno)
- X mesg("'%s' on %s after %luk. Continue [cyn] ? ",errmsg,which,done>>10);
- X else
- X mesg("EOF on %s after %luk. Continue [cyn] ? ",which,done>>10);
- X
- X read(tty,reply,sizeof reply);
- X }
- X while (strchr("cCyYnN",reply[0]) == 0);
- X
- X call close(tty);
- X
- X if (strchr("nN",reply[0]) != 0)
- X return 0L;
- X
- X errno = 0;
- X
- X if (strchr("cC",reply[0]) != 0)
- X {
- X call lseek(fd->fd,0L,0);
- X return fd->size;
- X }
- X
- X return space;
- X}
- X
- Xlocal unsigned FdCanDo on((remaining,available)) is
- X(
- X fast address remaining
- X_ fast long unsigned available
- X)
- X{
- X return (remaining < available)
- X ? (unsigned) remaining : (unsigned) available;
- X}
- X
- Xlocal address FdRead on((fd,buffer,todo,done)) is
- X(
- X fast Fd *fd
- X_ pointer buffer
- X_ fast address todo
- X_ long unsigned done
- X)
- X{
- X fast long unsigned space;
- X fast int bytesRead;
- X fast address justDone;
- X
- X switch (fd->status)
- X {
- X when FdEOF: return 0;
- X when FdERROR: return -1;
- X when FdCLOSED: return -1;
- X
- X when FdOPEN:
- X
- X space = fd->size - done%fd->size;
- X
- X for (justDone = 0; space != 0L && justDone < todo;)
- X {
- X bytesRead = read(fd->fd,buffer+justDone,
- X FdCanDo(todo-justDone,space-justDone));
- X
- X if (bytesRead <= 0 || (justDone += bytesRead) == space)
- X space = FdRetry(fd,"input",done+justDone,space-justDone);
- X }
- X
- X if (bytesRead == 0) fd->status = FdEOF;
- X if (bytesRead < 0) fd->status = FdERROR;
- X
- X Mesg(("FdRead %d reads %d last %d\n",fd->fd,justDone,bytesRead));
- X
- X return (justDone == 0) ? bytesRead : justDone;
- X }
- X /*NOTREACHED*/
- X}
- X
- Xlocal address FdWrite on((fd,buffer,todo,done)) is
- X(
- X fast Fd *fd
- X_ pointer buffer
- X_ fast address todo
- X_ long unsigned done
- X)
- X{
- X fast long unsigned space;
- X fast int bytesWritten;
- X fast address justDone;
- X
- X switch (fd->status)
- X {
- X when FdEOF: return 0;
- X when FdERROR: return -1;
- X when FdCLOSED: return -1;
- X
- X when FdOPEN:
- X
- X space = fd->size - done%fd->size;
- X
- X for (justDone = 0; space != 0L && justDone < todo;)
- X {
- X bytesWritten = write(fd->fd,buffer+justDone,
- X FdCanDo(todo-justDone,space-justDone));
- X
- X if (bytesWritten <= 0 || (justDone += bytesWritten) == space)
- X space = FdRetry(fd,"output",done+justDone,space-justDone);
- X }
- X
- X Mesg(("FdWrite %d writes %d last %d\n",fd->fd,justDone,bytesWritten));
- X
- X if (bytesWritten == 0) fd->status = FdEOF;
- X if (bytesWritten < 0) fd->status = FdERROR;
- X
- X return (justDone == 0) ? bytesWritten : justDone;
- X }
- X /*NOTREACHED*/
- X}
- X
- X/*
- X A Token is scalar value representing a command.
- X*/
- X
- Xtypedef short scalar Token;
- X
- X#define TokenREAD 0
- X#define TokenWRITE 1
- X#define TokenSTOP 2
- X#define TokenABORT -1
- X
- X/*
- X Here we represent Streams as Fds; this is is not entirely
- X appropriate, as Fds have also a volume size, and relatively
- X high overhead write and read functions. Well, we just take
- X some liberties with abstraction levels here. Actually we
- X should have an Fd abstraction for stream pipes and a Vol
- X abstraction for input and output...
- X*/
- X
- Xlocal bool StreamPipe on((downstream,upstream)) is
- X(
- X fast Fd *downstream
- X_ fast Fd *upstream
- X)
- X{
- X int links[2];
- X
- X if (pipe(links) < 0)
- X {
- X perror("team: opening links");
- X return false;
- X }
- X
- X Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0]));
- X
- X return FdOpen(downstream,links[1],TeamHVOLSZ)
- X && FdOpen(upstream,links[0],TeamHVOLSZ);
- X}
- X
- Xmode(struct,StreamMsg)
- X{
- X Token token;
- X short status;
- X long unsigned done;
- X};
- X
- Xlocal bool StreamSend on((fd,token,status,done)) is
- X(
- X fast Fd *fd
- X_ Token token
- X_ short status
- X_ long unsigned done
- X)
- X{
- X fast int n;
- X StreamMsg message;
- X
- X message.token = token;
- X message.status = status;
- X message.done = done;
- X
- X n = write(fd->fd,(pointer) &message,(unsigned) sizeof message);
- X
- X Mesg(("StreamSend fd %u n %d token %d\n",fd->fd,n,token));
- X
- X return n == sizeof message;
- X}
- X
- Xlocal bool StreamReceive on((fd,tokenp,statusp,donep)) is
- X(
- X fast Fd *fd
- X_ Token *tokenp
- X_ short *statusp
- X_ long unsigned *donep
- X)
- X{
- X fast int n;
- X StreamMsg message;
- X
- X n = read(fd->fd,(pointer) &message,(unsigned) sizeof message);
- X *tokenp = message.token;
- X *statusp = message.status;
- X *donep = message.done;
- X
- X Mesg(("StreamReceive fd %u n %d token %d\n",fd->fd,n,*tokenp));
- X
- X return n == sizeof message;
- X}
- X/*
- X A guy is an instance of the input to output copier. It is attached
- X to a relay station, with an upstream link, from which commands
- X arrive, and a downward link, to which they are relayed once they are
- X executed.
- X*/
- X
- Xmode(struct,Guy)
- X{
- X int pid;
- X Fd upStream;
- X Fd downStream;
- X};
- X
- Xlocal bool GuyOpen on((guy,pid,upstream,downstream)) is
- X(
- X fast Guy *guy
- X_ int pid
- X_ Fd *upstream
- X_ Fd *downstream
- X)
- X{
- X Mesg(("GuyOpen pid %u upstream %u downstream %u\n",
- X pid,upstream->fd,downstream->fd));
- X
- X guy->pid = pid;
- X FdSet(&guy->upStream,upstream);
- X FdSet(&guy->downStream,downstream);
- X
- X return true;
- X}
- X
- X#define GuySEND(guy,token,status,done) \
- X StreamSend(&guy->downStream,token,status,done)
- X
- X#define GuyRECEIVE(guy,tokenp,statusp,donep) \
- X StreamReceive(&guy->upStream,tokenp,statusp,donep)
- X
- Xlocal bool GuyStop of((Guy *,char *,long unsigned));
- X
- Xlocal bool GuyStart on((guy,bufsize)) is
- X(
- X fast Guy *guy
- X_ address bufsize
- X)
- X{
- X fast char *buffer;
- X Token token;
- X short status;
- X long unsigned done;
- X bool received;
- X static int bytesRead,bytesWritten;
- X
- X Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize));
- X
- X buffer = (pointer) malloc((unsigned) bufsize);
- X if (buffer == nil(pointer))
- X {
- X mesg("team: guy %d cannot allocate %u bytes\n",
- X guy->pid,bufsize);
- X return false;
- X }
- X
- X while ((received = GuyRECEIVE(guy,&token,&status,&done)) && token != TokenSTOP)
- X switch (token)
- X {
- X when TokenREAD:
- X FdIn.status = status;
- X
- X Mesg(("GuyStart reading %d chars\n",bufsize));
- X bytesRead = FdRead(&FdIn,(pointer) buffer,bufsize,done);
- X Mesg(("GuyStart reads %d chars\n",bytesRead));
- X
- X if (bytesRead == 0) GuyStop(guy,nil(char *),done);
- X if (bytesRead < 0) GuyStop(guy,"error on guy read",done);
- X
- X done += bytesRead;
- X
- X if (verbose)
- X mesg("%luk read \r",done>>10);
- X
- X if (!GuySEND(guy,TokenREAD,FdIn.status,done))
- X GuyStop(guy,"guy cannot send READ",done);
- X
- X when TokenWRITE:
- X FdOut.status = status;
- X
- X Mesg(("GuyStart writing %d chars\n",bytesRead));
- X bytesWritten = FdWrite(&FdOut,(pointer) buffer,(address) bytesRead,done);
- X Mesg(("GuyStart writes %d chars\n",bytesWritten));
- X
- X if (bytesWritten == 0) GuyStop(guy,"eof on guy write",done);
- X if (bytesWritten < 0) GuyStop(guy,"error on guy write",done);
- X
- X done += bytesWritten;
- X
- X if (verbose)
- X mesg("%luk written\r",done>>10);
- X
- X if (!GuySEND(guy,TokenWRITE,FdOut.status,done))
- X GuyStop(guy,"guy cannot send WRITE",done);
- X
- X when TokenABORT:
- X GuyStop(guy,"guy was aborted",0L);
- X
- X otherwise:
- X GuyStop(guy,"impossible token on ring",done);
- X }
- X
- X /* free((char *) buffer); */
- X
- X GuyStop(guy,(received) ? nil(char *) : "error on upstream receive",0L);
- X /*NOTREACHED*/
- X
- X /*return true;*/
- X}
- X
- Xlocal bool GuyStop on((guy,errormsg,done)) is
- X(
- X fast Guy *guy
- X_ char *errormsg
- X_ long unsigned done
- X)
- X{
- X Mesg(("GuyStop guy %#o\n",guy));
- X
- X if (done)
- X {
- X if (report)
- X mesg("%lu kilobytes, %lu seconds\r\n",
- X done>>10,(long unsigned) (time((time_t *) 0)-origin));
- X else if (verbose)
- X mesg("\n");
- X }
- X
- X if (errormsg != nil(char *))
- X {
- X mesg("team: guy pid %u: %s\n",guy->pid,errormsg);
- X call GuySEND(guy,TokenABORT,FdERROR,0L);
- X exit(1);
- X /*NOTREACHED*/
- X }
- X
- X if (!GuySEND(guy,TokenSTOP,FdEOF,0L))
- X {
- X exit(1);
- X /*NOTREACHED*/
- X }
- X
- X exit(0);
- X /*NOTREACHED*/
- X}
- X
- Xlocal bool GuyClose on((guy)) is
- X(
- X fast Guy *guy
- X)
- X{
- X return FdClose(&guy->upStream) && FdClose(&guy->downStream);
- X}
- X
- X/*
- X A team is made up of a ring of guys; each guy copies a blockfrom its
- X input to its ouput, and is driven by tokens sent to it by the
- X previous guy on a pipe.
- X*/
- X
- Xmode(struct,Team)
- X{
- X Guy *guys;
- X short unsigned size;
- X short unsigned active;
- X};
- X
- Xlocal bool TeamOpen on((team,nominalsize)) is
- X(
- X Team *team
- X_ short unsigned nominalsize
- X)
- X{
- X Mesg(("TeamOpen nominalsize %u\n",nominalsize));
- X
- X team->size = 0;
- X team->active = 0;
- X
- X team->guys = (Guy *) calloc(sizeof (Guy),nominalsize);
- X
- X for (team->size = 0; team->size < nominalsize; team->size++);
- X
- X were (team->guys == nil(Guy *))
- X return false;
- X
- X return true;
- X}
- X
- Xlocal bool TeamStart on((team,bufsize,isize,osize)) is
- X(
- X fast Team *team
- X_ address bufsize
- X_ long unsigned isize
- X_ long unsigned osize
- X)
- X{
- X /*
- X When generating each guy, we pass it an upstream link that
- X is the downstream of the previous guy, and create a new
- X downstream link that will be the next upstream.
- X
- X At each turn we obviously close the old downstream once it
- X has been passed to the forked guy.
- X
- X A special case are the first and last guys; the upstreamof
- X the first guy shall be the downstream of the last. This
- X goes against the grain of our main logic, where the
- X upstream is expected to already exist and the downstream
- X must be created.
- X
- X This means that the last and first guys are created in a
- X special way. When creating the first guy we shall create
- X its upstreamlink as well as its downstream, and we shall
- X save that in a special variable, last_downstream. This we
- X shall use as the downstreamof the last guy.
- X
- X We shall also keep it open in the team manager (parent
- X process) because we shall use it to do the initial send of
- X the read and write tokens that will circulate in the relay
- X ring, activating the guys.
- X
- X Of course because of this each guy will inherit this link
- X as wellas its upstream and downstream, but shall graciously
- X close it.
- X */
- X
- X Fd last_downstream;
- X Fd this_upstream;
- X Fd this_downstream;
- X Fd next_upstream;
- X
- X Mesg(("TeamStart team %#o size %u bufsize %u\n",
- X team,team->size,bufsize));
- X
- X call FdOpen(&FdIn,0,isize); call FdOpen(&FdOut,1,osize);
- X
- X for (team->active = 0; team->active < team->size; team->active++)
- X {
- X fast Guy *guy;
- X fast int pid;
- X
- X guy = team->guys+team->active;
- X
- X if (team->active == 0)
- X {
- X if (!StreamPipe(&last_downstream,&this_upstream))
- X {
- X perror("cannot open first link");
- X return false;
- X }
- X
- X if (!StreamPipe(&this_downstream,&next_upstream))
- X {
- X perror("cannot open link");
- X return false;
- X }
- X }
- X else if (team->active < (team->size-1))
- X {
- X if (!StreamPipe(&this_downstream,&next_upstream))
- X {
- X perror("cannot open link");
- X return false;
- X }
- X }
- X else /*if (team->active == team->size-1)*/
- X {
- X FdSet(&this_downstream,&last_downstream);
- X if (!FdCopy(&last_downstream,&this_downstream))
- X perror("team: cannot copy last downstream");
- X }
- X
- X Mesg(("TeamStart going to fork for guy %#o\n",guy));
- X
- X pid = fork();
- X
- X if (pid > 0)
- X {
- X Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid));
- X guy->pid = pid;
- X
- X if (!FdClose(&this_upstream))
- X perror("cannot close this upstream link");
- X if (!FdClose(&this_downstream))
- X perror("cannot close this downstream link");
- X
- X FdSet(&this_upstream,&next_upstream);
- X }
- X else if (pid == 0)
- X {
- X pid = getpid();
- X
- X if (!FdClose(&last_downstream))
- X perror("cannot close inherited first link");
- X
- X if (!GuyOpen(guy,pid,&this_upstream,&this_downstream))
- X GuyStop(guy,"cannot open guy",0L);
- X if (!GuyStart(guy,bufsize))
- X GuyStop(guy,"cannot start guy",0L);
- X if (!GuyClose(guy))
- X perror("cannot close guy");
- X
- X /*NOTREACHED*/
- X }
- X else if (pid < 0)
- X {
- X perror("team: forking a guy");
- X return false;
- X }
- X }
- X
- X if (!StreamSend(&last_downstream,TokenREAD,FdOPEN,0L))
- X {
- X perror("cannot send first READ token");
- X return false;
- X }
- X
- X if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN,0L))
- X {
- X perror("cannot send first WRITE token");
- X return false;
- X }
- X
- X if (!FdClose(&last_downstream))
- X perror("cannot close first link");
- X
- X return true;
- X}
- X
- Xlocal bool TeamWait on((team)) is
- X(
- X fast Team *team
- X)
- X{
- X while (team->active != 0)
- X {
- X int guypid;
- X int status;
- X
- X guypid = wait(&status);
- X if (guypid >= 0)
- X {
- X fast short unsigned guyno;
- X
- X for (guyno = 0; guyno < team->size; guyno++)
- X if (guypid == team->guys[guyno].pid)
- X {
- X team->guys[guyno].pid = -1;
- X break;
- X }
- X }
- X else
- X {
- X mesg("team: no guys, believed %u left\n",team->active);
- X return true;
- X }
- X
- X --team->active;
- X
- X if (status != 0 && team->active != 0)
- X return false;
- X }
- X
- X return true;
- X}
- X
- Xlocal bool TeamStop on((team)) is
- X(
- X fast Team *team
- X)
- X{
- X fast short unsigned guyno;
- X
- X Mesg(("TeamStop team %#o\n",team));
- X
- X for (guyno = 0; guyno < team->size; guyno++)
- X {
- X fast Guy *guy;
- X
- X guy = team->guys+guyno;
- X if (guy->pid >= 0)
- X {
- X /*kill(guy->pid,SIGKILL);*/
- X --team->active;
- X }
- X }
- X
- X return team->active == 0;
- X}
- X
- Xlocal bool TeamClose on((team)) is
- X(
- X fast Team *team
- X)
- X{
- X for (team->size; team->size != 0; --team->size)
- X continue;
- X
- X free(team->guys);
- X
- X return true;
- X}
- X
- Xlocal void usage of((noparms))
- X{
- X fprintf(stderr,"\
- Xsyntax: team [-[vr]] [-iI[bkm] [-oO[bkm] [N[bkm] [P]]\n\
- X copies standard input to output\n\
- X -v gives ongoing report, -r final report\n\
- X I is input volume size (default %lum)\n\
- X O is output volume size (default %lum)\n\
- X N is buffer size (default %luk)\n\
- X P is number of processes (default %u)\n\
- X (postfix b means *512, k means *1KB, m means *1MB)\n\
- X",
- X TeamHVOLSZ>>20,TeamHVOLSZ>>20,
- X TeamDBUFSZ>>10,TeamDTEAMSZ);
- X
- X exit(1);
- X /*NOTREACHED*/
- X}
- X
- Xlocal long unsigned atos on((s)) is
- X(
- X fast char *s
- X)
- X{
- X fast unsigned long l;
- X
- X for (
- X s, l = 0L;
- X *s >= '0' && *s <= '9';
- X s++
- X )
- X l = l*10L + (long unsigned) (*s-'0');
- X
- X if (*s == 'b') l *= (1L<<9);
- X if (*s == 'k') l *= (1L<<10);
- X if (*s == 'm') l *= (1L<<20);
- X
- X return l;
- X}
- X
- Xglobal int main on((argc,argv)) is
- X(
- X int argc
- X_ char *(argv[])
- X)
- X{
- X Team team;
- X short unsigned teamsize;
- X
- X address bufsize;
- X long unsigned isize;
- X long unsigned osize;
- X int opt;
- X
- X teamsize = TeamDTEAMSZ;
- X bufsize = TeamDBUFSZ;
- X isize = TeamHVOLSZ;
- X osize = TeamHVOLSZ;
- X optind = 1;
- X
- X while ((opt = getopt(argc,argv,"vri:o:")) != -1)
- X switch (opt)
- X {
- X when 'i':
- X isize = atos(optarg);
- X if (isize < TeamLVOLSZ || isize > TeamHVOLSZ)
- X {
- X fprintf(stderr,"team: invalid input volume size %lu\n",isize);
- X usage();
- X }
- X
- X when 'o':
- X osize = atos(optarg);
- X if (osize < TeamLVOLSZ || osize > TeamHVOLSZ)
- X {
- X fprintf(stderr,"team: invalid output volume size %lu\n",osize);
- X usage();
- X }
- X
- X when 'v':
- X verbose ^= 1;
- X
- X when 'r':
- X report ^= 1;
- X
- X otherwise:
- X usage();
- X }
- X
- X argc -= optind, argv += optind;
- X
- X if (argc != 0)
- X {
- X bufsize = (address) atos(argv[0]);
- X if (bufsize < TeamLBUFSZ || bufsize > TeamHBUFSZ)
- X {
- X fprintf(stderr,"team: invalid block size %u\n",
- X bufsize);
- X usage();
- X }
- X --argc, argv++;
- X }
- X
- X if (argc != 0)
- X {
- X teamsize = atoi(argv[0]);
- X if (teamsize < 2 || teamsize > TeamHTEAMSZ)
- X {
- X fprintf(stderr,"team: invalid # of processes %d\n",teamsize);
- X usage();
- X }
- X --argc, argv++;
- X }
- X
- X if (argc != 0) usage();
- X
- X if (!TeamOpen(&team,teamsize))
- X {
- X mesg("team: cannot setup the team with %u guys\n",teamsize);
- X return 1;
- X }
- X
- X origin = time((time_t *) 0);
- X
- X if (!TeamStart(&team,bufsize,isize,osize))
- X {
- X mesg("team: cannot start the team\n");
- X return 1;
- X }
- X
- X if (!TeamWait(&team))
- X {
- X mesg("team: stop remaining %u guys\n",team.active);
- X
- X if (!TeamStop(&team))
- X {
- X mesg("team: cannot stop the team\n");
- X return 1;
- X }
- X }
- X
- X if (!TeamClose(&team))
- X {
- X mesg("team: cannot close the team\n");
- X return 1;
- X }
- X
- X return 0;
- X}
- END_OF_FILE
- if test 23313 -ne `wc -c <'team.c'`; then
- echo shar: \"'team.c'\" unpacked with wrong size!
- fi
- # end of 'team.c'
- fi
- echo shar: End of shell archive.
- exit 0
-