home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 1.5)
-
- import sys
- import time
- import thread
- import traceback
- import StringIO
- _sys = sys
- del sys
- _time = time.time
- _sleep = time.sleep
- del time
- _start_new_thread = thread.start_new_thread
- _allocate_lock = thread.allocate_lock
- _get_ident = thread.get_ident
- del thread
- _print_exc = traceback.print_exc
- del traceback
- _StringIO = StringIO.StringIO
- del StringIO
- _VERBOSE = 0
- if __debug__:
-
- class _Verbose:
-
- def __init__(self, verbose = None):
- if verbose is None:
- verbose = _VERBOSE
-
- self._Verbose__verbose = verbose
-
-
- def _note(self, format, *args):
- if self._Verbose__verbose:
- format = format % args
- format = '%s: %s\n' % (currentThread().getName(), format)
- _sys.stderr.write(format)
-
-
-
- else:
-
- class _Verbose:
-
- def __init__(self, verbose = None):
- pass
-
-
- def _note(self, *args):
- pass
-
-
- Lock = _allocate_lock
-
- def RLock(*args, **kwargs):
- return apply(_RLock, args, kwargs)
-
-
- class _RLock(_Verbose):
-
- def __init__(self, verbose = None):
- _Verbose.__init__(self, verbose)
- self._RLock__block = _allocate_lock()
- self._RLock__owner = None
- self._RLock__count = 0
-
-
- def __repr__(self):
- if self._RLock__owner:
- pass
- return '<%s(%s, %d)>' % (self.__class__.__name__, self._RLock__owner.getName(), self._RLock__count)
-
-
- def acquire(self, blocking = 1):
- me = currentThread()
- if self._RLock__owner is me:
- self._RLock__count = self._RLock__count + 1
- if __debug__:
- self._note('%s.acquire(%s): recursive success', self, blocking)
-
- return 1
-
- rc = self._RLock__block.acquire(blocking)
- if rc:
- self._RLock__owner = me
- self._RLock__count = 1
- if __debug__:
- self._note('%s.acquire(%s): initial succes', self, blocking)
-
- elif __debug__:
- self._note('%s.acquire(%s): failure', self, blocking)
-
- return rc
-
-
- def release(self):
- me = currentThread()
- if not __debug__ and self._RLock__owner is me:
- raise AssertionError, 'release() of un-acquire()d lock'
- self._RLock__count = count = self._RLock__count - 1
- if not count:
- self._RLock__owner = None
- self._RLock__block.release()
- if __debug__:
- self._note('%s.release(): final release', self)
-
- elif __debug__:
- self._note('%s.release(): non-final release', self)
-
-
-
- def _acquire_restore(self, .2):
- (count, owner) = .2
- self._RLock__block.acquire()
- self._RLock__count = count
- self._RLock__owner = owner
- if __debug__:
- self._note('%s._acquire_restore()', self)
-
-
-
- def _release_save(self):
- if __debug__:
- self._note('%s._release_save()', self)
-
- count = self._RLock__count
- self._RLock__count = 0
- owner = self._RLock__owner
- self._RLock__owner = None
- self._RLock__block.release()
- return (count, owner)
-
-
- def _is_owned(self):
- return self._RLock__owner is currentThread()
-
-
-
- def Condition(*args, **kwargs):
- return apply(_Condition, args, kwargs)
-
-
- class _Condition(_Verbose):
-
- def __init__(self, lock = None, verbose = None):
- _Verbose.__init__(self, verbose)
- if lock is None:
- lock = RLock()
-
- self._Condition__lock = lock
- self.acquire = lock.acquire
- self.release = lock.release
-
- try:
- self._release_save = lock._release_save
- except AttributeError:
- pass
-
-
- try:
- self._acquire_restore = lock._acquire_restore
- except AttributeError:
- pass
-
-
- try:
- self._is_owned = lock._is_owned
- except AttributeError:
- pass
-
- self._Condition__waiters = []
-
-
- def __repr__(self):
- return '<Condition(%s, %d)>' % (self._Condition__lock, len(self._Condition__waiters))
-
-
- def _release_save(self):
- self._Condition__lock.release()
-
-
- def _acquire_restore(self, x):
- self._Condition__lock.acquire()
-
-
- def _is_owned(self):
- if self._Condition__lock.acquire(0):
- self._Condition__lock.release()
- return 0
- else:
- return 1
-
-
- def wait(self, timeout = None):
- me = currentThread()
- if not __debug__ and self._is_owned():
- raise AssertionError, 'wait() of un-acquire()d lock'
- waiter = _allocate_lock()
- waiter.acquire()
- self._Condition__waiters.append(waiter)
- saved_state = self._release_save()
- if timeout is None:
- waiter.acquire()
- if __debug__:
- self._note('%s.wait(): got it', self)
-
- else:
- endtime = _time() + timeout
- delay = 1e-006
- while 1:
- gotit = waiter.acquire(0)
- if gotit or _time() >= endtime:
- break
-
- _sleep(delay)
- if delay < 1.0:
- delay = delay * 2.0
-
- if not gotit:
- if __debug__:
- self._note('%s.wait(%s): timed out', self, timeout)
-
-
- try:
- self._Condition__waiters.remove(waiter)
- except ValueError:
- pass
-
- elif __debug__:
- self._note('%s.wait(%s): got it', self, timeout)
-
- self._acquire_restore(saved_state)
-
-
- def notify(self, n = 1):
- me = currentThread()
- if not __debug__ and self._is_owned():
- raise AssertionError, 'notify() of un-acquire()d lock'
- _Condition__waiters = self._Condition__waiters
- waiters = _Condition__waiters[:n]
- if not waiters:
- if __debug__:
- self._note('%s.notify(): no waiters', self)
-
- return None
-
- if not n != 1 and 's':
- pass
- self._note('%s.notify(): notifying %d waiter%s', self, n, '')
- for waiter in waiters:
- waiter.release()
-
- try:
- _Condition__waiters.remove(waiter)
- except ValueError:
- 0
- 0
- waiters
- except:
- 0
-
-
-
-
- def notifyAll(self):
- self.notify(len(self._Condition__waiters))
-
-
-
- def Semaphore(*args, **kwargs):
- return apply(_Semaphore, args, kwargs)
-
-
- class _Semaphore(_Verbose):
-
- def __init__(self, value = 1, verbose = None):
- if not __debug__ and value >= 0:
- raise AssertionError, 'Semaphore initial value must be >= 0'
- _Verbose.__init__(self, verbose)
- self._Semaphore__cond = Condition(Lock())
- self._Semaphore__value = value
-
-
- def acquire(self, blocking = 1):
- rc = 0
- self._Semaphore__cond.acquire()
- while self._Semaphore__value == 0:
- if not blocking:
- break
-
- self._Semaphore__cond.wait()
- self._Semaphore__value = self._Semaphore__value - 1
- rc = 1
- self._Semaphore__cond.release()
- return rc
-
-
- def release(self):
- self._Semaphore__cond.acquire()
- self._Semaphore__value = self._Semaphore__value + 1
- self._Semaphore__cond.notify()
- self._Semaphore__cond.release()
-
-
-
- def Event(*args, **kwargs):
- return apply(_Event, args, kwargs)
-
-
- class _Event(_Verbose):
-
- def __init__(self, verbose = None):
- _Verbose.__init__(self, verbose)
- self._Event__cond = Condition(Lock())
- self._Event__flag = 0
-
-
- def isSet(self):
- return self._Event__flag
-
-
- def set(self):
- self._Event__cond.acquire()
- self._Event__flag = 1
- self._Event__cond.notifyAll()
- self._Event__cond.release()
-
-
- def clear(self):
- self._Event__cond.acquire()
- self._Event__flag = 0
- self._Event__cond.release()
-
-
- def wait(self, timeout = None):
- self._Event__cond.acquire()
- if not (self._Event__flag):
- self._Event__cond.wait(timeout)
-
- self._Event__cond.release()
-
-
- _counter = 0
-
- def _newname(template = 'Thread-%d'):
- global _counter
- _counter = _counter + 1
- return template % _counter
-
- _active_limbo_lock = _allocate_lock()
- _active = { }
- _limbo = { }
-
- class Thread(_Verbose):
- __initialized = 0
-
- def __init__(self, group = None, target = None, name = None, args = (), kwargs = { }, verbose = None):
- if not __debug__ and group is None:
- raise AssertionError, 'group argument must be None for now'
- _Verbose.__init__(self, verbose)
- self._Thread__target = target
- if not name:
- pass
- self._Thread__name = str(_newname())
- self._Thread__args = args
- self._Thread__kwargs = kwargs
- self._Thread__daemonic = self._set_daemon()
- self._Thread__started = 0
- self._Thread__stopped = 0
- self._Thread__block = Condition(Lock())
- self._Thread__initialized = 1
-
-
- def _set_daemon(self):
- return currentThread().isDaemon()
-
-
- def __repr__(self):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() was not called'
- status = 'initial'
- if self._Thread__started:
- status = 'started'
-
- if self._Thread__stopped:
- status = 'stopped'
-
- if self._Thread__daemonic:
- status = status + ' daemon'
-
- return '<%s(%s, %s)>' % (self.__class__.__name__, self._Thread__name, status)
-
-
- def start(self):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- if not __debug__ and not (self._Thread__started):
- raise AssertionError, 'thread already started'
- if __debug__:
- self._note('%s.start(): starting thread', self)
-
- _active_limbo_lock.acquire()
- _limbo[self] = self
- _active_limbo_lock.release()
- _start_new_thread(self._Thread__bootstrap, ())
- self._Thread__started = 1
- _sleep(1e-006)
-
-
- def run(self):
- if self._Thread__target:
- apply(self._Thread__target, self._Thread__args, self._Thread__kwargs)
-
-
-
- def __bootstrap(self):
-
- try:
- self._Thread__started = 1
- _active_limbo_lock.acquire()
- _active[_get_ident()] = self
- del _limbo[self]
- _active_limbo_lock.release()
- if __debug__:
- self._note('%s.__bootstrap(): thread started', self)
-
-
- try:
- self.run()
- except SystemExit:
- if __debug__:
- self._note('%s.__bootstrap(): raised SystemExit', self)
-
- except:
- __debug__
- if __debug__:
- self._note('%s.__bootstrap(): unhandled exception', self)
-
- s = _StringIO()
- _print_exc(file = s)
- _sys.stderr.write('Exception in thread %s:\n%s\n' % (self.getName(), s.getvalue()))
-
- if __debug__:
- self._note('%s.__bootstrap(): normal return', self)
- finally:
- self._Thread__stop()
- self._Thread__delete()
-
-
-
- def __stop(self):
- self._Thread__block.acquire()
- self._Thread__stopped = 1
- self._Thread__block.notifyAll()
- self._Thread__block.release()
-
-
- def __delete(self):
- _active_limbo_lock.acquire()
- del _active[_get_ident()]
- _active_limbo_lock.release()
-
-
- def join(self, timeout = None):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- if not __debug__ and self._Thread__started:
- raise AssertionError, 'cannot join thread before it is started'
- if not __debug__ and self is not currentThread():
- raise AssertionError, 'cannot join current thread'
- if __debug__:
- if not (self._Thread__stopped):
- self._note('%s.join(): waiting until thread stops', self)
-
-
- self._Thread__block.acquire()
- if timeout is None:
- while not (self._Thread__stopped):
- self._Thread__block.wait()
- if __debug__:
- self._note('%s.join(): thread stopped', self)
-
- else:
- deadline = _time() + timeout
- while not (self._Thread__stopped):
- delay = deadline - _time()
- if delay <= 0:
- if __debug__:
- self._note('%s.join(): timed out', self)
-
- break
-
- self._Thread__block.wait(delay)
- if __debug__:
- self._note('%s.join(): thread stopped', self)
-
- self._Thread__block.release()
-
-
- def getName(self):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- return self._Thread__name
-
-
- def setName(self, name):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- self._Thread__name = str(name)
-
-
- def isAlive(self):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- if self._Thread__started:
- pass
- return not (self._Thread__stopped)
-
-
- def isDaemon(self):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- return self._Thread__daemonic
-
-
- def setDaemon(self, daemonic):
- if not __debug__ and self._Thread__initialized:
- raise AssertionError, 'Thread.__init__() not called'
- if not __debug__ and not (self._Thread__started):
- raise AssertionError, 'cannot set daemon status of active thread'
- self._Thread__daemonic = daemonic
-
-
-
- class _MainThread(Thread):
-
- def __init__(self):
- Thread.__init__(self, name = 'MainThread')
- self._Thread__started = 1
- _active_limbo_lock.acquire()
- _active[_get_ident()] = self
- _active_limbo_lock.release()
-
- try:
- self._MainThread__oldexitfunc = _sys.exitfunc
- except AttributeError:
- self._MainThread__oldexitfunc = None
-
- _sys.exitfunc = self._MainThread__exitfunc
-
-
- def _set_daemon(self):
- return 0
-
-
- def _MainThread__exitfunc(self):
- self._Thread__stop()
- t = _pickSomeNonDaemonThread()
- if t:
- if __debug__:
- self._note('%s: waiting for other threads', self)
-
-
- while t:
- t.join()
- t = _pickSomeNonDaemonThread()
- if self._MainThread__oldexitfunc:
- if __debug__:
- self._note('%s: calling exit handler', self)
-
- self._MainThread__oldexitfunc()
-
- if __debug__:
- self._note('%s: exiting', self)
-
- self._Thread__delete()
-
-
-
- def _pickSomeNonDaemonThread():
- for t in enumerate():
- pass
-
- return None
-
-
- class _DummyThread(Thread):
-
- def __init__(self):
- Thread.__init__(self, name = _newname('Dummy-%d'))
- self._DummyThread__Thread_started = 1
- _active_limbo_lock.acquire()
- _active[_get_ident()] = self
- _active_limbo_lock.release()
-
-
- def _set_daemon(self):
- return 1
-
-
- def join(self):
- if not __debug__ and 0:
- raise AssertionError, 'cannot join a dummy thread'
-
-
-
- def currentThread():
-
- try:
- return _active[_get_ident()]
- except KeyError:
- print 'currentThread(): no current thread for', _get_ident()
- return _DummyThread()
-
-
-
- def activeCount():
- _active_limbo_lock.acquire()
- count = len(_active) + len(_limbo)
- _active_limbo_lock.release()
- return count
-
-
- def enumerate():
- _active_limbo_lock.acquire()
- active = _active.values() + _limbo.values()
- _active_limbo_lock.release()
- return active
-
- _MainThread()
-
- def _test():
- import random
-
- class BoundedQueue(_Verbose):
-
- def __init__(self, limit):
- _Verbose.__init__(self)
- self.mon = RLock()
- self.rc = Condition(self.mon)
- self.wc = Condition(self.mon)
- self.limit = limit
- self.queue = []
-
-
- def put(self, item):
- self.mon.acquire()
- while len(self.queue) >= self.limit:
- self._note('put(%s): queue full', item)
- self.wc.wait()
- self.queue.append(item)
- self._note('put(%s): appended, length now %d', item, len(self.queue))
- self.rc.notify()
- self.mon.release()
-
-
- def get(self):
- self.mon.acquire()
- while not (self.queue):
- self._note('get(): queue empty')
- self.rc.wait()
- item = self.queue[0]
- del self.queue[0]
- self._note('get(): got %s, %d left', item, len(self.queue))
- self.wc.notify()
- self.mon.release()
- return item
-
-
-
- class ProducerThread(Thread):
-
- def __init__(self, queue, quota):
- Thread.__init__(self, name = 'Producer')
- self.queue = queue
- self.quota = quota
-
-
- def run(self):
- random
- counter = 0
- while counter < self.quota:
- counter = counter + 1
- self.queue.put('%s.%d' % (self.getName(), counter))
- _sleep(random() * 1e-005)
- continue
- import random
-
-
-
- class ConsumerThread(Thread):
-
- def __init__(self, queue, count):
- Thread.__init__(self, name = 'Consumer')
- self.queue = queue
- self.count = count
-
-
- def run(self):
- while self.count > 0:
- item = self.queue.get()
- print item
- self.count = self.count - 1
-
-
- import time
- NP = 3
- QL = 4
- NI = 5
- Q = BoundedQueue(QL)
- P = []
- for i in range(NP):
- t = ProducerThread(Q, NI)
- t.setName('Producer-%d' % (i + 1))
- P.append(t)
-
- C = ConsumerThread(Q, NI * NP)
- for t in P:
- t.start()
- _sleep(1e-006)
-
- C.start()
- for t in P:
- t.join()
-
- C.join()
-
- if __name__ == '__main__':
- _test()
-
-