from TagStrip import strip, strip_readable
from Components.Scanner import ScanFile
+NS_RDF = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}"
+NS_RSS_09 = "{http://my.netscape.com/rdf/simple/0.9/}"
+NS_RSS_10 = "{http://purl.org/rss/1.0/}"
+
+# based on http://effbot.org/zone/element-rss-wrapper.htm
+class ElementWrapper:
+ def __init__(self, element, ns = ""):
+ self._element = element
+ self._ns = ns
+
+ def __getattr__(self, tag):
+ if tag.startswith("__"):
+ raise AttributeError(tag)
+ return self._element.findtext(self._ns + tag)
+
+class RSSEntryWrapper(ElementWrapper):
+ def __getattr__(self, tag):
+ if tag == "enclosures":
+ myl = []
+ for elem in self._element.findall(self._ns + "enclosure"):
+ length = elem.get("length", None)
+ if length:
+ length = int(length) / 1048576
+ myl.append({
+ "href": elem.get("url"),
+ "type": elem.get("type"),
+ "length": length
+ })
+ return myl
+ if tag == "id":
+ possibleId = self._element.findtext(self._ns + "guid")
+ if not possibleId:
+ possibleId = ''.join([self.title, self.link])
+ return possibleId
+ if tag == "updated":
+ tag = "lastBuildDate"
+ elif tag == "summary":
+ tag = "description"
+ return ElementWrapper.__getattr__(self, tag)
+
+class PEAEntryWrapper(ElementWrapper):
+ def __getattr__(self, tag):
+ if tag == "link":
+ for elem in self._element.findall(self._ns + tag):
+ if not elem.get("rel") == "enclosure":
+ return elem.get("href")
+ return ""
+ if tag == "enclosures":
+ myl = []
+ for elem in self._element.findall(self._ns + "link"):
+ if elem.get("rel") == "enclosure":
+ length = elem.get("length", None)
+ if length:
+ length = int(length) / 1048576
+ myl.append({
+ "href": elem.get("href"),
+ "type": elem.get("type"),
+ "length": length
+ })
+ return myl
+ return ElementWrapper.__getattr__(self, tag)
+
+class RSSWrapper(ElementWrapper):
+ def __init__(self, channel, items, ns = ""):
+ self._items = items
+ ElementWrapper.__init__(self, channel, ns)
+
+ def __iter__(self):
+ return iter([self[i] for i in range(len(self))])
+
+ def __len__(self):
+ return len(self._items)
+
+ def __getitem__(self, index):
+ return RSSEntryWrapper(self._items[index], self._ns)
+
+class RSS1Wrapper(RSSWrapper):
+ def __init__(self, feed, ns):
+ RSSWrapper.__init__(
+ self, feed.find(ns + "channel"),
+ feed.findall(ns + "item"), ns
+ )
+
+class RSS2Wrapper(RSSWrapper):
+ def __init__(self, feed):
+ channel = feed.find("channel")
+ RSSWrapper.__init__(
+ self, channel, channel.findall("item")
+ )
+
+class PEAWrapper(RSSWrapper):
+ def __init__(self, feed):
+ ns = feed.tag[:feed.tag.index("}")+1]
+ RSSWrapper.__init__(
+ self, feed, feed.findall(ns + "entry"), ns
+ )
+
+ def __getitem__(self, index):
+ return PEAEntryWrapper(self._items[index], self._ns)
+
+ def __getattr__(self, tag):
+ if tag == "description":
+ tag = "subtitle"
+ return ElementWrapper.__getattr__(self, tag)
+
class BaseFeed:
"""Base-class for all Feeds. Initializes needed Elements."""
MAX_HISTORY_ELEMENTS = 100
- def __init__(self, uri, autoupdate, title = "", description = ""):
+ def __init__(self, uri, title = "", description = ""):
# Set URI (used as Identifier)
self.uri = uri
+ # Initialize
+ self.title = title or uri.encode("UTF-8")
+ self.description = description
+ self.history = []
+
+ def __str__(self):
+ return "<%s, \"%s\", \"%s\", %d items>" % (self.__class__, self.title, self.description, len(self.history))
+
+class UniversalFeed(BaseFeed):
+ """Feed which can handle rdf, rss and atom feeds utilizing abstraction wrappers."""
+ def __init__(self, uri, autoupdate):
+ BaseFeed.__init__(self, uri)
+
# Set Autoupdate
self.autoupdate = autoupdate
# Initialize
- self.title = title or uri.encode("UTF-8")
- self.description = description
self.last_update = None
self.last_ids = set()
- self.history = []
-class AtomFeed:
- """Parses an Atom-Feed into expected format."""
- def gotDom(self, dom):
- try:
- # Try to read when feed was last updated, if time equals return empty list. else fetch new items
- updated = dom.getElementsByTagName("updated")[0].childNodes[0].data
- if self.last_update == updated:
- return [ ]
- self.last_update = updated
- except:
- pass
- return AtomFeed.parse(self, dom.getElementsByTagName("entry"))
-
- def parse(self, items):
+ def gotWrapper(self, wrapper):
+ updated = wrapper.updated
+ if updated and self.last_update == updated:
+ return []
+
idx = 0
- for item in items:
- enclosure = []
+ for item in wrapper:
+ enclosures = []
link = ""
# Try to read title, continue if none found
- try:
- title = strip(item.getElementsByTagName("title")[0].childNodes[0].data)
- except:
+ title = strip(item.title)
+ if not title:
continue
- # Try to read id, continue if none found (invalid feed, should be handled differently) or to be excluded
- try:
- id = item.getElementsByTagName("id")[0].childNodes[0].data
- if id in self.last_ids:
- continue
- except:
+ # Try to read id, continue if none found (invalid feed or internal error) or to be excluded
+ id = item.id
+ if not id or id in self.last_ids:
continue
+ # Link
+ link = item.link
+
# Read out enclosures and link
- for current in item.getElementsByTagName("link"):
- # Enclosure
- if current.getAttribute("rel") == "enclosure":
- href = current.getAttribute("href").encode("UTF-8")
- type = current.getAttribute("type").encode("UTF-8")
- if current.hasAttribute("length"):
- size = int(current.getAttribute("length")) / 1048576
- else:
- size = None
-
- enclosure.append(ScanFile(href, mimetype = type, size = size, autodetect = False))
- # No Enclosure, assume its a link to the item
- else:
- link = current.getAttribute("href")
+ for enclosure in item.enclosures:
+ enclosures.append(ScanFile(enclosure["href"], mimetype = enclosure["type"], size = enclosure["length"], autodetect = False))
# Try to read summary, empty if none
- try:
- summary = strip_readable(item.getElementsByTagName("summary")[0].childNodes[0].data)
- except:
- summary = ""
+ summary = strip_readable(item.summary)
# Update Lists
self.history.insert(idx, (
title.encode("UTF-8"),
link.encode("UTF-8"),
summary.encode("UTF-8"),
- enclosure
+ enclosures
))
self.last_ids.add(id)
return self.history[:idx]
-class RSSFeed:
- """Parses an RSS-Feed into expected format."""
- def gotDom(self, dom):
- # Try to read when feed was last updated, if time equals return empty list. else fetch new items
- try:
- updated = dom.getElementsByTagName("lastBuildDate")[0].childNodes[0].data
- if self.last_update == updated:
- return [ ]
- self.last_update = updated
- except:
- pass
- return RSSFeed.parse(self, dom.getElementsByTagName("item"))
-
- def parse(self, items):
- idx = 0
- for item in items:
- enclosure = []
-
- # Try to read title, continue if none found
- try:
- title = strip(item.getElementsByTagName("title")[0].childNodes[0].data)
- except:
- continue
+ def gotFeed(self, feed):
+ if feed.tag == "rss":
+ wrapper = RSS2Wrapper(feed)
+ elif feed.tag.startswith(NS_RDF):
+ wrapper = RSS1Wrapper(feed, ns = NS_RDF)
+ elif feed.tag.startswith(NS_RSS_09):
+ wrapper = RSS1Wrapper(feed, ns = NS_RSS_09)
+ elif feed.tag.startswith(NS_RSS_10):
+ wrapper = RSS1Wrapper(feed, ns = NS_RSS_10)
+ elif feed.tag.endswith("feed"):
+ wrapper = PEAWrapper(feed)
+ else:
+ raise NotImplementedError, 'Unsupported Feed: %s' % feed.tag
+
+ self.title = strip(wrapper.title).encode("UTF-8")
+ self.description = strip_readable(wrapper.description or "").encode("UTF-8")
+
+ return self.gotWrapper(wrapper)
- # Try to read link, empty if none
- try:
- link = item.getElementsByTagName("link")[0].childNodes[0].data
- except:
- link = ""
-
- # Try to read guid, title+link if none (RSS 1.0 or invalid RSS 2.0)
- try:
- guid = item.getElementsByTagName("guid")[0].childNodes[0].data
- except:
- guid = ''.join([title, link])
-
- # Continue if item is to be excluded
- if guid in self.last_ids:
- continue
-
- # Try to read summary (description element), empty if none
- try:
- summary = strip_readable(item.getElementsByTagName("description")[0].childNodes[0].data)
- except:
- summary = ""
-
- # Read out enclosures
- for current in item.getElementsByTagName("enclosure"):
- href = current.getAttribute("url").encode("UTF-8")
- type = current.getAttribute("type").encode("UTF-8")
- if current.hasAttribute("length"):
- size = int(current.getAttribute("length")) / 1048576
- else:
- size = None
-
- enclosure.append(ScanFile(href, mimetype = type, size = size, autodetect = False))
-
- # Update Lists
- self.history.insert(idx, (
- title.encode("UTF-8"),
- link.encode("UTF-8"),
- summary.encode("UTF-8"),
- enclosure
- ))
- self.last_ids.add(guid)
-
- idx += 1
-
- # Eventually cut history
- del self.history[self.MAX_HISTORY_ELEMENTS:]
-
- return self.history[:idx]
-
-class UniversalFeed(BaseFeed, RSSFeed, AtomFeed):
- """Universal Feed which on first run determines its type and calls the correct parsing-functions"""
- def __init__(self, uri, autoupdate):
- BaseFeed.__init__(self, uri, autoupdate)
- self.type = None
-
- def gotDom(self, dom):
- if self.type == "rss":
- return RSSFeed.gotDom(self, dom)
- elif self.type == "atom":
- return AtomFeed.gotDom(self, dom)
- elif self.type is None:
- # RSS 2.0 and RSS 1.0 (NS: http://www.w3.org/1999/02/22-rdf-syntax-ns#)
- if dom.documentElement.getAttribute("version") in ["2.0", "2.00", "0.94", "0.93", "0.92", "0.91"] \
- or dom.documentElement.localName == "RDF":
- self.type = "rss"
- try:
- channel = dom.getElementsByTagName("channel")[0]
- self.title = channel.getElementsByTagName("title")[0].childNodes[0].data
- self.description = channel.getElementsByTagName("description")[0].childNodes[0].data
- except:
- pass
- # Atom (NS: http://www.w3.org/2005/Atom)
- elif dom.documentElement.localName == "feed":
- self.type = "atom"
- try:
- self.title = dom.getElementsByTagName("title")[0].childNodes[0].data
- self.description = dom.getElementsByTagName("subtitle")[0].childNodes[0].data
- except:
- pass
- else:
- self.type = "unknown"
- raise NotImplementedError, 'Unsupported Feed: %s' % dom.documentElement.localName
- self.title = strip(self.title).encode("UTF-8")
- self.description = strip_readable(self.description).encode("UTF-8")
-
- # Re-run function to parse dom
- return self.gotDom(dom)