| Index: third_party/gsutil/boto/boto/services/service.py
|
| diff --git a/third_party/gsutil/boto/boto/services/service.py b/third_party/gsutil/boto/boto/services/service.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..e0e987ce86913ff7ca1bbc12185169d844391f98
|
| --- /dev/null
|
| +++ b/third_party/gsutil/boto/boto/services/service.py
|
| @@ -0,0 +1,161 @@
|
| +# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
|
| +#
|
| +# Permission is hereby granted, free of charge, to any person obtaining a
|
| +# copy of this software and associated documentation files (the
|
| +# "Software"), to deal in the Software without restriction, including
|
| +# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| +# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| +# persons to whom the Software is furnished to do so, subject to the fol-
|
| +# lowing conditions:
|
| +#
|
| +# The above copyright notice and this permission notice shall be included
|
| +# in all copies or substantial portions of the Software.
|
| +#
|
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| +# IN THE SOFTWARE.
|
| +
|
| +import boto
|
| +from boto.services.message import ServiceMessage
|
| +from boto.services.servicedef import ServiceDef
|
| +from boto.pyami.scriptbase import ScriptBase
|
| +from boto.utils import get_ts
|
| +import time
|
| +import os
|
| +import mimetypes
|
| +
|
| +
|
| +class Service(ScriptBase):
|
| +
|
| + # Time required to process a transaction
|
| + ProcessingTime = 60
|
| +
|
| + def __init__(self, config_file=None, mimetype_files=None):
|
| + ScriptBase.__init__(self, config_file)
|
| + self.name = self.__class__.__name__
|
| + self.working_dir = boto.config.get('Pyami', 'working_dir')
|
| + self.sd = ServiceDef(config_file)
|
| + self.retry_count = self.sd.getint('retry_count', 5)
|
| + self.loop_delay = self.sd.getint('loop_delay', 30)
|
| + self.processing_time = self.sd.getint('processing_time', 60)
|
| + self.input_queue = self.sd.get_obj('input_queue')
|
| + self.output_queue = self.sd.get_obj('output_queue')
|
| + self.output_domain = self.sd.get_obj('output_domain')
|
| + if mimetype_files:
|
| + mimetypes.init(mimetype_files)
|
| +
|
| + def split_key(key):
|
| + if key.find(';') < 0:
|
| + t = (key, '')
|
| + else:
|
| + key, type = key.split(';')
|
| + label, mtype = type.split('=')
|
| + t = (key, mtype)
|
| + return t
|
| +
|
| + def read_message(self):
|
| + boto.log.info('read_message')
|
| + message = self.input_queue.read(self.processing_time)
|
| + if message:
|
| + boto.log.info(message.get_body())
|
| + key = 'Service-Read'
|
| + message[key] = get_ts()
|
| + return message
|
| +
|
| + # retrieve the source file from S3
|
| + def get_file(self, message):
|
| + bucket_name = message['Bucket']
|
| + key_name = message['InputKey']
|
| + file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
|
| + boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
|
| + bucket = boto.lookup('s3', bucket_name)
|
| + key = bucket.new_key(key_name)
|
| + key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
|
| + return file_name
|
| +
|
| + # process source file, return list of output files
|
| + def process_file(self, in_file_name, msg):
|
| + return []
|
| +
|
| + # store result file in S3
|
| + def put_file(self, bucket_name, file_path, key_name=None):
|
| + boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
|
| + bucket = boto.lookup('s3', bucket_name)
|
| + key = bucket.new_key(key_name)
|
| + key.set_contents_from_filename(file_path)
|
| + return key
|
| +
|
| + def save_results(self, results, input_message, output_message):
|
| + output_keys = []
|
| + for file, type in results:
|
| + if 'OutputBucket' in input_message:
|
| + output_bucket = input_message['OutputBucket']
|
| + else:
|
| + output_bucket = input_message['Bucket']
|
| + key_name = os.path.split(file)[1]
|
| + key = self.put_file(output_bucket, file, key_name)
|
| + output_keys.append('%s;type=%s' % (key.name, type))
|
| + output_message['OutputKey'] = ','.join(output_keys)
|
| +
|
| + # write message to each output queue
|
| + def write_message(self, message):
|
| + message['Service-Write'] = get_ts()
|
| + message['Server'] = self.name
|
| + if 'HOSTNAME' in os.environ:
|
| + message['Host'] = os.environ['HOSTNAME']
|
| + else:
|
| + message['Host'] = 'unknown'
|
| + message['Instance-ID'] = self.instance_id
|
| + if self.output_queue:
|
| + boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
|
| + self.output_queue.write(message)
|
| + if self.output_domain:
|
| + boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
|
| + item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
|
| + self.output_domain.put_attributes(item_name, message)
|
| +
|
| + # delete message from input queue
|
| + def delete_message(self, message):
|
| + boto.log.info('deleting message from %s' % self.input_queue.id)
|
| + self.input_queue.delete_message(message)
|
| +
|
| + # to clean up any files, etc. after each iteration
|
| + def cleanup(self):
|
| + pass
|
| +
|
| + def shutdown(self):
|
| + on_completion = self.sd.get('on_completion', 'shutdown')
|
| + if on_completion == 'shutdown':
|
| + if self.instance_id:
|
| + time.sleep(60)
|
| + c = boto.connect_ec2()
|
| + c.terminate_instances([self.instance_id])
|
| +
|
| + def main(self, notify=False):
|
| + self.notify('Service: %s Starting' % self.name)
|
| + empty_reads = 0
|
| + while self.retry_count < 0 or empty_reads < self.retry_count:
|
| + try:
|
| + input_message = self.read_message()
|
| + if input_message:
|
| + empty_reads = 0
|
| + output_message = ServiceMessage(None, input_message.get_body())
|
| + input_file = self.get_file(input_message)
|
| + results = self.process_file(input_file, output_message)
|
| + self.save_results(results, input_message, output_message)
|
| + self.write_message(output_message)
|
| + self.delete_message(input_message)
|
| + self.cleanup()
|
| + else:
|
| + empty_reads += 1
|
| + time.sleep(self.loop_delay)
|
| + except Exception:
|
| + boto.log.exception('Service Failed')
|
| + empty_reads += 1
|
| + self.notify('Service: %s Shutting Down' % self.name)
|
| + self.shutdown()
|
| +
|
|
|