home *** CD-ROM | disk | FTP | other *** search
/ Chip 2000 October / Chip_2000-10_cd1.bin / zkuste / Delphi / navody / multithread / mchtransactions.pas < prev    next >
Pascal/Delphi Source File  |  1999-05-10  |  20KB  |  632 lines

  1. { 10-05-1999 10:38:00 PM > [martin on MARTIN] checked out /Reformatting
  2.    according to Delphi guidelines. }
  3. { 10-05-1999 1:31:20 AM > [martin on MARTIN] update: Including winsock
  4.    message in fatal error (0.4) /  }
  5. { 08-05-1999 7:04:00 PM > [martin on MARTIN] checked out /Including winsock
  6.    message in fatal error }
  7. { 14-04-1999 11:59:23 PM > [martin on MARTIN] update: Changing dynamic
  8.    methods to virtual. (0.3) /  }
  9. { 14-04-1999 11:54:16 PM > [martin on MARTIN] checked out /Changing dynamic
  10.    methods to virtual. }
  11. { 14-04-1999 11:19:53 PM > [martin on MARTIN] update: Modifying code that
  12.    checks whether OK to send, so that structures can be sent to the transaction
  13.    manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. (0.2) /  }
  14. { 06-04-1999 9:14:07 PM > [martin on MARTIN] checked out /Modifying code
  15.    that checks whether OK to send, so that structures can be sent to the transaction
  16.    manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. }
  17. { 06-04-1999 7:44:46 PM > [martin on MARTIN] checked out /Modifying class
  18.    names }
  19. { 06-04-1999 1:46:40 AM > [martin on MARTIN] check in: (0.0) Initial Version
  20.    / None }
  21. unit MCHTransactions;
  22.  
  23. {Martin Harvey 20/10/1997
  24.  
  25. This unit implements a socket suitable for connection to
  26. a transaction manager, and it also defines the transaction
  27. manager.
  28. The transaction manager is an intermediary between the socket
  29. and the Data Object Protocol Manager
  30.  
  31. The basic structure is as follows:
  32.  
  33. TTransStreamSock<--->TMCHTransactionManager<---->TMCHDopManager
  34. The TMCHDopManager will be left to a separate unit.
  35. }
  36.  
  37. {
  38.  This unit has been slightly modified. 7/11/97
  39.  Whereas previously, the Transaction manager was created upon connection
  40.  establishment, it is now created and destroyed at the same time as the socket
  41.  object. However, the transaction manager now resets itself when a connection
  42.  on the socket is created or destroyed.
  43.  
  44.  In addition, the stream socket now allows for the creation of derived classes from
  45.  TMCHTransactionManager
  46.  }
  47.  
  48. { Yet more modifications, Martin Harvey 30/8/98
  49.  
  50.   Due to various requirements, like having transaction managers that save to disks
  51.   and to pipes, I'm creating a custom ancestor class that connects to a DOP,
  52.   and then subclasses that work with sockets, files, pipes etc etc}
  53.  
  54. interface
  55.  
  56. uses MCHMemoryStream,Classes,DWinsock,SysUtils,WSAPI;
  57.  
  58. type
  59.  
  60.   TFatalErrorEvent = procedure(Sender:TObject;Msg:string) of object;
  61.  
  62.   TMCHCustomTransactionManager = class
  63.   private
  64.     FOnReset:TNotifyEvent;
  65.     FOnFatalError:TFatalErrorEvent;
  66.     FOnDestroy:TNotifyEvent;
  67.     FOnTransactionRecieved:TNotifyEvent;
  68.     FDOPManager:TObject;
  69.   protected
  70.     procedure DoTransactionRecieved;virtual;
  71.     procedure DoFatalError(Msg:string);virtual;
  72.     procedure DoDestroy;virtual;
  73.     function GetActive:boolean;virtual;abstract;
  74.   public
  75.     destructor Destroy;override;
  76.     property DOPManager:TObject
  77.       read FDOPManager
  78.       write FDOPManager;
  79.     property OnTransactionRecieved:TNotifyEvent
  80.       read FOnTransactionRecieved
  81.       write FOnTransactionRecieved;
  82.  {informs user when it is about to be destroyed}
  83.     property OnDestroy:TNotifyEvent
  84.       read FOnDestroy
  85.       write FOnDestroy;
  86.  {informs user when a reset event is about to occur}
  87.     property OnReset:TNotifyEvent
  88.       read FOnReset
  89.       write FOnReset;
  90.     property OnFatalError:TFatalErrorEvent
  91.       read FOnFatalError
  92.       write FOnFatalError;
  93.     property Active:boolean
  94.       read GetActive;
  95.     constructor Create;
  96.     procedure WriteTransactionFromStream(ExternalIn:TStream);virtual;abstract;
  97.     procedure ReadTransactionToStream(ExternalOut:TStream);virtual;abstract;
  98.     procedure Reset;virtual;
  99.   published
  100.   end;
  101.  
  102.   TMCHTransactionManager = class; {forward declaration}
  103.  
  104.   TCustomManagerClass = class of TMCHCustomTransactionManager;
  105.  
  106.   TTransStreamSock = class(TStreamSocket)
  107.   private
  108.     FManager:TMCHCustomTransactionManager;
  109.   protected
  110.     {these methods normally call the parent component
  111.     and allow delegation through the parent component to the form
  112.     doing the delegation.
  113.     I am overriding them to provide a direct way of letting the transaction
  114.     manager get to the data first.}
  115.     procedure DoAccept;override;
  116.     procedure DoConnect;override;
  117.     procedure DoDisconnect;override;
  118.     procedure DoReject;override;
  119.     procedure DoTimeout;override;
  120.     procedure DoWrite;override;
  121.     procedure DoRead;override;
  122.   public
  123.     property Manager:TMCHCustomTransactionManager read FManager write FManager;
  124.     {connection to the manager object to allow direct access to it}
  125.     constructor Create(AParent:TSockCtrl);override;
  126.           {note that set manager class should only be called immediately
  127.           after the socket is created}
  128.     destructor Destroy;override;
  129.   published
  130.   end;
  131.  
  132.   TMCHTransactionManager = class(TMCHCustomTransactionManager)
  133.   private
  134.     FTransSock:TTransStreamSock;
  135.     FInputStream,
  136.       FOutputStream:
  137.       TMCHMemoryStream;
  138.     FReadingTransaction:boolean;
  139.     FSocketReady:boolean;
  140.     FReadTransactionStart,
  141.       FReadTransactionEnd:integer;
  142.     FWriteSocketOffset:integer;
  143.   protected
  144.     {offset of current transaction in read stream}
  145.     property ReadTransactionStart:integer
  146.       read FReadTransactionStart
  147.       write FReadTransactionStart;
  148.     {projected end of current transaction}
  149.     property ReadTransactionEnd:integer
  150.       read FReadTransactionEnd
  151.       write FReadTransactionEnd;
  152.     {position of writing data to socket}
  153.     property WriteSocketOffset:integer
  154.       read FWriteSocketOffset
  155.       write FWriteSocketOffset;
  156.     {stream properties}
  157.     property InputStream:TMCHMemoryStream
  158.       read FInputStream
  159.       write FInputStream;
  160.     property OutputStream:TMCHMemoryStream
  161.       read FOutputStream
  162.       write FOutputStream;
  163.     {internal procedures}
  164.     procedure TrimReadStream;
  165.     procedure TrimWriteStream;
  166.     {write to socket puts as many bytes as possible from ouput stream to socket,
  167.     and returns number put}
  168.     function WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer;
  169.     { read from socket reads as many bytes as possible to the end of the input
  170.     stream, and returns how many bytes read}
  171.     function ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer;
  172.     function GetSocketReady:boolean;
  173.     procedure ExtractTransactionsFromStream;
  174.     function GetActive:boolean;override;
  175.   public
  176.     {user accessible properties}
  177.     {owning socket}
  178.     property TransSock:TTransStreamSock read FTransSock write FTransSock;
  179.     {Reading side of things}
  180.     {user event notifying that a transaction is ready}
  181.     {note: this is the only chance that external code
  182.     has of picking up the transaction, pointers are automatically incremented
  183.     afterwards! (This may be changed later)}
  184.     property OnTransactionRecieved;
  185.     {informs user when it is about to be destroyed}
  186.     property OnDestroy;
  187.     {informs user when a reset event is about to occur}
  188.     property OnReset;
  189.     { an unfinished transaction is in the read stream }
  190.     { This does not preclude a very short transaction
  191.     existing. (ie header is too short to read length}
  192.     { This property doesn't have a write specifier,
  193.       so external objects can't modify it}
  194.     property ReadingTransaction:boolean
  195.       read FReadingTransaction;
  196.     { The socket ready property is used to ensure that
  197.     only one send operation is called for every OnWrite event}
  198.     property SocketReady:boolean
  199.       read GetSocketReady;
  200.     property Active;
  201.     {constructor and destructor.
  202.     These do not need to be called manually, since the
  203.     owning socket will take care of this }
  204.     constructor Create;
  205.     destructor Destroy;override;
  206.     {user methods for writing transactions}
  207.     { The input stream should contain all the data for one transaction}
  208.     { Note that the outside agent owns the external stream}
  209.     procedure WriteTransactionFromStream(ExternalIn:TStream);override;
  210.     {user methods for reading transactions}
  211.     {The output stream contains a copy of the transaction data
  212.      This should be called externally when an OnTransactionRecieved event occurs}
  213.      {Note that the outside agent owns the external stream}
  214.     procedure ReadTransactionToStream(ExternalOut:TStream);override;
  215.     {methods for owning socket to call.
  216.     The user should *not* call these!!}
  217.     {writing side of things}
  218.     procedure WriteDataRemainder;
  219.     {reading side}
  220.     procedure ReadNewData;
  221.     { a reset procedure to be used when the connection underlying the socket has
  222.      been closed}
  223.     procedure Reset;override;
  224.   published
  225.     property OnFatalError;
  226.   end;
  227.  
  228. implementation
  229.  
  230. uses MCHDOPManager,Dialogs;
  231.  
  232. constructor TMCHCustomTransactionManager.Create;
  233. begin
  234.   inherited Create;
  235.   DOPManager := TMCHDopManager.Create;
  236.   (DOPManager as TMCHDopManager).DOPTransactionManager := Self;
  237. end;
  238.  
  239. destructor TMCHCustomTransactionManager.Destroy;
  240. begin
  241.   DoDestroy;
  242.   DOPManager.Free;
  243.   DOPManager := nil;
  244.   inherited Destroy;
  245. end;
  246.  
  247. procedure TMCHCustomTransactionManager.DoFatalError(Msg:string);
  248. begin
  249.   if Assigned(FOnFatalError) then FOnFatalError(Self,Msg);
  250. end;
  251.  
  252. procedure TMCHCustomTransactionManager.Reset;
  253. begin
  254.   if Assigned(DOPManager) then
  255.     (DOPManager as TMCHDopManager).Reset;
  256.   if Assigned(FOnReset) then FOnReset(Self);
  257. end;
  258.  
  259. procedure TMCHCustomTransactionManager.DoTransactionRecieved;
  260. begin
  261.   if (Assigned(FOnTransactionRecieved)) then
  262.     FOnTransactionRecieved(Self);
  263.   try
  264.     if Assigned(DOPManager) then
  265.       (DOPManager as TMCHDopManager).GetIncomingTransaction(Self);
  266.   except
  267.     on E:ENetworkError do begin
  268.       ShowMessage('Exception raised on remote machine, '
  269.         + E.RemoteExceptionName + ' ' +
  270.         E.Message);
  271.     end;
  272.   end;
  273. end;
  274.  
  275. procedure TMCHCustomTransactionManager.DoDestroy;
  276. begin
  277.   if (Assigned(FOnDestroy)) then
  278.     FOnDestroy(Self);
  279. end;
  280.  
  281.  
  282. procedure TTransStreamSock.DoAccept;
  283. begin
  284.   if LastError = 0 then
  285.   begin
  286.     if Assigned(Manager) then
  287.       Manager.Reset;
  288.   end;
  289.   inherited DoAccept;
  290. end;
  291.  
  292. procedure TTransStreamSock.DoConnect;
  293. begin
  294. {Note that this has been modified. We now only
  295.  reset the transaction manager if the connection *fails*}
  296.   if LastError <> 0 then
  297.   begin
  298.     if Assigned(Manager) then
  299.       Manager.Reset;
  300.   end;
  301.   inherited DoConnect;
  302. end;
  303.  
  304. procedure TTransStreamSock.DoDisconnect;
  305. begin
  306.   if Assigned(Manager) then
  307.     Manager.Reset;
  308.   inherited DoDisconnect;
  309. end;
  310.  
  311. procedure TTransStreamSock.DoReject;
  312. begin
  313.   if Assigned(Manager) then
  314.     Manager.Reset;
  315.   inherited DoReject;
  316. end;
  317.  
  318. procedure TTransStreamSock.DoTimeOut;
  319. begin
  320.   if Assigned(Manager) then
  321.     Manager.Reset;
  322.   inherited DoTimeOut;
  323. end;
  324.  
  325. procedure TTransStreamSock.DoWrite;
  326. begin
  327.   if Assigned(Manager) then
  328.     (Manager as TMCHTransactionManager).WriteDataRemainder;
  329.   inherited DoWrite;
  330. end;
  331.  
  332. procedure TTransStreamSock.DoRead;
  333. begin
  334.   if Assigned(Manager) then
  335.     (Manager as TMCHTransactionManager).ReadNewData;
  336.   inherited DoRead;
  337. end;
  338.  
  339. destructor TTransStreamSock.Destroy;
  340. begin
  341.   if Assigned(Manager) then
  342.   begin
  343.     Manager.Free;
  344.     Manager := nil;
  345.   end;
  346.   inherited Destroy;
  347. end;
  348.  
  349. constructor TTransStreamSock.Create(AParent:TSockCtrl);
  350. begin
  351.   inherited Create(AParent);
  352.   Manager := TMCHTransactionManager.Create;
  353.   (Manager as TMCHTransactionManager).TransSock := Self;
  354. end;
  355.  
  356.  
  357. function TMCHTransactionManager.GetSocketReady:boolean;
  358. begin
  359.   result := FTransSock.Connected and FSocketReady;
  360. end;
  361.  
  362. {$WARNINGS OFF}
  363.  
  364. function TMCHTransactionManager.ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer;
  365.  
  366. const
  367.   blocksize = 4096;
  368. var
  369.   databuf:array[0..blocksize - 1] of byte;
  370.   TotalBytesRead,BytesRead:integer;
  371.  
  372. begin
  373.   Stream.Seek(0,SoFromEnd); {seek to the end of the input stream}
  374.   TotalBytesRead := 0;
  375.   repeat
  376.     try
  377.       BytesRead := Sock.Recv(databuf,blocksize);
  378.     except
  379.       on E:ESockError do begin
  380.         DoFatalError(E.Message);
  381.         BytesRead := 0;
  382.       end;
  383.     end;
  384.     Stream.WriteBuffer(Databuf,BytesRead);
  385.     TotalBytesRead := TotalBytesRead + BytesRead;
  386.   until BytesRead < blocksize;
  387.   result := TotalBytesRead;
  388. end;
  389.  
  390. {$WARNINGS ON}
  391.  
  392. constructor TMCHTransactionManager.Create;
  393. begin
  394.   inherited Create;
  395.   FInputStream := TMCHMemoryStream.Create;
  396.   FOutputStream := TMCHMemoryStream.Create;
  397. end;
  398.  
  399. destructor TMCHTransactionManager.Destroy;
  400. begin
  401.   FInputStream.Free;
  402.   FOutputStream.Free;
  403.   inherited Destroy;
  404. end;
  405.  
  406. function TMCHTransactionManager.GetActive:boolean;
  407. begin
  408.   result := true;
  409.   if (WriteSocketOffset = OutputStream.Size)
  410.     and (ReadTransactionStart = InputStream.Size)
  411.     and (not ReadingTransaction)
  412.     then result := false;
  413. end;
  414.  
  415.  
  416. procedure TMCHTransactionManager.Reset;
  417. {This procedure resets all pointers and clears the stream}
  418. begin
  419.   inherited Reset;
  420.   ReadTransactionStart := 0;
  421.   ReadTransactionEnd := 0;
  422.   WriteSocketOffset := 0;
  423.   FReadingTransaction := false;
  424.   InputStream.SetSize(0);
  425.   OutputStream.SetSize(0);
  426. end;
  427.  
  428. procedure TMCHTransactionManager.ReadNewData;
  429.  
  430. var
  431.   TransLength:integer;
  432.  
  433. begin
  434.   ReadFromSocket(TransSock,InputStream);
  435.   if (not (ReadingTransaction)) then
  436. { No transactions pending, Read Start & Read End point to same place}
  437.   begin
  438.     if (InputStream.Size >= (ReadTransactionEnd + SizeOf(TransLength))) then
  439.     begin
  440.       InputStream.Seek(ReadTransactionEnd,soFromBeginning);
  441.       InputStream.ReadBuffer(TransLength,SizeOf(TransLength));
  442.       ReadTransactionStart := InputStream.Position;
  443.       ReadTransactionEnd := ReadTransactionStart + TransLength;
  444.       FReadingTransaction := true;
  445.     end;
  446.   end;
  447.   ExtractTransactionsFromStream;
  448. end;
  449.  
  450. procedure TMCHTransactionManager.ExtractTransactionsFromStream;
  451.  
  452. var
  453.   TransLength:integer;
  454.  
  455. begin
  456. {do we have a complete transaction?}
  457.   while ((ReadTransactionEnd <= InputStream.Size) and ReadingTransaction) do
  458.   begin
  459.     {we have a complete pending transaction}
  460.     DoTransactionRecieved; {this will pick the transaction up if the user assigns a handler}
  461.     {now advance pointers to the next transaction}
  462.     ReadTransactionStart := ReadTransactionEnd;
  463.     if (InputStream.Size >= ReadTransactionStart + SizeOf(TransLength)) then
  464.     begin
  465.       {we at least have the integer transaction length}
  466.       InputStream.Seek(ReadTransactionEnd,soFromBeginning);
  467.       InputStream.ReadBuffer(TransLength,SizeOf(TransLength));
  468.       ReadTransactionStart := InputStream.Position;
  469.       ReadTransactionEnd := ReadTransactionStart + TransLength;
  470.       {now we know where the transaction ends and the pointers are valid}
  471.     end
  472.     else
  473.       {we don't have anything to work with}
  474.       FReadingTransaction := false;
  475.   end;
  476.   TrimReadStream;
  477. end;
  478.  
  479.  
  480. procedure TMCHTransactionManager.TrimReadStream;
  481.  
  482. const
  483.   MinStreamSize = 1024;
  484.  
  485. var
  486.   NewStream:TMCHMemoryStream;
  487.  
  488. begin
  489. {General policy here:
  490. If more than 4/5 of the stream is not used, then clear it up.
  491. In all cases, all data before ReadTransactionStart can be discarded
  492. if stream less than 1k, then don't bother.
  493. }
  494.   if (((InputStream.Size - ReadTransactionStart) * 5) < InputStream.Size)
  495.     and (InputStream.Size > MinStreamSize) then
  496.   begin
  497.     NewStream := TMCHMemoryStream.Create;
  498.     { do the work}
  499.     {copy data from old to new stream}
  500.     InputStream.Seek(ReadTransactionStart,soFromBeginning);
  501.     if (InputStream.Size - ReadTransactionStart) > 0 then
  502.       NewStream.CopyFrom(InputStream,(InputStream.Size - ReadTransactionStart));
  503.     {Swap stream ownership and destroy old stream}
  504.     InputStream.Free;
  505.     InputStream := NewStream;
  506.     {reset pointers accordingly}
  507.     ReadTransactionEnd := ReadTransactionEnd - ReadTransactionStart;
  508.     ReadTransactionStart := 0;
  509.   end;
  510. end;
  511.  
  512.  
  513. procedure TMCHTransactionManager.ReadTransactionToStream(ExternalOut:TStream);
  514. {This will copy the current transaction, but will not modify any pointers}
  515. begin
  516.   InputStream.Seek(ReadTransactionStart,soFromBeginning);
  517.   ExternalOut.CopyFrom(InputStream,ReadTransactionEnd - ReadTransactionStart);
  518. end;
  519.  
  520. {$WARNINGS OFF}
  521.  
  522. function TMCHTransactionManager.WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer;
  523.  
  524. const
  525.   blocksize = 4096;
  526.  
  527. var
  528.   databuf:array[0..blocksize - 1] of byte;
  529.   TotalBytesWritten,BytesWritten,BlockBytesToWrite,BytesToWrite:integer;
  530.  
  531. {write as much as we can to the socket until it's full}
  532.  
  533. begin
  534.   Stream.Seek(pos,soFromBeginning);
  535.   BytesToWrite := size;
  536.   TotalBytesWritten := 0;
  537.   repeat
  538.     if BytesToWrite > blocksize then
  539.       BlockBytesToWrite := blocksize
  540.     else
  541.       BlockBytesToWrite := BytesToWrite;
  542.     Stream.ReadBuffer(databuf,BlockBytesToWrite);
  543.     try
  544.       BytesWritten := Sock.Send(databuf,BlockBytesToWrite);
  545.     except
  546.       on E:ESockError do
  547.       begin
  548.         DoFatalError(E.Message);
  549.         BytesWritten := 0;
  550.       end;
  551.     end;
  552.     TotalBytesWritten := TotalBytesWritten + BytesWritten;
  553.     BytesToWrite := BytesToWrite - BytesWritten
  554.   until (BytesToWrite = 0) or (BytesWritten < BlockBytesToWrite);
  555.   result := TotalBytesWritten;
  556. end;
  557.  
  558. {$WARNINGS ON}
  559.  
  560. procedure TMCHTransactionManager.WriteDataRemainder;
  561.  
  562. var
  563.   BytesWritten:integer;
  564.   BytesToWrite:integer;
  565.  
  566. begin
  567.   FSocketReady := true;
  568. {socket is ready for more data to be written}
  569.   BytesToWrite := OutputStream.Size - WriteSocketOffset;
  570.   if BytesToWrite > 0 then
  571.   begin
  572.      {write as much as we can}
  573.     BytesWritten := WriteToSocket(TransSock,OutputStream,WriteSocketOffset,BytesToWrite);
  574.     WriteSocketOffset := WriteSocketOffset + BytesWritten;
  575.     if BytesWritten < BytesToWrite then
  576.       FSocketReady := false;
  577.       { we'll get a callback ready for more data}
  578.   end;
  579.   TrimWriteStream;
  580. end;
  581.  
  582.  
  583. procedure TMCHTransactionManager.TrimWriteStream;
  584.  
  585. const
  586.   MinStreamSize = 1024;
  587.  
  588. var
  589.   NewStream:TMCHMemoryStream;
  590.  
  591. begin
  592. {Same general policy as for trimming the read stream}
  593.   if (((OutputStream.Size - WriteSocketOffset) * 5) < OutputStream.Size) and (OutputStream.Size > MinStreamSize) then
  594.   begin
  595.     NewStream := TMCHMemoryStream.Create;
  596.     try
  597.       {copy}
  598.       OutputStream.Seek(WriteSocketOffset,soFromBeginning);
  599.       if (OutputStream.Size - WriteSocketOffset) > 0 then
  600.         NewStream.CopyFrom(OutputStream,(OutputStream.Size - WriteSocketOffset));
  601.       {Swap ownership & destroy}
  602.       OutputStream.Free;
  603.       OutputStream := TMCHMemoryStream.Create;
  604.       OutputStream.LoadFromStream(NewStream);
  605.       WriteSocketOffset := 0;
  606.     finally
  607.       NewStream.Free;
  608.     end;
  609.   end;
  610. end;
  611.  
  612.  
  613. procedure TMCHTransactionManager.WriteTransactionFromStream(ExternalIn:TStream);
  614.  
  615. var
  616.   Length:integer;
  617.  
  618. begin
  619. { write length data to output stream }
  620.   Length := ExternalIn.Size;
  621.   OutputStream.Seek(0,soFromEnd);
  622.   OutputStream.WriteBuffer(Length,SizeOf(Length));
  623.   ExternalIn.Seek(0,soFromBeginning);
  624.   OutputStream.CopyFrom(ExternalIn,ExternalIn.Size);
  625.   if SocketReady then
  626.     WriteDataRemainder;
  627.    {only force a send if the socket is empty and connected}
  628. end;
  629.  
  630. end.
  631.  
  632.