Skip to content
This repository was archived by the owner on Apr 12, 2018. It is now read-only.

Commit 6f4d175

Browse files
author
Théo Crevon
committed
Merge pull request #25 from botify-labs/feature/decider-poll-full-history
Feature/decider poll full history
2 parents 525754d + d99bf7f commit 6f4d175

File tree

2 files changed

+83
-9
lines changed

2 files changed

+83
-9
lines changed

swf/actors/decider.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ def complete(self, task_token,
4444
def poll(self, task_list=None,
4545
identity=None,
4646
**kwargs):
47-
"""Polls for decision tasks to process from current
48-
actor's instance defined ``task_list``
47+
"""
48+
Polls a decision task and returns the token and the full history of the
49+
workflow's events.
4950
5051
:param task_list: task list to poll for decision tasks from.
5152
:type task_list: string
@@ -55,22 +56,41 @@ def poll(self, task_list=None,
5556
workflow history.
5657
:type identity: string
5758
58-
:returns: polled decision tasks
59+
:returns: (token, history)
5960
:type: swf.models.History
61+
6062
"""
6163
task_list = task_list or self.task_list
6264

63-
events = self.connection.poll_for_decision_task(
65+
task = self.connection.poll_for_decision_task(
6466
self.domain.name,
6567
task_list=task_list,
6668
identity=identity,
6769
**kwargs
6870
)
69-
70-
if not 'taskToken' in events:
71+
token = task.get('taskToken')
72+
if token is None:
7173
raise PollTimeout("Decider poll timed out")
7274

73-
history = History.from_event_list(events['events'])
74-
task_token = events['taskToken']
75+
events = task['events']
76+
77+
next_page = task.get('nextPageToken')
78+
while next_page:
79+
task = self.connection.poll_for_decision_task(
80+
self.domain.name,
81+
task_list=task_list,
82+
identity=identity,
83+
next_page_token=next_page,
84+
**kwargs
85+
)
86+
87+
token = task.get('taskToken')
88+
if token is None:
89+
raise PollTimeout("Decider poll timed out")
90+
91+
events.extend(task['events'])
92+
next_page = task.get('nextPageToken')
93+
94+
history = History.from_event_list(events)
7595

76-
return task_token, history
96+
return token, history

tests/actors/test_decider.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import unittest
2+
3+
import swf.models
4+
import swf.models.decision
5+
import swf.actors
6+
7+
8+
class TestDecider(unittest.TestCase):
9+
def setUp(self):
10+
self.domain = swf.models.Domain("TestDomain")
11+
self.task_list = 'test'
12+
self.execution = None
13+
14+
def tearDown(self):
15+
if self.execution is not None:
16+
self.execution.terminate()
17+
18+
def test_poll(self):
19+
"""
20+
Checks :meth:`Decider.poll` retrieve all the history's pages.
21+
22+
"""
23+
domain = self.domain
24+
task_list = self.task_list
25+
workflow_name = 'TestDeciderPoll'
26+
27+
decider = swf.actors.Decider(domain, task_list)
28+
worker = swf.actors.ActivityWorker(domain, task_list)
29+
30+
activity = swf.models.ActivityType(domain=domain,
31+
name='task',
32+
version='test')
33+
34+
workflow = swf.models.WorkflowType(name=workflow_name,
35+
domain=domain,
36+
version='test')
37+
self.execution = workflow.start_execution(workflow_name, task_list)
38+
39+
for i in xrange(30):
40+
token, history = decider.poll()
41+
self.assertEqual(len(history), 3 + i * 6)
42+
decision = swf.models.decision.task.ActivityTaskDecision(
43+
'schedule',
44+
'task',
45+
activity,
46+
task_list=task_list,
47+
task_timeout='600',
48+
duration_timeout='600',
49+
schedule_timeout='600',
50+
heartbeat_timeout='600')
51+
decider.complete(token, [decision])
52+
53+
token, task = worker.poll()
54+
worker.complete(token)

0 commit comments

Comments
 (0)