OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 Mitch Garnaat http://garnaat.org/ |
| 2 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. |
| 3 # All Rights Reserved |
| 4 # |
| 5 # Permission is hereby granted, free of charge, to any person obtaining a |
| 6 # copy of this software and associated documentation files (the |
| 7 # "Software"), to deal in the Software without restriction, including |
| 8 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 9 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 10 # persons to whom the Software is furnished to do so, subject to the fol- |
| 11 # lowing conditions: |
| 12 # |
| 13 # The above copyright notice and this permission notice shall be included |
| 14 # in all copies or substantial portions of the Software. |
| 15 # |
| 16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 22 # IN THE SOFTWARE. |
| 23 # |
| 24 |
| 25 import boto.exception |
| 26 from boto.compat import json |
| 27 import requests |
| 28 import boto |
| 29 |
| 30 class SearchServiceException(Exception): |
| 31 pass |
| 32 |
| 33 |
| 34 class CommitMismatchError(Exception): |
| 35 pass |
| 36 |
| 37 |
| 38 class DocumentServiceConnection(object): |
| 39 |
| 40 def __init__(self, domain=None, endpoint=None): |
| 41 self.domain = domain |
| 42 self.endpoint = endpoint |
| 43 if not self.endpoint: |
| 44 self.endpoint = domain.doc_service_endpoint |
| 45 self.documents_batch = [] |
| 46 self._sdf = None |
| 47 |
| 48 def add(self, _id, version, fields, lang='en'): |
| 49 d = {'type': 'add', 'id': _id, 'version': version, 'lang': lang, |
| 50 'fields': fields} |
| 51 self.documents_batch.append(d) |
| 52 |
| 53 def delete(self, _id, version): |
| 54 d = {'type': 'delete', 'id': _id, 'version': version} |
| 55 self.documents_batch.append(d) |
| 56 |
| 57 def get_sdf(self): |
| 58 return self._sdf if self._sdf else json.dumps(self.documents_batch) |
| 59 |
| 60 def clear_sdf(self): |
| 61 self._sdf = None |
| 62 self.documents_batch = [] |
| 63 |
| 64 def add_sdf_from_s3(self, key_obj): |
| 65 """@todo (lucas) would be nice if this could just take an s3://uri...""" |
| 66 self._sdf = key_obj.get_contents_as_string() |
| 67 |
| 68 def commit(self): |
| 69 sdf = self.get_sdf() |
| 70 |
| 71 if ': null' in sdf: |
| 72 boto.log.error('null value in sdf detected. This will probably rais
e ' |
| 73 '500 error.') |
| 74 index = sdf.index(': null') |
| 75 boto.log.error(sdf[index - 100:index + 100]) |
| 76 |
| 77 url = "http://%s/2011-02-01/documents/batch" % (self.endpoint) |
| 78 |
| 79 request_config = { |
| 80 'pool_connections': 20, |
| 81 'keep_alive': True, |
| 82 'max_retries': 5, |
| 83 'pool_maxsize': 50 |
| 84 } |
| 85 |
| 86 r = requests.post(url, data=sdf, config=request_config, |
| 87 headers={'Content-Type': 'application/json'}) |
| 88 |
| 89 return CommitResponse(r, self, sdf) |
| 90 |
| 91 |
| 92 class CommitResponse(object): |
| 93 """Wrapper for response to Cloudsearch document batch commit. |
| 94 |
| 95 :type response: :class:`requests.models.Response` |
| 96 :param response: Response from Cloudsearch /documents/batch API |
| 97 |
| 98 :type doc_service: :class:`exfm.cloudsearch.DocumentServiceConnection` |
| 99 :param doc_service: Object containing the documents posted and methods to |
| 100 retry |
| 101 |
| 102 :raises: :class:`boto.exception.BotoServerError` |
| 103 :raises: :class:`exfm.cloudsearch.SearchServiceException` |
| 104 """ |
| 105 def __init__(self, response, doc_service, sdf): |
| 106 self.response = response |
| 107 self.doc_service = doc_service |
| 108 self.sdf = sdf |
| 109 |
| 110 try: |
| 111 self.content = json.loads(response.content) |
| 112 except: |
| 113 boto.log.error('Error indexing documents.\nResponse Content:\n{}\n\n
' |
| 114 'SDF:\n{}'.format(response.content, self.sdf)) |
| 115 raise boto.exception.BotoServerError(self.response.status_code, '', |
| 116 body=response.content) |
| 117 |
| 118 self.status = self.content['status'] |
| 119 if self.status == 'error': |
| 120 self.errors = [e.get('message') for e in self.content.get('errors', |
| 121 [])] |
| 122 else: |
| 123 self.errors = [] |
| 124 |
| 125 self.adds = self.content['adds'] |
| 126 self.deletes = self.content['deletes'] |
| 127 self._check_num_ops('add', self.adds) |
| 128 self._check_num_ops('delete', self.deletes) |
| 129 |
| 130 def _check_num_ops(self, type_, response_num): |
| 131 """Raise exception if number of ops in response doesn't match commit |
| 132 |
| 133 :type type_: str |
| 134 :param type_: Type of commit operation: 'add' or 'delete' |
| 135 |
| 136 :type response_num: int |
| 137 :param response_num: Number of adds or deletes in the response. |
| 138 |
| 139 :raises: :class:`exfm.cloudsearch.SearchServiceException` |
| 140 """ |
| 141 commit_num = len([d for d in self.doc_service.documents_batch |
| 142 if d['type'] == type_]) |
| 143 |
| 144 if response_num != commit_num: |
| 145 raise CommitMismatchError( |
| 146 'Incorrect number of {}s returned. Commit: {} Respose: {}'\ |
| 147 .format(type_, commit_num, response_num)) |
OLD | NEW |