home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / xulrunner / python / eventloop.py < prev    next >
Encoding:
Python Source  |  2007-11-12  |  14.7 KB  |  471 lines

  1. # Miro - an RSS based video player application
  2. # Copyright (C) 2005-2007 Participatory Culture Foundation
  3. #
  4. # This program is free software; you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation; either version 2 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  17.  
  18. """Event loop handler.
  19.  
  20. This module handles the democracy event loop which is responsible for network
  21. requests and scheduling.
  22.  
  23. TODO:
  24.     handle user setting clock back
  25. """
  26.  
  27. import threading
  28. import socket
  29. import errno
  30. import select
  31. import heapq
  32. import Queue
  33. import util
  34. import database
  35. import logging
  36.  
  37. from clock import clock
  38.  
  39. import util
  40.  
  41. cumulative = {}
  42.  
  43. class DelayedCall(object):
  44.     def __init__(self, function, name, args, kwargs):
  45.         self.function = function
  46.         self.name = name
  47.         self.args = args
  48.         self.kwargs = kwargs
  49.         self.canceled = False
  50.  
  51.     def _unlink(self):
  52.         """Removes the references that this object has to the outside world,
  53.         this eases the GC's work in finding cycles and fixes some memory leaks
  54.         on windows.
  55.         """
  56.         self.function = self.args = self.kwargs = None
  57.  
  58.     def cancel(self):
  59.         self.canceled = True
  60.         self._unlink()
  61.  
  62.     def dispatch(self):
  63.         if not self.canceled:
  64.             when = "While handling %s" % self.name
  65.             start = clock()
  66.             util.trapCall(when, self.function, *self.args, **self.kwargs)
  67.             end = clock()
  68.             if end-start > 0.5:
  69.                 logging.timing ("%s too slow (%.3f secs)",
  70.                                 self.name, end-start)
  71.             try:
  72.                 total = cumulative[self.name]
  73.             except KeyboardInterrupt:
  74.                 raise
  75.             except:
  76.                 total = 0
  77.             total += end - start
  78.             cumulative[self.name] = total
  79.             if total > 5.0:
  80.                 logging.timing ("%s cumulative is too slow (%.3f secs)",
  81.                                 self.name, total)
  82.                 cumulative[self.name] = 0
  83.         self._unlink()
  84.  
  85. class Scheduler(object):
  86.     def __init__(self):
  87.         self.heap = []
  88.  
  89.     def addTimeout(self, delay, function, name, args=None, kwargs=None):
  90.         if args is None:
  91.             args = ()
  92.         if kwargs is None:
  93.             kwargs = {}
  94.         scheduledTime = clock() + delay
  95.         dc = DelayedCall(function,  "timeout (%s)" % (name,), args, kwargs)
  96.         heapq.heappush(self.heap, (scheduledTime, dc))
  97.         return dc
  98.  
  99.     def nextTimeout(self):
  100.         if len(self.heap) == 0:
  101.             return None
  102.         else:
  103.             return max(0, self.heap[0][0] - clock())
  104.  
  105.     def hasPendingTimeout(self):
  106.         return len(self.heap) > 0 and self.heap[0][0] < clock()
  107.  
  108.     def processNextTimeout(self):
  109.         time, dc = heapq.heappop(self.heap)
  110.         dc.dispatch()
  111.  
  112.     def processTimeouts(self):
  113.         while self.hasPendingTimeout():
  114.             self.processNextTimeout()
  115.  
  116. class CallQueue(object):
  117.     def __init__(self):
  118.         self.queue = Queue.Queue()
  119.  
  120.     def addIdle(self, function, name, args=None, kwargs=None):
  121.         if args is None:
  122.             args = ()
  123.         if kwargs is None:
  124.             kwargs = {}
  125.         dc = DelayedCall (function, "idle (%s)" % (name,), args, kwargs)
  126.         self.queue.put (dc)
  127.         #self.queue.put((dc, clock()))
  128.         return dc
  129.  
  130.     def processNextIdle(self):
  131.         dc = self.queue.get()
  132.         #dc, requested = self.queue.get()
  133.         #start = clock()
  134.         dc.dispatch()
  135.         #if start - requested > 1.0:
  136.         #    print "WARNING: %s took too long to fire (%.3f secs)" % (
  137.         #        dc.name, start - requested)
  138.  
  139.     def hasPendingIdle(self):
  140.         return not self.queue.empty()
  141.  
  142.     def processIdles(self):
  143.         while self.hasPendingIdle():
  144.             self.processNextIdle()
  145.  
  146. class ThreadPool(object):
  147.     """The thread pool is used to handle calls like gethostbyname() that block
  148.     and there's no asynchronous workaround.  What we do instead is call them
  149.     in a separate thread and return the result in a callback that executes in
  150.     the event loop.
  151.     """
  152.     THREADS = 3
  153.  
  154.     def __init__(self, eventLoop):
  155.         self.eventLoop = eventLoop
  156.         self.queue = Queue.Queue()
  157.         self.threads = []
  158.  
  159.     def initThreads(self):
  160.         while len(self.threads) < ThreadPool.THREADS:
  161.             t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
  162.                     target=self.threadLoop)
  163.             t.setDaemon(True)
  164.             t.start()
  165.             self.threads.append(t)
  166.  
  167.     def threadLoop(self):
  168.         while True:
  169.             nextItem = self.queue.get()
  170.             if nextItem == "QUIT":
  171.                 break
  172.             else:
  173.                 callback, errback, func, name, args, kwargs, = nextItem
  174.             try:
  175.                 result = func(*args, **kwargs)
  176.             except KeyboardInterrupt:
  177.                 raise
  178.             except Exception, e:
  179.                 func = errback
  180.                 name = 'Thread Pool Errback (%s)' % name
  181.                 args = (e,)
  182.             else:
  183.                 func = callback
  184.                 name = 'Thread Pool Callback (%s)' % name
  185.                 args = (result,)
  186.             if not self.eventLoop.quitFlag:
  187.                 self.eventLoop.idleQueue.addIdle(func, name, args=args)
  188.                 self.eventLoop.wakeup()
  189.  
  190.     def queueCall(self, callback, errback, function, name, *args, **kwargs):
  191.         self.queue.put((callback, errback, function, name, args, kwargs))
  192.  
  193.     def closeThreads(self):
  194.         for x in xrange(len(self.threads)):
  195.             self.queue.put("QUIT")
  196.         while len(self.threads) > 0:
  197.             x = self.threads.pop()
  198.             try:
  199.                 x.join()
  200.             except:
  201.                 pass
  202.             
  203. class EventLoop(object):
  204.     def __init__(self):
  205.         self.scheduler = Scheduler()
  206.         self.idleQueue = CallQueue()
  207.         self.urgentQueue = CallQueue()
  208.         self.threadPool = ThreadPool(self)
  209.         self.readCallbacks = {}
  210.         self.writeCallbacks = {}
  211.         self.wakeSender, self.wakeReceiver = util.makeDummySocketPair()
  212.         self.addReadCallback(self.wakeReceiver, self._slurpWakerData)
  213.         self.quitFlag = False
  214.         self.delegate = None
  215.         self.clearRemovedCallbacks()
  216.  
  217.     def clearRemovedCallbacks(self):
  218.         self.removedReadCallbacks = set()
  219.         self.removedWriteCallbacks = set()
  220.  
  221.     def _slurpWakerData(self):
  222.         self.wakeReceiver.recv(1024)
  223.  
  224.     def setDelegate(self, delegate):
  225.         self.delegate = delegate
  226.  
  227.     def addReadCallback(self, socket, callback):
  228.         self.readCallbacks[socket.fileno()] = callback
  229.  
  230.     def removeReadCallback(self, socket):
  231.         del self.readCallbacks[socket.fileno()]
  232.         self.removedReadCallbacks.add(socket.fileno())
  233.  
  234.     def addWriteCallback(self, socket, callback):
  235.         self.writeCallbacks[socket.fileno()] = callback
  236.  
  237.     def removeWriteCallback(self, socket):
  238.         del self.writeCallbacks[socket.fileno()]
  239.         self.removedWriteCallbacks.add(socket.fileno())
  240.  
  241.     def wakeup(self):
  242.         self.wakeSender.send("b")
  243.  
  244.     def callInThread(self, callback, errback, function, name, *args, **kwargs):
  245.         self.threadPool.queueCall(callback, errback, function, name, *args, **kwargs)
  246.  
  247.     def _beginLoop(self):
  248.         if self.delegate is not None and hasattr(self.delegate, "beginLoop"):
  249.             self.delegate.beginLoop(self)
  250.  
  251.     def _endLoop(self):
  252.         if self.delegate is not None and hasattr(self.delegate, "endLoop"):
  253.             self.delegate.endLoop(self)
  254.  
  255.     def loop(self):
  256.         global loop_ready
  257.         database.set_thread()
  258.         loop_ready.set()
  259.         while not self.quitFlag:
  260.             self._beginLoop()
  261.             self.clearRemovedCallbacks()
  262.             timeout = self.scheduler.nextTimeout()
  263.             readfds = self.readCallbacks.keys()
  264.             writefds = self.writeCallbacks.keys()
  265.             try:
  266.                 readables, writeables, _ = select.select(readfds, writefds, [],
  267.                                                          timeout)
  268.             except select.error, (err, detail):
  269.                 if err == errno.EINTR:
  270.                     logging.warning ("eventloop: %s", detail)
  271.                 else:
  272.                     raise
  273.             if self.quitFlag:
  274.                 break
  275.             self.urgentQueue.processIdles()
  276.             for event in self.generateEvents(readables, writeables):
  277.                 event()
  278.                 if self.quitFlag:
  279.                     break
  280.                 self.urgentQueue.processIdles()
  281.                 if self.quitFlag:
  282.                     break
  283.             self._endLoop()
  284.  
  285.     def generateEvents(self, readables, writeables):
  286.         """Generator that creates the list of events that should be dealt with
  287.         on this iteration of the event loop.  This includes all socket
  288.         read/write callbacks, timeouts and idle calls.  "events" are
  289.         implemented as functions that should be called with no arguments.
  290.         """
  291.  
  292.         for callback in self.generateCallbacks(writeables,
  293.                 self.writeCallbacks, self.removedWriteCallbacks):
  294.             yield callback
  295.         for callback in self.generateCallbacks(readables,
  296.                 self.readCallbacks, self.removedReadCallbacks):
  297.             yield callback
  298.         while self.scheduler.hasPendingTimeout():
  299.             yield self.scheduler.processNextTimeout
  300.         while self.idleQueue.hasPendingIdle():
  301.             yield self.idleQueue.processNextIdle
  302.  
  303.     def generateCallbacks(self, readyList, map, removed):
  304.         for fd in readyList:
  305.             try:
  306.                 function = map[fd]
  307.             except KeyError:
  308.                 # this can happen the write callback removes the read callback
  309.                 # or vise versa
  310.                 pass
  311.             else:
  312.                 if fd in removed:
  313.                     continue
  314.                 when = "While talking to the network"
  315.                 def callbackEvent():
  316.                     if not util.trapCall(when, function):
  317.                         del map[fd] 
  318.                 yield callbackEvent
  319.  
  320. _eventLoop = EventLoop()
  321.  
  322. def setDelegate(delegate):
  323.     _eventLoop.setDelegate(delegate)
  324.  
  325. def addReadCallback(socket, callback):
  326.     """Add a read callback.  When socket is ready for reading, callback will
  327.     be called.  If there is already a read callback installed, it will be
  328.     replaced.
  329.     """
  330.     _eventLoop.addReadCallback(socket, callback)
  331.  
  332. def removeReadCallback(socket):
  333.     """Remove a read callback.  If there is not a read callback installed for
  334.     socket, a KeyError will be thrown.
  335.     """
  336.     _eventLoop.removeReadCallback(socket)
  337.  
  338. def addWriteCallback(socket, callback):
  339.     """Add a write callback.  When socket is ready for writing, callback will
  340.     be called.  If there is already a write callback installed, it will be
  341.     replaced.
  342.     """
  343.     _eventLoop.addWriteCallback(socket, callback)
  344.  
  345. def removeWriteCallback(socket):
  346.     """Remove a write callback.  If there is not a write callback installed for
  347.     socket, a KeyError will be thrown.
  348.     """
  349.     _eventLoop.removeWriteCallback(socket)
  350.  
  351. def stopHandlingSocket(socket):
  352.     """Convience function to that removes both the read and write callback for
  353.     a socket if they exist."""
  354.     try:
  355.         removeReadCallback(socket)
  356.     except KeyError:
  357.         pass
  358.     try:
  359.         removeWriteCallback(socket)
  360.     except KeyError:
  361.         pass
  362.  
  363. def addTimeout(delay, function, name, args=None, kwargs=None):
  364.     """Schedule a function to be called at some point in the future.
  365.     Returns a DelayedCall object that can be used to cancel the call.
  366.     """
  367.  
  368.     dc = _eventLoop.scheduler.addTimeout(delay, function, name, args, kwargs)
  369.     return dc
  370.  
  371. def addIdle(function, name, args=None, kwargs=None):
  372.     """Schedule a function to be called when we get some spare time.
  373.     Returns a DelayedCall object that can be used to cancel the call.
  374.     """
  375.  
  376.     dc = _eventLoop.idleQueue.addIdle(function, name, args, kwargs)
  377.     _eventLoop.wakeup()
  378.     return dc
  379.  
  380. def addUrgentCall(function, name, args=None, kwargs=None):
  381.     """Schedule a function to be called as soon as possible.  This method
  382.     should be used for things like GUI actions, where the user is waiting on
  383.     us.
  384.     """
  385.  
  386.     dc = _eventLoop.urgentQueue.addIdle(function, name, args, kwargs)
  387.     _eventLoop.wakeup()
  388.     return dc
  389.  
  390. def callInThread(callback, errback, function, name, *args, **kwargs):
  391.     """Schedule a function to be called in a separate thread. Do not
  392.     put code that accesses the database or the UI here!
  393.     """
  394.     _eventLoop.callInThread(callback, errback, function, name, *args, **kwargs)
  395.  
  396. lt = None
  397.  
  398. profile_file = None
  399.  
  400. loop_ready = threading.Event()
  401.  
  402. def startup():
  403.     threadPoolInit()
  404.  
  405.     def profile_startup():
  406.         import profile
  407.         def start_loop():
  408.             _eventLoop.loop()
  409.         profile.runctx ('_eventLoop.loop()', globals(), locals(), profile_file + ".event_loop")
  410.  
  411.     global lt
  412.     global loop_ready
  413.     if profile_file:
  414.         lt = threading.Thread(target=profile_startup, name="Event Loop")
  415.     else:
  416.         lt = threading.Thread(target=_eventLoop.loop, name="Event Loop")
  417.     lt.setDaemon(False)
  418.     lt.start()
  419.     loop_ready.wait()
  420.         
  421.  
  422. def join():
  423.     global lt
  424.     if lt is not None:
  425.         lt.join()
  426.  
  427. def quit():
  428.     threadPoolQuit()
  429.     _eventLoop.quitFlag = True
  430.     _eventLoop.wakeup()
  431.  
  432. def resetEventLoop():
  433.     _eventLoop = EventLoop()
  434.  
  435. def threadPoolQuit():
  436.     _eventLoop.threadPool.closeThreads()
  437.  
  438. def threadPoolInit():
  439.     _eventLoop.threadPool.initThreads()
  440.  
  441. def asIdle(func):
  442.     """Decorator to make a methods run as an idle function
  443.  
  444.     Suppose you have 2 methods, foo and bar
  445.  
  446.     @asIdle
  447.     def foo():
  448.         # database operations
  449.  
  450.     def bar():
  451.         # same database operations as foo
  452.  
  453.     Then calling foo() is exactly the same as calling addIdle(bar).
  454.     """
  455.  
  456.     def queuer(*args, **kwargs):
  457.         return addIdle(func, "%s() (using asIdle)" % func.__name__, args=args, kwargs=kwargs)
  458.     return queuer
  459.  
  460. def asUrgent(func):
  461.     """Like asIdle, but uses addUrgentCall() instead of addIdle()."""
  462.  
  463.     def queuer(*args, **kwargs):
  464.         return addUrgentCall(func, "%s() (using asUrgent)" % func.__name__, args=args, kwargs=kwargs)
  465.     return queuer
  466.  
  467. def checkHeapSize():
  468.     logging.info ("Heap size: %d.", len(_eventLoop.scheduler.heap))
  469.     addTimeout(5, checkHeapSize, "Check Heap Size")
  470. #addTimeout(5, checkHeapSize, "Check Heap Size")
  471.