home *** CD-ROM | disk | FTP | other *** search
- from __future__ import generators
-
- from test_support import TestFailed
-
- import pprint
- import sys
- import unittest
-
- import test_support
-
-
- class HookWatcher:
- def __init__(self):
- self.frames = []
- self.events = []
-
- def callback(self, frame, event, arg):
- self.add_event(event, frame)
-
- def add_event(self, event, frame=None):
- """Add an event to the log."""
- if frame is None:
- frame = sys._getframe(1)
-
- try:
- frameno = self.frames.index(frame)
- except ValueError:
- frameno = len(self.frames)
- self.frames.append(frame)
-
- self.events.append((frameno, event, ident(frame)))
-
- def get_events(self):
- """Remove calls to add_event()."""
- disallowed = [ident(self.add_event.im_func), ident(ident)]
- self.frames = None
-
- return [item for item in self.events if item[2] not in disallowed]
-
-
- class ProfileSimulator(HookWatcher):
- def __init__(self, testcase):
- self.testcase = testcase
- self.stack = []
- HookWatcher.__init__(self)
-
- def callback(self, frame, event, arg):
- # Callback registered with sys.setprofile()/sys.settrace()
- self.dispatch[event](self, frame)
-
- def trace_call(self, frame):
- self.add_event('call', frame)
- self.stack.append(frame)
-
- def trace_return(self, frame):
- self.add_event('return', frame)
- self.stack.pop()
-
- def trace_exception(self, frame):
- self.testcase.fail(
- "the profiler should never receive exception events")
-
- dispatch = {
- 'call': trace_call,
- 'exception': trace_exception,
- 'return': trace_return,
- }
-
-
- class TestCaseBase(unittest.TestCase):
- def check_events(self, callable, expected):
- events = capture_events(callable, self.new_watcher())
- if events != expected:
- self.fail("Expected events:\n%s\nReceived events:\n%s"
- % (pprint.pformat(expected), pprint.pformat(events)))
-
-
- class ProfileHookTestCase(TestCaseBase):
- def new_watcher(self):
- return HookWatcher()
-
- def test_simple(self):
- def f(p):
- pass
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_exception(self):
- def f(p):
- 1/0
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_caught_exception(self):
- def f(p):
- try: 1/0
- except: pass
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_caught_nested_exception(self):
- def f(p):
- try: 1/0
- except: pass
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_nested_exception(self):
- def f(p):
- 1/0
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- # This isn't what I expected:
- # (0, 'exception', protect_ident),
- # I expected this again:
- (1, 'return', f_ident),
- ])
-
- def test_exception_in_except_clause(self):
- def f(p):
- 1/0
- def g(p):
- try:
- f(p)
- except:
- try: f(p)
- except: pass
- f_ident = ident(f)
- g_ident = ident(g)
- self.check_events(g, [(1, 'call', g_ident),
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (3, 'call', f_ident),
- (3, 'return', f_ident),
- (1, 'return', g_ident),
- ])
-
- def test_exception_propogation(self):
- def f(p):
- 1/0
- def g(p):
- try: f(p)
- finally: p.add_event("falling through")
- f_ident = ident(f)
- g_ident = ident(g)
- self.check_events(g, [(1, 'call', g_ident),
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (1, 'falling through', g_ident),
- (1, 'return', g_ident),
- ])
-
- def test_raise_twice(self):
- def f(p):
- try: 1/0
- except: 1/0
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_raise_reraise(self):
- def f(p):
- try: 1/0
- except: raise
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_raise(self):
- def f(p):
- raise Exception()
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_distant_exception(self):
- def f():
- 1/0
- def g():
- f()
- def h():
- g()
- def i():
- h()
- def j(p):
- i()
- f_ident = ident(f)
- g_ident = ident(g)
- h_ident = ident(h)
- i_ident = ident(i)
- j_ident = ident(j)
- self.check_events(j, [(1, 'call', j_ident),
- (2, 'call', i_ident),
- (3, 'call', h_ident),
- (4, 'call', g_ident),
- (5, 'call', f_ident),
- (5, 'return', f_ident),
- (4, 'return', g_ident),
- (3, 'return', h_ident),
- (2, 'return', i_ident),
- (1, 'return', j_ident),
- ])
-
- def test_generator(self):
- def f():
- for i in range(2):
- yield i
- def g(p):
- for i in f():
- pass
- f_ident = ident(f)
- g_ident = ident(g)
- self.check_events(g, [(1, 'call', g_ident),
- # call the iterator twice to generate values
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- # once more; returns end-of-iteration with
- # actually raising an exception
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (1, 'return', g_ident),
- ])
-
- def test_stop_iteration(self):
- def f():
- for i in range(2):
- yield i
- raise StopIteration
- def g(p):
- for i in f():
- pass
- f_ident = ident(f)
- g_ident = ident(g)
- self.check_events(g, [(1, 'call', g_ident),
- # call the iterator twice to generate values
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- # once more to hit the raise:
- (2, 'call', f_ident),
- (2, 'return', f_ident),
- (1, 'return', g_ident),
- ])
-
-
- class ProfileSimulatorTestCase(TestCaseBase):
- def new_watcher(self):
- return ProfileSimulator(self)
-
- def test_simple(self):
- def f(p):
- pass
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_basic_exception(self):
- def f(p):
- 1/0
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_caught_exception(self):
- def f(p):
- try: 1/0
- except: pass
- f_ident = ident(f)
- self.check_events(f, [(1, 'call', f_ident),
- (1, 'return', f_ident),
- ])
-
- def test_distant_exception(self):
- def f():
- 1/0
- def g():
- f()
- def h():
- g()
- def i():
- h()
- def j(p):
- i()
- f_ident = ident(f)
- g_ident = ident(g)
- h_ident = ident(h)
- i_ident = ident(i)
- j_ident = ident(j)
- self.check_events(j, [(1, 'call', j_ident),
- (2, 'call', i_ident),
- (3, 'call', h_ident),
- (4, 'call', g_ident),
- (5, 'call', f_ident),
- (5, 'return', f_ident),
- (4, 'return', g_ident),
- (3, 'return', h_ident),
- (2, 'return', i_ident),
- (1, 'return', j_ident),
- ])
-
-
- def ident(function):
- if hasattr(function, "f_code"):
- code = function.f_code
- else:
- code = function.func_code
- return code.co_firstlineno, code.co_name
-
-
- def protect(f, p):
- try: f(p)
- except: pass
-
- protect_ident = ident(protect)
-
-
- def capture_events(callable, p=None):
- try: sys.setprofile()
- except TypeError: pass
- else: raise TestFailed, 'sys.setprofile() did not raise TypeError'
-
- if p is None:
- p = HookWatcher()
- sys.setprofile(p.callback)
- protect(callable, p)
- sys.setprofile(None)
- return p.get_events()[1:-1]
-
-
- def show_events(callable):
- import pprint
- pprint.pprint(capture_events(callable))
-
-
- def test_main():
- loader = unittest.TestLoader()
- suite = unittest.TestSuite()
- suite.addTest(loader.loadTestsFromTestCase(ProfileHookTestCase))
- suite.addTest(loader.loadTestsFromTestCase(ProfileSimulatorTestCase))
- test_support.run_suite(suite)
-
-
- if __name__ == "__main__":
- test_main()
-