home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Chip 2000 October
/
Chip_2000-10_cd1.bin
/
zkuste
/
Delphi
/
navody
/
multithread
/
mchpipethreads.pas
< prev
next >
Wrap
Pascal/Delphi Source File
|
1999-05-10
|
7KB
|
240 lines
{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
according to Delphi guidelines. }
{ 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
Names }
unit MCHPipeThreads;
{Martin Harvey 7/11/98}
{This unit gives us a base pipe thread type with some common support for
error tracking}
interface
uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;
type
TMCHPipeThread = class(TThread)
private
FOnTerminate:TNotifyEvent;
protected
FTermReason:TMCHError;
public
procedure Execute;override;
published
property TermReason:TMCHError read FTermReason;
property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
end;
TMCHPipeWriterThread = class(TMCHPipeThread)
private
FDataMutex,FIdleSemaphore:THandle;
FPipeWriteHandle:TMCHHandle;
FData:TMCHMemoryStream;
FWriteIdx:integer;
protected
public
constructor Create(CreateSuspended:boolean);
procedure Execute;override;
destructor Destroy;override;
function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
end;
TMCHPipeReaderThread = class(TMCHPipeThread)
private
{ Private declarations }
FDataMutex:THandle;
FPipeReadHandle:TMCHHandle;
FData:TMCHMemoryStream;
FOnDataRecieved:TNotifyEvent;
FOnConnect:TNotifyEvent;
protected
public
constructor Create(CreateSuspended:boolean);
procedure Execute;override;
destructor Destroy;override;
function ReadData(OutStream:TStream):integer; {returns bytes read}
property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
end;
implementation
uses MCHPipeInterface2;
const
BufSize = 4096;
type
DataBuf = array[0..BufSize - 1] of integer;
procedure TMCHPipeThread.Execute;
begin
if Assigned(FOnTerminate) then FOnTerminate(Self);
end;
constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
begin
inherited Create(CreateSuspended);
FDataMutex := CreateMutex(nil,false,nil);
FData := TMCHMemoryStream.Create;
end;
destructor TMCHPipeReaderThread.Destroy;
begin
Terminate;
if Suspended then Resume;
WaitFor;
FData.Free;
CloseHandle(FDataMutex);
inherited Destroy;
end;
function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;
begin
WaitForSingleObject(FDataMutex,INFINITE);
try
OutStream.Seek(0,soFromEnd);
FData.Seek(0,soFromBeginning);
Result := FData.Size;
OutStream.CopyFrom(FData,FData.Size);
FData.Clear;
finally
ReleaseMutex(FDataMutex);
end;
end;
procedure TMCHPipeReaderThread.Execute;
var
Buffer:DataBuf;
BytesToRead,BytesThisTime:integer;
begin
FTermReason := WaitForPeer(FPipeReadHandle);
if FTermReason <> meOK then
terminate
else
if Assigned(FOnConnect) then FOnConnect(Self);
while not terminated do
begin
FTermReason := PeekData(FPipeReadHandle,BytesToRead);
if FTermReason <> meOK then
terminate;
if (not terminated) then
begin
if BytesToRead <= 0 then
begin
{Callback handler should implement lazy async notification}
if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
BytesToRead := 1;
end;
if BytesToRead > BufSize then
BytesThisTime := BufSize
else
BytesThisTime := BytesToRead;
FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
if FTermReason <> meOK then
terminate
else
begin
WaitForSingleObject(FDataMutex,INFINITE);
FData.Seek(0,soFromEnd);
FData.WriteBuffer(Buffer,BytesThisTime);
ReleaseMutex(FDataMutex);
end;
end;
end;
inherited Execute;
end;
constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
begin
inherited Create(CreateSuspended);
FDataMutex := CreateMutex(nil,false,nil);
FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
FData := TMCHMemoryStream.Create;
end;
destructor TMCHPipeWriterThread.Destroy;
begin
Terminate;
ReleaseSemaphore(FIdleSemaphore,1,nil);
if Suspended then Resume;
WaitFor;
FData.Free;
CloseHandle(FDataMutex);
inherited Destroy;
end;
function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;
begin
InStream.Seek(0,soFromBeginning);
WaitForSingleObject(FDataMutex,INFINITE);
try
Result := InStream.Size;
FData.Seek(0,soFromEnd);
FData.CopyFrom(InStream,InStream.Size);
finally
ReleaseMutex(FDataMutex);
end;
ReleaseSemaphore(FIdleSemaphore,1,nil);
end;
procedure TMCHPipeWriterThread.Execute;
var
Buf:DataBuf;
BytesThisTime,BytesToWrite:integer;
begin
while not (terminated) do
begin
WaitForSingleObject(FDataMutex,INFINITE);
BytesToWrite := FData.Size - FWriteIdx;
ReleaseMutex(FDataMutex);
while (BytesToWrite > 0) and (not terminated) do
begin
if BytesToWrite > BufSize then
BytesThisTime := BufSize
else
BytesThisTime := BytesToWrite;
WaitForSingleObject(FDataMutex,INFINITE);
FData.Seek(FWriteIdx,soFromBeginning);
FData.ReadBuffer(Buf,BytesThisTime);
ReleaseMutex(FDataMutex);
{Note that we should not block when we have the mutex!}
FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
if (FTermReason = meOK) then
begin
BytesToWrite := BytesToWrite - BytesThisTime;
FWriteIdx := FWriteIdx + BytesThisTime;
end
else
terminate;
end;
if (not (terminated)) then
begin
WaitForSingleObject(FDataMutex,INFINITE);
{Cannot be sure that the stream hasn't been written to in the meantime!}
{When the expression below is false, the wait on the idle semaphore
will not block, so the stream should not get unnecessarily large}
if FWriteIdx = FData.Size then
begin
FData.Clear;
FWriteIdx := 0;
end;
ReleaseMutex(FDataMutex);
WaitForSingleObject(FIdleSemaphore,INFINITE);
end;
end;
inherited Execute;
end;
end.