home *** CD-ROM | disk | FTP | other *** search
/ Chip 2000 October / Chip_2000-10_cd1.bin / zkuste / Delphi / navody / multithread / mchpipethreads.pas < prev    next >
Pascal/Delphi Source File  |  1999-05-10  |  7KB  |  240 lines

  1. { 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
  2.    according to Delphi guidelines. }
  3. { 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
  4.    Names }
  5. unit MCHPipeThreads;
  6.  
  7. {Martin Harvey 7/11/98}
  8.  
  9. {This unit gives us a base pipe thread type with some common support for
  10. error tracking}
  11.  
  12. interface
  13.  
  14. uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;
  15.  
  16. type
  17.   TMCHPipeThread = class(TThread)
  18.   private
  19.     FOnTerminate:TNotifyEvent;
  20.   protected
  21.     FTermReason:TMCHError;
  22.   public
  23.     procedure Execute;override;
  24.   published
  25.     property TermReason:TMCHError read FTermReason;
  26.     property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
  27.   end;
  28.  
  29.   TMCHPipeWriterThread = class(TMCHPipeThread)
  30.   private
  31.     FDataMutex,FIdleSemaphore:THandle;
  32.     FPipeWriteHandle:TMCHHandle;
  33.     FData:TMCHMemoryStream;
  34.     FWriteIdx:integer;
  35.   protected
  36.   public
  37.     constructor Create(CreateSuspended:boolean);
  38.     procedure Execute;override;
  39.     destructor Destroy;override;
  40.     function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
  41.     property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
  42.   end;
  43.  
  44.   TMCHPipeReaderThread = class(TMCHPipeThread)
  45.   private
  46.       { Private declarations }
  47.     FDataMutex:THandle;
  48.     FPipeReadHandle:TMCHHandle;
  49.     FData:TMCHMemoryStream;
  50.     FOnDataRecieved:TNotifyEvent;
  51.     FOnConnect:TNotifyEvent;
  52.   protected
  53.   public
  54.     constructor Create(CreateSuspended:boolean);
  55.     procedure Execute;override;
  56.     destructor Destroy;override;
  57.     function ReadData(OutStream:TStream):integer; {returns bytes read}
  58.     property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
  59.     property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
  60.     property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
  61.   end;
  62.  
  63. implementation
  64.  
  65. uses MCHPipeInterface2;
  66.  
  67. const
  68.   BufSize = 4096;
  69.  
  70. type
  71.   DataBuf = array[0..BufSize - 1] of integer;
  72.  
  73. procedure TMCHPipeThread.Execute;
  74. begin
  75.   if Assigned(FOnTerminate) then FOnTerminate(Self);
  76. end;
  77.  
  78. constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
  79. begin
  80.   inherited Create(CreateSuspended);
  81.   FDataMutex := CreateMutex(nil,false,nil);
  82.   FData := TMCHMemoryStream.Create;
  83. end;
  84.  
  85. destructor TMCHPipeReaderThread.Destroy;
  86. begin
  87.   Terminate;
  88.   if Suspended then Resume;
  89.   WaitFor;
  90.   FData.Free;
  91.   CloseHandle(FDataMutex);
  92.   inherited Destroy;
  93. end;
  94.  
  95. function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;
  96.  
  97. begin
  98.   WaitForSingleObject(FDataMutex,INFINITE);
  99.   try
  100.     OutStream.Seek(0,soFromEnd);
  101.     FData.Seek(0,soFromBeginning);
  102.     Result := FData.Size;
  103.     OutStream.CopyFrom(FData,FData.Size);
  104.     FData.Clear;
  105.   finally
  106.     ReleaseMutex(FDataMutex);
  107.   end;
  108. end;
  109.  
  110. procedure TMCHPipeReaderThread.Execute;
  111.  
  112. var
  113.   Buffer:DataBuf;
  114.   BytesToRead,BytesThisTime:integer;
  115.  
  116. begin
  117.   FTermReason := WaitForPeer(FPipeReadHandle);
  118.   if FTermReason <> meOK then
  119.     terminate
  120.   else
  121.     if Assigned(FOnConnect) then FOnConnect(Self);
  122.   while not terminated do
  123.   begin
  124.     FTermReason := PeekData(FPipeReadHandle,BytesToRead);
  125.     if FTermReason <> meOK then
  126.       terminate;
  127.     if (not terminated) then
  128.     begin
  129.       if BytesToRead <= 0 then
  130.       begin
  131.         {Callback handler should implement lazy async notification}
  132.         if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
  133.         BytesToRead := 1;
  134.       end;
  135.       if BytesToRead > BufSize then
  136.         BytesThisTime := BufSize
  137.       else
  138.         BytesThisTime := BytesToRead;
  139.       FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
  140.       if FTermReason <> meOK then
  141.         terminate
  142.       else
  143.       begin
  144.         WaitForSingleObject(FDataMutex,INFINITE);
  145.         FData.Seek(0,soFromEnd);
  146.         FData.WriteBuffer(Buffer,BytesThisTime);
  147.         ReleaseMutex(FDataMutex);
  148.       end;
  149.     end;
  150.   end;
  151.   inherited Execute;
  152. end;
  153.  
  154. constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
  155. begin
  156.   inherited Create(CreateSuspended);
  157.   FDataMutex := CreateMutex(nil,false,nil);
  158.   FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
  159.   FData := TMCHMemoryStream.Create;
  160. end;
  161.  
  162. destructor TMCHPipeWriterThread.Destroy;
  163. begin
  164.   Terminate;
  165.   ReleaseSemaphore(FIdleSemaphore,1,nil);
  166.   if Suspended then Resume;
  167.   WaitFor;
  168.   FData.Free;
  169.   CloseHandle(FDataMutex);
  170.   inherited Destroy;
  171. end;
  172.  
  173. function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;
  174.  
  175. begin
  176.   InStream.Seek(0,soFromBeginning);
  177.   WaitForSingleObject(FDataMutex,INFINITE);
  178.   try
  179.     Result := InStream.Size;
  180.     FData.Seek(0,soFromEnd);
  181.     FData.CopyFrom(InStream,InStream.Size);
  182.   finally
  183.     ReleaseMutex(FDataMutex);
  184.   end;
  185.   ReleaseSemaphore(FIdleSemaphore,1,nil);
  186. end;
  187.  
  188. procedure TMCHPipeWriterThread.Execute;
  189.  
  190. var
  191.   Buf:DataBuf;
  192.   BytesThisTime,BytesToWrite:integer;
  193.  
  194. begin
  195.   while not (terminated) do
  196.   begin
  197.     WaitForSingleObject(FDataMutex,INFINITE);
  198.     BytesToWrite := FData.Size - FWriteIdx;
  199.     ReleaseMutex(FDataMutex);
  200.     while (BytesToWrite > 0) and (not terminated) do
  201.     begin
  202.       if BytesToWrite > BufSize then
  203.         BytesThisTime := BufSize
  204.       else
  205.         BytesThisTime := BytesToWrite;
  206.       WaitForSingleObject(FDataMutex,INFINITE);
  207.       FData.Seek(FWriteIdx,soFromBeginning);
  208.       FData.ReadBuffer(Buf,BytesThisTime);
  209.       ReleaseMutex(FDataMutex);
  210.       {Note that we should not block when we have the mutex!}
  211.       FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
  212.       if (FTermReason = meOK) then
  213.       begin
  214.         BytesToWrite := BytesToWrite - BytesThisTime;
  215.         FWriteIdx := FWriteIdx + BytesThisTime;
  216.       end
  217.       else
  218.         terminate;
  219.     end;
  220.     if (not (terminated)) then
  221.     begin
  222.       WaitForSingleObject(FDataMutex,INFINITE);
  223.       {Cannot be sure that the stream hasn't been written to in the meantime!}
  224.       {When the expression below is false, the wait on the idle semaphore
  225.        will not block, so the stream should not get unnecessarily large}
  226.       if FWriteIdx = FData.Size then
  227.       begin
  228.         FData.Clear;
  229.         FWriteIdx := 0;
  230.       end;
  231.       ReleaseMutex(FDataMutex);
  232.       WaitForSingleObject(FIdleSemaphore,INFINITE);
  233.     end;
  234.   end;
  235.   inherited Execute;
  236. end;
  237.  
  238. end.
  239.  
  240.