home *** CD-ROM | disk | FTP | other *** search
/ Chip 2011 November / CHIP_2011_11.iso / Programy / Narzedzia / Calibre / calibre-0.8.18.msi / file_262 / pywin32_testutil.pyo (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2011-09-09  |  8.5 KB  |  236 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.7)
  3.  
  4. import sys
  5. import unittest
  6. import gc
  7. import winerror
  8.  
  9. def int2long(val):
  10.     return val + 0x100000000L - 0x100000000L
  11.  
  12.  
  13. def str2bytes(sval):
  14.     if sys.version_info < (3, 0) and isinstance(sval, str):
  15.         sval = sval.decode('latin1')
  16.     return sval.encode('latin1')
  17.  
  18.  
  19. def str2memory(sval):
  20.     if sys.version_info < (3, 0):
  21.         return buffer(sval)
  22.     return None(sval.encode('latin1'))
  23.  
  24.  
  25. def ob2memory(ob):
  26.     if sys.version_info < (3, 0):
  27.         return buffer(ob)
  28.     return None(ob)
  29.  
  30.  
  31. class LeakTestCase(unittest.TestCase):
  32.     
  33.     def __init__(self, real_test):
  34.         unittest.TestCase.__init__(self)
  35.         self.real_test = real_test
  36.         self.num_test_cases = 1
  37.         self.num_leak_iters = 2
  38.         if hasattr(sys, 'gettotalrefcount'):
  39.             self.num_test_cases = self.num_test_cases + self.num_leak_iters
  40.  
  41.     
  42.     def countTestCases(self):
  43.         return self.num_test_cases
  44.  
  45.     
  46.     def __call__(self, result = None):
  47.         _GetInterfaceCount = _GetInterfaceCount
  48.         _GetGatewayCount = _GetGatewayCount
  49.         import pythoncom
  50.         gc.collect()
  51.         ni = _GetInterfaceCount()
  52.         ng = _GetGatewayCount()
  53.         self.real_test(result)
  54.         if result.shouldStop or not result.wasSuccessful():
  55.             return None
  56.         None._do_leak_tests(result)
  57.         gc.collect()
  58.         lost_i = _GetInterfaceCount() - ni
  59.         lost_g = _GetGatewayCount() - ng
  60.         if lost_i or lost_g:
  61.             msg = '%d interface objects and %d gateway objects leaked' % (lost_i, lost_g)
  62.             exc = AssertionError(msg)
  63.             result.addFailure(self.real_test, (exc.__class__, exc, None))
  64.  
  65.     
  66.     def runTest(self):
  67.         pass
  68.  
  69.     
  70.     def _do_leak_tests(self, result = None):
  71.         
  72.         try:
  73.             gtrc = sys.gettotalrefcount
  74.         except AttributeError:
  75.             return None
  76.  
  77.         gc.collect()
  78.         trc = gtrc()
  79.         for i in range(self.num_leak_iters):
  80.             self.real_test(result)
  81.             if result.shouldStop:
  82.                 break
  83.                 continue
  84.         del i
  85.         gc.collect()
  86.         lost = (gtrc() - trc) // self.num_leak_iters
  87.         if lost < 0:
  88.             msg = 'LeakTest: %s appeared to gain %d references!!' % (self.real_test, -lost)
  89.             result.addFailure(self.real_test, (AssertionError, msg, None))
  90.         if lost > 0:
  91.             msg = 'LeakTest: %s lost %d references' % (self.real_test, lost)
  92.             exc = AssertionError(msg)
  93.             result.addFailure(self.real_test, (exc.__class__, exc, None))
  94.  
  95.  
  96.  
  97. class TestLoader(unittest.TestLoader):
  98.     
  99.     def loadTestsFromTestCase(self, testCaseClass):
  100.         leak_tests = []
  101.         for name in self.getTestCaseNames(testCaseClass):
  102.             real_test = testCaseClass(name)
  103.             leak_test = self._getTestWrapper(real_test)
  104.             leak_tests.append(leak_test)
  105.         
  106.         return self.suiteClass(leak_tests)
  107.  
  108.     
  109.     def fixupTestsForLeakTests(self, test):
  110.         if isinstance(test, unittest.TestSuite):
  111.             test._tests = [ self.fixupTestsForLeakTests(t) for t in test._tests ]
  112.             return test
  113.         return None._getTestWrapper(test)
  114.  
  115.     
  116.     def _getTestWrapper(self, test):
  117.         no_leak_tests = getattr(test, 'no_leak_tests', False)
  118.         if no_leak_tests:
  119.             print "Test says it doesn't want leak tests!"
  120.             return test
  121.         return None(test)
  122.  
  123.     
  124.     def loadTestsFromModule(self, mod):
  125.         if hasattr(mod, 'suite'):
  126.             tests = mod.suite()
  127.         else:
  128.             tests = unittest.TestLoader.loadTestsFromModule(self, mod)
  129.         return self.fixupTestsForLeakTests(tests)
  130.  
  131.     
  132.     def loadTestsFromName(self, name, module = None):
  133.         test = unittest.TestLoader.loadTestsFromName(self, name, module)
  134.         if isinstance(test, unittest.TestSuite):
  135.             pass
  136.         elif isinstance(test, unittest.TestCase):
  137.             test = self._getTestWrapper(test)
  138.         else:
  139.             print 'XXX - what is', test
  140.         return test
  141.  
  142.  
  143. non_admin_error_codes = [
  144.     winerror.ERROR_ACCESS_DENIED,
  145.     winerror.ERROR_PRIVILEGE_NOT_HELD]
  146. _is_admin = None
  147.  
  148. def check_is_admin():
  149.     global _is_admin, _is_admin
  150.     if _is_admin is None:
  151.         IsUserAnAdmin = IsUserAnAdmin
  152.         import win32com.shell.shell
  153.         import pythoncom
  154.         
  155.         try:
  156.             _is_admin = IsUserAnAdmin()
  157.         except pythoncom.com_error:
  158.             exc = None
  159.             if exc.hresult != winerror.E_NOTIMPL:
  160.                 raise 
  161.             _is_admin = True
  162.         
  163.  
  164.     return _is_admin
  165.  
  166.  
  167. class TestSkipped(Exception):
  168.     pass
  169.  
  170.  
  171. class TestResult(unittest._TextTestResult):
  172.     
  173.     def __init__(self, *args, **kw):
  174.         super(TestResult, self).__init__(*args, **kw)
  175.         self.skips = { }
  176.  
  177.     
  178.     def addError(self, test, err):
  179.         import pywintypes
  180.         exc_val = err[1]
  181.         if isinstance(exc_val, pywintypes.error) and exc_val.winerror in non_admin_error_codes and not check_is_admin():
  182.             exc_val = TestSkipped(exc_val)
  183.         elif isinstance(exc_val, pywintypes.com_error) and exc_val.hresult == winerror.CO_E_CLASSSTRING:
  184.             exc_val = TestSkipped(exc_val)
  185.         elif isinstance(exc_val, NotImplementedError):
  186.             exc_val = TestSkipped(NotImplementedError)
  187.         if isinstance(exc_val, TestSkipped):
  188.             reason = exc_val.args[0]
  189.             
  190.             try:
  191.                 reason = tuple(reason.args)
  192.             except (AttributeError, TypeError):
  193.                 pass
  194.  
  195.             self.skips.setdefault(reason, 0)
  196.             self.skips[reason] += 1
  197.             if self.showAll:
  198.                 self.stream.writeln('SKIP (%s)' % (reason,))
  199.             elif self.dots:
  200.                 self.stream.write('S')
  201.                 self.stream.flush()
  202.             return None
  203.         None(TestResult, self).addError(test, err)
  204.  
  205.     
  206.     def printErrors(self):
  207.         super(TestResult, self).printErrors()
  208.         for reason, num_skipped in self.skips.iteritems():
  209.             self.stream.writeln('SKIPPED: %d tests - %s' % (num_skipped, reason))
  210.         
  211.  
  212.  
  213.  
  214. class TestRunner(unittest.TextTestRunner):
  215.     
  216.     def _makeResult(self):
  217.         return TestResult(self.stream, self.descriptions, self.verbosity)
  218.  
  219.  
  220.  
  221. class TestProgram(unittest.TestProgram):
  222.     
  223.     def runTests(self):
  224.         self.testRunner = TestRunner(verbosity = self.verbosity)
  225.         unittest.TestProgram.runTests(self)
  226.  
  227.  
  228.  
  229. def testmain(*args, **kw):
  230.     new_kw = kw.copy()
  231.     if 'testLoader' not in new_kw:
  232.         new_kw['testLoader'] = TestLoader()
  233.     program_class = new_kw.get('testProgram', TestProgram)
  234.     program_class(*args, **new_kw)
  235.  
  236.