Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: third_party/gsutil/boto/emr/connection.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23 """
24 Represents a connection to the EMR service
25 """
26 import types
27
28 import boto
29 import boto.utils
30 from boto.ec2.regioninfo import RegionInfo
31 from boto.emr.emrobject import JobFlow, RunJobFlowResponse
32 from boto.emr.emrobject import AddInstanceGroupsResponse
33 from boto.emr.emrobject import ModifyInstanceGroupsResponse
34 from boto.emr.step import JarStep
35 from boto.connection import AWSQueryConnection
36 from boto.exception import EmrResponseError
37
38
39 class EmrConnection(AWSQueryConnection):
40
41 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
42 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1')
43 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint',
44 'elasticmapreduce.us-east-1.amazonaw s.com')
45 ResponseError = EmrResponseError
46
47 # Constants for AWS Console debugging
48 DebuggingJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-r unner.jar'
49 DebuggingArgs = 's3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetc h'
50
51 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
52 is_secure=True, port=None, proxy=None, proxy_port=None,
53 proxy_user=None, proxy_pass=None, debug=0,
54 https_connection_factory=None, region=None, path='/',
55 security_token=None, validate_certs=True):
56 if not region:
57 region = RegionInfo(self, self.DefaultRegionName,
58 self.DefaultRegionEndpoint)
59 self.region = region
60 AWSQueryConnection.__init__(self, aws_access_key_id,
61 aws_secret_access_key,
62 is_secure, port, proxy, proxy_port,
63 proxy_user, proxy_pass,
64 self.region.endpoint, debug,
65 https_connection_factory, path,
66 security_token,
67 validate_certs=validate_certs)
68
69 def _required_auth_capability(self):
70 return ['emr']
71
72 def describe_jobflow(self, jobflow_id):
73 """
74 Describes a single Elastic MapReduce job flow
75
76 :type jobflow_id: str
77 :param jobflow_id: The job flow id of interest
78 """
79 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id])
80 if jobflows:
81 return jobflows[0]
82
83 def describe_jobflows(self, states=None, jobflow_ids=None,
84 created_after=None, created_before=None):
85 """
86 Retrieve all the Elastic MapReduce job flows on your account
87
88 :type states: list
89 :param states: A list of strings with job flow states wanted
90
91 :type jobflow_ids: list
92 :param jobflow_ids: A list of job flow IDs
93 :type created_after: datetime
94 :param created_after: Bound on job flow creation time
95
96 :type created_before: datetime
97 :param created_before: Bound on job flow creation time
98 """
99 params = {}
100
101 if states:
102 self.build_list_params(params, states, 'JobFlowStates.member')
103 if jobflow_ids:
104 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
105 if created_after:
106 params['CreatedAfter'] = created_after.strftime(
107 boto.utils.ISO8601)
108 if created_before:
109 params['CreatedBefore'] = created_before.strftime(
110 boto.utils.ISO8601)
111
112 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
113
114 def terminate_jobflow(self, jobflow_id):
115 """
116 Terminate an Elastic MapReduce job flow
117
118 :type jobflow_id: str
119 :param jobflow_id: A jobflow id
120 """
121 self.terminate_jobflows([jobflow_id])
122
123 def terminate_jobflows(self, jobflow_ids):
124 """
125 Terminate an Elastic MapReduce job flow
126
127 :type jobflow_ids: list
128 :param jobflow_ids: A list of job flow IDs
129 """
130 params = {}
131 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
132 return self.get_status('TerminateJobFlows', params, verb='POST')
133
134 def add_jobflow_steps(self, jobflow_id, steps):
135 """
136 Adds steps to a jobflow
137
138 :type jobflow_id: str
139 :param jobflow_id: The job flow id
140 :type steps: list(boto.emr.Step)
141 :param steps: A list of steps to add to the job
142 """
143 if not isinstance(steps, types.ListType):
144 steps = [steps]
145 params = {}
146 params['JobFlowId'] = jobflow_id
147
148 # Step args
149 step_args = [self._build_step_args(step) for step in steps]
150 params.update(self._build_step_list(step_args))
151
152 return self.get_object(
153 'AddJobFlowSteps', params, RunJobFlowResponse, verb='POST')
154
155 def add_instance_groups(self, jobflow_id, instance_groups):
156 """
157 Adds instance groups to a running cluster.
158
159 :type jobflow_id: str
160 :param jobflow_id: The id of the jobflow which will take the
161 new instance groups
162
163 :type instance_groups: list(boto.emr.InstanceGroup)
164 :param instance_groups: A list of instance groups to add to the job
165 """
166 if not isinstance(instance_groups, types.ListType):
167 instance_groups = [instance_groups]
168 params = {}
169 params['JobFlowId'] = jobflow_id
170 params.update(self._build_instance_group_list_args(instance_groups))
171
172 return self.get_object('AddInstanceGroups', params,
173 AddInstanceGroupsResponse, verb='POST')
174
175 def modify_instance_groups(self, instance_group_ids, new_sizes):
176 """
177 Modify the number of nodes and configuration settings in an
178 instance group.
179
180 :type instance_group_ids: list(str)
181 :param instance_group_ids: A list of the ID's of the instance
182 groups to be modified
183
184 :type new_sizes: list(int)
185 :param new_sizes: A list of the new sizes for each instance group
186 """
187 if not isinstance(instance_group_ids, types.ListType):
188 instance_group_ids = [instance_group_ids]
189 if not isinstance(new_sizes, types.ListType):
190 new_sizes = [new_sizes]
191
192 instance_groups = zip(instance_group_ids, new_sizes)
193
194 params = {}
195 for k, ig in enumerate(instance_groups):
196 # could be wrong - the example amazon gives uses
197 # InstanceRequestCount, while the api documentation
198 # says InstanceCount
199 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
200 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
201
202 return self.get_object('ModifyInstanceGroups', params,
203 ModifyInstanceGroupsResponse, verb='POST')
204
205 def run_jobflow(self, name, log_uri=None, ec2_keyname=None,
206 availability_zone=None,
207 master_instance_type='m1.small',
208 slave_instance_type='m1.small', num_instances=1,
209 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
210 enable_debugging=False,
211 hadoop_version=None,
212 steps=[],
213 bootstrap_actions=[],
214 instance_groups=None,
215 additional_info=None,
216 ami_version=None,
217 api_params=None):
218 """
219 Runs a job flow
220 :type name: str
221 :param name: Name of the job flow
222
223 :type log_uri: str
224 :param log_uri: URI of the S3 bucket to place logs
225
226 :type ec2_keyname: str
227 :param ec2_keyname: EC2 key used for the instances
228
229 :type availability_zone: str
230 :param availability_zone: EC2 availability zone of the cluster
231
232 :type master_instance_type: str
233 :param master_instance_type: EC2 instance type of the master
234
235 :type slave_instance_type: str
236 :param slave_instance_type: EC2 instance type of the slave nodes
237
238 :type num_instances: int
239 :param num_instances: Number of instances in the Hadoop cluster
240
241 :type action_on_failure: str
242 :param action_on_failure: Action to take if a step terminates
243
244 :type keep_alive: bool
245 :param keep_alive: Denotes whether the cluster should stay
246 alive upon completion
247
248 :type enable_debugging: bool
249 :param enable_debugging: Denotes whether AWS console debugging
250 should be enabled.
251
252 :type hadoop_version: str
253 :param hadoop_version: Version of Hadoop to use. This no longer
254 defaults to '0.20' and now uses the AMI default.
255
256 :type steps: list(boto.emr.Step)
257 :param steps: List of steps to add with the job
258
259 :type bootstrap_actions: list(boto.emr.BootstrapAction)
260 :param bootstrap_actions: List of bootstrap actions that run
261 before Hadoop starts.
262
263 :type instance_groups: list(boto.emr.InstanceGroup)
264 :param instance_groups: Optional list of instance groups to
265 use when creating this job.
266 NB: When provided, this argument supersedes num_instances
267 and master/slave_instance_type.
268
269 :type ami_version: str
270 :param ami_version: Amazon Machine Image (AMI) version to use
271 for instances. Values accepted by EMR are '1.0', '2.0', and
272 'latest'; EMR currently defaults to '1.0' if you don't set
273 'ami_version'.
274
275 :type additional_info: JSON str
276 :param additional_info: A JSON string for selecting additional features
277
278 :type api_params: dict
279 :param api_params: a dictionary of additional parameters to pass
280 directly to the EMR API (so you don't have to upgrade boto to
281 use new EMR features). You can also delete an API parameter
282 by setting it to None.
283
284 :rtype: str
285 :return: The jobflow id
286 """
287 params = {}
288 if action_on_failure:
289 params['ActionOnFailure'] = action_on_failure
290 if log_uri:
291 params['LogUri'] = log_uri
292 params['Name'] = name
293
294 # Common instance args
295 common_params = self._build_instance_common_args(ec2_keyname,
296 availability_zone,
297 keep_alive,
298 hadoop_version)
299 params.update(common_params)
300
301 # NB: according to the AWS API's error message, we must
302 # "configure instances either using instance count, master and
303 # slave instance type or instance groups but not both."
304 #
305 # Thus we switch here on the truthiness of instance_groups.
306 if not instance_groups:
307 # Instance args (the common case)
308 instance_params = self._build_instance_count_and_type_args(
309 master_instance_type,
310 slave_instance_type,
311 num_instances)
312 params.update(instance_params)
313 else:
314 # Instance group args (for spot instances or a heterogenous cluster)
315 list_args = self._build_instance_group_list_args(instance_groups)
316 instance_params = dict(
317 ('Instances.%s' % k, v) for k, v in list_args.iteritems()
318 )
319 params.update(instance_params)
320
321 # Debugging step from EMR API docs
322 if enable_debugging:
323 debugging_step = JarStep(name='Setup Hadoop Debugging',
324 action_on_failure='TERMINATE_JOB_FLOW',
325 main_class=None,
326 jar=self.DebuggingJar,
327 step_args=self.DebuggingArgs)
328 steps.insert(0, debugging_step)
329
330 # Step args
331 if steps:
332 step_args = [self._build_step_args(step) for step in steps]
333 params.update(self._build_step_list(step_args))
334
335 if bootstrap_actions:
336 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap _action) for bootstrap_action in bootstrap_actions]
337 params.update(self._build_bootstrap_action_list(bootstrap_action_arg s))
338
339 if ami_version:
340 params['AmiVersion'] = ami_version
341
342 if additional_info is not None:
343 params['AdditionalInfo'] = additional_info
344
345 if api_params:
346 for key, value in api_params.iteritems():
347 if value is None:
348 params.pop(key, None)
349 else:
350 params[key] = value
351
352 response = self.get_object(
353 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
354 return response.jobflowid
355
356 def set_termination_protection(self, jobflow_id,
357 termination_protection_status):
358 """
359 Set termination protection on specified Elastic MapReduce job flows
360
361 :type jobflow_ids: list or str
362 :param jobflow_ids: A list of job flow IDs
363
364 :type termination_protection_status: bool
365 :param termination_protection_status: Termination protection status
366 """
367 assert termination_protection_status in (True, False)
368
369 params = {}
370 params['TerminationProtected'] = (termination_protection_status and "tru e") or "false"
371 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
372
373 return self.get_status('SetTerminationProtection', params, verb='POST')
374
375 def _build_bootstrap_action_args(self, bootstrap_action):
376 bootstrap_action_params = {}
377 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action .path
378
379 try:
380 bootstrap_action_params['Name'] = bootstrap_action.name
381 except AttributeError:
382 pass
383
384 args = bootstrap_action.args()
385 if args:
386 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstr apAction.Args.member')
387
388 return bootstrap_action_params
389
390 def _build_step_args(self, step):
391 step_params = {}
392 step_params['ActionOnFailure'] = step.action_on_failure
393 step_params['HadoopJarStep.Jar'] = step.jar()
394
395 main_class = step.main_class()
396 if main_class:
397 step_params['HadoopJarStep.MainClass'] = main_class
398
399 args = step.args()
400 if args:
401 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member ')
402
403 step_params['Name'] = step.name
404 return step_params
405
406 def _build_bootstrap_action_list(self, bootstrap_actions):
407 if not isinstance(bootstrap_actions, types.ListType):
408 bootstrap_actions = [bootstrap_actions]
409
410 params = {}
411 for i, bootstrap_action in enumerate(bootstrap_actions):
412 for key, value in bootstrap_action.iteritems():
413 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
414 return params
415
416 def _build_step_list(self, steps):
417 if not isinstance(steps, types.ListType):
418 steps = [steps]
419
420 params = {}
421 for i, step in enumerate(steps):
422 for key, value in step.iteritems():
423 params['Steps.member.%s.%s' % (i+1, key)] = value
424 return params
425
426 def _build_instance_common_args(self, ec2_keyname, availability_zone,
427 keep_alive, hadoop_version):
428 """
429 Takes a number of parameters used when starting a jobflow (as
430 specified in run_jobflow() above). Returns a comparable dict for
431 use in making a RunJobFlow request.
432 """
433 params = {
434 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(),
435 }
436
437 if hadoop_version:
438 params['Instances.HadoopVersion'] = hadoop_version
439 if ec2_keyname:
440 params['Instances.Ec2KeyName'] = ec2_keyname
441 if availability_zone:
442 params['Instances.Placement.AvailabilityZone'] = availability_zone
443
444 return params
445
446 def _build_instance_count_and_type_args(self, master_instance_type,
447 slave_instance_type, num_instances):
448 """
449 Takes a master instance type (string), a slave instance type
450 (string), and a number of instances. Returns a comparable dict
451 for use in making a RunJobFlow request.
452 """
453 params = {'Instances.MasterInstanceType': master_instance_type,
454 'Instances.SlaveInstanceType': slave_instance_type,
455 'Instances.InstanceCount': num_instances}
456 return params
457
458 def _build_instance_group_args(self, instance_group):
459 """
460 Takes an InstanceGroup; returns a dict that, when its keys are
461 properly prefixed, can be used for describing InstanceGroups in
462 RunJobFlow or AddInstanceGroups requests.
463 """
464 params = {'InstanceCount': instance_group.num_instances,
465 'InstanceRole': instance_group.role,
466 'InstanceType': instance_group.type,
467 'Name': instance_group.name,
468 'Market': instance_group.market}
469 if instance_group.market == 'SPOT':
470 params['BidPrice'] = instance_group.bidprice
471 return params
472
473 def _build_instance_group_list_args(self, instance_groups):
474 """
475 Takes a list of InstanceGroups, or a single InstanceGroup. Returns
476 a comparable dict for use in making a RunJobFlow or AddInstanceGroups
477 request.
478 """
479 if not isinstance(instance_groups, types.ListType):
480 instance_groups = [instance_groups]
481
482 params = {}
483 for i, instance_group in enumerate(instance_groups):
484 ig_dict = self._build_instance_group_args(instance_group)
485 for key, value in ig_dict.iteritems():
486 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
487 return params
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698