home *** CD-ROM | disk | FTP | other *** search
/ PC World 2005 June / PCWorld_2005-06_cd.bin / software / vyzkuste / firewally / firewally.exe / framework-2.3.exe / test_poll.py < prev    next >
Text File  |  2003-12-30  |  4KB  |  173 lines

  1. # Test case for the os.poll() function
  2.  
  3. import sys, os, select, random
  4. from test.test_support import verify, verbose, TestSkipped, TESTFN
  5.  
  6. try:
  7.     select.poll
  8. except AttributeError:
  9.     raise TestSkipped, "select.poll not defined -- skipping test_poll"
  10.  
  11.  
  12. def find_ready_matching(ready, flag):
  13.     match = []
  14.     for fd, mode in ready:
  15.         if mode & flag:
  16.             match.append(fd)
  17.     return match
  18.  
  19. def test_poll1():
  20.     """Basic functional test of poll object
  21.  
  22.     Create a bunch of pipe and test that poll works with them.
  23.     """
  24.     print 'Running poll test 1'
  25.     p = select.poll()
  26.  
  27.     NUM_PIPES = 12
  28.     MSG = " This is a test."
  29.     MSG_LEN = len(MSG)
  30.     readers = []
  31.     writers = []
  32.     r2w = {}
  33.     w2r = {}
  34.  
  35.     for i in range(NUM_PIPES):
  36.         rd, wr = os.pipe()
  37.         p.register(rd, select.POLLIN)
  38.         p.register(wr, select.POLLOUT)
  39.         readers.append(rd)
  40.         writers.append(wr)
  41.         r2w[rd] = wr
  42.         w2r[wr] = rd
  43.  
  44.     while writers:
  45.         ready = p.poll()
  46.         ready_writers = find_ready_matching(ready, select.POLLOUT)
  47.         if not ready_writers:
  48.             raise RuntimeError, "no pipes ready for writing"
  49.         wr = random.choice(ready_writers)
  50.         os.write(wr, MSG)
  51.  
  52.         ready = p.poll()
  53.         ready_readers = find_ready_matching(ready, select.POLLIN)
  54.         if not ready_readers:
  55.             raise RuntimeError, "no pipes ready for reading"
  56.         rd = random.choice(ready_readers)
  57.         buf = os.read(rd, MSG_LEN)
  58.         verify(len(buf) == MSG_LEN)
  59.         print buf
  60.         os.close(r2w[rd]) ; os.close( rd )
  61.         p.unregister( r2w[rd] )
  62.         p.unregister( rd )
  63.         writers.remove(r2w[rd])
  64.  
  65.     poll_unit_tests()
  66.     print 'Poll test 1 complete'
  67.  
  68. def poll_unit_tests():
  69.     # returns NVAL for invalid file descriptor
  70.     FD = 42
  71.     try:
  72.         os.close(FD)
  73.     except OSError:
  74.         pass
  75.     p = select.poll()
  76.     p.register(FD)
  77.     r = p.poll()
  78.     verify(r[0] == (FD, select.POLLNVAL))
  79.  
  80.     f = open(TESTFN, 'w')
  81.     fd = f.fileno()
  82.     p = select.poll()
  83.     p.register(f)
  84.     r = p.poll()
  85.     verify(r[0][0] == fd)
  86.     f.close()
  87.     r = p.poll()
  88.     verify(r[0] == (fd, select.POLLNVAL))
  89.     os.unlink(TESTFN)
  90.  
  91.     # type error for invalid arguments
  92.     p = select.poll()
  93.     try:
  94.         p.register(p)
  95.     except TypeError:
  96.         pass
  97.     else:
  98.         print "Bogus register call did not raise TypeError"
  99.     try:
  100.         p.unregister(p)
  101.     except TypeError:
  102.         pass
  103.     else:
  104.         print "Bogus unregister call did not raise TypeError"
  105.  
  106.     # can't unregister non-existent object
  107.     p = select.poll()
  108.     try:
  109.         p.unregister(3)
  110.     except KeyError:
  111.         pass
  112.     else:
  113.         print "Bogus unregister call did not raise KeyError"
  114.  
  115.     # Test error cases
  116.     pollster = select.poll()
  117.     class Nope:
  118.         pass
  119.  
  120.     class Almost:
  121.         def fileno(self):
  122.             return 'fileno'
  123.  
  124.     try:
  125.         pollster.register( Nope(), 0 )
  126.     except TypeError: pass
  127.     else: print 'expected TypeError exception, not raised'
  128.  
  129.     try:
  130.         pollster.register( Almost(), 0 )
  131.     except TypeError: pass
  132.     else: print 'expected TypeError exception, not raised'
  133.  
  134.  
  135. # Another test case for poll().  This is copied from the test case for
  136. # select(), modified to use poll() instead.
  137.  
  138. def test_poll2():
  139.     print 'Running poll test 2'
  140.     cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
  141.     p = os.popen(cmd, 'r')
  142.     pollster = select.poll()
  143.     pollster.register( p, select.POLLIN )
  144.     for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
  145.         if verbose:
  146.             print 'timeout =', tout
  147.         fdlist = pollster.poll(tout)
  148.         if (fdlist == []):
  149.             continue
  150.         fd, flags = fdlist[0]
  151.         if flags & select.POLLHUP:
  152.             line = p.readline()
  153.             if line != "":
  154.                 print 'error: pipe seems to be closed, but still returns data'
  155.             continue
  156.  
  157.         elif flags & select.POLLIN:
  158.             line = p.readline()
  159.             if verbose:
  160.                 print `line`
  161.             if not line:
  162.                 if verbose:
  163.                     print 'EOF'
  164.                 break
  165.             continue
  166.         else:
  167.             print 'Unexpected return value from select.poll:', fdlist
  168.     p.close()
  169.     print 'Poll test 2 complete'
  170.  
  171. test_poll1()
  172. test_poll2()
  173.