home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / xulrunner / python / databasesanity.py < prev    next >
Encoding:
Python Source  |  2007-11-12  |  7.4 KB  |  201 lines

  1. # Miro - an RSS based video player application
  2. # Copyright (C) 2005-2007 Participatory Culture Foundation
  3. #
  4. # This program is free software; you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation; either version 2 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  17.  
  18. """Sanity checks for the databases.
  19.  
  20. This module is deprecated: database sanity checking is done by the
  21. checkConstraints method on DDBObjects.  This is a better way to do things
  22. because it will catch errors right when we save objects, instead of some
  23. unknown point in the future.  We still have this code around, because it's
  24. used to do sanity checks on old databases.
  25.  
  26. """
  27.  
  28. import item
  29. import feed
  30. import util
  31. import guide
  32.  
  33. class DatabaseInsaneError(Exception):
  34.     pass
  35.  
  36. class SanityTest(object):
  37.     """Base class for the sanity test objects."""
  38.  
  39.     def checkObject(self, object):
  40.         """checkObject will be called for each object in the object list.
  41.         If there is an error return a string describing it.  If not return
  42.         None (or just let the function hit the bottom).
  43.         """
  44.         raise NotImplementedError()
  45.  
  46.     def finished(self):
  47.         """Called when we reach the end of the object list, SanityTest
  48.         subclasses may implement additional checking here."""
  49.         return
  50.  
  51.     def fixIfPossible(self, objectList):
  52.         """Subclasses may implement this method if it's possible to fix broken
  53.         databases.  The default implementation just raises a
  54.         DatabaseInsaneError.
  55.         """
  56.         raise DatabaseInsaneError()
  57.  
  58. class PhontomFeedTest(SanityTest):
  59.     """Check that no items reference a Feed that isn't around anymore."""
  60.     def __init__(self):
  61.         self.feedsInItems = set()
  62.         self.topLevelFeeds = set()
  63.         self.parentsInItems = set()
  64.         self.topLevelParents = set()
  65.  
  66.     def checkObject(self, obj):
  67.         if isinstance(obj, item.Item):
  68.             if obj.feed_id is not None:
  69.                 self.feedsInItems.add(obj.feed_id)
  70.             if obj.parent_id is not None:
  71.                 self.parentsInItems.add(obj.parent_id)
  72.             if obj.isContainerItem in (None, True):
  73.                 self.topLevelParents.add(obj.id)
  74.         elif isinstance(obj, feed.Feed):
  75.             self.topLevelFeeds.add(obj.id)
  76.  
  77.     def finished(self):
  78.         if not self.feedsInItems.issubset(self.topLevelFeeds):
  79.             phantoms = self.feedsInItems.difference(self.topLevelFeeds)
  80.             phantomsString = ', '.join([str(p) for p in phantoms])
  81.             return "Phantom feed(s) referenced in items: %s" % phantomsString
  82.         if not self.parentsInItems.issubset(self.topLevelParents):
  83.             phantoms = self.parentsInItems.difference(self.topLevelParents)
  84.             phantomsString = ', '.join([str(p) for p in phantoms])
  85.             return "Phantom items(s) referenced in items: %s" % phantomsString
  86.  
  87.     def fixIfPossible(self, objectList):
  88.         for i in reversed(xrange(len(objectList))):
  89.             if (isinstance(objectList[i], item.Item) and 
  90.                     objectList[i].feed_id is not None and
  91.                 objectList[i].feed_id not in self.topLevelFeeds):
  92.                 del objectList[i]
  93.             elif (isinstance(objectList[i], item.Item) and 
  94.                     objectList[i].parent_id is not None and
  95.                 objectList[i].parent_id not in self.topLevelParents):
  96.                 del objectList[i]
  97.  
  98. class SingletonTest(SanityTest):
  99.     """Check that singleton DB objects are really singletons.
  100.  
  101.     This is a baseclass for the channle guide test, manual feed test, etc.
  102.     """
  103.  
  104.     def __init__(self):
  105.         self.count = 0
  106.  
  107.     def checkObject(self, obj):
  108.         if self.objectIsSingleton(obj):
  109.             self.count += 1
  110.             if self.count > 1:
  111.                 return "Extra %s in database" % self.singletonName
  112.     
  113.     def finished(self):
  114.         if self.count == 0:
  115.             # For all our singletons (currently at least), we don't need to
  116.             # create them here.  It'll happen when Miro is restarted.
  117.             # return "No %s in database" % self.singletonName
  118.             pass
  119.  
  120.     def fixIfPossible(self, objectList):
  121.         if self.count == 0:
  122.             # For all our singletons (currently at least), we don't need to
  123.             # create them here.  It'll happen when Miro is restarted.
  124.             return
  125.         else:
  126.             seenObject = False
  127.             for i in reversed(xrange(len(objectList))):
  128.                 if self.objectIsSingleton(objectList[i]):
  129.                     if seenObject:
  130.                         del objectList[i]
  131.                     else:
  132.                         seenObject = True
  133.  
  134. class ChannelGuideSingletonTest(SingletonTest):
  135.     singletonName = "Channel Guide"
  136.     def objectIsSingleton(self, obj):
  137.         return isinstance(obj, guide.ChannelGuide) and obj.url is None
  138.  
  139. class ManualFeedSingletonTest(SingletonTest):
  140.     singletonName = "Manual Feed"
  141.     def objectIsSingleton(self, obj):
  142.         return (isinstance(obj, feed.Feed) and 
  143.                 isinstance(obj.actualFeed, feed.ManualFeedImpl))
  144.  
  145. def checkSanity(objectList, fixIfPossible=True, quiet=False, reallyQuiet=False):
  146.     """Do all sanity checks on a list of objects.
  147.  
  148.     If fixIfPossible is True, the sanity checks will try to fix errors.  If
  149.     this happens objectList will be modified.
  150.  
  151.     If fixIfPossible is False, or if it's not possible to fix the errors
  152.     checkSanity will raise a DatabaseInsaneError.
  153.  
  154.     If quiet is True, we print to the log instead of poping up an error dialog
  155.     on fixable problems.  We set this when we are converting old databases,
  156.     since sanity errors are somewhat expected.
  157.  
  158.     If reallyQuiet is True, won't even print out a warning on fixable
  159.     problems.
  160.  
  161.     Returns True if the database passed all sanity tests, false otherwise.
  162.     """
  163.  
  164.     tests = set([
  165.         PhontomFeedTest(),
  166.         ChannelGuideSingletonTest(),
  167.         ManualFeedSingletonTest(),
  168.     ])
  169.  
  170.     errors = []
  171.     failedTests = set()
  172.     for obj in objectList:
  173.         for test in tests:
  174.             rv = test.checkObject(obj)
  175.             if rv is not None:
  176.                 errors.append(rv)
  177.                 failedTests.add(test)
  178.         tests = tests.difference(failedTests)
  179.     for test in tests:
  180.         rv = test.finished()
  181.         if rv is not None:
  182.             errors.append(rv)
  183.             failedTests.add(test)
  184.  
  185.     if errors:
  186.         errorMsg = "The database failed the following sanity tests:\n"
  187.         errorMsg += "\n".join(errors)
  188.         if fixIfPossible:
  189.             if not quiet:
  190.                 util.failed(when="While checking database", details=errorMsg)
  191.             elif not reallyQuiet:
  192.                 print "WARNING: Database sanity error"
  193.                 print errorMsg
  194.             for test in failedTests:
  195.                 test.fixIfPossible(objectList)
  196.                 # fixIfPossible will throw a DatabaseInsaneError if it fails,
  197.                 # which we let get raised to our caller
  198.         else:
  199.             raise DatabaseInsaneError(errorMsg)
  200.     return (errors == [])
  201.