home *** CD-ROM | disk | FTP | other *** search
/ back2roots/padua / padua.7z / padua / uucp / duucp-1.17 / AU-117b4-src.lha / src / fifolib / fifo.c < prev    next >
Encoding:
C/C++ Source or Header  |  1993-12-25  |  10.2 KB  |  505 lines

  1.  
  2. /*
  3.  *  FIFO.C
  4.  */
  5.  
  6. #include "defs.h"
  7.  
  8. void ReCalcReaderIdx(Fifo *);
  9. void SignalEof(Fifo *);
  10. void FixFiFlags(Fifo *);
  11. LibCall void RequestFifo(FHan *, Message *, long);
  12. void SetEOF(Fifo *);
  13. long AvailWBufSpace(Fifo *);
  14.  
  15. __stkargs long BitTestSet(char *, short);
  16.  
  17. /*
  18.  *  Open up a new fifo by name.
  19.  */
  20.  
  21. LibCall FHan *
  22. OpenFifo(name, bytes, flags)
  23. char *name;
  24. long bytes;
  25. long flags;
  26. {
  27.     Fifo *fifo;
  28.     FHan *fhan;
  29.  
  30.     {                /*  bytes a power of 2? */
  31.     unsigned long i = 8;
  32.     while (i) {
  33.         if (bytes == i)
  34.         break;
  35.         i = i << 1;
  36.     }
  37.     if (i == 0)
  38.         return(NULL);
  39.     }
  40.     if (fhan = AllocMem(sizeof(FHan), MEMF_PUBLIC | MEMF_CLEAR)) {
  41.     Forbid();
  42.     if (fifo = (Fifo *)FindName((MaxList *)&FifoList, name)) {
  43.         ;
  44.     } else {
  45.         if (fifo = AllocMem(sizeof(Fifo) - sizeof(fifo->fi_Buf) + bytes + strlen(name) + 1, MEMF_PUBLIC | MEMF_CLEAR)) {
  46.         AddTail((MaxList *)&FifoList, &fifo->fi_Node);
  47.         NewList((MaxList *)&fifo->fi_HanList);
  48.         NewList((MaxList *)&fifo->fi_WrNotify);
  49.         NewList((MaxList *)&fifo->fi_RdNotify);
  50.         fifo->fi_BufSize = bytes;
  51.         fifo->fi_BufMask = bytes - 1;
  52.         fifo->fi_Node.ln_Name = (char *)fifo + (sizeof(Fifo) - sizeof(fifo->fi_Buf)) + bytes;
  53.         strcpy(fifo->fi_Node.ln_Name, name);
  54.         InitSemaphore(&fifo->fi_SigSem);
  55.         }
  56.     }
  57.     if (fifo) {
  58.         AddTail((MaxList *)&fifo->fi_HanList, (MaxNode *)&fhan->fh_Node);
  59.         fhan->fh_Fifo = fifo;
  60.         fhan->fh_Flags = flags;
  61.  
  62.         fhan->fh_Msg.mn_ReplyPort = &fhan->fh_Port;
  63.  
  64.         fhan->fh_Port.mp_Node.ln_Type = NT_MSGPORT;
  65.         fhan->fh_Port.mp_Flags = PA_SIGNAL;
  66.         fhan->fh_Port.mp_SigBit = SIGB_SINGLE;
  67.  
  68.         NewList(&fhan->fh_Port.mp_MsgList);
  69.  
  70.         if (flags & FIFOF_READ) {
  71.         ++fifo->fi_RRefs;
  72.         fhan->fh_RIdx = fifo->fi_RIdx;
  73.         }
  74.         if (flags & FIFOF_WRITE) {
  75.         ++fifo->fi_WRefs;
  76.         }
  77.         /*
  78.         if (flags & FIFOF_CLOSEOF)
  79.         fifo->fi_Flags |= FIFOF_CLOSEOF;
  80.         */
  81.         if (flags & FIFOF_KEEPIFD)
  82.         fifo->fi_Flags |= FIFOF_KEEPIFD;
  83.         if (flags & FIFOF_RREQUIRED)
  84.         fifo->fi_Flags |= FIFOF_RREQUIRED;
  85.         ++fifo->fi_ORefs;
  86.         FixFiFlags(fifo);
  87.     } else {
  88.         FreeMem(fhan, sizeof(FHan));
  89.         fhan = NULL;
  90.     }
  91.     Permit();
  92.     }
  93.     return(fhan);
  94. }
  95.  
  96. /*
  97.  *  Close a previously openned fifo
  98.  */
  99.  
  100. LibCall void
  101. CloseFifo(fhan, flags)
  102. FHan *fhan;
  103. long flags;
  104. {
  105.     Fifo *fifo;
  106.  
  107.     Forbid();
  108.     if (fifo = fhan->fh_Fifo) {
  109.     fhan->fh_Fifo = NULL;    /*  try to catch duplicate closes   */
  110.     Remove((MaxNode *)&fhan->fh_Node);
  111.  
  112.     --fifo->fi_ORefs;
  113.  
  114.     if (fhan->fh_Flags & FIFOF_WRITE) {
  115.         if (flags & FIFOF_EOF)
  116.         fifo->fi_Flags |= FIFOF_CLOSEOF;
  117.         if (--fifo->fi_WRefs == 0) {
  118.         if (fifo->fi_Flags & FIFOF_CLOSEOF) {
  119.             SetEOF(fifo);
  120.         }
  121.         }
  122.     }
  123.     if (fhan->fh_Flags & FIFOF_READ) {
  124.         --fifo->fi_RRefs;
  125.  
  126.         if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
  127.         struct Message
  128.             *msg;
  129.  
  130.         while (msg = (struct Message *) RemHead ((struct List *) &fifo->fi_WrNotify))
  131.             ReplyMsg (msg);
  132.         }
  133.     }
  134.  
  135.     if (fifo->fi_ORefs == 0) {
  136.         if ((fifo->fi_Flags & FIFOF_KEEPIFD) == 0 || fifo->fi_RIdx == fifo->fi_WIdx) {
  137.         Remove(&fifo->fi_Node);
  138.         FreeMem(fifo, sizeof(Fifo) - sizeof(fifo->fi_Buf) + fifo->fi_BufSize + strlen(fifo->fi_Node.ln_Name) + 1);
  139.         }
  140.     } else {
  141.         if (fhan->fh_Flags & FIFOF_WRITE)
  142.         SignalEof(fifo);
  143.         if (fhan->fh_Flags & FIFOF_READ)
  144.         ReCalcReaderIdx(fifo);
  145.         FixFiFlags(fifo);
  146.     }
  147.     FreeMem(fhan, sizeof(FHan));
  148.     }
  149.     Permit();
  150. }
  151.  
  152. /*
  153.  *  Read from a fifo.  Block if necessary (and not FIFOF_NBIO)
  154.  */
  155.  
  156. LibCall long
  157. ReadFifo(fhan, pbuf, skip)
  158. FHan *fhan;
  159. char **pbuf;
  160. long skip;
  161. {
  162.     long n;
  163.     Fifo *fifo = fhan->fh_Fifo;
  164.  
  165.     /*
  166.      *    attempt to clear <skip> bytes
  167.      */
  168.  
  169.     while (skip > 0) {
  170.     long widx = fifo->fi_WIdx;    /*  snapshot widx   */
  171.     long ridx = fhan->fh_RIdx;
  172.     long len;
  173.  
  174.     if (ridx <= widx) {
  175.         len = widx - ridx;
  176.     } else {
  177.         len = fifo->fi_BufSize - ridx;
  178.     }
  179.     if (len == 0)
  180.         break;
  181.     if (len > skip)
  182.         len = skip;
  183.     n += len;
  184.     skip -= len;
  185.  
  186.     Forbid();
  187.     fhan->fh_RIdx = (ridx + len) & fifo->fi_BufMask;
  188.     if (ridx == fifo->fi_RIdx)
  189.         ReCalcReaderIdx(fifo);    /*  update mast-idx/writer-waiters */
  190.     Permit();
  191.     }
  192.  
  193.     /*
  194.      *    return available data
  195.      */
  196.  
  197.     for (;;) {
  198.     long widx = fifo->fi_WIdx;    /*  snapshot widx   */
  199.     long ridx = fhan->fh_RIdx;
  200.  
  201.     if (ridx <= widx) {
  202.         n = widx - ridx;
  203.     } else {
  204.         n = fifo->fi_BufSize - ridx;
  205.     }
  206.     *pbuf = fifo->fi_Buf + ridx;
  207.     if (n == 0) {
  208.         /*
  209.          *    EOF on a per-handle basis since it gets cleared after EOF
  210.          *    is returned.
  211.          */
  212.  
  213.         if ((fhan->fh_Flags & FIFOF_EOF) || (fifo->fi_Flags & FIFOF_EOF)) {
  214.         /*fhan->fh_Flags &= ~FIFOF_EOF;*/
  215.         n = -1;
  216.         break;
  217.         }
  218.         if ((fhan->fh_Flags & FIFOF_NBIO) == 0) {
  219.         fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
  220.         RequestFifo(fhan, &fhan->fh_Msg, FREQ_RPEND);
  221.         WaitPort(&fhan->fh_Port);
  222.         Remove((MaxNode *)&fhan->fh_Msg);
  223.         continue;
  224.         }
  225.     }
  226.     break;
  227.     }
  228.     return(n);
  229. }
  230.  
  231. /*
  232.  *  Write to a fifo
  233.  */
  234.  
  235. LibCall long
  236. WriteFifo(fhan, buf, bytes)
  237. FHan *fhan;
  238. char *buf;
  239. long bytes;
  240. {
  241.     long n = -1;
  242.     short wsigchk = 0;
  243.     short normal  = 0;
  244.     Fifo *fifo = fhan->fh_Fifo;
  245.  
  246.     if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED))
  247.     return(-1);
  248.  
  249.     if (bytes < 0 || bytes > (fifo->fi_BufSize >> 1))
  250.     return(-2);
  251.  
  252.     Forbid();
  253.  
  254.     /*
  255.      *    A normal FIFO uses fi_SigSem
  256.      *    A non-normal FIFO cannot afford to block and uses fi_Lock
  257.      */
  258.  
  259.     if (fifo->fi_Flags & FIFOF_WRNORM)
  260.     normal = 1;
  261.  
  262.     if (normal) {
  263.     ObtainSemaphore(&fifo->fi_SigSem);
  264.     } else if (BitTestSet(&fifo->fi_Lock, 0) != 0) {
  265.     Permit();
  266.     return(n);
  267.     }
  268.  
  269.     {
  270.     n = 0;
  271.  
  272.     for (;;) {
  273.         while (bytes) {
  274.         long ridx = fifo->fi_RIdx;    /*  snapshot ridx   */
  275.         long len;
  276.  
  277.         if (fifo->fi_WIdx < ridx) {
  278.             len = ridx - fifo->fi_WIdx;
  279.             if (len <= bytes)        /*  FIFO FULL        */
  280.             break;
  281.         } else {
  282.             len = fifo->fi_BufSize - fifo->fi_WIdx;
  283.             if (len + ridx <= bytes)    /*  FIFO FULL        */
  284.             break;
  285.         }
  286.         if (len > bytes)
  287.             len = bytes;
  288.         movmem(buf, fifo->fi_Buf + fifo->fi_WIdx, len);
  289.         buf += len;
  290.         n += len;
  291.         bytes -= len;
  292.         fifo->fi_WIdx = (fifo->fi_WIdx + len) & fifo->fi_BufMask;
  293.         if (fhan->fh_Flags & FIFOF_NORMAL)
  294.             wsigchk = 1;
  295.         }
  296.  
  297.         /*
  298.          *    if fifo is full and NBIO not set, then block and loop
  299.          */
  300.         if (bytes && !(fhan->fh_Flags & FIFOF_NBIO)) {
  301.         fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
  302.         RequestFifo(fhan, &fhan->fh_Msg, FREQ_WAVAIL);
  303.         WaitPort(&fhan->fh_Port);
  304.         Remove((MaxNode *)&fhan->fh_Msg);
  305.         continue;
  306.         }
  307.         break;
  308.     }
  309.  
  310.     /*
  311.      *  check for any blocked readers, data is probably now available
  312.      *  so wake them up.
  313.      */
  314.  
  315.     if (wsigchk && fifo->fi_RdNotify.mlh_Head->mln_Succ) {
  316.         struct Message
  317.         *msg;
  318.  
  319.         while (msg = (struct Message *) RemHead ((struct List *) &fifo->fi_RdNotify))
  320.         ReplyMsg (msg);
  321.     }
  322.  
  323.     if (normal)
  324.         ReleaseSemaphore (&fifo->fi_SigSem);
  325.     else
  326.         fifo->fi_Lock = 0;
  327.     }
  328.     Permit();
  329.     return(n);
  330. }
  331.  
  332. LibCall long
  333. BufSizeFifo (FHan *fhan)
  334. {
  335.     return(fhan->fh_Fifo->fi_BufSize);
  336. }
  337.  
  338.  
  339. /*
  340.  *  Calculate distance between fh->fh_RIdx and fifo->fi_RIdx, shortest
  341.  *  wins.  If none found then nothing changes due to initial best set
  342.  *  to exactly the buffer size.
  343.  *
  344.  *  If the master index is modified and writers are waiting, signal them.
  345.  *
  346.  *  Called while Forbid() or otherwise reader-lockedout
  347.  */
  348.  
  349. void
  350. ReCalcReaderIdx (Fifo *fifo)
  351. {
  352.     FHan *fh;
  353.     long bestLen = fifo->fi_BufSize;
  354.     long ridx;
  355.  
  356.     for (fh = (FHan *) fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = (FHan *) fh->fh_Node.mln_Succ) {
  357.     if (fh->fh_Flags & FIFOF_READ) {
  358.         long len = (fh->fh_RIdx - fifo->fi_RIdx) & fifo->fi_BufMask;
  359.         if (len < bestLen)
  360.         bestLen = len;
  361.     }
  362.     }
  363.  
  364.     ridx = (fifo->fi_RIdx + bestLen) & fifo->fi_BufMask;
  365.  
  366.     /*
  367.      *    more buffer space available, wakeup writers?
  368.      */
  369.  
  370.     if (ridx != fifo->fi_RIdx) {
  371.     fifo->fi_RIdx = ridx;
  372.  
  373.     if (fifo->fi_WrNotify.mlh_Head->mln_Succ) {
  374.         struct Message
  375.             *msg;
  376.  
  377.         if (AvailWBufSpace (fifo) >= (fifo->fi_BufSize >> 1)) {
  378.             while (msg = (struct Message *) RemHead ((struct List *) &fifo->fi_WrNotify))
  379.                 ReplyMsg (msg);
  380.         }
  381.     }
  382.     }
  383. }
  384.  
  385. /*
  386.  *  signal EOF to any blocked readers
  387.  */
  388.  
  389. void
  390. SignalEof (Fifo *fifo)
  391. {
  392.     if (fifo->fi_Flags & FIFOF_EOF) {
  393.         struct Message
  394.             *msg;
  395.  
  396.         Forbid ();
  397.         while (msg = (struct Message *) RemHead ((struct List *) &fifo->fi_RdNotify))
  398.             ReplyMsg (msg);
  399.         Permit ();
  400.     }
  401. }
  402.  
  403. /*
  404.  *  SetEOF()
  405.  */
  406.  
  407. void
  408. SetEOF(fifo)
  409. Fifo *fifo;
  410. {
  411.     FHan *fh;
  412.  
  413.     for (fh = (FHan *) fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = (FHan *) fh->fh_Node.mln_Succ)
  414.     fh->fh_Flags |= FIFOF_EOF;
  415.     fifo->fi_Flags |= FIFOF_EOF;
  416. }
  417.  
  418. /*
  419.  *  FIXME, if state change occurs in master fifo, must unblock anybody
  420.  *  blocked due to previous information. XXX
  421.  */
  422.  
  423. void
  424. FixFiFlags(fifo)
  425. Fifo *fifo;
  426. {
  427.     FHan *fh;
  428.     long rflags = FIFOF_RDNORM | FIFOF_WRNORM;
  429.  
  430.     fifo->fi_Flags &= ~rflags;
  431.     for (fh = (FHan *) fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = (FHan *) fh->fh_Node.mln_Succ) {
  432.     if ((fh->fh_Flags & FIFOF_NORMAL) == 0) {
  433.         if (fh->fh_Flags & FIFOF_READ)
  434.         rflags &= ~FIFOF_RDNORM;
  435.         if (fh->fh_Flags & FIFOF_WRITE)
  436.         rflags &= ~FIFOF_WRNORM;
  437.     }
  438.     }
  439.     fifo->fi_Flags |= rflags;
  440. }
  441.  
  442. /*
  443.  *  request message on event.  Returns message immediately if event already
  444.  *  satisfied.
  445.  */
  446.  
  447. LibCall void
  448. RequestFifo(fhan, msg, req)
  449. FHan *fhan;
  450. Message *msg;
  451. long req;
  452. {
  453.     Fifo *fifo = fhan->fh_Fifo;
  454.  
  455.     Forbid();
  456.  
  457.     switch(req) {
  458.     case FREQ_RPEND:
  459.     if ((fhan->fh_Flags & FIFOF_EOF) || fhan->fh_RIdx != fifo->fi_WIdx) {
  460.         ReplyMsg(msg);
  461.     } else {
  462.         msg->mn_Node.ln_Type = NT_MESSAGE;
  463.         AddTail((MaxList *)&fifo->fi_RdNotify, &msg->mn_Node);
  464.     }
  465.     break;
  466.     case FREQ_WAVAIL:
  467.     /*
  468.      *  determine available buffer space, alert if more than 1/2 empty.
  469.      *
  470.      *  check for broken pipe
  471.      */
  472.  
  473.     if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
  474.         ReplyMsg(msg);
  475.     } else if (AvailWBufSpace(fifo) >= (fifo->fi_BufSize >> 1)) {
  476.         ReplyMsg(msg);
  477.     } else {
  478.         msg->mn_Node.ln_Type = NT_MESSAGE;
  479.         AddTail((MaxList *)&fifo->fi_WrNotify, &msg->mn_Node);
  480.     }
  481.     break;
  482.     case FREQ_ABORT:
  483.     if (msg->mn_Node.ln_Type == NT_MESSAGE) {   /*    if not returned */
  484.         Remove(&msg->mn_Node);            /*    return it    */
  485.         ReplyMsg(msg);
  486.     }
  487.     break;
  488.     }
  489.     Permit();
  490. }
  491.  
  492. long
  493. AvailWBufSpace(fifo)
  494. Fifo *fifo;
  495. {
  496.     long ridx = fifo->fi_RIdx;
  497.     long len;
  498.  
  499.     if (fifo->fi_WIdx < ridx)
  500.     len = ridx - fifo->fi_WIdx;
  501.     else
  502.     len = fifo->fi_BufSize - fifo->fi_WIdx + ridx - 1;
  503.     return(len);
  504. }
  505.