| Index: third_party/logilab/common/umessage.py
|
| diff --git a/third_party/logilab/common/umessage.py b/third_party/logilab/common/umessage.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..85d564c0abd54b6165bad55687adc75b04f51108
|
| --- /dev/null
|
| +++ b/third_party/logilab/common/umessage.py
|
| @@ -0,0 +1,167 @@
|
| +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
|
| +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
|
| +#
|
| +# This file is part of logilab-common.
|
| +#
|
| +# logilab-common is free software: you can redistribute it and/or modify it under
|
| +# the terms of the GNU Lesser General Public License as published by the Free
|
| +# Software Foundation, either version 2.1 of the License, or (at your option) any
|
| +# later version.
|
| +#
|
| +# logilab-common is distributed in the hope that it will be useful, but WITHOUT
|
| +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
| +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
| +# details.
|
| +#
|
| +# You should have received a copy of the GNU Lesser General Public License along
|
| +# with logilab-common. If not, see <http://www.gnu.org/licenses/>.
|
| +"""Unicode email support (extends email from stdlib).
|
| +
|
| +
|
| +
|
| +
|
| +"""
|
| +__docformat__ = "restructuredtext en"
|
| +
|
| +import email
|
| +from encodings import search_function
|
| +import sys
|
| +if sys.version_info >= (2, 5):
|
| + from email.utils import parseaddr, parsedate
|
| + from email.header import decode_header
|
| +else:
|
| + from email.Utils import parseaddr, parsedate
|
| + from email.Header import decode_header
|
| +
|
| +from datetime import datetime
|
| +
|
| +try:
|
| + from mx.DateTime import DateTime
|
| +except ImportError:
|
| + DateTime = datetime
|
| +
|
| +import logilab.common as lgc
|
| +
|
| +
|
| +def decode_QP(string):
|
| + parts = []
|
| + for decoded, charset in decode_header(string):
|
| + if not charset :
|
| + charset = 'iso-8859-15'
|
| + parts.append(unicode(decoded, charset, 'replace'))
|
| +
|
| + return u' '.join(parts)
|
| +
|
| +def message_from_file(fd):
|
| + try:
|
| + return UMessage(email.message_from_file(fd))
|
| + except email.Errors.MessageParseError:
|
| + return ''
|
| +
|
| +def message_from_string(string):
|
| + try:
|
| + return UMessage(email.message_from_string(string))
|
| + except email.Errors.MessageParseError:
|
| + return ''
|
| +
|
| +class UMessage:
|
| + """Encapsulates an email.Message instance and returns only unicode objects.
|
| + """
|
| +
|
| + def __init__(self, message):
|
| + self.message = message
|
| +
|
| + # email.Message interface #################################################
|
| +
|
| + def get(self, header, default=None):
|
| + value = self.message.get(header, default)
|
| + if value:
|
| + return decode_QP(value)
|
| + return value
|
| +
|
| + def get_all(self, header, default=()):
|
| + return [decode_QP(val) for val in self.message.get_all(header, default)
|
| + if val is not None]
|
| +
|
| + def get_payload(self, index=None, decode=False):
|
| + message = self.message
|
| + if index is None:
|
| + payload = message.get_payload(index, decode)
|
| + if isinstance(payload, list):
|
| + return [UMessage(msg) for msg in payload]
|
| + if message.get_content_maintype() != 'text':
|
| + return payload
|
| +
|
| + charset = message.get_content_charset() or 'iso-8859-1'
|
| + if search_function(charset) is None:
|
| + charset = 'iso-8859-1'
|
| + return unicode(payload or '', charset, "replace")
|
| + else:
|
| + payload = UMessage(message.get_payload(index, decode))
|
| + return payload
|
| +
|
| + def is_multipart(self):
|
| + return self.message.is_multipart()
|
| +
|
| + def get_boundary(self):
|
| + return self.message.get_boundary()
|
| +
|
| + def walk(self):
|
| + for part in self.message.walk():
|
| + yield UMessage(part)
|
| +
|
| + def get_content_maintype(self):
|
| + return unicode(self.message.get_content_maintype())
|
| +
|
| + def get_content_type(self):
|
| + return unicode(self.message.get_content_type())
|
| +
|
| + def get_filename(self, failobj=None):
|
| + value = self.message.get_filename(failobj)
|
| + if value is failobj:
|
| + return value
|
| + try:
|
| + return unicode(value)
|
| + except UnicodeDecodeError:
|
| + return u'error decoding filename'
|
| +
|
| + # other convenience methods ###############################################
|
| +
|
| + def headers(self):
|
| + """return an unicode string containing all the message's headers"""
|
| + values = []
|
| + for header in self.message.keys():
|
| + values.append(u'%s: %s' % (header, self.get(header)))
|
| + return '\n'.join(values)
|
| +
|
| + def multi_addrs(self, header):
|
| + """return a list of 2-uple (name, address) for the given address (which
|
| + is expected to be an header containing address such as from, to, cc...)
|
| + """
|
| + persons = []
|
| + for person in self.get_all(header, ()):
|
| + name, mail = parseaddr(person)
|
| + persons.append((name, mail))
|
| + return persons
|
| +
|
| + def date(self, alternative_source=False, return_str=False):
|
| + """return a datetime object for the email's date or None if no date is
|
| + set or if it can't be parsed
|
| + """
|
| + value = self.get('date')
|
| + if value is None and alternative_source:
|
| + unix_from = self.message.get_unixfrom()
|
| + if unix_from is not None:
|
| + try:
|
| + value = unix_from.split(" ", 2)[2]
|
| + except IndexError:
|
| + pass
|
| + if value is not None:
|
| + datetuple = parsedate(value)
|
| + if datetuple:
|
| + if lgc.USE_MX_DATETIME:
|
| + return DateTime(*datetuple[:6])
|
| + return datetime(*datetuple[:6])
|
| + elif not return_str:
|
| + return None
|
| + return value
|
|
|