-
Notifications
You must be signed in to change notification settings - Fork 348
Expand file tree
/
Copy pathtaskqueue.py
More file actions
203 lines (154 loc) · 5.82 KB
/
taskqueue.py
File metadata and controls
203 lines (154 loc) · 5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python
# -*-coding: utf8 -*-
"""Task Queue API
TaskQueue is a distributed task queue service provided by SAE for developers as
a simple way to execute asynchronous user tasks.
Example:
1. Add a GET task.
from sae.taskqueue import Task, TaskQueue
queue = TaskQueue('queue_name')
queue.add(Task("/tasks/cd"))
2. Add a POST task.
queue.add(Task("/tasks/strip", "postdata"))
3. Add a bundle of tasks.
tasks = [Task("/tasks/grep", d) for d in datas]
queue.add(tasks)
4. A simple way to add task.
from sae.taskqueue import add_task
add_task('queue_name', '/tasks/fsck', 'postdata')
"""
__all__ = ['Error', 'InternalError', 'InvalidTaskError',
'PermissionDeniedError', 'TaskQueueNotExistsError',
'TooManyTasksError', 'add_task', 'Task', 'TaskQueue']
import os
import time
import json
import urllib
import urllib2
import urlparse
import base64
import util
import const
class Error(Exception):
"""Base-class for all exception in this module"""
class InvalidTaskError(Error):
"""The task's url, payload, or options is invalid"""
class InternalError(Error):
"""There was an internal error while accessing this queue, it should be
temporary, it problem continues, please contact us"""
class PermissionDeniedError(Error):
"""The requested operation is not allowed for this app"""
class TaskQueueNotExistsError(Error):
"""The specified task queue does not exist"""
class TooManyTasksError(Error):
"""Either the taskqueue is Full or the space left's not enough"""
_ERROR_MAPPING = {
1: PermissionDeniedError, 3: InvalidTaskError, 10: TaskQueueNotExistsError,
11: TooManyTasksError, 500: InternalError, #999: UnknownError,
#403: Permission denied or out of quota
}
_TASKQUEUE_BACKEND = 'http://taskqueue.sae.sina.com.cn/index.php'
class Task:
_default_netloc = 'http://' + os.environ['HTTP_HOST']
def __init__(self, url, payload = None, **kwargs):
"""Initializer.
Args:
url: URL where the taskqueue daemon should handle this task.
payload: Optinal, if provided, the taskqueue daemon will take this
task as a POST task and |payload| as POST data.
delay: Delay the execution of the task for certain second(s). Up to
600 seconds.
prior: If set to True, the task will be add to the head of the queue.
Raises:
InvalidTaskError: if there's a unrecognized argument.
"""
self.info = {}
if url.startswith('http://'):
self.info['url'] = url
else:
self.info['url'] = urlparse.urljoin(self._default_netloc, url)
if payload:
self.info['postdata'] = base64.b64encode(payload)
for k, v in kwargs.iteritems():
if k == 'delay':
self.info['delay'] = v
elif k == 'prior':
self.info['prior'] = v
else:
raise InvalidTaskError()
def extract_params(self):
return self.info
class TaskQueue:
def __init__(self, name, auth_token=None):
"""Initializer.
Args:
name: The name of the taskqueue.
auth_token: Optional, a two-element tuple (access_key, secretkey_key),
useful when you want to access other application's taskqueue.
"""
self.name = name
if auth_token:
self.accesskey_key, self.secret_key = auth_token
else:
self.access_key = const.ACCESS_KEY
self.secret_key = const.SECRET_KEY
def add(self, task):
"""Add task to the task queue
Args:
task: The task to be added, it can be a single Task, or a list of
Tasks.
"""
try:
tasks = list(iter(task))
except TypeError:
tasks = [task]
task_args = {}
task_args['name'] = self.name
task_args['queue'] = []
for t in tasks:
task_args['queue'].append(t.extract_params())
#print task_args
args = [('taskqueue', json.dumps(task_args))]
return self._remote_call(args)
def size(self):
"""Query for how many task is left(not executed) in the queue. """
args = []
args.append(('act', 'curlen'))
args.append(('params', json.dumps({'name': self.name})))
return int(self._remote_call(args))
def _remote_call(self, args):
args_dict = dict(args)
command = args_dict.get('act')
if command == 'curlen':
return "0"
tasks = json.loads(args_dict['taskqueue'])['queue']
for t in tasks:
url = t['url']
payload = t.get('postdata')
if payload:
payload = base64.b64decode(payload)
print '[SAE:TASKQUEUE] Add task:', url, payload, self.name
#try:
# # Try to make a sync call.
# rep = urllib2.urlopen(url, payload, 5)
# print rep.read()
#except:
# import traceback
# print 'TASKQUEUE_ERROR:', t
# traceback.print_exc()
return True
def _get_headers(self):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
msg = 'ACCESSKEY' + self.access_key + 'TIMESTAMP' + timestamp
headers = {'TimeStamp': timestamp,
'AccessKey': self.access_key,
'Signature': util.get_signature(self.secret_key, msg)}
return headers
def add_task(queue_name, url, payload=None, **kws):
"""A shortcut for adding task
Args:
queue_name: The queue's name of which you want the task be added to.
url: URL where the taskqueue daemon should handle this task.
payload: The post data if you want to do a POST task.
"""
TaskQueue(queue_name).add(Task(url, payload, **kws))