home *** CD-ROM | disk | FTP | other *** search
/ PC World 2001 April / PCWorld_2001-04_cd.bin / Software / TemaCD / webclean / !!!python!!! / BeOpen-Python-2.0.exe / TEST_FORK1.PY < prev    next >
Encoding:
Python Source  |  2000-09-28  |  1.7 KB  |  79 lines

  1. """This test checks for correct fork() behavior.
  2.  
  3. We want fork1() semantics -- only the forking thread survives in the
  4. child after a fork().
  5.  
  6. On some systems (e.g. Solaris without posix threads) we find that all
  7. active threads survive in the child after a fork(); this is an error.
  8.  
  9. On BeOS, you CANNOT mix threads and fork(), the behaviour is undefined.
  10. That's OK, fork() is a grotesque hack anyway. ;-) [cjh]
  11.  
  12. """
  13.  
  14. import os, sys, time, thread
  15. from test_support import TestSkipped
  16.  
  17. try:
  18.     if os.uname()[0] == "BeOS":
  19.         raise TestSkipped, "can't mix os.fork with threads on BeOS"
  20. except AttributeError:
  21.     pass
  22.  
  23. try:
  24.     os.fork
  25. except AttributeError:
  26.     raise TestSkipped, "os.fork not defined -- skipping test_fork1"
  27.  
  28. LONGSLEEP = 2
  29.  
  30. SHORTSLEEP = 0.5
  31.  
  32. NUM_THREADS = 4
  33.  
  34. alive = {}
  35.  
  36. stop = 0
  37.  
  38. def f(id):
  39.     while not stop:
  40.         alive[id] = os.getpid()
  41.         try:
  42.             time.sleep(SHORTSLEEP)
  43.         except IOError:
  44.             pass
  45.  
  46. def main():
  47.     for i in range(NUM_THREADS):
  48.         thread.start_new(f, (i,))
  49.  
  50.     time.sleep(LONGSLEEP)
  51.  
  52.     a = alive.keys()
  53.     a.sort()
  54.     assert a == range(NUM_THREADS)
  55.  
  56.     prefork_lives = alive.copy()
  57.  
  58.     cpid = os.fork()
  59.  
  60.     if cpid == 0:
  61.         # Child
  62.         time.sleep(LONGSLEEP)
  63.         n = 0
  64.         for key in alive.keys():
  65.             if alive[key] != prefork_lives[key]:
  66.                 n = n+1
  67.         os._exit(n)
  68.     else:
  69.         # Parent
  70.         spid, status = os.waitpid(cpid, 0)
  71.         assert spid == cpid
  72.         assert status == 0, "cause = %d, exit = %d" % (status&0xff, status>>8)
  73.         global stop
  74.         # Tell threads to die
  75.         stop = 1
  76.         time.sleep(2*SHORTSLEEP) # Wait for threads to die
  77.  
  78. main()
  79.