home *** CD-ROM | disk | FTP | other *** search
/ Komputer for Alle 2004 #2 / K-CD-2-2004.ISO / OpenOffice Sv / f_0397 / python-core-2.2.2 / lib / test / test_weakref.py < prev    next >
Encoding:
Python Source  |  2003-07-18  |  16.6 KB  |  516 lines

  1. import sys
  2. import unittest
  3. import UserList
  4. import weakref
  5.  
  6. import test_support
  7.  
  8.  
  9. class C:
  10.     def method(self):
  11.         pass
  12.  
  13.  
  14. class Callable:
  15.     bar = None
  16.  
  17.     def __call__(self, x):
  18.         self.bar = x
  19.  
  20.  
  21. def create_function():
  22.     def f(): pass
  23.     return f
  24.  
  25. def create_bound_method():
  26.     return C().method
  27.  
  28. def create_unbound_method():
  29.     return C.method
  30.  
  31.  
  32. class TestBase(unittest.TestCase):
  33.  
  34.     def setUp(self):
  35.         self.cbcalled = 0
  36.  
  37.     def callback(self, ref):
  38.         self.cbcalled += 1
  39.  
  40.  
  41. class ReferencesTestCase(TestBase):
  42.  
  43.     def test_basic_ref(self):
  44.         self.check_basic_ref(C)
  45.         self.check_basic_ref(create_function)
  46.         self.check_basic_ref(create_bound_method)
  47.         self.check_basic_ref(create_unbound_method)
  48.  
  49.     def test_basic_callback(self):
  50.         self.check_basic_callback(C)
  51.         self.check_basic_callback(create_function)
  52.         self.check_basic_callback(create_bound_method)
  53.         self.check_basic_callback(create_unbound_method)
  54.  
  55.     def test_multiple_callbacks(self):
  56.         o = C()
  57.         ref1 = weakref.ref(o, self.callback)
  58.         ref2 = weakref.ref(o, self.callback)
  59.         del o
  60.         self.assert_(ref1() is None,
  61.                      "expected reference to be invalidated")
  62.         self.assert_(ref2() is None,
  63.                      "expected reference to be invalidated")
  64.         self.assert_(self.cbcalled == 2,
  65.                      "callback not called the right number of times")
  66.  
  67.     def test_multiple_selfref_callbacks(self):
  68.         """Make sure all references are invalidated before callbacks
  69.         are called."""
  70.         #
  71.         # What's important here is that we're using the first
  72.         # reference in the callback invoked on the second reference
  73.         # (the most recently created ref is cleaned up first).  This
  74.         # tests that all references to the object are invalidated
  75.         # before any of the callbacks are invoked, so that we only
  76.         # have one invocation of _weakref.c:cleanup_helper() active
  77.         # for a particular object at a time.
  78.         #
  79.         def callback(object, self=self):
  80.             self.ref()
  81.         c = C()
  82.         self.ref = weakref.ref(c, callback)
  83.         ref1 = weakref.ref(c, callback)
  84.         del c
  85.  
  86.     def test_proxy_ref(self):
  87.         o = C()
  88.         o.bar = 1
  89.         ref1 = weakref.proxy(o, self.callback)
  90.         ref2 = weakref.proxy(o, self.callback)
  91.         del o
  92.  
  93.         def check(proxy):
  94.             proxy.bar
  95.  
  96.         self.assertRaises(weakref.ReferenceError, check, ref1)
  97.         self.assertRaises(weakref.ReferenceError, check, ref2)
  98.         self.assert_(self.cbcalled == 2)
  99.  
  100.     def check_basic_ref(self, factory):
  101.         o = factory()
  102.         ref = weakref.ref(o)
  103.         self.assert_(ref() is not None,
  104.                      "weak reference to live object should be live")
  105.         o2 = ref()
  106.         self.assert_(o is o2,
  107.                      "<ref>() should return original object if live")
  108.  
  109.     def check_basic_callback(self, factory):
  110.         self.cbcalled = 0
  111.         o = factory()
  112.         ref = weakref.ref(o, self.callback)
  113.         del o
  114.         self.assert_(self.cbcalled == 1,
  115.                      "callback did not properly set 'cbcalled'")
  116.         self.assert_(ref() is None,
  117.                      "ref2 should be dead after deleting object reference")
  118.  
  119.     def test_ref_reuse(self):
  120.         o = C()
  121.         ref1 = weakref.ref(o)
  122.         # create a proxy to make sure that there's an intervening creation
  123.         # between these two; it should make no difference
  124.         proxy = weakref.proxy(o)
  125.         ref2 = weakref.ref(o)
  126.         self.assert_(ref1 is ref2,
  127.                      "reference object w/out callback should be re-used")
  128.  
  129.         o = C()
  130.         proxy = weakref.proxy(o)
  131.         ref1 = weakref.ref(o)
  132.         ref2 = weakref.ref(o)
  133.         self.assert_(ref1 is ref2,
  134.                      "reference object w/out callback should be re-used")
  135.         self.assert_(weakref.getweakrefcount(o) == 2,
  136.                      "wrong weak ref count for object")
  137.         del proxy
  138.         self.assert_(weakref.getweakrefcount(o) == 1,
  139.                      "wrong weak ref count for object after deleting proxy")
  140.  
  141.     def test_proxy_reuse(self):
  142.         o = C()
  143.         proxy1 = weakref.proxy(o)
  144.         ref = weakref.ref(o)
  145.         proxy2 = weakref.proxy(o)
  146.         self.assert_(proxy1 is proxy2,
  147.                      "proxy object w/out callback should have been re-used")
  148.  
  149.     def test_basic_proxy(self):
  150.         o = C()
  151.         self.check_proxy(o, weakref.proxy(o))
  152.  
  153.         L = UserList.UserList()
  154.         p = weakref.proxy(L)
  155.         self.failIf(p, "proxy for empty UserList should be false")
  156.         p.append(12)
  157.         self.assertEqual(len(L), 1)
  158.         self.failUnless(p, "proxy for non-empty UserList should be true")
  159.         p[:] = [2, 3]
  160.         self.assertEqual(len(L), 2)
  161.         self.assertEqual(len(p), 2)
  162.         self.failUnless(3 in p, "proxy didn't support __contains__() properly")
  163.         p[1] = 5
  164.         self.assertEqual(L[1], 5)
  165.         self.assertEqual(p[1], 5)
  166.         L2 = UserList.UserList(L)
  167.         p2 = weakref.proxy(L2)
  168.         self.assertEqual(p, p2)
  169.  
  170.     def test_callable_proxy(self):
  171.         o = Callable()
  172.         ref1 = weakref.proxy(o)
  173.  
  174.         self.check_proxy(o, ref1)
  175.  
  176.         self.assert_(type(ref1) is weakref.CallableProxyType,
  177.                      "proxy is not of callable type")
  178.         ref1('twinkies!')
  179.         self.assert_(o.bar == 'twinkies!',
  180.                      "call through proxy not passed through to original")
  181.         ref1(x='Splat.')
  182.         self.assert_(o.bar == 'Splat.',
  183.                      "call through proxy not passed through to original")
  184.  
  185.         # expect due to too few args
  186.         self.assertRaises(TypeError, ref1)
  187.  
  188.         # expect due to too many args
  189.         self.assertRaises(TypeError, ref1, 1, 2, 3)
  190.  
  191.     def check_proxy(self, o, proxy):
  192.         o.foo = 1
  193.         self.assert_(proxy.foo == 1,
  194.                      "proxy does not reflect attribute addition")
  195.         o.foo = 2
  196.         self.assert_(proxy.foo == 2,
  197.                      "proxy does not reflect attribute modification")
  198.         del o.foo
  199.         self.assert_(not hasattr(proxy, 'foo'),
  200.                      "proxy does not reflect attribute removal")
  201.  
  202.         proxy.foo = 1
  203.         self.assert_(o.foo == 1,
  204.                      "object does not reflect attribute addition via proxy")
  205.         proxy.foo = 2
  206.         self.assert_(
  207.             o.foo == 2,
  208.             "object does not reflect attribute modification via proxy")
  209.         del proxy.foo
  210.         self.assert_(not hasattr(o, 'foo'),
  211.                      "object does not reflect attribute removal via proxy")
  212.  
  213.     def test_getweakrefcount(self):
  214.         o = C()
  215.         ref1 = weakref.ref(o)
  216.         ref2 = weakref.ref(o, self.callback)
  217.         self.assert_(weakref.getweakrefcount(o) == 2,
  218.                      "got wrong number of weak reference objects")
  219.  
  220.         proxy1 = weakref.proxy(o)
  221.         proxy2 = weakref.proxy(o, self.callback)
  222.         self.assert_(weakref.getweakrefcount(o) == 4,
  223.                      "got wrong number of weak reference objects")
  224.  
  225.     def test_getweakrefs(self):
  226.         o = C()
  227.         ref1 = weakref.ref(o, self.callback)
  228.         ref2 = weakref.ref(o, self.callback)
  229.         del ref1
  230.         self.assert_(weakref.getweakrefs(o) == [ref2],
  231.                      "list of refs does not match")
  232.  
  233.         o = C()
  234.         ref1 = weakref.ref(o, self.callback)
  235.         ref2 = weakref.ref(o, self.callback)
  236.         del ref2
  237.         self.assert_(weakref.getweakrefs(o) == [ref1],
  238.                      "list of refs does not match")
  239.  
  240.     def test_newstyle_number_ops(self):
  241.         class F(float):
  242.             pass
  243.         f = F(2.0)
  244.         p = weakref.proxy(f)
  245.         self.assert_(p + 1.0 == 3.0)
  246.         self.assert_(1.0 + p == 3.0)  # this used to SEGV
  247.  
  248.     def test_callbacks_protected(self):
  249.         """Callbacks protected from already-set exceptions?"""
  250.         # Regression test for SF bug #478534.
  251.         class BogusError(Exception):
  252.             pass
  253.         data = {}
  254.         def remove(k):
  255.             del data[k]
  256.         def encapsulate():
  257.             f = lambda : ()
  258.             data[weakref.ref(f, remove)] = None
  259.             raise BogusError
  260.         try:
  261.             encapsulate()
  262.         except BogusError:
  263.             pass
  264.         else:
  265.             self.fail("exception not properly restored")
  266.         try:
  267.             encapsulate()
  268.         except BogusError:
  269.             pass
  270.         else:
  271.             self.fail("exception not properly restored")
  272.  
  273.  
  274. class Object:
  275.     def __init__(self, arg):
  276.         self.arg = arg
  277.     def __repr__(self):
  278.         return "<Object %r>" % self.arg
  279.  
  280.  
  281. class MappingTestCase(TestBase):
  282.  
  283.     COUNT = 10
  284.  
  285.     def test_weak_values(self):
  286.         #
  287.         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
  288.         #
  289.         dict, objects = self.make_weak_valued_dict()
  290.         for o in objects:
  291.             self.assert_(weakref.getweakrefcount(o) == 1,
  292.                          "wrong number of weak references to %r!" % o)
  293.             self.assert_(o is dict[o.arg],
  294.                          "wrong object returned by weak dict!")
  295.         items1 = dict.items()
  296.         items2 = dict.copy().items()
  297.         items1.sort()
  298.         items2.sort()
  299.         self.assert_(items1 == items2,
  300.                      "cloning of weak-valued dictionary did not work!")
  301.         del items1, items2
  302.         self.assert_(len(dict) == self.COUNT)
  303.         del objects[0]
  304.         self.assert_(len(dict) == (self.COUNT - 1),
  305.                      "deleting object did not cause dictionary update")
  306.         del objects, o
  307.         self.assert_(len(dict) == 0,
  308.                      "deleting the values did not clear the dictionary")
  309.         # regression on SF bug #447152:
  310.         dict = weakref.WeakValueDictionary()
  311.         self.assertRaises(KeyError, dict.__getitem__, 1)
  312.         dict[2] = C()
  313.         self.assertRaises(KeyError, dict.__getitem__, 2)
  314.  
  315.     def test_weak_keys(self):
  316.         #
  317.         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
  318.         #  len(d), d.has_key().
  319.         #
  320.         dict, objects = self.make_weak_keyed_dict()
  321.         for o in objects:
  322.             self.assert_(weakref.getweakrefcount(o) == 1,
  323.                          "wrong number of weak references to %r!" % o)
  324.             self.assert_(o.arg is dict[o],
  325.                          "wrong object returned by weak dict!")
  326.         items1 = dict.items()
  327.         items2 = dict.copy().items()
  328.         items1.sort()
  329.         items2.sort()
  330.         self.assert_(items1 == items2,
  331.                      "cloning of weak-keyed dictionary did not work!")
  332.         del items1, items2
  333.         self.assert_(len(dict) == self.COUNT)
  334.         del objects[0]
  335.         self.assert_(len(dict) == (self.COUNT - 1),
  336.                      "deleting object did not cause dictionary update")
  337.         del objects, o
  338.         self.assert_(len(dict) == 0,
  339.                      "deleting the keys did not clear the dictionary")
  340.         o = Object(42)
  341.         dict[o] = "What is the meaning of the universe?"
  342.         self.assert_(dict.has_key(o))
  343.         self.assert_(not dict.has_key(34))
  344.  
  345.     def test_weak_keyed_iters(self):
  346.         dict, objects = self.make_weak_keyed_dict()
  347.         self.check_iters(dict)
  348.  
  349.     def test_weak_valued_iters(self):
  350.         dict, objects = self.make_weak_valued_dict()
  351.         self.check_iters(dict)
  352.  
  353.     def check_iters(self, dict):
  354.         # item iterator:
  355.         items = dict.items()
  356.         for item in dict.iteritems():
  357.             items.remove(item)
  358.         self.assert_(len(items) == 0, "iteritems() did not touch all items")
  359.  
  360.         # key iterator, via __iter__():
  361.         keys = dict.keys()
  362.         for k in dict:
  363.             keys.remove(k)
  364.         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
  365.  
  366.         # key iterator, via iterkeys():
  367.         keys = dict.keys()
  368.         for k in dict.iterkeys():
  369.             keys.remove(k)
  370.         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
  371.  
  372.         # value iterator:
  373.         values = dict.values()
  374.         for v in dict.itervalues():
  375.             values.remove(v)
  376.         self.assert_(len(values) == 0, "itervalues() did not touch all values")
  377.  
  378.     def test_make_weak_keyed_dict_from_dict(self):
  379.         o = Object(3)
  380.         dict = weakref.WeakKeyDictionary({o:364})
  381.         self.assert_(dict[o] == 364)
  382.  
  383.     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
  384.         o = Object(3)
  385.         dict = weakref.WeakKeyDictionary({o:364})
  386.         dict2 = weakref.WeakKeyDictionary(dict)
  387.         self.assert_(dict[o] == 364)
  388.  
  389.     def make_weak_keyed_dict(self):
  390.         dict = weakref.WeakKeyDictionary()
  391.         objects = map(Object, range(self.COUNT))
  392.         for o in objects:
  393.             dict[o] = o.arg
  394.         return dict, objects
  395.  
  396.     def make_weak_valued_dict(self):
  397.         dict = weakref.WeakValueDictionary()
  398.         objects = map(Object, range(self.COUNT))
  399.         for o in objects:
  400.             dict[o.arg] = o
  401.         return dict, objects
  402.  
  403.     def check_popitem(self, klass, key1, value1, key2, value2):
  404.         weakdict = klass()
  405.         weakdict[key1] = value1
  406.         weakdict[key2] = value2
  407.         self.assert_(len(weakdict) == 2)
  408.         k, v = weakdict.popitem()
  409.         self.assert_(len(weakdict) == 1)
  410.         if k is key1:
  411.             self.assert_(v is value1)
  412.         else:
  413.             self.assert_(v is value2)
  414.         k, v = weakdict.popitem()
  415.         self.assert_(len(weakdict) == 0)
  416.         if k is key1:
  417.             self.assert_(v is value1)
  418.         else:
  419.             self.assert_(v is value2)
  420.  
  421.     def test_weak_valued_dict_popitem(self):
  422.         self.check_popitem(weakref.WeakValueDictionary,
  423.                            "key1", C(), "key2", C())
  424.  
  425.     def test_weak_keyed_dict_popitem(self):
  426.         self.check_popitem(weakref.WeakKeyDictionary,
  427.                            C(), "value 1", C(), "value 2")
  428.  
  429.     def check_setdefault(self, klass, key, value1, value2):
  430.         self.assert_(value1 is not value2,
  431.                      "invalid test"
  432.                      " -- value parameters must be distinct objects")
  433.         weakdict = klass()
  434.         o = weakdict.setdefault(key, value1)
  435.         self.assert_(o is value1)
  436.         self.assert_(weakdict.has_key(key))
  437.         self.assert_(weakdict.get(key) is value1)
  438.         self.assert_(weakdict[key] is value1)
  439.  
  440.         o = weakdict.setdefault(key, value2)
  441.         self.assert_(o is value1)
  442.         self.assert_(weakdict.has_key(key))
  443.         self.assert_(weakdict.get(key) is value1)
  444.         self.assert_(weakdict[key] is value1)
  445.  
  446.     def test_weak_valued_dict_setdefault(self):
  447.         self.check_setdefault(weakref.WeakValueDictionary,
  448.                               "key", C(), C())
  449.  
  450.     def test_weak_keyed_dict_setdefault(self):
  451.         self.check_setdefault(weakref.WeakKeyDictionary,
  452.                               C(), "value 1", "value 2")
  453.  
  454.     def check_update(self, klass, dict):
  455.         #
  456.         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
  457.         #  d.get(), d[].
  458.         #
  459.         weakdict = klass()
  460.         weakdict.update(dict)
  461.         self.assert_(len(weakdict) == len(dict))
  462.         for k in weakdict.keys():
  463.             self.assert_(dict.has_key(k),
  464.                          "mysterious new key appeared in weak dict")
  465.             v = dict.get(k)
  466.             self.assert_(v is weakdict[k])
  467.             self.assert_(v is weakdict.get(k))
  468.         for k in dict.keys():
  469.             self.assert_(weakdict.has_key(k),
  470.                          "original key disappeared in weak dict")
  471.             v = dict[k]
  472.             self.assert_(v is weakdict[k])
  473.             self.assert_(v is weakdict.get(k))
  474.  
  475.     def test_weak_valued_dict_update(self):
  476.         self.check_update(weakref.WeakValueDictionary,
  477.                           {1: C(), 'a': C(), C(): C()})
  478.  
  479.     def test_weak_keyed_dict_update(self):
  480.         self.check_update(weakref.WeakKeyDictionary,
  481.                           {C(): 1, C(): 2, C(): 3})
  482.  
  483.     def test_weak_keyed_delitem(self):
  484.         d = weakref.WeakKeyDictionary()
  485.         o1 = Object('1')
  486.         o2 = Object('2')
  487.         d[o1] = 'something'
  488.         d[o2] = 'something'
  489.         self.assert_(len(d) == 2)
  490.         del d[o1]
  491.         self.assert_(len(d) == 1)
  492.         self.assert_(d.keys() == [o2])
  493.  
  494.     def test_weak_valued_delitem(self):
  495.         d = weakref.WeakValueDictionary()
  496.         o1 = Object('1')
  497.         o2 = Object('2')
  498.         d['something'] = o1
  499.         d['something else'] = o2
  500.         self.assert_(len(d) == 2)
  501.         del d['something']
  502.         self.assert_(len(d) == 1)
  503.         self.assert_(d.items() == [('something else', o2)])
  504.  
  505.  
  506. def test_main():
  507.     loader = unittest.TestLoader()
  508.     suite = unittest.TestSuite()
  509.     suite.addTest(loader.loadTestsFromTestCase(ReferencesTestCase))
  510.     suite.addTest(loader.loadTestsFromTestCase(MappingTestCase))
  511.     test_support.run_suite(suite)
  512.  
  513.  
  514. if __name__ == "__main__":
  515.     test_main()
  516.