home *** CD-ROM | disk | FTP | other *** search
/ MPEG Toolkit / MPEG Toolkit.iso / os2 / mpegenc / src / parallel.c < prev    next >
Encoding:
C/C++ Source or Header  |  1997-01-01  |  46.7 KB  |  1,801 lines

  1. /*===========================================================================*
  2.  * parallel.c                                         *
  3.  *                                         *
  4.  *    Procedures to make encoder run in parallel                 *
  5.  *                                         *
  6.  * EXPORTED PROCEDURES:                                 *
  7.  *    StartIOServer                                 *
  8.  *    StartCombineServer                             *
  9.  *    StartDecodeServer                             *
  10.  *    SendRemoteFrame                                 *
  11.  *    GetRemoteFrame                                 *
  12.  *    StartMasterServer                             *
  13.  *    NotifyMasterDone                             *
  14.  *                                         *
  15.  *===========================================================================*/
  16.  
  17. /*
  18.  * Copyright (c) 1993 The Regents of the University of California.
  19.  * All rights reserved.
  20.  *
  21.  * Permission to use, copy, modify, and distribute this software and its
  22.  * documentation for any purpose, without fee, and without written agreement is
  23.  * hereby granted, provided that the above copyright notice and the following
  24.  * two paragraphs appear in all copies of this software.
  25.  *
  26.  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
  27.  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
  28.  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
  29.  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30.  *
  31.  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
  32.  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
  33.  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
  34.  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
  35.  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
  36.  */
  37.  
  38. /*  
  39.  *  $Header: /n/picasso/users/keving/encode/src/RCS/parallel.c,v 1.2 1993/07/22 22:23:43 keving Exp keving $
  40.  *  $Log: parallel.c,v $
  41.  * Revision 1.2  1993/07/22  22:23:43  keving
  42.  * nothing
  43.  *
  44.  * Revision 1.1  1993/06/30  20:06:09  keving
  45.  * nothing
  46.  *
  47.  */
  48.  
  49.  
  50. /*==============*
  51.  * HEADER FILES *
  52.  *==============*/
  53.  
  54. #include <sys/types.h>
  55. #include <sys/socket.h>
  56. #include <sys/times.h>
  57. #include <netinet/in.h>
  58. #include <netdb.h>
  59. #include <errno.h>
  60. #include "all.h"
  61. #include "param.h"
  62. #include "mpeg.h"
  63. #ifdef OS2
  64. /* @@@ FAT 8.3 convention */
  65. #include "prototyp.h"
  66. #else
  67. #include "prototypes.h"
  68. #endif
  69. #include "parallel.h"
  70. #ifdef OS2
  71. /* @@@ FAT 8.3 convention */
  72. #include "readfram.h"
  73. #else
  74. #include "readframe.h"
  75. #endif
  76. #include "fsize.h"
  77. #include "combine.h"
  78. #include "frames.h"
  79.  
  80.  
  81. #define MAX_IO_SERVERS    10
  82.  
  83.  
  84. /*==================*
  85.  * STATIC VARIABLES *
  86.  *==================*/
  87.  
  88. static int32   diffTime;
  89. static char    rsh[256];
  90. static struct hostent *hostEntry = NULL;
  91. static boolean    *frameDone;
  92. static int    outputServerSocket;
  93. static int    decodeServerSocket;
  94. static boolean    parallelPerfect = FALSE;
  95.  
  96.  
  97. /*==================*
  98.  * GLOBAL VARIABLES *
  99.  *==================*/
  100.  
  101. extern int yuvHeight, yuvWidth;
  102. extern    time_t  timeStart, timeEnd;
  103. extern char    statFileName[256];
  104. extern FILE *statFile;
  105. extern boolean  debugMachines;
  106. extern boolean debugSockets;
  107. int parallelTestFrames = 10;
  108. int parallelTimeChunks = 60;
  109. char *IOhostName;
  110. int ioPortNumber;
  111. int combinePortNumber;
  112. int decodePortNumber;
  113. boolean    niceProcesses = FALSE;
  114. boolean    forceIalign = FALSE;
  115. int        machineNumber = -1;
  116. boolean    remoteIO = FALSE;
  117. boolean    separateConversion;
  118. time_t    IOtime = 0;
  119.  
  120.  
  121. /*===============================*
  122.  * INTERNAL PROCEDURE prototypes *
  123.  *===============================*/
  124.  
  125. static void    TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
  126.                            int ioPortNum));
  127. static void    EndIOServer _ANSI_ARGS_((void));
  128. static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
  129. static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
  130. static int  CreateListeningSocket _ANSI_ARGS_((int *portNumber));
  131. static int  ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
  132.                      struct hostent **hostEnt));
  133.  
  134.  
  135. /*=====================*
  136.  * EXPORTED PROCEDURES *
  137.  *=====================*/
  138.  
  139.             /*=================*
  140.              * IO SERVER STUFF *
  141.              *=================*/
  142.  
  143.  
  144. /*===========================================================================*
  145.  *
  146.  * SetIOConvert
  147.  *
  148.  *    sets the IO conversion to be separate or not.  If separate, then
  149.  *    some post-processing is done at slave end
  150.  *
  151.  * RETURNS:    nothing
  152.  *
  153.  * SIDE EFFECTS:    none
  154.  *
  155.  *===========================================================================*/
  156. void
  157. SetIOConvert(separate)
  158.     boolean separate;
  159. {
  160.     separateConversion = separate;
  161. }
  162.  
  163.  
  164. /*===========================================================================*
  165.  *
  166.  * SetParallelPerfect
  167.  *
  168.  *    If this is called, then frames will be divided up completely, and
  169.  *    evenly (modulo rounding) between all the processors
  170.  *
  171.  * RETURNS:    nothing
  172.  *
  173.  * SIDE EFFECTS:    none
  174.  *
  175.  *===========================================================================*/
  176. void
  177. SetParallelPerfect()
  178. {
  179.     parallelPerfect = TRUE;
  180. }
  181.  
  182.  
  183. /*===========================================================================*
  184.  *
  185.  * SetRemoteShell
  186.  *
  187.  *    sets the remote shell program (usually rsh, but different on some
  188.  *    machines)
  189.  *
  190.  * RETURNS:    nothing
  191.  *
  192.  * SIDE EFFECTS:    none
  193.  *
  194.  *===========================================================================*/
  195. void
  196. SetRemoteShell(shell)
  197.     char *shell;
  198. {
  199.     strcpy(rsh, shell);
  200. }
  201.  
  202.  
  203. /*===========================================================================*
  204.  *
  205.  * StartIOServer
  206.  *
  207.  *    start-up the IOServer with this process
  208.  *    handles slave requests for frames, and exits when master tells it to
  209.  *
  210.  * RETURNS:    nothing
  211.  *
  212.  * SIDE EFFECTS:    none
  213.  *
  214.  *===========================================================================*/
  215. void
  216. StartIOServer(numInputFiles, parallelHostName, portNum)
  217.     int numInputFiles;
  218.     char *parallelHostName;
  219.     int portNum;
  220. {
  221.     int        ioPortNum;
  222.     int        serverSocket;
  223.     int        otherSock, otherSize;
  224.     struct sockaddr otherSocket;
  225.     int32   buffer[8];
  226.     boolean    done = FALSE;
  227.     int        frameNumber;
  228.     MpegFrame *frame;
  229.     register int y;
  230.     int        numBytes;
  231.     unsigned char   *bigBuffer;
  232.     unsigned char   smallBuffer[1000];
  233.     int        bigBufferSize;
  234.     FILE    *filePtr;
  235.     uint32  data;
  236.     char    inputFileName[1024];
  237.     char    fileName[1024];
  238.  
  239.     bigBufferSize = 0;
  240.     bigBuffer = NULL;
  241.  
  242. /* once we get IO port num, should transmit it to parallel server */
  243.  
  244.     serverSocket = CreateListeningSocket(&ioPortNum);
  245.  
  246.     if ( debugSockets ) {
  247.     fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
  248.     }
  249.  
  250.     TransmitPortNum(parallelHostName, portNum, ioPortNum);
  251.  
  252.     otherSize = sizeof(otherSocket);
  253.  
  254.     if ( separateConversion ) {
  255.     SetFileType(ioConversion);    /* for reading */
  256.     } else {
  257.     SetFileType(inputConversion);
  258.     }
  259.  
  260.     /* now, wait until get done signal */
  261.     while ( ! done ) {
  262.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  263.     if ( otherSock == -1 ) {
  264.         fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
  265.         exit(1);
  266.     }
  267.  
  268.     SafeRead(otherSock, (char *)buffer, 4);
  269.     frameNumber = ntohl(buffer[0]);
  270.  
  271.     if ( frameNumber == -1 ) {
  272.         done = TRUE;
  273.     } else if ( frameNumber == -2 ) {
  274.         /* decoded frame to be output to disk */
  275.         SafeRead(otherSock, (char *)buffer, 4);
  276.         frameNumber = ntohl(buffer[0]);        
  277.  
  278.         if ( debugSockets ) {
  279.         fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", frameNumber);
  280.         fflush(stdout);
  281.         }
  282.  
  283.         /* should read frame from socket, then write to disk */
  284.         frame = Frame_New(frameNumber, 'i');
  285.  
  286.         Frame_AllocDecoded(frame, TRUE);
  287.  
  288.         for ( y = 0; y < Fsize_y; y++ ) {
  289.         SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
  290.         }
  291.  
  292.         for (y = 0; y < Fsize_y / 2; y++) {            /* U */
  293.         SafeRead(otherSock, (char *)frame->decoded_cb[y], Fsize_x / 2);
  294.         }
  295.  
  296.         for (y = 0; y < Fsize_y / 2; y++) {            /* V */
  297.         SafeRead(otherSock, (char *)frame->decoded_cr[y], Fsize_x / 2);
  298.         }
  299.  
  300.         /* now output to disk */
  301.         WriteDecodedFrame(frame);
  302.  
  303.         Frame_Free(frame);
  304.     } else if ( frameNumber == -3 ) {
  305.         /* request for decoded frame from disk */
  306.         SafeRead(otherSock, (char *)buffer, 4);
  307.         frameNumber = ntohl(buffer[0]);        
  308.  
  309.         if ( debugSockets ) {
  310.         fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d from DISK\n", frameNumber);
  311.         fflush(stdout);
  312.         }
  313.  
  314.         /* should read frame from disk, then write to socket */
  315.         frame = Frame_New(frameNumber, 'i');
  316.  
  317.         Frame_AllocDecoded(frame, TRUE);
  318.  
  319.         ReadDecodedRefFrame(frame, frameNumber);
  320.  
  321.         /* now write to socket */
  322.         for ( y = 0; y < Fsize_y; y++ ) {
  323.         SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
  324.         }
  325.  
  326.         for (y = 0; y < Fsize_y / 2; y++) {            /* U */
  327.         SafeWrite(otherSock, (char *)frame->decoded_cb[y], Fsize_x / 2);
  328.         }
  329.  
  330.         for (y = 0; y < Fsize_y / 2; y++) {            /* V */
  331.         SafeWrite(otherSock, (char *)frame->decoded_cr[y], Fsize_x / 2);
  332.         }
  333.  
  334.         Frame_Free(frame);
  335.     } else if ( frameNumber == -4 ) {
  336.         /* routing output frame from socket to disk */
  337.         SafeRead(otherSock, (char *)buffer, 8);
  338.         frameNumber = buffer[0];
  339.         frameNumber = ntohl(frameNumber);
  340.  
  341.         /* read in number of bytes */
  342.         numBytes = buffer[1];
  343.         numBytes = ntohl(numBytes);
  344.  
  345.         /* make sure buffer is big enough for data */
  346.         if ( numBytes > bigBufferSize ) {
  347.         bigBufferSize = numBytes;
  348.         if ( bigBuffer != NULL ) {
  349.             free(bigBuffer);
  350.         }
  351.  
  352.         bigBuffer = (unsigned char *) malloc(bigBufferSize*
  353.                          sizeof(unsigned char));
  354.         }
  355.  
  356.         /* now read in the bytes */
  357.         SafeRead(otherSock, (char *) bigBuffer, numBytes);
  358.  
  359.         /* open file to output this stuff to */
  360.         sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
  361. /* @@@ Open binary mode for OS/2, Andy Key */
  362. #ifdef OS2
  363.         if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
  364. #else
  365.         if ( (filePtr = fopen(fileName, "w")) == NULL ) {
  366. #endif
  367.         fprintf(stderr, "ERROR:  Could not open output file:  %s\n",
  368.             fileName);
  369.         exit(1);
  370.         }
  371.  
  372.         /* now write the bytes here */
  373.         fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
  374.  
  375.         fclose(filePtr);
  376.  
  377.         if ( debugSockets ) {
  378.         fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
  379.             frameNumber);
  380.         fflush(stdout);
  381.         }
  382.     } else {
  383.         if ( debugSockets ) {
  384.         fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
  385.         fflush(stdout);
  386.         }
  387.  
  388.         /* should read in frame, then write to socket */
  389.         frame = Frame_New(frameNumber, 'i');
  390.  
  391.         if ( separateConversion ) {
  392.         GetNthInputFileName(inputFileName, frameNumber);
  393.  
  394.         /* do conversion and send right to the socket */
  395.         filePtr = ReadIOConvert(inputFileName);
  396.             do {
  397.             numBytes = fread(smallBuffer, 1, 1000, filePtr);
  398.  
  399.             if ( numBytes > 0 ) {
  400.                 data = numBytes;
  401.                 data = htonl(data);
  402.                 SafeWrite(otherSock, (char *)&data, 4);
  403.                 SafeWrite(otherSock, (char *)smallBuffer, numBytes);
  404.             }
  405.             }
  406.             while ( numBytes == 1000 );
  407.  
  408.         if ( strcmp(ioConversion, "*") == 0 ) {
  409.             fclose(filePtr);
  410.         } else {
  411.             pclose(filePtr);
  412.         }
  413.         } else {
  414.         GetNthInputFileName(inputFileName, frameNumber);
  415.         ReadFrame(frame, inputFileName, inputConversion, TRUE);
  416.  
  417.         /* should now transmit yuv values */
  418.         for (y = 0; y < yuvHeight; y++) {            /* Y */
  419.             SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
  420.         }
  421.  
  422.         for (y = 0; y < yuvHeight / 2; y++) {            /* U */
  423.             SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth / 2);
  424.         }
  425.  
  426.         for (y = 0; y < yuvHeight / 2; y++) {            /* V */
  427.             SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth / 2);
  428.         }
  429.  
  430. /* now, make sure we don't leave until other processor read everything */
  431.  
  432.         SafeRead(otherSock, (char *)buffer, 4);
  433.         /* should = 0 */
  434.         }
  435.  
  436.         if ( debugSockets ) {
  437.         fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n",
  438.             frameNumber);
  439.         }
  440.  
  441.         Frame_Free(frame);
  442.     }
  443.  
  444.     close(otherSock);
  445.     }
  446.  
  447.     close(serverSocket);
  448.  
  449.     if ( debugSockets ) {
  450.     fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
  451.     }
  452. }
  453.  
  454.  
  455. /*===========================================================================*
  456.  *
  457.  * SendRemoteFrame
  458.  *
  459.  *    called by a slave to the I/O server; sends an encoded frame
  460.  *    to the server to be sent to disk
  461.  *
  462.  * RETURNS:    nothing
  463.  *
  464.  * SIDE EFFECTS:    none
  465.  *
  466.  *===========================================================================*/
  467. void
  468. SendRemoteFrame(frameNumber, bb)
  469.     int frameNumber;
  470.     BitBucket *bb;
  471. {
  472.     int    clientSocket;
  473.     u_long  data;
  474.     int        negativeFour = -4;
  475.     time_t  tempTimeStart, tempTimeEnd;
  476.  
  477.     time(&tempTimeStart);
  478.  
  479.     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
  480.  
  481.     data = htonl(negativeFour);
  482.     SafeWrite(clientSocket, (char *)&data, 4);
  483.  
  484.     data = htonl(frameNumber);
  485.     SafeWrite(clientSocket, (char *)&data, 4);
  486.  
  487.     if ( frameNumber != -1 ) {
  488.     /* send number of bytes */
  489.     data = (bb->totalbits+7)/8;
  490.     data = htonl(data);
  491.     SafeWrite(clientSocket, (char *)&data, 4);
  492.  
  493.     /* now send the bytes themselves */
  494.     Bitio_WriteToSocket(bb, clientSocket);
  495.     }
  496.  
  497.     close(clientSocket);
  498.  
  499.     time(&tempTimeEnd);
  500.     IOtime += (tempTimeEnd-tempTimeStart);
  501. }
  502.  
  503.  
  504.  
  505.  
  506. /*===========================================================================*
  507.  *
  508.  * NoteFrameDone
  509.  *
  510.  *    called by a slave to the Combine server; tells it these frames are
  511.  *    done
  512.  *
  513.  * RETURNS:    nothing
  514.  *
  515.  * SIDE EFFECTS:    none
  516.  *
  517.  *===========================================================================*/
  518. void
  519. NoteFrameDone(frameStart, frameEnd)
  520.     int frameStart;
  521.     int frameEnd;
  522. {
  523.     int    clientSocket;
  524.     u_long  data;
  525.     int        negativeTwo = -2;
  526.     time_t  tempTimeStart, tempTimeEnd;
  527.  
  528.     time(&tempTimeStart);
  529.  
  530.     clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);
  531.  
  532.     data = negativeTwo;
  533.     data = htonl(negativeTwo);
  534.     SafeWrite(clientSocket, (char *)&data, 4);
  535.  
  536.     data = htonl(frameStart);
  537.     SafeWrite(clientSocket, (char *)&data, 4);
  538.  
  539.     data = htonl(frameEnd);
  540.     SafeWrite(clientSocket, (char *)&data, 4);
  541.  
  542.     close(clientSocket);
  543.  
  544.     time(&tempTimeEnd);
  545.     IOtime += (tempTimeEnd-tempTimeStart);
  546. }
  547.  
  548.  
  549. /*===========================================================================*
  550.  *
  551.  * GetRemoteFrame
  552.  *
  553.  *    called by a slave; gets a remote frame from the I/O server
  554.  *
  555.  * RETURNS:    nothing
  556.  *
  557.  * SIDE EFFECTS:    none
  558.  *
  559.  *===========================================================================*/
  560. void
  561. GetRemoteFrame(frame, frameNumber)
  562.     MpegFrame *frame;
  563.     int frameNumber;
  564. {
  565.     FILE    *filePtr;
  566.     int    clientSocket;
  567.     unsigned char   smallBuffer[1000];
  568.     register int y;
  569.     int        numBytes;
  570.     u_long  data;
  571.     char    fileName[256];
  572.  
  573.     Fsize_Note(frameNumber, yuvWidth, yuvHeight);
  574.  
  575.     if ( debugSockets ) {
  576.     fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
  577.         getenv("HOST"), frameNumber);
  578.     fflush(stdout);
  579.     }
  580.  
  581.     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
  582.  
  583.     data = frameNumber;
  584.     data = htonl(data);
  585.     SafeWrite(clientSocket, (char *)&data, 4);
  586.  
  587.     if ( frameNumber != -1 ) {
  588.     if ( separateConversion ) {
  589.         sprintf(fileName, "/tmp/foobar%d", machineNumber);
  590. /* @@@ Open binary mode for OS/2, Andy Key */
  591. #ifdef OS2
  592.         filePtr = fopen(fileName, "wb");
  593. #else
  594.         filePtr = fopen(fileName, "w");
  595. #endif
  596.  
  597.         /* read in stuff, SafeWrite to file, perform local conversion */
  598.         do {
  599.         SafeRead(clientSocket, (char *)&numBytes, 4);
  600.         numBytes = ntohl(numBytes);
  601.  
  602.         SafeRead(clientSocket, (char *)smallBuffer, numBytes);
  603.  
  604.         fwrite(smallBuffer, 1, numBytes, filePtr);
  605.         } while ( numBytes == 1000 );
  606.  
  607.         fclose(filePtr);
  608.  
  609.         /* now do slave conversion */
  610.         ReadFrame(frame, fileName, slaveConversion, FALSE);
  611.     } else {
  612.         Frame_AllocYCC(frame);
  613.  
  614.         if ( debugSockets ) {
  615.         fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
  616.             getenv("HOST"), frameNumber);
  617.         fflush(stdout);
  618.         }
  619.  
  620.         /* should now read yuv values */
  621.         for (y = 0; y < yuvHeight; y++) {            /* Y */
  622.         SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
  623.         }
  624.  
  625.         for (y = 0; y < yuvHeight / 2; y++) {        /* U */
  626.         SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth/2);
  627.         }
  628.  
  629.         for (y = 0; y < yuvHeight / 2; y++) {        /* V */
  630.         SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth/2);
  631.         }
  632.     }
  633.     }
  634.  
  635.     data = 0;
  636.     data = htonl(data);
  637.     SafeWrite(clientSocket, (char *)&data, 4);
  638.  
  639.     close(clientSocket);
  640.  
  641.     if ( debugSockets ) {
  642.     fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
  643.         getenv("HOST"), frameNumber);
  644.     fflush(stdout);
  645.     }
  646. }
  647.  
  648.  
  649. /*===========================================================================*
  650.  *
  651.  * StartCombineServer
  652.  *
  653.  *    start-up the CombineServer with this process
  654.  *    handles combination of frames, and exits
  655.  *    when master tells it to
  656.  *
  657.  * RETURNS:    nothing
  658.  *
  659.  * SIDE EFFECTS:    none
  660.  *
  661.  *===========================================================================*/
  662. void
  663. StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
  664.     int numInputFiles;
  665.     char *outputFileName;
  666.     char *parallelHostName;
  667.     int portNum;
  668. {
  669.     int        combinePortNum;
  670.     FILE    *ofp;
  671.  
  672.     /* once we get Combine port num, should transmit it to parallel server */
  673.  
  674.     outputServerSocket = CreateListeningSocket(&combinePortNum);
  675.  
  676.     if ( debugSockets ) {
  677.     fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
  678.     }
  679.  
  680.     TransmitPortNum(parallelHostName, portNum, combinePortNum);
  681.  
  682.     frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
  683.     bzero((char *)frameDone, numInputFiles*sizeof(boolean));
  684.  
  685. /* @@@ Open binary mode for OS/2, Andy Key */
  686. #ifdef OS2
  687.     if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
  688. #else
  689.     if ( (ofp = fopen(outputFileName, "w")) == NULL ) {
  690. #endif
  691.     fprintf(stderr, "ERROR:  Could not open output file!\n");
  692.     fflush(stderr);
  693.     exit(1);
  694.     }
  695.     FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
  696.  
  697.     if ( debugSockets ) {
  698.     fprintf(stdout, "====COMBINE SERVER:  Shutting Down\n");
  699.     fflush(stdout);
  700.     }
  701.  
  702.     /* tell Master server we are done */
  703.     TransmitPortNum(parallelHostName, portNum, combinePortNum);
  704.  
  705.     close(outputServerSocket);
  706. }
  707.  
  708.  
  709. /*===========================================================================*
  710.  *
  711.  * WaitForOutputFile
  712.  *
  713.  *    keep handling output events until we get the specified frame
  714.  *    number
  715.  *
  716.  * RETURNS:    nothing
  717.  *
  718.  * SIDE EFFECTS:    none
  719.  *
  720.  *===========================================================================*/
  721. void
  722. WaitForOutputFile(number)
  723.     int number;
  724. {
  725.     int        otherSock;
  726.     static int otherSize = sizeof(struct sockaddr);
  727.     struct sockaddr otherSocket;
  728.     int        frameNumber;
  729.     int32   buffer[8];
  730.     int frameStart, frameEnd;
  731.  
  732.     while ( ! frameDone[number] ) {
  733.     otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
  734.     if ( otherSock == -1 ) {
  735.         fprintf(stderr, "ERROR:  Combine SERVER accept returned error %d\n", errno);
  736.         exit(1);
  737.     }
  738.  
  739.     SafeRead(otherSock, (char *)buffer, 4);
  740.     frameNumber = ntohl(buffer[0]);
  741.  
  742.     if ( frameNumber == -2 ) {
  743.         /* this is notification from non-remote process that a frame is done */
  744.  
  745.         SafeRead(otherSock, (char *)buffer, 8);
  746.         frameStart = buffer[0];
  747.         frameStart = ntohl(frameStart);
  748.         frameEnd = buffer[1];
  749.         frameEnd = ntohl(frameEnd);
  750.  
  751.         for ( frameNumber = frameStart; frameNumber <= frameEnd;
  752.           frameNumber++ ) {
  753.         frameDone[frameNumber] = TRUE;
  754.         }
  755.     }
  756.  
  757.     close(otherSock);
  758.     }
  759.  
  760.     if ( debugSockets ) {
  761.     fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
  762.     fflush(stdout);
  763.     }
  764. }
  765.  
  766.  
  767.             /*=====================*
  768.              * MASTER SERVER STUFF *
  769.              *=====================*/
  770.  
  771.  
  772. /*===========================================================================*
  773.  *
  774.  * StartMasterServer
  775.  *
  776.  *    start the master server with this process
  777.  *
  778.  * RETURNS:    nothing
  779.  *
  780.  * SIDE EFFECTS:    none
  781.  *
  782.  *===========================================================================*/
  783. void
  784. StartMasterServer(numInputFiles, paramFile, outputFileName)
  785.     int numInputFiles;
  786.     char *paramFile;
  787.     char *outputFileName;
  788. {
  789.     FILE    *filePtr;
  790.     register int index, index2;
  791.     int        framesPerMachine;
  792.     char    command[1024];
  793.     char    *hostName;
  794.     int        portNum;
  795.     int        serverSocket;
  796.     boolean finished[MAX_MACHINES];
  797.     int        numFinished;
  798.     int        otherSock, otherSize;
  799.     struct sockaddr otherSocket;
  800.     int        seconds;
  801.     int32   buffer[8];
  802.     int ioPortNum[MAX_IO_SERVERS];
  803.     int        combinePortNum, decodePortNum;
  804.     int        nextFrame;
  805.     int        numFrames[MAX_MACHINES];
  806.     int        lastNumFrames[MAX_MACHINES];
  807.     int        numSeconds[MAX_MACHINES];
  808.     float   framesPerSecond;
  809.     float   totalFPS, localFPS;
  810.     int        framesDone;
  811.     float   avgFPS;
  812.     char    niceNess[256];
  813.     int32   startFrame, endFrame;
  814.     int numInputPorts = 0;
  815.     int    numRemote = SOMAXCONN;
  816.     int totalRemote = 0;
  817.     time_t  startUpBegin, startUpEnd;
  818.     time_t  shutDownBegin, shutDownEnd;
  819.  
  820.     time(&startUpBegin);
  821.  
  822.     if ( niceProcesses ) {
  823.     sprintf(niceNess, "nice");
  824.     } else {
  825.     niceNess[0] = '\0';
  826.     }
  827.  
  828.     time(&timeStart);
  829.  
  830.     PrintStartStats(-1, 0);
  831.  
  832.     /* create a server socket */
  833.     hostName = getenv("HOST");
  834.  
  835.     if ( hostName == NULL ) {
  836.     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
  837.     exit(1);
  838.     }
  839.  
  840.     hostEntry = gethostbyname(hostName);
  841.     if ( hostEntry == NULL ) {
  842.     fprintf(stderr, "ERROR:  Could not find host %s in database\n",
  843.         hostName);
  844.     exit(1);
  845.     }
  846.  
  847.     hostName = hostEntry->h_name;
  848.  
  849.     serverSocket = CreateListeningSocket(&portNum);
  850.     if ( debugSockets ) {
  851.     fprintf(stdout, "---USING PORT %d\n", portNum);
  852.     }
  853.  
  854.     /* START COMBINE SERVER */
  855.     sprintf(command, "mpeg_encode -max_machines %d -output_server %s %d %d %s &",
  856.         numMachines, hostName, portNum, numInputFiles, paramFile);
  857.     system(command);    /* should do an exec? */
  858.  
  859.     /* should now listen for connection from Combine server */
  860.     otherSize = sizeof(otherSocket);
  861.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  862.     if ( otherSock == -1 ) {
  863.     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
  864.     exit(1);
  865.     }
  866.  
  867.     SafeRead(otherSock, (char *)(&combinePortNum), 4);
  868.     combinePortNum = ntohl(combinePortNum);
  869.     close(otherSock);
  870.  
  871.     if ( debugSockets ) {
  872.     fprintf(stdout, "---MASTER SERVER:  Combine port number = %d\n",
  873.         combinePortNum);
  874.     }
  875.  
  876.     /* START DECODE SERVER if necessary */
  877.     if ( referenceFrame == DECODED_FRAME ) {
  878.     sprintf(command, "mpeg_encode -max_machines %d -decode_server %s %d %d %s &",
  879.         numMachines, hostName, portNum, numInputFiles, paramFile);
  880.     system(command);    /* should do an exec? */
  881.  
  882.     /* should now listen for connection from Decode server */
  883.     otherSize = sizeof(otherSocket);
  884.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  885.     if ( otherSock == -1 ) {
  886.         fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
  887.         exit(1);
  888.     }
  889.  
  890.     SafeRead(otherSock, (char *)(&decodePortNum), 4);
  891.     decodePortNum = ntohl(decodePortNum);
  892.     close(otherSock);
  893.  
  894.     if ( debugSockets ) {
  895.         fprintf(stdout, "---MASTER SERVER:  Decode port number = %d\n",
  896.             decodePortNum);
  897.     }
  898.     }
  899.  
  900.     /* we are doing whole thing (if not, see above) */
  901.  
  902.     framesPerMachine = numInputFiles/numMachines;
  903.  
  904.     numFinished = 0;
  905.  
  906.     /* count number of remote machines */
  907.     for ( index = 0; index < numMachines; index++ ) {
  908.     if ( remote[index] ) {
  909.         totalRemote++;
  910.     }
  911.     }
  912.  
  913.     /* DO INITIAL TIME TESTS */
  914.     nextFrame = 0;
  915.     for ( index = 0; index < numMachines; index++ ) {
  916.     if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
  917.         /* Create an I/O server */
  918.         sprintf(command, "mpeg_encode -max_machines %d -io_server %s %d %s &",
  919.             numMachines, hostName, portNum, paramFile);
  920.         system(command);    /* should do an exec? */
  921.  
  922.         /* should now listen for connection from I/O server */
  923.         otherSize = sizeof(otherSocket);
  924.         otherSock = accept(serverSocket, &otherSocket, &otherSize);
  925.         if ( otherSock == -1 ) {
  926.         fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
  927.         exit(1);
  928.         }
  929.  
  930.         SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
  931.         ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
  932.         close(otherSock);
  933.  
  934.         if ( debugSockets ) {
  935.         fprintf(stdout, "---MASTER SERVER:  I/O port number = %d\n",
  936.             ioPortNum[numInputPorts]);
  937.         }
  938.  
  939.         numInputPorts++;
  940.         numRemote = 0;
  941.     }
  942.  
  943.     finished[index] = FALSE;
  944.     numSeconds[index] = 0;
  945.  
  946.     startFrame = nextFrame;
  947.     if ( parallelPerfect ) {
  948.         endFrame = startFrame+((numInputFiles-startFrame)/
  949.                    (numMachines-index))  -1;
  950.  
  951.         if ( forceIalign ) {
  952.         /* round to nearest pattern length */
  953.         endFrame = startFrame-1+framePatternLen*
  954.                 ((endFrame-startFrame+1+framePatternLen/2)/
  955.                  framePatternLen);
  956.         if ( endFrame == startFrame-1 ) {
  957.             endFrame = startFrame-1+framePatternLen;
  958.         }
  959.         }
  960.  
  961.         /* always give at least 1 frame */
  962.         if ( endFrame < startFrame ) {
  963.         endFrame = startFrame;
  964.         }
  965.  
  966.         /* make sure not out of bounds */
  967.         if ( endFrame >= numInputFiles ) {
  968.         endFrame = numInputFiles-1;
  969.         }
  970.     } else if ( forceIalign ) {
  971.         endFrame = startFrame+framePatternLen-1;
  972.     } else {
  973.         endFrame = startFrame+parallelTestFrames-1;
  974.     }
  975.         
  976.     if ( remote[index] ) {
  977.         sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s &",
  978.             rsh,
  979.             machineName[index], userName[index], niceNess,
  980.             executable[index],
  981.             hostName, portNum, ioPortNum[numInputPorts-1],
  982.             combinePortNum, decodePortNum, index,
  983.             remote[index],
  984.             startFrame, endFrame,
  985.             remoteParamFile[index]);
  986.         numRemote++;
  987.         totalRemote--;
  988.     } else {
  989.         sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s &",
  990.             rsh,
  991.             machineName[index], userName[index], niceNess,
  992.             executable[index],
  993.             hostName, portNum, ioPortNum[numInputPorts-1],
  994.             combinePortNum, decodePortNum, index,
  995.             remote[index],
  996.             startFrame, endFrame,
  997.             paramFile);
  998.     }
  999.  
  1000.     if ( debugMachines ) {
  1001.         fprintf(stdout, "---%s:  frames %d to %d\n",
  1002.             machineName[index],
  1003.             startFrame, endFrame);
  1004.     }
  1005.     system(command);    /* should do an exec? */
  1006.  
  1007.     nextFrame = endFrame+1;
  1008.     numFrames[index] = endFrame-startFrame+1;
  1009.     lastNumFrames[index] = endFrame-startFrame+1;
  1010.     }
  1011.  
  1012.     framesDone = 0;
  1013.  
  1014.     time(&startUpEnd);
  1015.  
  1016.     /* now, wait for other processes to finish and boss them around */
  1017.     while ( numFinished != numMachines ) {
  1018.     otherSize = sizeof(otherSocket);
  1019.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  1020.     if ( otherSock == -1 ) {
  1021.         fprintf(stderr, "ERROR:  MASTER SERVER 2 accept returned error %d\n", errno);
  1022.         exit(1);
  1023.     }
  1024.  
  1025.     SafeRead(otherSock, (char *)buffer, 8);
  1026.  
  1027.     index = ntohl(buffer[0]);
  1028.     seconds = ntohl(buffer[1]);
  1029.  
  1030.     numSeconds[index] += seconds;
  1031.  
  1032.     if ( seconds != 0 )
  1033.         framesPerSecond = (float)lastNumFrames[index]/(float)seconds;
  1034.     else
  1035.         framesPerSecond = (float)lastNumFrames[index]*2.0;
  1036.  
  1037.     framesDone += lastNumFrames[index];
  1038.  
  1039.     if ( nextFrame >= numInputFiles ) {
  1040.         buffer[0] = htonl(-1);
  1041.         buffer[1] = htonl(0);
  1042.         SafeWrite(otherSock, (char *)buffer, 8);
  1043.         numFinished++;
  1044.  
  1045.         if ( debugMachines ) {
  1046.         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
  1047.             machineName[index], framesPerSecond, numFinished,
  1048.             numMachines);
  1049.         }
  1050.     } else {
  1051.         avgFPS = (float)numFrames[index]/(float)numSeconds[index];
  1052.  
  1053.         startFrame = nextFrame;
  1054.         endFrame = nextFrame +
  1055.                 (int)((float)parallelTimeChunks*avgFPS) - 1;
  1056.  
  1057.         if ( forceIalign ) {
  1058.         /* round to nearest pattern length */
  1059.         endFrame = startFrame-1+framePatternLen*
  1060.                 ((endFrame-startFrame+1+framePatternLen/2)/
  1061.                  framePatternLen);
  1062.         if ( endFrame == startFrame-1 ) {
  1063.             endFrame = startFrame-1+framePatternLen;
  1064.         }
  1065.         }
  1066.  
  1067.         if ( endFrame < startFrame ) {   /* always give at least 1 frame */
  1068.         endFrame = startFrame;
  1069.         }
  1070.         if ( endFrame >= numInputFiles ) {
  1071.         endFrame = numInputFiles-1;
  1072.         }
  1073.  
  1074.         nextFrame = endFrame+1;
  1075.  
  1076.         numFrames[index] += (endFrame-startFrame+1);
  1077.         lastNumFrames[index] = (endFrame-startFrame+1);
  1078.  
  1079.         if ( debugMachines ) {
  1080.         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
  1081.             machineName[index], framesPerSecond, numFinished,
  1082.             numMachines, startFrame, endFrame);
  1083.         }
  1084.  
  1085.         buffer[0] = htonl(startFrame);
  1086.         buffer[1] = htonl(endFrame);
  1087.  
  1088.         SafeWrite(otherSock, (char *)buffer, 8);
  1089.     }
  1090.  
  1091.     close(otherSock);
  1092.  
  1093.     if ( debugMachines ) {
  1094.         fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
  1095.             framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
  1096.     }
  1097.     }
  1098.  
  1099.     time(&shutDownBegin);
  1100.  
  1101.     /* end all input servers */
  1102.     IOhostName = hostName;
  1103.     for ( index = 0; index < numInputPorts; index++ ) {
  1104.     ioPortNumber = ioPortNum[index];
  1105.     EndIOServer();
  1106.     }
  1107.  
  1108.     /* now wait for CombineServer to tell us they're done */
  1109.     otherSize = sizeof(otherSocket);
  1110.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  1111.     if ( otherSock == -1 ) {
  1112.     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
  1113.     exit(1);
  1114.     }
  1115.  
  1116.     SafeRead(otherSock, (char *)buffer, 4);
  1117.     close(otherSock);
  1118.     
  1119.     close(serverSocket);
  1120.  
  1121.     time(&timeEnd);
  1122.     diffTime = (int32)(timeEnd-timeStart);
  1123.  
  1124.     time(&shutDownEnd);
  1125.  
  1126.     for ( index2 = 0; index2 < 2; index2++ ) {
  1127.     if ( index2 == 0 ) {
  1128.         filePtr = stdout;
  1129.     } else if ( statFile != NULL ) {
  1130.         filePtr = statFile;
  1131.     } else {
  1132.         continue;
  1133.     }
  1134.  
  1135.     fprintf(filePtr, "\n\n");
  1136.     fprintf(filePtr, "PARALLEL SUMMARY\n");
  1137.     fprintf(filePtr, "----------------\n");
  1138.     fprintf(filePtr, "\n");
  1139.     fprintf(filePtr, "START UP TIME:  %d seconds\n",
  1140.         startUpEnd-startUpBegin);
  1141.     fprintf(filePtr, "SHUT DOWN TIME:  %d seconds\n",
  1142.         shutDownEnd-shutDownBegin);
  1143.  
  1144.     fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
  1145.         "MACHINE");
  1146.     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
  1147.     totalFPS = 0.0;
  1148.     for ( index = 0; index < numMachines; index++ ) {
  1149.         localFPS = (float)numFrames[index]/(float)numSeconds[index];
  1150.         fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
  1151.             machineName[index], numFrames[index], numSeconds[index],
  1152.             localFPS, (int)((float)numInputFiles/localFPS));
  1153.         totalFPS += localFPS;
  1154.     }
  1155.  
  1156.     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
  1157.  
  1158.     fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL", 
  1159.         (int)((float)numInputFiles/totalFPS),
  1160.         totalFPS);
  1161.     fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime, 
  1162.         (float)numInputFiles/(float)diffTime);
  1163.  
  1164.     fprintf(filePtr, "\n\n");
  1165.     }
  1166.  
  1167.     if ( statFile != NULL ) {
  1168.     fclose(statFile);
  1169.     }
  1170. }
  1171.  
  1172.  
  1173. /*===========================================================================*
  1174.  *
  1175.  * NotifyMasterDone
  1176.  *
  1177.  *    called by a slave process; tells the master process it is done
  1178.  *
  1179.  * RETURNS:    nothing
  1180.  *
  1181.  * SIDE EFFECTS:    none
  1182.  *
  1183.  *===========================================================================*/
  1184. boolean
  1185. NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
  1186.          frameEnd)
  1187.     char *hostName;
  1188.     int portNum;
  1189.     int machineNumber;
  1190.     int seconds;
  1191.     int *frameStart;
  1192.     int *frameEnd;
  1193. {
  1194.     int    clientSocket;
  1195.     int32   buffer[8];
  1196.     time_t  tempTimeStart, tempTimeEnd;
  1197.  
  1198.     time(&tempTimeStart);
  1199.  
  1200.     clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
  1201.  
  1202.     buffer[0] = htonl(machineNumber);
  1203.     buffer[1] = htonl(seconds);
  1204.  
  1205.     SafeWrite(clientSocket, (char *)buffer, 8);
  1206.  
  1207.     SafeRead(clientSocket, (char *)buffer, 8);
  1208.     *frameStart = ntohl(buffer[0]);
  1209.     *frameEnd = ntohl(buffer[1]);
  1210.  
  1211.     close(clientSocket);
  1212.  
  1213.     time(&tempTimeEnd);
  1214.     IOtime += (tempTimeEnd-tempTimeStart);
  1215.  
  1216.     return ((*frameStart) >= 0);
  1217. }
  1218.  
  1219.  
  1220. /*===========================================================================*
  1221.  *
  1222.  * StartDecodeServer
  1223.  *
  1224.  *    start-up the DecodeServer with this process
  1225.  *    handles transfer of decoded frames to/from processes, and exits
  1226.  *    when master tells it to
  1227.  *    this is necessary only if referenceFrame == DECODED_FRAME
  1228.  *
  1229.  * RETURNS:    nothing
  1230.  *
  1231.  * SIDE EFFECTS:    none
  1232.  *
  1233.  *===========================================================================*/
  1234. void
  1235. StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
  1236.     int numInputFiles;
  1237.     char *decodeFileName;
  1238.     char *parallelHostName;
  1239.     int portNum;
  1240. {
  1241.     int        otherSock, otherSize;
  1242.     struct sockaddr otherSocket;
  1243.     int        decodePortNum;
  1244.     int32   buffer[8];
  1245.     int        frameReady;
  1246.     boolean *ready;
  1247.     int        *waitMachine;
  1248.     int        *waitPort;
  1249.     int        *waitList;
  1250.     int        slaveNumber;
  1251.     int        slavePort;
  1252.     int        waitPtr;
  1253.     struct hostent *nullHost = NULL;
  1254.     int        clientSocket;
  1255.  
  1256.     /* should keep list of port numbers to notify when frames become ready */
  1257.  
  1258.     ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
  1259.     waitMachine = (int *) calloc(numInputFiles, sizeof(int));
  1260.     waitPort = (int *) malloc(numMachines*sizeof(int));
  1261.     waitList = (int *) calloc(numMachines, sizeof(int));
  1262.  
  1263.     /* once we get Decode port num, should transmit it to parallel server */
  1264.  
  1265.     decodeServerSocket = CreateListeningSocket(&decodePortNum);
  1266.  
  1267.     if ( debugSockets ) {
  1268.     fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
  1269.     }
  1270.  
  1271.     TransmitPortNum(parallelHostName, portNum, decodePortNum);
  1272.  
  1273.     frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
  1274.     bzero((char *)frameDone, numInputFiles*sizeof(boolean));
  1275.  
  1276.     /* wait for ready signals and requests */
  1277.     while ( TRUE ) {
  1278.     otherSize = sizeof(otherSocket);
  1279.     otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
  1280.     if ( otherSock == -1 ) {
  1281.         fprintf(stderr, "ERROR:  DECODE SERVER accept returned error %d\n", errno);
  1282.         exit(1);
  1283.     }
  1284.  
  1285.     SafeRead(otherSock, (char *)buffer, 4);
  1286.     frameReady = buffer[0];
  1287.     frameReady = ntohl(frameReady);
  1288.  
  1289.     if ( frameReady == -2 ) {
  1290.         SafeRead(otherSock, (char *)buffer, 4);
  1291.         frameReady = buffer[0];
  1292.         frameReady = ntohl(frameReady);
  1293.  
  1294.         if ( debugSockets ) {
  1295.         fprintf(stdout, "====DECODE SERVER:  REQUEST FOR %d\n", frameReady);
  1296.         fflush(stdout);        
  1297.         }
  1298.  
  1299.         /* now respond if it's ready yet */
  1300.         buffer[0] = frameDone[frameReady];
  1301.         buffer[0] = htonl(buffer[0]);
  1302.         SafeWrite(otherSock, (char *)buffer, 4);
  1303.  
  1304.         if ( ! frameDone[frameReady] ) {
  1305.         /* read machine number, port number */
  1306.         SafeRead(otherSock, (char *)buffer, 8);
  1307.         slaveNumber = buffer[0];
  1308.         slaveNumber = ntohl(slaveNumber);
  1309.         slavePort = buffer[1];
  1310.         slavePort = ntohl(slavePort);
  1311.  
  1312.         if ( debugSockets ) {
  1313.             fprintf(stdout, "WAITING:  SLAVE %d, PORT %d\n",
  1314.                 slaveNumber, slavePort);
  1315.         }
  1316.  
  1317.         waitPort[slaveNumber] = slavePort;
  1318.         if ( waitMachine[frameReady] == 0 ) {
  1319.             waitMachine[frameReady] = slaveNumber+1;
  1320.         } else {
  1321.             /* someone already waiting for this frame */
  1322.             /* follow list of waiters to the end */
  1323.             waitPtr = waitMachine[frameReady]-1;
  1324.             while ( waitList[waitPtr] != 0 ) {
  1325.             waitPtr = waitList[waitPtr]-1;
  1326.             }
  1327.  
  1328.             waitList[waitPtr] = slaveNumber+1;
  1329.             waitList[slaveNumber] = 0;
  1330.         }
  1331.         }
  1332.     } else {
  1333.         frameDone[frameReady] = TRUE;
  1334.  
  1335.         if ( debugSockets ) {
  1336.         fprintf(stdout, "====DECODE SERVER:  FRAME %d READY\n", frameReady);
  1337.         fflush(stdout);
  1338.         }
  1339.  
  1340.         if ( waitMachine[frameReady] ) {
  1341.         /* need to notify one or more machines it's ready */
  1342.         waitPtr = waitMachine[frameReady]-1;
  1343.         while ( waitPtr >= 0 ) {
  1344.             clientSocket = ConnectToSocket(machineName[waitPtr],
  1345.                            waitPort[waitPtr],
  1346.                            &nullHost);
  1347.             close(clientSocket);
  1348.             waitPtr = waitList[waitPtr]-1;
  1349.         }
  1350.         }
  1351.     }
  1352.  
  1353.     close(otherSock);
  1354.     }
  1355.  
  1356.     if ( debugSockets ) {
  1357.     fprintf(stdout, "====DECODE SERVER:  Shutting Down\n");
  1358.     fflush(stdout);
  1359.     }
  1360.  
  1361.     /* tell Master server we are done */
  1362.     TransmitPortNum(parallelHostName, portNum, decodePortNum);
  1363.  
  1364.     close(decodeServerSocket);
  1365. }
  1366.  
  1367.  
  1368. /*=====================*
  1369.  * INTERNAL PROCEDURES *
  1370.  *=====================*/
  1371.  
  1372.  
  1373. /*===========================================================================*
  1374.  *
  1375.  * TransmitPortNum
  1376.  *
  1377.  *    called by the I/O or Combine server; transmits the appropriate
  1378.  *    port number to the master
  1379.  *
  1380.  * RETURNS:    nothing
  1381.  *
  1382.  * SIDE EFFECTS:    none
  1383.  *
  1384.  *===========================================================================*/
  1385. static void
  1386. TransmitPortNum(hostName, portNum, newPortNum)
  1387.     char *hostName;
  1388.     int portNum;
  1389.     int newPortNum;
  1390. {
  1391.     int    clientSocket;
  1392.     u_long  data;
  1393.  
  1394.     clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
  1395.  
  1396.     data = htonl(newPortNum);
  1397.     SafeWrite(clientSocket, (char *) &data, 4);
  1398.  
  1399.     close(clientSocket);
  1400. }
  1401.  
  1402.  
  1403. /*===========================================================================*
  1404.  *
  1405.  * SafeRead
  1406.  *
  1407.  *    safely read from the given socket; the procedure keeps reading until
  1408.  *    it gets the number of bytes specified
  1409.  *
  1410.  * RETURNS:    nothing
  1411.  *
  1412.  * SIDE EFFECTS:    none
  1413.  *
  1414.  *===========================================================================*/
  1415. static void
  1416. SafeRead(fd, buf, nbyte)
  1417.     int fd;
  1418.     char *buf;
  1419.     int nbyte;
  1420. {
  1421.     int numRead;
  1422.     int result;
  1423.  
  1424.     numRead = 0;
  1425.  
  1426.     while ( numRead != nbyte ) {
  1427.         result = read(fd, &buf[numRead], nbyte-numRead);
  1428.  
  1429.     if ( result == -1 ) {
  1430.         fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
  1431.             nbyte-numRead, nbyte, errno);
  1432.         exit(1);
  1433.     }
  1434.     numRead += result;
  1435.     }
  1436. }
  1437.  
  1438.  
  1439. /*===========================================================================*
  1440.  *
  1441.  * SafeWrite
  1442.  *
  1443.  *    safely write to the given socket; the procedure keeps writing until
  1444.  *    it sends the number of bytes specified
  1445.  *
  1446.  * RETURNS:    nothing
  1447.  *
  1448.  * SIDE EFFECTS:    none
  1449.  *
  1450.  *===========================================================================*/
  1451. static void
  1452. SafeWrite(fd, buf, nbyte)
  1453.     int fd;
  1454.     char *buf;
  1455.     int nbyte;
  1456. {
  1457.     int numWritten;
  1458.     int result;
  1459.  
  1460.     numWritten = 0;
  1461.  
  1462.     while ( numWritten != nbyte ) {
  1463.         result = write(fd, &buf[numWritten], nbyte-numWritten);
  1464.  
  1465.     if ( result == -1 ) {
  1466.         fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
  1467.             nbyte-numWritten, nbyte, errno);
  1468.         exit(1);
  1469.     }
  1470.     numWritten += result;
  1471.     }
  1472. }
  1473.  
  1474.  
  1475. /*===========================================================================*
  1476.  *
  1477.  * EndIOServer
  1478.  *
  1479.  *    called by the master process -- tells the I/O server to commit
  1480.  *    suicide
  1481.  *
  1482.  * RETURNS:    nothing
  1483.  *
  1484.  * SIDE EFFECTS:    none
  1485.  *
  1486.  *===========================================================================*/
  1487. static void
  1488. EndIOServer()
  1489. {
  1490.     /* send signal to IO server:  -1 as frame number */
  1491.     GetRemoteFrame(NULL, -1);
  1492. }
  1493.  
  1494.  
  1495. /*===========================================================================*
  1496.  *
  1497.  * NotifyDecodeServerReady
  1498.  *
  1499.  *    called by a slave to the Decode Server to tell it a decoded frame
  1500.  *    is ready and waiting
  1501.  *
  1502.  * RETURNS:    nothing
  1503.  *
  1504.  * SIDE EFFECTS:    none
  1505.  *
  1506.  *===========================================================================*/
  1507. void
  1508. NotifyDecodeServerReady(id)
  1509.     int id;
  1510. {
  1511.     int    clientSocket;
  1512.     u_long  data;
  1513.     time_t  tempTimeStart, tempTimeEnd;
  1514.  
  1515.     time(&tempTimeStart);
  1516.  
  1517.     clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
  1518.  
  1519.     data = htonl(id);
  1520.     SafeWrite(clientSocket, (char *)&data, 4);
  1521.  
  1522.     close(clientSocket);
  1523.  
  1524.     time(&tempTimeEnd);
  1525.     IOtime += (tempTimeEnd-tempTimeStart);
  1526. }
  1527.  
  1528.  
  1529. /*===========================================================================*
  1530.  *
  1531.  * WaitForDecodedFrame
  1532.  *
  1533.  *    blah blah blah
  1534.  *
  1535.  * RETURNS:    nothing
  1536.  *
  1537.  * SIDE EFFECTS:    none
  1538.  *
  1539.  *===========================================================================*/
  1540. void
  1541. WaitForDecodedFrame(id)
  1542.     int id;
  1543. {
  1544.     int    clientSocket;
  1545.     u_long  data;
  1546.     int        negativeTwo = -2;
  1547.     int     ready;
  1548.  
  1549.     /* wait for a decoded frame */
  1550.     if ( debugSockets ) {
  1551.     fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
  1552.     }
  1553.  
  1554.     clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
  1555.  
  1556.     /* first, tell DecodeServer we're waiting for this frame */
  1557.     data = negativeTwo;
  1558.     data = htonl(negativeTwo);
  1559.     SafeWrite(clientSocket, (char *)&data, 4);
  1560.  
  1561.     data = htonl(id);
  1562.     SafeWrite(clientSocket, (char *)&data, 4);
  1563.  
  1564.     SafeRead(clientSocket, (char *)&data, 4);
  1565.     ready = data;
  1566.     ready = ntohl(ready);
  1567.  
  1568.     if ( ! ready ) {
  1569.     int        waitSocket;
  1570.     int        waitPort;
  1571.     int        otherSock, otherSize;
  1572.     struct sockaddr otherSocket;
  1573.  
  1574.     /* it's not ready; set up a connection and wait for decode server */
  1575.     waitSocket = CreateListeningSocket(&waitPort);
  1576.  
  1577.         /* tell decode server where we are */
  1578.         data = machineNumber;
  1579.         data = ntohl(data);
  1580.         SafeWrite(clientSocket, (char *)&data, 4);
  1581.  
  1582.         data = waitPort;
  1583.         data = ntohl(data);
  1584.         SafeWrite(clientSocket, (char *)&data, 4);
  1585.  
  1586.         close(clientSocket);
  1587.  
  1588.         if ( debugSockets ) {
  1589.         fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
  1590.         fflush(stdout);
  1591.         }
  1592.  
  1593.         otherSize = sizeof(otherSocket);
  1594.         otherSock = accept(waitSocket, &otherSocket, &otherSize);
  1595.         if ( otherSock == -1 ) {
  1596.         fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
  1597.         exit(1);
  1598.         }
  1599.  
  1600.         /* should we verify this is decode server? */
  1601.         /* for now, we won't */
  1602.  
  1603.         close(otherSock);
  1604.  
  1605.     close(waitSocket);
  1606.     } else {
  1607.     close(clientSocket);
  1608.     }
  1609.  
  1610.     if ( debugSockets ) {
  1611.     fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
  1612.     }
  1613. }
  1614.  
  1615.  
  1616. /*===========================================================================*
  1617.  *
  1618.  * CreateListeningSocket
  1619.  *
  1620.  *    create a socket, using the first unused port number we can find
  1621.  *
  1622.  * RETURNS:    the socket; portNumber is modified appropriately
  1623.  *
  1624.  * SIDE EFFECTS:    none
  1625.  *
  1626.  *===========================================================================*/
  1627. static int
  1628. CreateListeningSocket(portNumber)
  1629.     int *portNumber;
  1630. {
  1631.     int        resultSocket;
  1632.     u_short tempShort;
  1633.     int        result;
  1634.     struct sockaddr_in    nameEntry;
  1635.  
  1636.     resultSocket = socket(AF_INET, SOCK_STREAM, 0);
  1637.     if ( resultSocket == -1 ) {
  1638.     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
  1639.     exit(1);
  1640.     }
  1641.  
  1642.     bzero((char *) &nameEntry, sizeof(nameEntry));
  1643.     nameEntry.sin_family = AF_INET;
  1644.  
  1645.     /* find a port number that isn't used */
  1646.     (*portNumber) = 2048;
  1647.     do {
  1648.     (*portNumber)++;
  1649.     tempShort = (*portNumber);
  1650.     nameEntry.sin_port = htons(tempShort);
  1651.     result = bind(resultSocket, (struct sockaddr *) &nameEntry,
  1652.               sizeof(struct sockaddr));
  1653.     }
  1654.     while ( result == -1 );
  1655.  
  1656.     /* would really like to wait for 1+numMachines machines, but this is max
  1657.      * allowable, unfortunately
  1658.      */
  1659.     result = listen(resultSocket, SOMAXCONN);
  1660.     if ( result == -1 ) {
  1661.     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
  1662.     exit(1);
  1663.     }
  1664.  
  1665.     return resultSocket;
  1666. }
  1667.  
  1668.  
  1669. /*===========================================================================*
  1670.  *
  1671.  * ConnectToSocket
  1672.  *
  1673.  *    creates a socket and connects it to the specified socket
  1674.  *    hostEnt either is the host entry, or is NULL and needs to be
  1675.  *    found by using machineName
  1676.  *
  1677.  * RETURNS:    the socket
  1678.  *
  1679.  * SIDE EFFECTS:    none
  1680.  *
  1681.  *===========================================================================*/
  1682. static int
  1683. ConnectToSocket(machineName, portNum, hostEnt)
  1684.     char *machineName;
  1685.     int    portNum;
  1686.     struct hostent **hostEnt;
  1687. {
  1688.     int    resultSocket;
  1689.     int        result;
  1690.     u_short        tempShort;
  1691.     struct sockaddr_in  nameEntry;
  1692.  
  1693.     if ( (*hostEnt) == NULL ) {
  1694.     (*hostEnt) = gethostbyname(machineName);
  1695.     if ( (*hostEnt) == NULL ) {
  1696.         fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
  1697.             machineName);
  1698.         exit(1);
  1699.     }
  1700.     }
  1701.  
  1702.     resultSocket = socket(AF_INET, SOCK_STREAM, 0);
  1703.     if ( resultSocket == -1 ) {
  1704.     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
  1705.     exit(1);
  1706.     }
  1707.  
  1708.     nameEntry.sin_family = AF_INET;
  1709.     bzero((char *)nameEntry.sin_zero, 8);
  1710.     bcopy((char *) (*hostEnt)->h_addr_list[0],
  1711.       (char *) &(nameEntry.sin_addr.s_addr),
  1712.       (size_t) (*hostEnt)->h_length);
  1713.     tempShort = portNum;
  1714.     nameEntry.sin_port = htons(tempShort);
  1715.  
  1716.     result = connect(resultSocket, (struct sockaddr *) &nameEntry,
  1717.              sizeof(struct sockaddr));
  1718.     if ( result == -1 ) {
  1719.     fprintf(stderr, "ERROR:  connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
  1720.         portNum, getenv("HOST"), errno);
  1721.     exit(1);
  1722.     }
  1723.  
  1724.     return resultSocket;
  1725. }
  1726.     
  1727.  
  1728. void
  1729. SendDecodedFrame(frame)
  1730.     MpegFrame *frame;
  1731. {
  1732.     int    clientSocket;
  1733.     register int y;
  1734.     int        negativeTwo = -2;
  1735.     uint32  data;
  1736.  
  1737.     /* send to IOServer */
  1738.     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
  1739.  
  1740.     data = negativeTwo;
  1741.     data = htonl(data);
  1742.     SafeWrite(clientSocket, (char *)&data, 4);
  1743.  
  1744.     data = frame->id;
  1745.     data = htonl(data);
  1746.     SafeWrite(clientSocket, (char *)&data, 4);
  1747.  
  1748.     for ( y = 0; y < Fsize_y; y++ ) {
  1749.         SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
  1750.     }
  1751.  
  1752.     for (y = 0; y < Fsize_y / 2; y++) {            /* U */
  1753.         SafeWrite(clientSocket, (char *)frame->decoded_cb[y], Fsize_x / 2);
  1754.     }
  1755.  
  1756.     for (y = 0; y < Fsize_y / 2; y++) {            /* V */
  1757.         SafeWrite(clientSocket, (char *)frame->decoded_cr[y], Fsize_x / 2);
  1758.     }
  1759.  
  1760.     close(clientSocket);
  1761. }
  1762.  
  1763.  
  1764. void
  1765. GetRemoteDecodedRefFrame(frame, frameNumber)
  1766.     MpegFrame *frame;
  1767.     int frameNumber;
  1768. {
  1769.     int    clientSocket;
  1770.     register int y;
  1771.     int        negativeThree = -3;
  1772.     uint32  data;
  1773.  
  1774.     /* send to IOServer */
  1775.     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
  1776.  
  1777.     /* ask IOServer for decoded frame */
  1778.     data = negativeThree;
  1779.     data = htonl(data);
  1780.     SafeWrite(clientSocket, (char *)&data, 4);
  1781.  
  1782.     data = frame->id;
  1783.     data = htonl(data);
  1784.     SafeWrite(clientSocket, (char *)&data, 4);
  1785.  
  1786.     for ( y = 0; y < Fsize_y; y++ ) {
  1787.         SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
  1788.     }
  1789.  
  1790.     for (y = 0; y < Fsize_y / 2; y++) {            /* U */
  1791.         SafeRead(clientSocket, (char *)frame->decoded_cb[y], Fsize_x / 2);
  1792.     }
  1793.  
  1794.     for (y = 0; y < Fsize_y / 2; y++) {            /* V */
  1795.         SafeRead(clientSocket, (char *)frame->decoded_cr[y], Fsize_x / 2);
  1796.     }
  1797.  
  1798.     close(clientSocket);
  1799.     
  1800. }
  1801.