OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 Mitch Garnaat http://garnaat.org/ |
| 2 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. |
| 3 # All Rights Reserved |
| 4 # |
| 5 # Permission is hereby granted, free of charge, to any person obtaining a |
| 6 # copy of this software and associated documentation files (the |
| 7 # "Software"), to deal in the Software without restriction, including |
| 8 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 9 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 10 # persons to whom the Software is furnished to do so, subject to the fol- |
| 11 # lowing conditions: |
| 12 # |
| 13 # The above copyright notice and this permission notice shall be included |
| 14 # in all copies or substantial portions of the Software. |
| 15 # |
| 16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 22 # IN THE SOFTWARE. |
| 23 # |
| 24 |
| 25 import boto.exception |
| 26 from boto.compat import json |
| 27 import requests |
| 28 import boto |
| 29 |
| 30 class SearchServiceException(Exception): |
| 31 pass |
| 32 |
| 33 |
| 34 class CommitMismatchError(Exception): |
| 35 pass |
| 36 |
| 37 class EncodingError(Exception): |
| 38 """ |
| 39 Content sent for Cloud Search indexing was incorrectly encoded. |
| 40 |
| 41 This usually happens when a document is marked as unicode but non-unicode |
| 42 characters are present. |
| 43 """ |
| 44 pass |
| 45 |
| 46 class ContentTooLongError(Exception): |
| 47 """ |
| 48 Content sent for Cloud Search indexing was too long |
| 49 |
| 50 This will usually happen when documents queued for indexing add up to more |
| 51 than the limit allowed per upload batch (5MB) |
| 52 |
| 53 """ |
| 54 pass |
| 55 |
| 56 class DocumentServiceConnection(object): |
| 57 """ |
| 58 A CloudSearch document service. |
| 59 |
| 60 The DocumentServiceConection is used to add, remove and update documents in |
| 61 CloudSearch. Commands are uploaded to CloudSearch in SDF (Search Document Fo
rmat). |
| 62 |
| 63 To generate an appropriate SDF, use :func:`add` to add or update documents, |
| 64 as well as :func:`delete` to remove documents. |
| 65 |
| 66 Once the set of documents is ready to be index, use :func:`commit` to send t
he |
| 67 commands to CloudSearch. |
| 68 |
| 69 If there are a lot of documents to index, it may be preferable to split the |
| 70 generation of SDF data and the actual uploading into CloudSearch. Retrieve |
| 71 the current SDF with :func:`get_sdf`. If this file is the uploaded into S3, |
| 72 it can be retrieved back afterwards for upload into CloudSearch using |
| 73 :func:`add_sdf_from_s3`. |
| 74 |
| 75 The SDF is not cleared after a :func:`commit`. If you wish to continue |
| 76 using the DocumentServiceConnection for another batch upload of commands, |
| 77 you will need to :func:`clear_sdf` first to stop the previous batch of |
| 78 commands from being uploaded again. |
| 79 |
| 80 """ |
| 81 |
| 82 def __init__(self, domain=None, endpoint=None): |
| 83 self.domain = domain |
| 84 self.endpoint = endpoint |
| 85 if not self.endpoint: |
| 86 self.endpoint = domain.doc_service_endpoint |
| 87 self.documents_batch = [] |
| 88 self._sdf = None |
| 89 |
| 90 def add(self, _id, version, fields, lang='en'): |
| 91 """ |
| 92 Add a document to be processed by the DocumentService |
| 93 |
| 94 The document will not actually be added until :func:`commit` is called |
| 95 |
| 96 :type _id: string |
| 97 :param _id: A unique ID used to refer to this document. |
| 98 |
| 99 :type version: int |
| 100 :param version: Version of the document being indexed. If a file is |
| 101 being reindexed, the version should be higher than the existing one |
| 102 in CloudSearch. |
| 103 |
| 104 :type fields: dict |
| 105 :param fields: A dictionary of key-value pairs to be uploaded . |
| 106 |
| 107 :type lang: string |
| 108 :param lang: The language code the data is in. Only 'en' is currently |
| 109 supported |
| 110 """ |
| 111 |
| 112 d = {'type': 'add', 'id': _id, 'version': version, 'lang': lang, |
| 113 'fields': fields} |
| 114 self.documents_batch.append(d) |
| 115 |
| 116 def delete(self, _id, version): |
| 117 """ |
| 118 Schedule a document to be removed from the CloudSearch service |
| 119 |
| 120 The document will not actually be scheduled for removal until :func:`com
mit` is called |
| 121 |
| 122 :type _id: string |
| 123 :param _id: The unique ID of this document. |
| 124 |
| 125 :type version: int |
| 126 :param version: Version of the document to remove. The delete will only |
| 127 occur if this version number is higher than the version currently |
| 128 in the index. |
| 129 """ |
| 130 |
| 131 d = {'type': 'delete', 'id': _id, 'version': version} |
| 132 self.documents_batch.append(d) |
| 133 |
| 134 def get_sdf(self): |
| 135 """ |
| 136 Generate the working set of documents in Search Data Format (SDF) |
| 137 |
| 138 :rtype: string |
| 139 :returns: JSON-formatted string of the documents in SDF |
| 140 """ |
| 141 |
| 142 return self._sdf if self._sdf else json.dumps(self.documents_batch) |
| 143 |
| 144 def clear_sdf(self): |
| 145 """ |
| 146 Clear the working documents from this DocumentServiceConnection |
| 147 |
| 148 This should be used after :func:`commit` if the connection will be reuse
d |
| 149 for another set of documents. |
| 150 """ |
| 151 |
| 152 self._sdf = None |
| 153 self.documents_batch = [] |
| 154 |
| 155 def add_sdf_from_s3(self, key_obj): |
| 156 """ |
| 157 Load an SDF from S3 |
| 158 |
| 159 Using this method will result in documents added through |
| 160 :func:`add` and :func:`delete` being ignored. |
| 161 |
| 162 :type key_obj: :class:`boto.s3.key.Key` |
| 163 :param key_obj: An S3 key which contains an SDF |
| 164 """ |
| 165 #@todo:: (lucas) would be nice if this could just take an s3://uri..." |
| 166 |
| 167 self._sdf = key_obj.get_contents_as_string() |
| 168 |
| 169 def commit(self): |
| 170 """ |
| 171 Actually send an SDF to CloudSearch for processing |
| 172 |
| 173 If an SDF file has been explicitly loaded it will be used. Otherwise, |
| 174 documents added through :func:`add` and :func:`delete` will be used. |
| 175 |
| 176 :rtype: :class:`CommitResponse` |
| 177 :returns: A summary of documents added and deleted |
| 178 """ |
| 179 |
| 180 sdf = self.get_sdf() |
| 181 |
| 182 if ': null' in sdf: |
| 183 boto.log.error('null value in sdf detected. This will probably rais
e ' |
| 184 '500 error.') |
| 185 index = sdf.index(': null') |
| 186 boto.log.error(sdf[index - 100:index + 100]) |
| 187 |
| 188 url = "http://%s/2011-02-01/documents/batch" % (self.endpoint) |
| 189 |
| 190 request_config = { |
| 191 'pool_connections': 20, |
| 192 'keep_alive': True, |
| 193 'max_retries': 5, |
| 194 'pool_maxsize': 50 |
| 195 } |
| 196 |
| 197 r = requests.post(url, data=sdf, config=request_config, |
| 198 headers={'Content-Type': 'application/json'}) |
| 199 |
| 200 return CommitResponse(r, self, sdf) |
| 201 |
| 202 |
| 203 class CommitResponse(object): |
| 204 """Wrapper for response to Cloudsearch document batch commit. |
| 205 |
| 206 :type response: :class:`requests.models.Response` |
| 207 :param response: Response from Cloudsearch /documents/batch API |
| 208 |
| 209 :type doc_service: :class:`boto.cloudsearch.document.DocumentServiceConnecti
on` |
| 210 :param doc_service: Object containing the documents posted and methods to |
| 211 retry |
| 212 |
| 213 :raises: :class:`boto.exception.BotoServerError` |
| 214 :raises: :class:`boto.cloudsearch.document.SearchServiceException` |
| 215 :raises: :class:`boto.cloudsearch.document.EncodingError` |
| 216 :raises: :class:`boto.cloudsearch.document.ContentTooLongError` |
| 217 """ |
| 218 def __init__(self, response, doc_service, sdf): |
| 219 self.response = response |
| 220 self.doc_service = doc_service |
| 221 self.sdf = sdf |
| 222 |
| 223 try: |
| 224 self.content = json.loads(response.content) |
| 225 except: |
| 226 boto.log.error('Error indexing documents.\nResponse Content:\n{0}\n\
n' |
| 227 'SDF:\n{1}'.format(response.content, self.sdf)) |
| 228 raise boto.exception.BotoServerError(self.response.status_code, '', |
| 229 body=response.content) |
| 230 |
| 231 self.status = self.content['status'] |
| 232 if self.status == 'error': |
| 233 self.errors = [e.get('message') for e in self.content.get('errors', |
| 234 [])] |
| 235 for e in self.errors: |
| 236 if "Illegal Unicode character" in e: |
| 237 raise EncodingError("Illegal Unicode character in document") |
| 238 elif e == "The Content-Length is too long": |
| 239 raise ContentTooLongError("Content was too long") |
| 240 else: |
| 241 self.errors = [] |
| 242 |
| 243 self.adds = self.content['adds'] |
| 244 self.deletes = self.content['deletes'] |
| 245 self._check_num_ops('add', self.adds) |
| 246 self._check_num_ops('delete', self.deletes) |
| 247 |
| 248 def _check_num_ops(self, type_, response_num): |
| 249 """Raise exception if number of ops in response doesn't match commit |
| 250 |
| 251 :type type_: str |
| 252 :param type_: Type of commit operation: 'add' or 'delete' |
| 253 |
| 254 :type response_num: int |
| 255 :param response_num: Number of adds or deletes in the response. |
| 256 |
| 257 :raises: :class:`boto.cloudsearch.document.CommitMismatchError` |
| 258 """ |
| 259 commit_num = len([d for d in self.doc_service.documents_batch |
| 260 if d['type'] == type_]) |
| 261 |
| 262 if response_num != commit_num: |
| 263 raise CommitMismatchError( |
| 264 'Incorrect number of {0}s returned. Commit: {1} Response: {2}'\ |
| 265 .format(type_, commit_num, response_num)) |
OLD | NEW |