OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved |
| 2 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: |
| 10 # |
| 11 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. |
| 21 # |
| 22 |
| 23 import json |
| 24 import boto |
| 25 from boto.connection import AWSQueryConnection |
| 26 from boto.regioninfo import RegionInfo |
| 27 from boto.exception import JSONResponseError |
| 28 from boto.datapipeline import exceptions |
| 29 |
| 30 |
| 31 class DataPipelineConnection(AWSQueryConnection): |
| 32 """ |
| 33 This is the AWS Data Pipeline API Reference. This guide provides |
| 34 descriptions and samples of the AWS Data Pipeline API. |
| 35 """ |
| 36 APIVersion = "2012-10-29" |
| 37 DefaultRegionName = "us-east-1" |
| 38 DefaultRegionEndpoint = "datapipeline.us-east-1.amazonaws.com" |
| 39 ServiceName = "DataPipeline" |
| 40 ResponseError = JSONResponseError |
| 41 |
| 42 _faults = { |
| 43 "PipelineDeletedException": exceptions.PipelineDeletedException, |
| 44 "InvalidRequestException": exceptions.InvalidRequestException, |
| 45 "TaskNotFoundException": exceptions.TaskNotFoundException, |
| 46 "PipelineNotFoundException": exceptions.PipelineNotFoundException, |
| 47 "InternalServiceError": exceptions.InternalServiceError, |
| 48 } |
| 49 |
| 50 |
| 51 def __init__(self, **kwargs): |
| 52 region = kwargs.get('region') |
| 53 if not region: |
| 54 region = RegionInfo(self, self.DefaultRegionName, |
| 55 self.DefaultRegionEndpoint) |
| 56 kwargs['host'] = region.endpoint |
| 57 AWSQueryConnection.__init__(self, **kwargs) |
| 58 self.region = region |
| 59 |
| 60 |
| 61 def _required_auth_capability(self): |
| 62 return ['hmac-v4'] |
| 63 |
| 64 def activate_pipeline(self, pipeline_id): |
| 65 """ |
| 66 Validates a pipeline and initiates processing. If the pipeline |
| 67 does not pass validation, activation fails. |
| 68 |
| 69 :type pipeline_id: string |
| 70 :param pipeline_id: The identifier of the pipeline to activate. |
| 71 |
| 72 """ |
| 73 params = {'pipelineId': pipeline_id, } |
| 74 return self.make_request(action='ActivatePipeline', |
| 75 body=json.dumps(params)) |
| 76 |
| 77 def create_pipeline(self, name, unique_id, description=None): |
| 78 """ |
| 79 Creates a new empty pipeline. When this action succeeds, you can |
| 80 then use the PutPipelineDefinition action to populate the |
| 81 pipeline. |
| 82 |
| 83 :type name: string |
| 84 :param name: The name of the new pipeline. You can use the same name |
| 85 for multiple pipelines associated with your AWS account, because |
| 86 AWS Data Pipeline assigns each new pipeline a unique pipeline |
| 87 identifier. |
| 88 |
| 89 :type unique_id: string |
| 90 :param unique_id: A unique identifier that you specify. This identifier |
| 91 is not the same as the pipeline identifier assigned by AWS Data |
| 92 Pipeline. You are responsible for defining the format and ensuring |
| 93 the uniqueness of this identifier. You use this parameter to ensure |
| 94 idempotency during repeated calls to CreatePipeline. For example, |
| 95 if the first call to CreatePipeline does not return a clear |
| 96 success, you can pass in the same unique identifier and pipeline |
| 97 name combination on a subsequent call to CreatePipeline. |
| 98 CreatePipeline ensures that if a pipeline already exists with the |
| 99 same name and unique identifier, a new pipeline will not be |
| 100 created. Instead, you'll receive the pipeline identifier from the |
| 101 previous attempt. The uniqueness of the name and unique identifier |
| 102 combination is scoped to the AWS account or IAM user credentials. |
| 103 |
| 104 :type description: string |
| 105 :param description: The description of the new pipeline. |
| 106 |
| 107 """ |
| 108 params = {'name': name, 'uniqueId': unique_id, } |
| 109 if description is not None: |
| 110 params['description'] = description |
| 111 return self.make_request(action='CreatePipeline', |
| 112 body=json.dumps(params)) |
| 113 |
| 114 def delete_pipeline(self, pipeline_id): |
| 115 """ |
| 116 Permanently deletes a pipeline, its pipeline definition and its |
| 117 run history. You cannot query or restore a deleted pipeline. AWS |
| 118 Data Pipeline will attempt to cancel instances associated with |
| 119 the pipeline that are currently being processed by task runners. |
| 120 Deleting a pipeline cannot be undone. |
| 121 |
| 122 :type pipeline_id: string |
| 123 :param pipeline_id: The identifier of the pipeline to be deleted. |
| 124 |
| 125 """ |
| 126 params = {'pipelineId': pipeline_id, } |
| 127 return self.make_request(action='DeletePipeline', |
| 128 body=json.dumps(params)) |
| 129 |
| 130 def describe_objects(self, object_ids, pipeline_id, marker=None, |
| 131 evaluate_expressions=None): |
| 132 """ |
| 133 Returns the object definitions for a set of objects associated |
| 134 with the pipeline. Object definitions are composed of a set of |
| 135 fields that define the properties of the object. |
| 136 |
| 137 :type object_ids: list |
| 138 :param object_ids: Identifiers of the pipeline objects that contain the |
| 139 definitions to be described. You can pass as many as 25 identifiers |
| 140 in a single call to DescribeObjects |
| 141 |
| 142 :type marker: string |
| 143 :param marker: The starting point for the results to be returned. The |
| 144 first time you call DescribeObjects, this value should be empty. As |
| 145 long as the action returns HasMoreResults as True, you can call |
| 146 DescribeObjects again and pass the marker value from the response |
| 147 to retrieve the next set of results. |
| 148 |
| 149 :type pipeline_id: string |
| 150 :param pipeline_id: Identifier of the pipeline that contains the object |
| 151 definitions. |
| 152 |
| 153 :type evaluate_expressions: boolean |
| 154 :param evaluate_expressions: |
| 155 |
| 156 """ |
| 157 params = { |
| 158 'objectIds': object_ids, |
| 159 'pipelineId': pipeline_id, |
| 160 } |
| 161 if marker is not None: |
| 162 params['marker'] = marker |
| 163 if evaluate_expressions is not None: |
| 164 params['evaluateExpressions'] = evaluate_expressions |
| 165 return self.make_request(action='DescribeObjects', |
| 166 body=json.dumps(params)) |
| 167 |
| 168 def describe_pipelines(self, pipeline_ids): |
| 169 """ |
| 170 Retrieve metadata about one or more pipelines. The information |
| 171 retrieved includes the name of the pipeline, the pipeline |
| 172 identifier, its current state, and the user account that owns |
| 173 the pipeline. Using account credentials, you can retrieve |
| 174 metadata about pipelines that you or your IAM users have |
| 175 created. If you are using an IAM user account, you can retrieve |
| 176 metadata about only those pipelines you have read permission |
| 177 for. |
| 178 |
| 179 :type pipeline_ids: list |
| 180 :param pipeline_ids: Identifiers of the pipelines to describe. You can |
| 181 pass as many as 25 identifiers in a single call to |
| 182 DescribePipelines. You can obtain pipeline identifiers by calling |
| 183 ListPipelines. |
| 184 |
| 185 """ |
| 186 params = {'pipelineIds': pipeline_ids, } |
| 187 return self.make_request(action='DescribePipelines', |
| 188 body=json.dumps(params)) |
| 189 |
| 190 def evaluate_expression(self, pipeline_id, expression, object_id): |
| 191 """ |
| 192 Evaluates a string in the context of a specified object. A task |
| 193 runner can use this action to evaluate SQL queries stored in |
| 194 Amazon S3. |
| 195 |
| 196 :type pipeline_id: string |
| 197 :param pipeline_id: The identifier of the pipeline. |
| 198 |
| 199 :type expression: string |
| 200 :param expression: The expression to evaluate. |
| 201 |
| 202 :type object_id: string |
| 203 :param object_id: The identifier of the object. |
| 204 |
| 205 """ |
| 206 params = { |
| 207 'pipelineId': pipeline_id, |
| 208 'expression': expression, |
| 209 'objectId': object_id, |
| 210 } |
| 211 return self.make_request(action='EvaluateExpression', |
| 212 body=json.dumps(params)) |
| 213 |
| 214 def get_pipeline_definition(self, pipeline_id, version=None): |
| 215 """ |
| 216 Returns the definition of the specified pipeline. You can call |
| 217 GetPipelineDefinition to retrieve the pipeline definition you |
| 218 provided using PutPipelineDefinition. |
| 219 |
| 220 :type pipeline_id: string |
| 221 :param pipeline_id: The identifier of the pipeline. |
| 222 |
| 223 :type version: string |
| 224 :param version: The version of the pipeline definition to retrieve. |
| 225 |
| 226 """ |
| 227 params = {'pipelineId': pipeline_id, } |
| 228 if version is not None: |
| 229 params['version'] = version |
| 230 return self.make_request(action='GetPipelineDefinition', |
| 231 body=json.dumps(params)) |
| 232 |
| 233 def list_pipelines(self, marker=None): |
| 234 """ |
| 235 Returns a list of pipeline identifiers for all active pipelines. |
| 236 Identifiers are returned only for pipelines you have permission |
| 237 to access. |
| 238 |
| 239 :type marker: string |
| 240 :param marker: The starting point for the results to be returned. The |
| 241 first time you call ListPipelines, this value should be empty. As |
| 242 long as the action returns HasMoreResults as True, you can call |
| 243 ListPipelines again and pass the marker value from the response to |
| 244 retrieve the next set of results. |
| 245 |
| 246 """ |
| 247 params = {} |
| 248 if marker is not None: |
| 249 params['marker'] = marker |
| 250 return self.make_request(action='ListPipelines', |
| 251 body=json.dumps(params)) |
| 252 |
| 253 def poll_for_task(self, worker_group, hostname=None, |
| 254 instance_identity=None): |
| 255 """ |
| 256 Task runners call this action to receive a task to perform from |
| 257 AWS Data Pipeline. The task runner specifies which tasks it can |
| 258 perform by setting a value for the workerGroup parameter of the |
| 259 PollForTask call. The task returned by PollForTask may come from |
| 260 any of the pipelines that match the workerGroup value passed in |
| 261 by the task runner and that was launched using the IAM user |
| 262 credentials specified by the task runner. |
| 263 |
| 264 :type worker_group: string |
| 265 :param worker_group: Indicates the type of task the task runner is |
| 266 configured to accept and process. The worker group is set as a |
| 267 field on objects in the pipeline when they are created. You can |
| 268 only specify a single value for workerGroup in the call to |
| 269 PollForTask. There are no wildcard values permitted in workerGroup, |
| 270 the string must be an exact, case-sensitive, match. |
| 271 |
| 272 :type hostname: string |
| 273 :param hostname: The public DNS name of the calling task runner. |
| 274 |
| 275 :type instance_identity: structure |
| 276 :param instance_identity: Identity information for the Amazon EC2 |
| 277 instance that is hosting the task runner. You can get this value by |
| 278 calling the URI, http://169.254.169.254/latest/meta-data/instance- |
| 279 id, from the EC2 instance. For more information, go to Instance |
| 280 Metadata in the Amazon Elastic Compute Cloud User Guide. Passing in |
| 281 this value proves that your task runner is running on an EC2 |
| 282 instance, and ensures the proper AWS Data Pipeline service charges |
| 283 are applied to your pipeline. |
| 284 |
| 285 """ |
| 286 params = {'workerGroup': worker_group, } |
| 287 if hostname is not None: |
| 288 params['hostname'] = hostname |
| 289 if instance_identity is not None: |
| 290 params['instanceIdentity'] = instance_identity |
| 291 return self.make_request(action='PollForTask', |
| 292 body=json.dumps(params)) |
| 293 |
| 294 def put_pipeline_definition(self, pipeline_objects, pipeline_id): |
| 295 """ |
| 296 Adds tasks, schedules, and preconditions that control the |
| 297 behavior of the pipeline. You can use PutPipelineDefinition to |
| 298 populate a new pipeline or to update an existing pipeline that |
| 299 has not yet been activated. |
| 300 |
| 301 :type pipeline_objects: list |
| 302 :param pipeline_objects: The objects that define the pipeline. These |
| 303 will overwrite the existing pipeline definition. |
| 304 |
| 305 :type pipeline_id: string |
| 306 :param pipeline_id: The identifier of the pipeline to be configured. |
| 307 |
| 308 """ |
| 309 params = { |
| 310 'pipelineObjects': pipeline_objects, |
| 311 'pipelineId': pipeline_id, |
| 312 } |
| 313 return self.make_request(action='PutPipelineDefinition', |
| 314 body=json.dumps(params)) |
| 315 |
| 316 def query_objects(self, pipeline_id, sphere, marker=None, query=None, |
| 317 limit=None): |
| 318 """ |
| 319 Queries a pipeline for the names of objects that match a |
| 320 specified set of conditions. |
| 321 |
| 322 :type marker: string |
| 323 :param marker: The starting point for the results to be returned. The |
| 324 first time you call QueryObjects, this value should be empty. As |
| 325 long as the action returns HasMoreResults as True, you can call |
| 326 QueryObjects again and pass the marker value from the response to |
| 327 retrieve the next set of results. |
| 328 |
| 329 :type query: structure |
| 330 :param query: Query that defines the objects to be returned. The Query |
| 331 object can contain a maximum of ten selectors. The conditions in |
| 332 the query are limited to top-level String fields in the object. |
| 333 These filters can be applied to components, instances, and |
| 334 attempts. |
| 335 |
| 336 :type pipeline_id: string |
| 337 :param pipeline_id: Identifier of the pipeline to be queried for object |
| 338 names. |
| 339 |
| 340 :type limit: integer |
| 341 :param limit: Specifies the maximum number of object names that |
| 342 QueryObjects will return in a single call. The default value is |
| 343 100. |
| 344 |
| 345 :type sphere: string |
| 346 :param sphere: Specifies whether the query applies to components or |
| 347 instances. Allowable values: COMPONENT, INSTANCE, ATTEMPT. |
| 348 |
| 349 """ |
| 350 params = {'pipelineId': pipeline_id, 'sphere': sphere, } |
| 351 if marker is not None: |
| 352 params['marker'] = marker |
| 353 if query is not None: |
| 354 params['query'] = query |
| 355 if limit is not None: |
| 356 params['limit'] = limit |
| 357 return self.make_request(action='QueryObjects', |
| 358 body=json.dumps(params)) |
| 359 |
| 360 def report_task_progress(self, task_id): |
| 361 """ |
| 362 Updates the AWS Data Pipeline service on the progress of the |
| 363 calling task runner. When the task runner is assigned a task, it |
| 364 should call ReportTaskProgress to acknowledge that it has the |
| 365 task within 2 minutes. If the web service does not recieve this |
| 366 acknowledgement within the 2 minute window, it will assign the |
| 367 task in a subsequent PollForTask call. After this initial |
| 368 acknowledgement, the task runner only needs to report progress |
| 369 every 15 minutes to maintain its ownership of the task. You can |
| 370 change this reporting time from 15 minutes by specifying a |
| 371 reportProgressTimeout field in your pipeline. If a task runner |
| 372 does not report its status after 5 minutes, AWS Data Pipeline |
| 373 will assume that the task runner is unable to process the task |
| 374 and will reassign the task in a subsequent response to |
| 375 PollForTask. task runners should call ReportTaskProgress every |
| 376 60 seconds. |
| 377 |
| 378 :type task_id: string |
| 379 :param task_id: Identifier of the task assigned to the task runner. |
| 380 This value is provided in the TaskObject that the service returns |
| 381 with the response for the PollForTask action. |
| 382 |
| 383 """ |
| 384 params = {'taskId': task_id, } |
| 385 return self.make_request(action='ReportTaskProgress', |
| 386 body=json.dumps(params)) |
| 387 |
| 388 def report_task_runner_heartbeat(self, taskrunner_id, worker_group=None, |
| 389 hostname=None): |
| 390 """ |
| 391 Task runners call ReportTaskRunnerHeartbeat to indicate that |
| 392 they are operational. In the case of AWS Data Pipeline Task |
| 393 Runner launched on a resource managed by AWS Data Pipeline, the |
| 394 web service can use this call to detect when the task runner |
| 395 application has failed and restart a new instance. |
| 396 |
| 397 :type worker_group: string |
| 398 :param worker_group: Indicates the type of task the task runner is |
| 399 configured to accept and process. The worker group is set as a |
| 400 field on objects in the pipeline when they are created. You can |
| 401 only specify a single value for workerGroup in the call to |
| 402 ReportTaskRunnerHeartbeat. There are no wildcard values permitted |
| 403 in workerGroup, the string must be an exact, case-sensitive, match. |
| 404 |
| 405 :type hostname: string |
| 406 :param hostname: The public DNS name of the calling task runner. |
| 407 |
| 408 :type taskrunner_id: string |
| 409 :param taskrunner_id: The identifier of the task runner. This value |
| 410 should be unique across your AWS account. In the case of AWS Data |
| 411 Pipeline Task Runner launched on a resource managed by AWS Data |
| 412 Pipeline, the web service provides a unique identifier when it |
| 413 launches the application. If you have written a custom task runner, |
| 414 you should assign a unique identifier for the task runner. |
| 415 |
| 416 """ |
| 417 params = {'taskrunnerId': taskrunner_id, } |
| 418 if worker_group is not None: |
| 419 params['workerGroup'] = worker_group |
| 420 if hostname is not None: |
| 421 params['hostname'] = hostname |
| 422 return self.make_request(action='ReportTaskRunnerHeartbeat', |
| 423 body=json.dumps(params)) |
| 424 |
| 425 def set_status(self, object_ids, status, pipeline_id): |
| 426 """ |
| 427 Requests that the status of an array of physical or logical |
| 428 pipeline objects be updated in the pipeline. This update may not |
| 429 occur immediately, but is eventually consistent. The status that |
| 430 can be set depends on the type of object. |
| 431 |
| 432 :type object_ids: list |
| 433 :param object_ids: Identifies an array of objects. The corresponding |
| 434 objects can be either physical or components, but not a mix of both |
| 435 types. |
| 436 |
| 437 :type status: string |
| 438 :param status: Specifies the status to be set on all the objects in |
| 439 objectIds. For components, this can be either PAUSE or RESUME. For |
| 440 instances, this can be either CANCEL, RERUN, or MARK\_FINISHED. |
| 441 |
| 442 :type pipeline_id: string |
| 443 :param pipeline_id: Identifies the pipeline that contains the objects. |
| 444 |
| 445 """ |
| 446 params = { |
| 447 'objectIds': object_ids, |
| 448 'status': status, |
| 449 'pipelineId': pipeline_id, |
| 450 } |
| 451 return self.make_request(action='SetStatus', |
| 452 body=json.dumps(params)) |
| 453 |
| 454 def set_task_status(self, task_id, task_status, error_code=None, |
| 455 error_message=None, error_stack_trace=None): |
| 456 """ |
| 457 Notifies AWS Data Pipeline that a task is completed and provides |
| 458 information about the final status. The task runner calls this |
| 459 action regardless of whether the task was sucessful. The task |
| 460 runner does not need to call SetTaskStatus for tasks that are |
| 461 canceled by the web service during a call to ReportTaskProgress. |
| 462 |
| 463 :type error_code: integer |
| 464 :param error_code: If an error occurred during the task, specifies a |
| 465 numerical value that represents the error. This value is set on the |
| 466 physical attempt object. It is used to display error information to |
| 467 the user. The web service does not parse this value. |
| 468 |
| 469 :type error_message: string |
| 470 :param error_message: If an error occurred during the task, specifies a |
| 471 text description of the error. This value is set on the physical |
| 472 attempt object. It is used to display error information to the |
| 473 user. The web service does not parse this value. |
| 474 |
| 475 :type error_stack_trace: string |
| 476 :param error_stack_trace: If an error occurred during the task, |
| 477 specifies the stack trace associated with the error. This value is |
| 478 set on the physical attempt object. It is used to display error |
| 479 information to the user. The web service does not parse this value. |
| 480 |
| 481 :type task_id: string |
| 482 :param task_id: Identifies the task assigned to the task runner. This |
| 483 value is set in the TaskObject that is returned by the PollForTask |
| 484 action. |
| 485 |
| 486 :type task_status: string |
| 487 :param task_status: If FINISHED, the task successfully completed. If |
| 488 FAILED the task ended unsuccessfully. The FALSE value is used by |
| 489 preconditions. |
| 490 |
| 491 """ |
| 492 params = {'taskId': task_id, 'taskStatus': task_status, } |
| 493 if error_code is not None: |
| 494 params['errorCode'] = error_code |
| 495 if error_message is not None: |
| 496 params['errorMessage'] = error_message |
| 497 if error_stack_trace is not None: |
| 498 params['errorStackTrace'] = error_stack_trace |
| 499 return self.make_request(action='SetTaskStatus', |
| 500 body=json.dumps(params)) |
| 501 |
| 502 def validate_pipeline_definition(self, pipeline_objects, pipeline_id): |
| 503 """ |
| 504 Tests the pipeline definition with a set of validation checks to |
| 505 ensure that it is well formed and can run without error. |
| 506 |
| 507 :type pipeline_objects: list |
| 508 :param pipeline_objects: A list of objects that define the pipeline |
| 509 changes to validate against the pipeline. |
| 510 |
| 511 :type pipeline_id: string |
| 512 :param pipeline_id: Identifies the pipeline whose definition is to be |
| 513 validated. |
| 514 |
| 515 """ |
| 516 params = { |
| 517 'pipelineObjects': pipeline_objects, |
| 518 'pipelineId': pipeline_id, |
| 519 } |
| 520 return self.make_request(action='ValidatePipelineDefinition', |
| 521 body=json.dumps(params)) |
| 522 |
| 523 def make_request(self, action, body): |
| 524 headers = { |
| 525 'X-Amz-Target': '%s.%s' % (self.ServiceName, action), |
| 526 'Host': self.region.endpoint, |
| 527 'Content-Type': 'application/x-amz-json-1.1', |
| 528 'Content-Length': str(len(body)), |
| 529 } |
| 530 http_request = self.build_base_http_request( |
| 531 method='POST', path='/', auth_path='/', params={}, |
| 532 headers=headers, data=body) |
| 533 response = self._mexe(http_request, sender=None, |
| 534 override_num_retries=10) |
| 535 response_body = response.read() |
| 536 boto.log.debug(response_body) |
| 537 if response.status == 200: |
| 538 if response_body: |
| 539 return json.loads(response_body) |
| 540 else: |
| 541 json_body = json.loads(response_body) |
| 542 fault_name = json_body.get('__type', None) |
| 543 exception_class = self._faults.get(fault_name, self.ResponseError) |
| 544 raise exception_class(response.status, response.reason, |
| 545 body=json_body) |
| 546 |
OLD | NEW |