Index: third_party/logilab/common/umessage.py |
diff --git a/third_party/logilab/common/umessage.py b/third_party/logilab/common/umessage.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..85d564c0abd54b6165bad55687adc75b04f51108 |
--- /dev/null |
+++ b/third_party/logilab/common/umessage.py |
@@ -0,0 +1,167 @@ |
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
+# |
+# This file is part of logilab-common. |
+# |
+# logilab-common is free software: you can redistribute it and/or modify it under |
+# the terms of the GNU Lesser General Public License as published by the Free |
+# Software Foundation, either version 2.1 of the License, or (at your option) any |
+# later version. |
+# |
+# logilab-common is distributed in the hope that it will be useful, but WITHOUT |
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
+# details. |
+# |
+# You should have received a copy of the GNU Lesser General Public License along |
+# with logilab-common. If not, see <http://www.gnu.org/licenses/>. |
+"""Unicode email support (extends email from stdlib). |
+ |
+ |
+ |
+ |
+""" |
+__docformat__ = "restructuredtext en" |
+ |
+import email |
+from encodings import search_function |
+import sys |
+if sys.version_info >= (2, 5): |
+ from email.utils import parseaddr, parsedate |
+ from email.header import decode_header |
+else: |
+ from email.Utils import parseaddr, parsedate |
+ from email.Header import decode_header |
+ |
+from datetime import datetime |
+ |
+try: |
+ from mx.DateTime import DateTime |
+except ImportError: |
+ DateTime = datetime |
+ |
+import logilab.common as lgc |
+ |
+ |
+def decode_QP(string): |
+ parts = [] |
+ for decoded, charset in decode_header(string): |
+ if not charset : |
+ charset = 'iso-8859-15' |
+ parts.append(unicode(decoded, charset, 'replace')) |
+ |
+ return u' '.join(parts) |
+ |
+def message_from_file(fd): |
+ try: |
+ return UMessage(email.message_from_file(fd)) |
+ except email.Errors.MessageParseError: |
+ return '' |
+ |
+def message_from_string(string): |
+ try: |
+ return UMessage(email.message_from_string(string)) |
+ except email.Errors.MessageParseError: |
+ return '' |
+ |
+class UMessage: |
+ """Encapsulates an email.Message instance and returns only unicode objects. |
+ """ |
+ |
+ def __init__(self, message): |
+ self.message = message |
+ |
+ # email.Message interface ################################################# |
+ |
+ def get(self, header, default=None): |
+ value = self.message.get(header, default) |
+ if value: |
+ return decode_QP(value) |
+ return value |
+ |
+ def get_all(self, header, default=()): |
+ return [decode_QP(val) for val in self.message.get_all(header, default) |
+ if val is not None] |
+ |
+ def get_payload(self, index=None, decode=False): |
+ message = self.message |
+ if index is None: |
+ payload = message.get_payload(index, decode) |
+ if isinstance(payload, list): |
+ return [UMessage(msg) for msg in payload] |
+ if message.get_content_maintype() != 'text': |
+ return payload |
+ |
+ charset = message.get_content_charset() or 'iso-8859-1' |
+ if search_function(charset) is None: |
+ charset = 'iso-8859-1' |
+ return unicode(payload or '', charset, "replace") |
+ else: |
+ payload = UMessage(message.get_payload(index, decode)) |
+ return payload |
+ |
+ def is_multipart(self): |
+ return self.message.is_multipart() |
+ |
+ def get_boundary(self): |
+ return self.message.get_boundary() |
+ |
+ def walk(self): |
+ for part in self.message.walk(): |
+ yield UMessage(part) |
+ |
+ def get_content_maintype(self): |
+ return unicode(self.message.get_content_maintype()) |
+ |
+ def get_content_type(self): |
+ return unicode(self.message.get_content_type()) |
+ |
+ def get_filename(self, failobj=None): |
+ value = self.message.get_filename(failobj) |
+ if value is failobj: |
+ return value |
+ try: |
+ return unicode(value) |
+ except UnicodeDecodeError: |
+ return u'error decoding filename' |
+ |
+ # other convenience methods ############################################### |
+ |
+ def headers(self): |
+ """return an unicode string containing all the message's headers""" |
+ values = [] |
+ for header in self.message.keys(): |
+ values.append(u'%s: %s' % (header, self.get(header))) |
+ return '\n'.join(values) |
+ |
+ def multi_addrs(self, header): |
+ """return a list of 2-uple (name, address) for the given address (which |
+ is expected to be an header containing address such as from, to, cc...) |
+ """ |
+ persons = [] |
+ for person in self.get_all(header, ()): |
+ name, mail = parseaddr(person) |
+ persons.append((name, mail)) |
+ return persons |
+ |
+ def date(self, alternative_source=False, return_str=False): |
+ """return a datetime object for the email's date or None if no date is |
+ set or if it can't be parsed |
+ """ |
+ value = self.get('date') |
+ if value is None and alternative_source: |
+ unix_from = self.message.get_unixfrom() |
+ if unix_from is not None: |
+ try: |
+ value = unix_from.split(" ", 2)[2] |
+ except IndexError: |
+ pass |
+ if value is not None: |
+ datetuple = parsedate(value) |
+ if datetuple: |
+ if lgc.USE_MX_DATETIME: |
+ return DateTime(*datetuple[:6]) |
+ return datetime(*datetuple[:6]) |
+ elif not return_str: |
+ return None |
+ return value |