| Index: third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| diff --git a/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py b/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6f59a7adc107d738cf7c3a83e46e7d38bccae124
|
| --- /dev/null
|
| +++ b/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| @@ -0,0 +1,173 @@
|
| +"""
|
| +Tests for Layer1 of Simple Workflow
|
| +
|
| +"""
|
| +import time
|
| +import uuid
|
| +import json
|
| +import traceback
|
| +
|
| +from boto.swf.layer1_decisions import Layer1Decisions
|
| +
|
| +from test_layer1 import SimpleWorkflowLayer1TestBase
|
| +
|
| +
|
| +
|
| +class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase):
|
| + """
|
| + test a simple workflow execution
|
| + """
|
| + swf = True
|
| +
|
| + def run_decider(self):
|
| + """
|
| + run one iteration of a simple decision engine
|
| + """
|
| + # Poll for a decision task.
|
| + tries = 0
|
| + while True:
|
| + dtask = self.conn.poll_for_decision_task(self._domain,
|
| + self._task_list, reverse_order=True)
|
| + if dtask.get('taskToken') is not None:
|
| + # This means a real decision task has arrived.
|
| + break
|
| + time.sleep(2)
|
| + tries += 1
|
| + if tries > 10:
|
| + # Give up if it's taking too long. Probably
|
| + # means something is broken somewhere else.
|
| + assert False, 'no decision task occurred'
|
| +
|
| + # Get the most recent interesting event.
|
| + ignorable = (
|
| + 'DecisionTaskScheduled',
|
| + 'DecisionTaskStarted',
|
| + 'DecisionTaskTimedOut',
|
| + )
|
| + event = None
|
| + for tevent in dtask['events']:
|
| + if tevent['eventType'] not in ignorable:
|
| + event = tevent
|
| + break
|
| +
|
| + # Construct the decision response.
|
| + decisions = Layer1Decisions()
|
| + if event['eventType'] == 'WorkflowExecutionStarted':
|
| + activity_id = str(uuid.uuid1())
|
| + decisions.schedule_activity_task(activity_id,
|
| + self._activity_type_name, self._activity_type_version,
|
| + task_list=self._task_list,
|
| + input=event['workflowExecutionStartedEventAttributes']['input'])
|
| + elif event['eventType'] == 'ActivityTaskCompleted':
|
| + decisions.complete_workflow_execution(
|
| + result=event['activityTaskCompletedEventAttributes']['result'])
|
| + elif event['eventType'] == 'ActivityTaskFailed':
|
| + decisions.fail_workflow_execution(
|
| + reason=event['activityTaskFailedEventAttributes']['reason'],
|
| + details=event['activityTaskFailedEventAttributes']['details'])
|
| + else:
|
| + decisions.fail_workflow_execution(
|
| + reason='unhandled decision task type; %r' % (event['eventType'],))
|
| +
|
| + # Send the decision response.
|
| + r = self.conn.respond_decision_task_completed(dtask['taskToken'],
|
| + decisions=decisions._data,
|
| + execution_context=None)
|
| + assert r is None
|
| +
|
| +
|
| + def run_worker(self):
|
| + """
|
| + run one iteration of a simple worker engine
|
| + """
|
| + # Poll for an activity task.
|
| + tries = 0
|
| + while True:
|
| + atask = self.conn.poll_for_activity_task(self._domain,
|
| + self._task_list, identity='test worker')
|
| + if atask.get('activityId') is not None:
|
| + # This means a real activity task has arrived.
|
| + break
|
| + time.sleep(2)
|
| + tries += 1
|
| + if tries > 10:
|
| + # Give up if it's taking too long. Probably
|
| + # means something is broken somewhere else.
|
| + assert False, 'no activity task occurred'
|
| + # Do the work or catch a "work exception."
|
| + reason = None
|
| + try:
|
| + result = json.dumps(sum(json.loads(atask['input'])))
|
| + except:
|
| + reason = 'an exception was raised'
|
| + details = traceback.format_exc()
|
| + if reason is None:
|
| + r = self.conn.respond_activity_task_completed(
|
| + atask['taskToken'], result)
|
| + else:
|
| + r = self.conn.respond_activity_task_failed(
|
| + atask['taskToken'], reason=reason, details=details)
|
| + assert r is None
|
| +
|
| +
|
| + def test_workflow_execution(self):
|
| + # Start a workflow execution whose activity task will succeed.
|
| + workflow_id = 'wfid-%.2f' % (time.time(),)
|
| + r = self.conn.start_workflow_execution(self._domain,
|
| + workflow_id,
|
| + self._workflow_type_name,
|
| + self._workflow_type_version,
|
| + execution_start_to_close_timeout='20',
|
| + input='[600, 15]')
|
| + # Need the run_id to lookup the execution history later.
|
| + run_id = r['runId']
|
| +
|
| + # Move the workflow execution forward by having the
|
| + # decider schedule an activity task.
|
| + self.run_decider()
|
| +
|
| + # Run the worker to handle the scheduled activity task.
|
| + self.run_worker()
|
| +
|
| + # Complete the workflow execution by having the
|
| + # decider close it down.
|
| + self.run_decider()
|
| +
|
| + # Check that the result was stored in the execution history.
|
| + r = self.conn.get_workflow_execution_history(self._domain,
|
| + run_id, workflow_id,
|
| + reverse_order=True)['events'][0]
|
| + result = r['workflowExecutionCompletedEventAttributes']['result']
|
| + assert json.loads(result) == 615
|
| +
|
| +
|
| + def test_failed_workflow_execution(self):
|
| + # Start a workflow execution whose activity task will fail.
|
| + workflow_id = 'wfid-%.2f' % (time.time(),)
|
| + r = self.conn.start_workflow_execution(self._domain,
|
| + workflow_id,
|
| + self._workflow_type_name,
|
| + self._workflow_type_version,
|
| + execution_start_to_close_timeout='20',
|
| + input='[600, "s"]')
|
| + # Need the run_id to lookup the execution history later.
|
| + run_id = r['runId']
|
| +
|
| + # Move the workflow execution forward by having the
|
| + # decider schedule an activity task.
|
| + self.run_decider()
|
| +
|
| + # Run the worker to handle the scheduled activity task.
|
| + self.run_worker()
|
| +
|
| + # Complete the workflow execution by having the
|
| + # decider close it down.
|
| + self.run_decider()
|
| +
|
| + # Check that the failure was stored in the execution history.
|
| + r = self.conn.get_workflow_execution_history(self._domain,
|
| + run_id, workflow_id,
|
| + reverse_order=True)['events'][0]
|
| + reason = r['workflowExecutionFailedEventAttributes']['reason']
|
| + assert reason == 'an exception was raised'
|
| +
|
|
|