home *** CD-ROM | disk | FTP | other *** search
/ Chip 2003 January / Chip_2003-01_cd2.bin / convert / eJayMp3Pro / mp3pro_demo.exe / POPEN2.PY < prev    next >
Encoding:
Python Source  |  1998-06-14  |  2.8 KB  |  100 lines

  1. import os
  2. import sys
  3. import string
  4.  
  5. MAXFD = 256     # Max number of file descriptors (os.getdtablesize()???)
  6.  
  7. _active = []
  8.  
  9. def _cleanup():
  10.     for inst in _active[:]:
  11.         inst.poll()
  12.  
  13. class Popen3:
  14.     def __init__(self, cmd, capturestderr=0, bufsize=-1):
  15.         if type(cmd) == type(''):
  16.             cmd = ['/bin/sh', '-c', cmd]
  17.         p2cread, p2cwrite = os.pipe()
  18.         c2pread, c2pwrite = os.pipe()
  19.         if capturestderr:
  20.             errout, errin = os.pipe()
  21.         self.pid = os.fork()
  22.         if self.pid == 0:
  23.             # Child
  24.             os.close(0)
  25.             os.close(1)
  26.             if os.dup(p2cread) <> 0:
  27.                 sys.stderr.write('popen2: bad read dup\n')
  28.             if os.dup(c2pwrite) <> 1:
  29.                 sys.stderr.write('popen2: bad write dup\n')
  30.             if capturestderr:
  31.                 os.close(2)
  32.                 if os.dup(errin) <> 2: pass
  33.             for i in range(3, MAXFD):
  34.                 try:
  35.                     os.close(i)
  36.                 except: pass
  37.             try:
  38.                 os.execvp(cmd[0], cmd)
  39.             finally:
  40.                 os._exit(1)
  41.             # Shouldn't come here, I guess
  42.             os._exit(1)
  43.         os.close(p2cread)
  44.         self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
  45.         os.close(c2pwrite)
  46.         self.fromchild = os.fdopen(c2pread, 'r', bufsize)
  47.         if capturestderr:
  48.             os.close(errin)
  49.             self.childerr = os.fdopen(errout, 'r', bufsize)
  50.         else:
  51.             self.childerr = None
  52.         self.sts = -1 # Child not completed yet
  53.         _active.append(self)
  54.     def poll(self):
  55.         if self.sts < 0:
  56.             try:
  57.                 pid, sts = os.waitpid(self.pid, os.WNOHANG)
  58.                 if pid == self.pid:
  59.                     self.sts = sts
  60.                     _active.remove(self)
  61.             except os.error:
  62.                 pass
  63.         return self.sts
  64.     def wait(self):
  65.         pid, sts = os.waitpid(self.pid, 0)
  66.         if pid == self.pid:
  67.             self.sts = sts
  68.             _active.remove(self)
  69.         return self.sts
  70.  
  71. def popen2(cmd, bufsize=-1):
  72.     _cleanup()
  73.     inst = Popen3(cmd, 0, bufsize)
  74.     return inst.fromchild, inst.tochild
  75.  
  76. def popen3(cmd, bufsize=-1):
  77.     _cleanup()
  78.     inst = Popen3(cmd, 1, bufsize)
  79.     return inst.fromchild, inst.tochild, inst.childerr
  80.  
  81. def _test():
  82.     teststr = "abc\n"
  83.     print "testing popen2..."
  84.     r, w = popen2('cat')
  85.     w.write(teststr)
  86.     w.close()
  87.     assert r.read() == teststr
  88.     print "testing popen3..."
  89.     r, w, e = popen3(['cat'])
  90.     w.write(teststr)
  91.     w.close()
  92.     assert r.read() == teststr
  93.     assert e.read() == ""
  94.     _cleanup()
  95.     assert not _active
  96.     print "All OK"
  97.  
  98. if __name__ == '__main__':
  99.     _test()
  100.