home *** CD-ROM | disk | FTP | other *** search
/ PC World 2005 June / PCWorld_2005-06_cd.bin / software / vyzkuste / firewally / firewally.exe / framework-2.3.exe / test_queue.py < prev    next >
Text File  |  2003-12-30  |  6KB  |  197 lines

  1. # Some simple Queue module tests, plus some failure conditions
  2. # to ensure the Queue locks remain stable
  3. import Queue
  4. import sys
  5. import threading
  6. import time
  7.  
  8. from test.test_support import verify, TestFailed, verbose
  9.  
  10. queue_size = 5
  11.  
  12. # Execute a function that blocks, and in a seperate thread, a function that
  13. # triggers the release.  Returns the result of the blocking function.
  14. class _TriggerThread(threading.Thread):
  15.     def __init__(self, fn, args):
  16.         self.fn = fn
  17.         self.args = args
  18.         self.startedEvent = threading.Event()
  19.         threading.Thread.__init__(self)
  20.     def run(self):
  21.         time.sleep(.1)
  22.         self.startedEvent.set()
  23.         self.fn(*self.args)
  24.  
  25. def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
  26.     t = _TriggerThread(trigger_func, trigger_args)
  27.     t.start()
  28.     try:
  29.         return block_func(*block_args)
  30.     finally:
  31.         # If we unblocked before our thread made the call, we failed!
  32.         if not t.startedEvent.isSet():
  33.             raise TestFailed("blocking function '%r' appeared not to block" %
  34.                              block_func)
  35.         t.join(1) # make sure the thread terminates
  36.         if t.isAlive():
  37.             raise TestFailed("trigger function '%r' appeared to not return" %
  38.                              trigger_func)
  39.  
  40. # A Queue subclass that can provoke failure at a moment's notice :)
  41. class FailingQueueException(Exception):
  42.     pass
  43.  
  44. class FailingQueue(Queue.Queue):
  45.     def __init__(self, *args):
  46.         self.fail_next_put = False
  47.         self.fail_next_get = False
  48.         Queue.Queue.__init__(self, *args)
  49.     def _put(self, item):
  50.         if self.fail_next_put:
  51.             self.fail_next_put = False
  52.             raise FailingQueueException, "You Lose"
  53.         return Queue.Queue._put(self, item)
  54.     def _get(self):
  55.         if self.fail_next_get:
  56.             self.fail_next_get = False
  57.             raise FailingQueueException, "You Lose"
  58.         return Queue.Queue._get(self)
  59.  
  60. def FailingQueueTest(q):
  61.     if not q.empty():
  62.         raise RuntimeError, "Call this function with an empty queue"
  63.     for i in range(queue_size-1):
  64.         q.put(i)
  65.     # Test a failing non-blocking put.
  66.     q.fail_next_put = True
  67.     try:
  68.         q.put("oops", block=0)
  69.         raise TestFailed("The queue didn't fail when it should have")
  70.     except FailingQueueException:
  71.         pass
  72.     q.fail_next_put = True
  73.     try:
  74.         q.put("oops", timeout=0.1)
  75.         raise TestFailed("The queue didn't fail when it should have")
  76.     except FailingQueueException:
  77.         pass
  78.     q.put("last")
  79.     verify(q.full(), "Queue should be full")
  80.     # Test a failing blocking put
  81.     q.fail_next_put = True
  82.     try:
  83.         _doBlockingTest( q.put, ("full",), q.get, ())
  84.         raise TestFailed("The queue didn't fail when it should have")
  85.     except FailingQueueException:
  86.         pass
  87.     # Check the Queue isn't damaged.
  88.     # put failed, but get succeeded - re-add
  89.     q.put("last")
  90.     # Test a failing timeout put
  91.     q.fail_next_put = True
  92.     try:
  93.         _doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
  94.         raise TestFailed("The queue didn't fail when it should have")
  95.     except FailingQueueException:
  96.         pass
  97.     # Check the Queue isn't damaged.
  98.     # put failed, but get succeeded - re-add
  99.     q.put("last")
  100.     verify(q.full(), "Queue should be full")
  101.     q.get()
  102.     verify(not q.full(), "Queue should not be full")
  103.     q.put("last")
  104.     verify(q.full(), "Queue should be full")
  105.     # Test a blocking put
  106.     _doBlockingTest( q.put, ("full",), q.get, ())
  107.     # Empty it
  108.     for i in range(queue_size):
  109.         q.get()
  110.     verify(q.empty(), "Queue should be empty")
  111.     q.put("first")
  112.     q.fail_next_get = True
  113.     try:
  114.         q.get()
  115.         raise TestFailed("The queue didn't fail when it should have")
  116.     except FailingQueueException:
  117.         pass
  118.     verify(not q.empty(), "Queue should not be empty")
  119.     q.fail_next_get = True
  120.     try:
  121.         q.get(timeout=0.1)
  122.         raise TestFailed("The queue didn't fail when it should have")
  123.     except FailingQueueException:
  124.         pass
  125.     verify(not q.empty(), "Queue should not be empty")
  126.     q.get()
  127.     verify(q.empty(), "Queue should be empty")
  128.     q.fail_next_get = True
  129.     try:
  130.         _doBlockingTest( q.get, (), q.put, ('empty',))
  131.         raise TestFailed("The queue didn't fail when it should have")
  132.     except FailingQueueException:
  133.         pass
  134.     # put succeeded, but get failed.
  135.     verify(not q.empty(), "Queue should not be empty")
  136.     q.get()
  137.     verify(q.empty(), "Queue should be empty")
  138.  
  139. def SimpleQueueTest(q):
  140.     if not q.empty():
  141.         raise RuntimeError, "Call this function with an empty queue"
  142.     # I guess we better check things actually queue correctly a little :)
  143.     q.put(111)
  144.     q.put(222)
  145.     verify(q.get() == 111 and q.get() == 222,
  146.            "Didn't seem to queue the correct data!")
  147.     for i in range(queue_size-1):
  148.         q.put(i)
  149.     verify(not q.full(), "Queue should not be full")
  150.     q.put("last")
  151.     verify(q.full(), "Queue should be full")
  152.     try:
  153.         q.put("full", block=0)
  154.         raise TestFailed("Didn't appear to block with a full queue")
  155.     except Queue.Full:
  156.         pass
  157.     try:
  158.         q.put("full", timeout=0.1)
  159.         raise TestFailed("Didn't appear to time-out with a full queue")
  160.     except Queue.Full:
  161.         pass
  162.     # Test a blocking put
  163.     _doBlockingTest( q.put, ("full",), q.get, ())
  164.     _doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
  165.     # Empty it
  166.     for i in range(queue_size):
  167.         q.get()
  168.     verify(q.empty(), "Queue should be empty")
  169.     try:
  170.         q.get(block=0)
  171.         raise TestFailed("Didn't appear to block with an empty queue")
  172.     except Queue.Empty:
  173.         pass
  174.     try:
  175.         q.get(timeout=0.1)
  176.         raise TestFailed("Didn't appear to time-out with an empty queue")
  177.     except Queue.Empty:
  178.         pass
  179.     # Test a blocking get
  180.     _doBlockingTest(q.get, (), q.put, ('empty',))
  181.     _doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
  182.  
  183. def test():
  184.     q=Queue.Queue(queue_size)
  185.     # Do it a couple of times on the same queue
  186.     SimpleQueueTest(q)
  187.     SimpleQueueTest(q)
  188.     if verbose:
  189.         print "Simple Queue tests seemed to work"
  190.     q = FailingQueue(queue_size)
  191.     FailingQueueTest(q)
  192.     FailingQueueTest(q)
  193.     if verbose:
  194.         print "Failing Queue tests seemed to work"
  195.  
  196. test()
  197.