home *** CD-ROM | disk | FTP | other *** search
/ Liren Large Software Subsidy 7 / 07.iso / c / c100 / 3.ddi / TASK.ZIP / PIPE.CPP < prev    next >
Encoding:
C/C++ Source or Header  |  1989-08-28  |  4.5 KB  |  154 lines

  1. /*****************************************************
  2. File: PIPE.CPP      Copyright 1989 by Dlugosz Software
  3.    Pipes for the C++ multitasking system
  4. *****************************************************/
  5.  
  6. #include "usual.hpp"
  7. #include "task.hpp"
  8. #include "sem.hpp"
  9. #include "pipe.hpp"
  10. #include <string.h>
  11.    //need memcpy()
  12.  
  13. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  14.  
  15. pipe::pipe (void* buf, unsigned size)
  16. : suspend(0)
  17. {
  18. buffer= buf;
  19. bufsize= room= size;
  20. dynamic= FALSE;
  21. writepos= readpos= 0;
  22. }
  23.  
  24. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  25.  
  26. pipe::pipe (unsigned size)
  27. : suspend(0)
  28. {
  29. buffer= new byte[size];
  30. bufsize= room= size;
  31. dynamic= TRUE;
  32. writepos= readpos= 0;
  33. }
  34.  
  35. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  36.  
  37. pipe::~pipe()
  38. {
  39. /* I've got to be careful destroying a pipe.  Another task may be using
  40.    the pipe at this moment.  So, I need an exclusine lock.  Any other
  41.    task using the pipe will be waiting at a semaphore, which will then
  42.    return errors. */
  43. if (! --in_use) return;  //already destroyed
  44. if (dynamic) delete buffer;
  45. // semaphores destroyed automatically
  46. }
  47.  
  48. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  49.  
  50. bool pipe::send (void* data, unsigned size)
  51. {
  52. if (! --writer) return FALSE;  //get exclusive access to write
  53. /* I am now the only writer in the system, but someone may be
  54.    reading at this very moment.  */
  55. if (! --in_use) return FALSE;
  56.    // I am now the only one using this pipe.
  57. while (size) {  //stuff left to be written
  58.    if (size <= room) {  //it all fits
  59.       put (data, size);
  60.       if (int(suspend) < 0) suspend++;
  61.          //tell any suspended reader to continue
  62.       in_use++;  //signal any waiting reader to go ahead
  63.       writer++;  //let the next writer in
  64.       return TRUE;  //mission complete!
  65.       }
  66.    else {  //have to do it piece-wise, co-ordinating with a reader
  67.       if (room) {
  68.          unsigned temp= room;  //put() changes room.
  69.          put (data, room);  //as much as will fit
  70.          size -= temp;
  71.          data= temp + (byte*)data;  //where I left off
  72.          if (int(suspend) < 0) suspend++;
  73.          }
  74.       in_use++;   //signal any waiting reader to go ahead
  75.       if (!suspend--) return FALSE;  //wait for signal from reader to continue
  76.       // and loop back to the beginning
  77.       }
  78.    }
  79. }
  80.  
  81. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  82.  
  83. bool pipe::receive (void* buffer, unsigned size)
  84. {
  85. if (! --reader) return FALSE;  //get exclusive access to read
  86. if (! --in_use) return FALSE;
  87.    // I am now the only one using this pipe
  88. while (size) {
  89.    unsigned stuff= bufsize - room;  //how much is in the buffer
  90.    if (stuff >= size) {  //enough available
  91.       get (buffer, size);
  92.       if (int(suspend) < 0) suspend++;
  93.          //tell any suspended writer to continue
  94.       in_use++;  //signal any waiting writer to go ahead
  95.       reader++;  //let the next reader in
  96.       return TRUE;
  97.       }
  98.    else {  //do it piece-wise
  99.       if (stuff) {
  100.          get (buffer, stuff);  //take everything
  101.          size -= stuff;
  102.          buffer= stuff + (byte*) buffer;
  103.          if (int(suspend) < 0) suspend++;
  104.          }
  105.       in_use++;  //signal waiting writer to go ahead with more
  106.       if (!suspend--) return FALSE; //wait for signal from writer
  107.       //loop back to read more
  108.       }
  109.    }
  110. }
  111.  
  112. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  113. /*     primitive circular queue functions   */
  114. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  115.  
  116. void pipe::get (void* buf, unsigned size)
  117. {
  118. /* get 'size' bytes from the pipe's buffer.  It does not need a
  119.    return value since the caller (receive()) never askes for more
  120.    then there is. */
  121. unsigned firsthalf= bufsize-readpos;    //how much before 'wrap around'
  122. if (firsthalf < size) {  //I'll need to wrap around
  123.    //get to the end first
  124.    memcpy (buf, buffer+readpos, firsthalf);
  125.    buf= firsthalf+(byte*)buf;
  126.    size -= firsthalf;
  127.    room += firsthalf;
  128.    readpos= 0;  //wrap to the beginning
  129.    }
  130. memcpy (buf, buffer+readpos, size);
  131. room += size;
  132. readpos += size;
  133. }
  134.  
  135. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  136.  
  137. void pipe::put (void* buf, unsigned size)
  138. {
  139. unsigned firsthalf= bufsize-writepos;    //how much before 'wrap around'
  140. if (firsthalf < size) {  //I'll need to wrap around
  141.    memcpy (buffer+writepos, buf, firsthalf);
  142.    buf= firsthalf+(byte*)buf;
  143.    size -= firsthalf;
  144.    room -= firsthalf;
  145.    writepos= 0;
  146.    }
  147. memcpy (buffer+writepos, buf, size);
  148. room -= size;
  149. writepos += size;
  150. }
  151.  
  152. /* /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ */
  153.  
  154.