home *** CD-ROM | disk | FTP | other *** search
/ PC World 2005 June / PCWorld_2005-06_cd.bin / software / vyzkuste / firewally / firewally.exe / framework-2.3.exe / test_socketserver.py < prev    next >
Text File  |  2003-12-30  |  5KB  |  166 lines

  1. # Test suite for SocketServer.py
  2.  
  3. from test import test_support
  4. from test.test_support import verbose, verify, TESTFN, TestSkipped
  5. test_support.requires('network')
  6.  
  7. from SocketServer import *
  8. import socket
  9. import select
  10. import time
  11. import threading
  12. import os
  13.  
  14. NREQ = 3
  15. DELAY = 0.5
  16.  
  17. class MyMixinHandler:
  18.     def handle(self):
  19.         time.sleep(DELAY)
  20.         line = self.rfile.readline()
  21.         time.sleep(DELAY)
  22.         self.wfile.write(line)
  23.  
  24. class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
  25.     pass
  26.  
  27. class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
  28.     pass
  29.  
  30. class MyMixinServer:
  31.     def serve_a_few(self):
  32.         for i in range(NREQ):
  33.             self.handle_request()
  34.     def handle_error(self, request, client_address):
  35.         self.close_request(request)
  36.         self.server_close()
  37.         raise
  38.  
  39. teststring = "hello world\n"
  40.  
  41. def receive(sock, n, timeout=20):
  42.     r, w, x = select.select([sock], [], [], timeout)
  43.     if sock in r:
  44.         return sock.recv(n)
  45.     else:
  46.         raise RuntimeError, "timed out on %s" % `sock`
  47.  
  48. def testdgram(proto, addr):
  49.     s = socket.socket(proto, socket.SOCK_DGRAM)
  50.     s.sendto(teststring, addr)
  51.     buf = data = receive(s, 100)
  52.     while data and '\n' not in buf:
  53.         data = receive(s, 100)
  54.         buf += data
  55.     verify(buf == teststring)
  56.     s.close()
  57.  
  58. def teststream(proto, addr):
  59.     s = socket.socket(proto, socket.SOCK_STREAM)
  60.     s.connect(addr)
  61.     s.sendall(teststring)
  62.     buf = data = receive(s, 100)
  63.     while data and '\n' not in buf:
  64.         data = receive(s, 100)
  65.         buf += data
  66.     verify(buf == teststring)
  67.     s.close()
  68.  
  69. class ServerThread(threading.Thread):
  70.     def __init__(self, addr, svrcls, hdlrcls):
  71.         threading.Thread.__init__(self)
  72.         self.__addr = addr
  73.         self.__svrcls = svrcls
  74.         self.__hdlrcls = hdlrcls
  75.     def run(self):
  76.         class svrcls(MyMixinServer, self.__svrcls):
  77.             pass
  78.         if verbose: print "thread: creating server"
  79.         svr = svrcls(self.__addr, self.__hdlrcls)
  80.         if verbose: print "thread: serving three times"
  81.         svr.serve_a_few()
  82.         if verbose: print "thread: done"
  83.  
  84. seed = 0
  85. def pickport():
  86.     global seed
  87.     seed += 1
  88.     return 10000 + (os.getpid() % 1000)*10 + seed
  89.  
  90. host = "localhost"
  91. testfiles = []
  92. def pickaddr(proto):
  93.     if proto == socket.AF_INET:
  94.         return (host, pickport())
  95.     else:
  96.         fn = TESTFN + str(pickport())
  97.         testfiles.append(fn)
  98.         return fn
  99.  
  100. def cleanup():
  101.     for fn in testfiles:
  102.         try:
  103.             os.remove(fn)
  104.         except os.error:
  105.             pass
  106.     testfiles[:] = []
  107.  
  108. def testloop(proto, servers, hdlrcls, testfunc):
  109.     for svrcls in servers:
  110.         addr = pickaddr(proto)
  111.         if verbose:
  112.             print "ADDR =", addr
  113.             print "CLASS =", svrcls
  114.         t = ServerThread(addr, svrcls, hdlrcls)
  115.         if verbose: print "server created"
  116.         t.start()
  117.         if verbose: print "server running"
  118.         for i in range(NREQ):
  119.             time.sleep(DELAY)
  120.             if verbose: print "test client", i
  121.             testfunc(proto, addr)
  122.         if verbose: print "waiting for server"
  123.         t.join()
  124.         if verbose: print "done"
  125.  
  126. tcpservers = [TCPServer, ThreadingTCPServer]
  127. if hasattr(os, 'fork') and os.name not in ('os2',):
  128.     tcpservers.append(ForkingTCPServer)
  129. udpservers = [UDPServer, ThreadingUDPServer]
  130. if hasattr(os, 'fork') and os.name not in ('os2',):
  131.     udpservers.append(ForkingUDPServer)
  132.  
  133. if not hasattr(socket, 'AF_UNIX'):
  134.     streamservers = []
  135.     dgramservers = []
  136. else:
  137.     class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
  138.     streamservers = [UnixStreamServer, ThreadingUnixStreamServer,
  139.                      ForkingUnixStreamServer]
  140.     class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
  141.     dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer,
  142.                     ForkingUnixDatagramServer]
  143.  
  144. def testall():
  145.     testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
  146.     testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
  147.     if hasattr(socket, 'AF_UNIX'):
  148.         testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
  149.         # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
  150.         # client address so this cannot work:
  151.         ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
  152.  
  153. def test_main():
  154.     import imp
  155.     if imp.lock_held():
  156.         # If the import lock is held, the threads will hang.
  157.         raise TestSkipped("can't run when import lock is held")
  158.  
  159.     try:
  160.         testall()
  161.     finally:
  162.         cleanup()
  163.  
  164. if __name__ == "__main__":
  165.     test_main()
  166.