Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
diff --git a/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py b/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
new file mode 100644
index 0000000000000000000000000000000000000000..6f59a7adc107d738cf7c3a83e46e7d38bccae124
--- /dev/null
+++ b/third_party/gsutil/boto/tests/integration/swf/test_layer1_workflow_execution.py
@@ -0,0 +1,173 @@
+"""
+Tests for Layer1 of Simple Workflow
+
+"""
+import time
+import uuid
+import json
+import traceback
+
+from boto.swf.layer1_decisions import Layer1Decisions
+
+from test_layer1 import SimpleWorkflowLayer1TestBase
+
+
+
+class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase):
+ """
+ test a simple workflow execution
+ """
+ swf = True
+
+ def run_decider(self):
+ """
+ run one iteration of a simple decision engine
+ """
+ # Poll for a decision task.
+ tries = 0
+ while True:
+ dtask = self.conn.poll_for_decision_task(self._domain,
+ self._task_list, reverse_order=True)
+ if dtask.get('taskToken') is not None:
+ # This means a real decision task has arrived.
+ break
+ time.sleep(2)
+ tries += 1
+ if tries > 10:
+ # Give up if it's taking too long. Probably
+ # means something is broken somewhere else.
+ assert False, 'no decision task occurred'
+
+ # Get the most recent interesting event.
+ ignorable = (
+ 'DecisionTaskScheduled',
+ 'DecisionTaskStarted',
+ 'DecisionTaskTimedOut',
+ )
+ event = None
+ for tevent in dtask['events']:
+ if tevent['eventType'] not in ignorable:
+ event = tevent
+ break
+
+ # Construct the decision response.
+ decisions = Layer1Decisions()
+ if event['eventType'] == 'WorkflowExecutionStarted':
+ activity_id = str(uuid.uuid1())
+ decisions.schedule_activity_task(activity_id,
+ self._activity_type_name, self._activity_type_version,
+ task_list=self._task_list,
+ input=event['workflowExecutionStartedEventAttributes']['input'])
+ elif event['eventType'] == 'ActivityTaskCompleted':
+ decisions.complete_workflow_execution(
+ result=event['activityTaskCompletedEventAttributes']['result'])
+ elif event['eventType'] == 'ActivityTaskFailed':
+ decisions.fail_workflow_execution(
+ reason=event['activityTaskFailedEventAttributes']['reason'],
+ details=event['activityTaskFailedEventAttributes']['details'])
+ else:
+ decisions.fail_workflow_execution(
+ reason='unhandled decision task type; %r' % (event['eventType'],))
+
+ # Send the decision response.
+ r = self.conn.respond_decision_task_completed(dtask['taskToken'],
+ decisions=decisions._data,
+ execution_context=None)
+ assert r is None
+
+
+ def run_worker(self):
+ """
+ run one iteration of a simple worker engine
+ """
+ # Poll for an activity task.
+ tries = 0
+ while True:
+ atask = self.conn.poll_for_activity_task(self._domain,
+ self._task_list, identity='test worker')
+ if atask.get('activityId') is not None:
+ # This means a real activity task has arrived.
+ break
+ time.sleep(2)
+ tries += 1
+ if tries > 10:
+ # Give up if it's taking too long. Probably
+ # means something is broken somewhere else.
+ assert False, 'no activity task occurred'
+ # Do the work or catch a "work exception."
+ reason = None
+ try:
+ result = json.dumps(sum(json.loads(atask['input'])))
+ except:
+ reason = 'an exception was raised'
+ details = traceback.format_exc()
+ if reason is None:
+ r = self.conn.respond_activity_task_completed(
+ atask['taskToken'], result)
+ else:
+ r = self.conn.respond_activity_task_failed(
+ atask['taskToken'], reason=reason, details=details)
+ assert r is None
+
+
+ def test_workflow_execution(self):
+ # Start a workflow execution whose activity task will succeed.
+ workflow_id = 'wfid-%.2f' % (time.time(),)
+ r = self.conn.start_workflow_execution(self._domain,
+ workflow_id,
+ self._workflow_type_name,
+ self._workflow_type_version,
+ execution_start_to_close_timeout='20',
+ input='[600, 15]')
+ # Need the run_id to lookup the execution history later.
+ run_id = r['runId']
+
+ # Move the workflow execution forward by having the
+ # decider schedule an activity task.
+ self.run_decider()
+
+ # Run the worker to handle the scheduled activity task.
+ self.run_worker()
+
+ # Complete the workflow execution by having the
+ # decider close it down.
+ self.run_decider()
+
+ # Check that the result was stored in the execution history.
+ r = self.conn.get_workflow_execution_history(self._domain,
+ run_id, workflow_id,
+ reverse_order=True)['events'][0]
+ result = r['workflowExecutionCompletedEventAttributes']['result']
+ assert json.loads(result) == 615
+
+
+ def test_failed_workflow_execution(self):
+ # Start a workflow execution whose activity task will fail.
+ workflow_id = 'wfid-%.2f' % (time.time(),)
+ r = self.conn.start_workflow_execution(self._domain,
+ workflow_id,
+ self._workflow_type_name,
+ self._workflow_type_version,
+ execution_start_to_close_timeout='20',
+ input='[600, "s"]')
+ # Need the run_id to lookup the execution history later.
+ run_id = r['runId']
+
+ # Move the workflow execution forward by having the
+ # decider schedule an activity task.
+ self.run_decider()
+
+ # Run the worker to handle the scheduled activity task.
+ self.run_worker()
+
+ # Complete the workflow execution by having the
+ # decider close it down.
+ self.run_decider()
+
+ # Check that the failure was stored in the execution history.
+ r = self.conn.get_workflow_execution_history(self._domain,
+ run_id, workflow_id,
+ reverse_order=True)['events'][0]
+ reason = r['workflowExecutionFailedEventAttributes']['reason']
+ assert reason == 'an exception was raised'
+

Powered by Google App Engine
This is Rietveld 408576698