Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Unified Diff: third_party/boto/boto/services/service.py

Issue 12755026: Added gsutil/boto to depot_tools/third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Added readme Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/boto/boto/services/result.py ('k') | third_party/boto/boto/services/servicedef.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/boto/boto/services/service.py
diff --git a/third_party/boto/boto/services/service.py b/third_party/boto/boto/services/service.py
new file mode 100644
index 0000000000000000000000000000000000000000..e0e987ce86913ff7ca1bbc12185169d844391f98
--- /dev/null
+++ b/third_party/boto/boto/services/service.py
@@ -0,0 +1,161 @@
+# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+import boto
+from boto.services.message import ServiceMessage
+from boto.services.servicedef import ServiceDef
+from boto.pyami.scriptbase import ScriptBase
+from boto.utils import get_ts
+import time
+import os
+import mimetypes
+
+
+class Service(ScriptBase):
+
+ # Time required to process a transaction
+ ProcessingTime = 60
+
+ def __init__(self, config_file=None, mimetype_files=None):
+ ScriptBase.__init__(self, config_file)
+ self.name = self.__class__.__name__
+ self.working_dir = boto.config.get('Pyami', 'working_dir')
+ self.sd = ServiceDef(config_file)
+ self.retry_count = self.sd.getint('retry_count', 5)
+ self.loop_delay = self.sd.getint('loop_delay', 30)
+ self.processing_time = self.sd.getint('processing_time', 60)
+ self.input_queue = self.sd.get_obj('input_queue')
+ self.output_queue = self.sd.get_obj('output_queue')
+ self.output_domain = self.sd.get_obj('output_domain')
+ if mimetype_files:
+ mimetypes.init(mimetype_files)
+
+ def split_key(key):
+ if key.find(';') < 0:
+ t = (key, '')
+ else:
+ key, type = key.split(';')
+ label, mtype = type.split('=')
+ t = (key, mtype)
+ return t
+
+ def read_message(self):
+ boto.log.info('read_message')
+ message = self.input_queue.read(self.processing_time)
+ if message:
+ boto.log.info(message.get_body())
+ key = 'Service-Read'
+ message[key] = get_ts()
+ return message
+
+ # retrieve the source file from S3
+ def get_file(self, message):
+ bucket_name = message['Bucket']
+ key_name = message['InputKey']
+ file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
+ boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
+ bucket = boto.lookup('s3', bucket_name)
+ key = bucket.new_key(key_name)
+ key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
+ return file_name
+
+ # process source file, return list of output files
+ def process_file(self, in_file_name, msg):
+ return []
+
+ # store result file in S3
+ def put_file(self, bucket_name, file_path, key_name=None):
+ boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
+ bucket = boto.lookup('s3', bucket_name)
+ key = bucket.new_key(key_name)
+ key.set_contents_from_filename(file_path)
+ return key
+
+ def save_results(self, results, input_message, output_message):
+ output_keys = []
+ for file, type in results:
+ if 'OutputBucket' in input_message:
+ output_bucket = input_message['OutputBucket']
+ else:
+ output_bucket = input_message['Bucket']
+ key_name = os.path.split(file)[1]
+ key = self.put_file(output_bucket, file, key_name)
+ output_keys.append('%s;type=%s' % (key.name, type))
+ output_message['OutputKey'] = ','.join(output_keys)
+
+ # write message to each output queue
+ def write_message(self, message):
+ message['Service-Write'] = get_ts()
+ message['Server'] = self.name
+ if 'HOSTNAME' in os.environ:
+ message['Host'] = os.environ['HOSTNAME']
+ else:
+ message['Host'] = 'unknown'
+ message['Instance-ID'] = self.instance_id
+ if self.output_queue:
+ boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
+ self.output_queue.write(message)
+ if self.output_domain:
+ boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
+ item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
+ self.output_domain.put_attributes(item_name, message)
+
+ # delete message from input queue
+ def delete_message(self, message):
+ boto.log.info('deleting message from %s' % self.input_queue.id)
+ self.input_queue.delete_message(message)
+
+ # to clean up any files, etc. after each iteration
+ def cleanup(self):
+ pass
+
+ def shutdown(self):
+ on_completion = self.sd.get('on_completion', 'shutdown')
+ if on_completion == 'shutdown':
+ if self.instance_id:
+ time.sleep(60)
+ c = boto.connect_ec2()
+ c.terminate_instances([self.instance_id])
+
+ def main(self, notify=False):
+ self.notify('Service: %s Starting' % self.name)
+ empty_reads = 0
+ while self.retry_count < 0 or empty_reads < self.retry_count:
+ try:
+ input_message = self.read_message()
+ if input_message:
+ empty_reads = 0
+ output_message = ServiceMessage(None, input_message.get_body())
+ input_file = self.get_file(input_message)
+ results = self.process_file(input_file, output_message)
+ self.save_results(results, input_message, output_message)
+ self.write_message(output_message)
+ self.delete_message(input_message)
+ self.cleanup()
+ else:
+ empty_reads += 1
+ time.sleep(self.loop_delay)
+ except Exception:
+ boto.log.exception('Service Failed')
+ empty_reads += 1
+ self.notify('Service: %s Shutting Down' % self.name)
+ self.shutdown()
+
« no previous file with comments | « third_party/boto/boto/services/result.py ('k') | third_party/boto/boto/services/servicedef.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698