home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.7)
-
- import sys
- import unittest
- import gc
- import winerror
-
- def int2long(val):
- return val + 0x100000000L - 0x100000000L
-
-
- def str2bytes(sval):
- if sys.version_info < (3, 0) and isinstance(sval, str):
- sval = sval.decode('latin1')
- return sval.encode('latin1')
-
-
- def str2memory(sval):
- if sys.version_info < (3, 0):
- return buffer(sval)
- return None(sval.encode('latin1'))
-
-
- def ob2memory(ob):
- if sys.version_info < (3, 0):
- return buffer(ob)
- return None(ob)
-
-
- class LeakTestCase(unittest.TestCase):
-
- def __init__(self, real_test):
- unittest.TestCase.__init__(self)
- self.real_test = real_test
- self.num_test_cases = 1
- self.num_leak_iters = 2
- if hasattr(sys, 'gettotalrefcount'):
- self.num_test_cases = self.num_test_cases + self.num_leak_iters
-
-
- def countTestCases(self):
- return self.num_test_cases
-
-
- def __call__(self, result = None):
- _GetInterfaceCount = _GetInterfaceCount
- _GetGatewayCount = _GetGatewayCount
- import pythoncom
- gc.collect()
- ni = _GetInterfaceCount()
- ng = _GetGatewayCount()
- self.real_test(result)
- if result.shouldStop or not result.wasSuccessful():
- return None
- None._do_leak_tests(result)
- gc.collect()
- lost_i = _GetInterfaceCount() - ni
- lost_g = _GetGatewayCount() - ng
- if lost_i or lost_g:
- msg = '%d interface objects and %d gateway objects leaked' % (lost_i, lost_g)
- exc = AssertionError(msg)
- result.addFailure(self.real_test, (exc.__class__, exc, None))
-
-
- def runTest(self):
- pass
-
-
- def _do_leak_tests(self, result = None):
-
- try:
- gtrc = sys.gettotalrefcount
- except AttributeError:
- return None
-
- gc.collect()
- trc = gtrc()
- for i in range(self.num_leak_iters):
- self.real_test(result)
- if result.shouldStop:
- break
- continue
- del i
- gc.collect()
- lost = (gtrc() - trc) // self.num_leak_iters
- if lost < 0:
- msg = 'LeakTest: %s appeared to gain %d references!!' % (self.real_test, -lost)
- result.addFailure(self.real_test, (AssertionError, msg, None))
- if lost > 0:
- msg = 'LeakTest: %s lost %d references' % (self.real_test, lost)
- exc = AssertionError(msg)
- result.addFailure(self.real_test, (exc.__class__, exc, None))
-
-
-
- class TestLoader(unittest.TestLoader):
-
- def loadTestsFromTestCase(self, testCaseClass):
- leak_tests = []
- for name in self.getTestCaseNames(testCaseClass):
- real_test = testCaseClass(name)
- leak_test = self._getTestWrapper(real_test)
- leak_tests.append(leak_test)
-
- return self.suiteClass(leak_tests)
-
-
- def fixupTestsForLeakTests(self, test):
- if isinstance(test, unittest.TestSuite):
- test._tests = [ self.fixupTestsForLeakTests(t) for t in test._tests ]
- return test
- return None._getTestWrapper(test)
-
-
- def _getTestWrapper(self, test):
- no_leak_tests = getattr(test, 'no_leak_tests', False)
- if no_leak_tests:
- print "Test says it doesn't want leak tests!"
- return test
- return None(test)
-
-
- def loadTestsFromModule(self, mod):
- if hasattr(mod, 'suite'):
- tests = mod.suite()
- else:
- tests = unittest.TestLoader.loadTestsFromModule(self, mod)
- return self.fixupTestsForLeakTests(tests)
-
-
- def loadTestsFromName(self, name, module = None):
- test = unittest.TestLoader.loadTestsFromName(self, name, module)
- if isinstance(test, unittest.TestSuite):
- pass
- elif isinstance(test, unittest.TestCase):
- test = self._getTestWrapper(test)
- else:
- print 'XXX - what is', test
- return test
-
-
- non_admin_error_codes = [
- winerror.ERROR_ACCESS_DENIED,
- winerror.ERROR_PRIVILEGE_NOT_HELD]
- _is_admin = None
-
- def check_is_admin():
- global _is_admin, _is_admin
- if _is_admin is None:
- IsUserAnAdmin = IsUserAnAdmin
- import win32com.shell.shell
- import pythoncom
-
- try:
- _is_admin = IsUserAnAdmin()
- except pythoncom.com_error:
- exc = None
- if exc.hresult != winerror.E_NOTIMPL:
- raise
- _is_admin = True
-
-
- return _is_admin
-
-
- class TestSkipped(Exception):
- pass
-
-
- class TestResult(unittest._TextTestResult):
-
- def __init__(self, *args, **kw):
- super(TestResult, self).__init__(*args, **kw)
- self.skips = { }
-
-
- def addError(self, test, err):
- import pywintypes
- exc_val = err[1]
- if isinstance(exc_val, pywintypes.error) and exc_val.winerror in non_admin_error_codes and not check_is_admin():
- exc_val = TestSkipped(exc_val)
- elif isinstance(exc_val, pywintypes.com_error) and exc_val.hresult == winerror.CO_E_CLASSSTRING:
- exc_val = TestSkipped(exc_val)
- elif isinstance(exc_val, NotImplementedError):
- exc_val = TestSkipped(NotImplementedError)
- if isinstance(exc_val, TestSkipped):
- reason = exc_val.args[0]
-
- try:
- reason = tuple(reason.args)
- except (AttributeError, TypeError):
- pass
-
- self.skips.setdefault(reason, 0)
- self.skips[reason] += 1
- if self.showAll:
- self.stream.writeln('SKIP (%s)' % (reason,))
- elif self.dots:
- self.stream.write('S')
- self.stream.flush()
- return None
- None(TestResult, self).addError(test, err)
-
-
- def printErrors(self):
- super(TestResult, self).printErrors()
- for reason, num_skipped in self.skips.iteritems():
- self.stream.writeln('SKIPPED: %d tests - %s' % (num_skipped, reason))
-
-
-
-
- class TestRunner(unittest.TextTestRunner):
-
- def _makeResult(self):
- return TestResult(self.stream, self.descriptions, self.verbosity)
-
-
-
- class TestProgram(unittest.TestProgram):
-
- def runTests(self):
- self.testRunner = TestRunner(verbosity = self.verbosity)
- unittest.TestProgram.runTests(self)
-
-
-
- def testmain(*args, **kw):
- new_kw = kw.copy()
- if 'testLoader' not in new_kw:
- new_kw['testLoader'] = TestLoader()
- program_class = new_kw.get('testProgram', TestProgram)
- program_class(*args, **new_kw)
-
-