Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(582)

Side by Side Diff: third_party/boto/manage/task.py

Issue 12633019: Added boto/ to depot_tools/third_party (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Moved boto down by one Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/boto/manage/server.py ('k') | third_party/boto/manage/test_manage.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22
23 import boto
24 from boto.sdb.db.property import StringProperty, DateTimeProperty, IntegerProper ty
25 from boto.sdb.db.model import Model
26 import datetime, subprocess, StringIO, time
27
28 def check_hour(val):
29 if val == '*':
30 return
31 if int(val) < 0 or int(val) > 23:
32 raise ValueError
33
34 class Task(Model):
35
36 """
37 A scheduled, repeating task that can be executed by any participating server s.
38 The scheduling is similar to cron jobs. Each task has an hour attribute.
39 The allowable values for hour are [0-23|*].
40
41 To keep the operation reasonably efficient and not cause excessive polling,
42 the minimum granularity of a Task is hourly. Some examples:
43
44 hour='*' - the task would be executed each hour
45 hour='3' - the task would be executed at 3AM GMT each day.
46
47 """
48 name = StringProperty()
49 hour = StringProperty(required=True, validator=check_hour, default='*')
50 command = StringProperty(required=True)
51 last_executed = DateTimeProperty()
52 last_status = IntegerProperty()
53 last_output = StringProperty()
54 message_id = StringProperty()
55
56 @classmethod
57 def start_all(cls, queue_name):
58 for task in cls.all():
59 task.start(queue_name)
60
61 def __init__(self, id=None, **kw):
62 Model.__init__(self, id, **kw)
63 self.hourly = self.hour == '*'
64 self.daily = self.hour != '*'
65 self.now = datetime.datetime.utcnow()
66
67 def check(self):
68 """
69 Determine how long until the next scheduled time for a Task.
70 Returns the number of seconds until the next scheduled time or zero
71 if the task needs to be run immediately.
72 If it's an hourly task and it's never been run, run it now.
73 If it's a daily task and it's never been run and the hour is right, run it now.
74 """
75 boto.log.info('checking Task[%s]-now=%s, last=%s' % (self.name, self.now , self.last_executed))
76
77 if self.hourly and not self.last_executed:
78 return 0
79
80 if self.daily and not self.last_executed:
81 if int(self.hour) == self.now.hour:
82 return 0
83 else:
84 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(s elf.hour)) )*60*60
85
86 delta = self.now - self.last_executed
87 if self.hourly:
88 if delta.seconds >= 60*60:
89 return 0
90 else:
91 return 60*60 - delta.seconds
92 else:
93 if int(self.hour) == self.now.hour:
94 if delta.days >= 1:
95 return 0
96 else:
97 return 82800 # 23 hours, just to be safe
98 else:
99 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(s elf.hour)) )*60*60
100
101 def _run(self, msg, vtimeout):
102 boto.log.info('Task[%s] - running:%s' % (self.name, self.command))
103 log_fp = StringIO.StringIO()
104 process = subprocess.Popen(self.command, shell=True, stdin=subprocess.PI PE,
105 stdout=subprocess.PIPE, stderr=subprocess.PIP E)
106 nsecs = 5
107 current_timeout = vtimeout
108 while process.poll() == None:
109 boto.log.info('nsecs=%s, timeout=%s' % (nsecs, current_timeout))
110 if nsecs >= current_timeout:
111 current_timeout += vtimeout
112 boto.log.info('Task[%s] - setting timeout to %d seconds' % (self .name, current_timeout))
113 if msg:
114 msg.change_visibility(current_timeout)
115 time.sleep(5)
116 nsecs += 5
117 t = process.communicate()
118 log_fp.write(t[0])
119 log_fp.write(t[1])
120 boto.log.info('Task[%s] - output: %s' % (self.name, log_fp.getvalue()))
121 self.last_executed = self.now
122 self.last_status = process.returncode
123 self.last_output = log_fp.getvalue()[0:1023]
124
125 def run(self, msg, vtimeout=60):
126 delay = self.check()
127 boto.log.info('Task[%s] - delay=%s seconds' % (self.name, delay))
128 if delay == 0:
129 self._run(msg, vtimeout)
130 queue = msg.queue
131 new_msg = queue.new_message(self.id)
132 new_msg = queue.write(new_msg)
133 self.message_id = new_msg.id
134 self.put()
135 boto.log.info('Task[%s] - new message id=%s' % (self.name, new_msg.i d))
136 msg.delete()
137 boto.log.info('Task[%s] - deleted message %s' % (self.name, msg.id))
138 else:
139 boto.log.info('new_vtimeout: %d' % delay)
140 msg.change_visibility(delay)
141
142 def start(self, queue_name):
143 boto.log.info('Task[%s] - starting with queue: %s' % (self.name, queue_n ame))
144 queue = boto.lookup('sqs', queue_name)
145 msg = queue.new_message(self.id)
146 msg = queue.write(msg)
147 self.message_id = msg.id
148 self.put()
149 boto.log.info('Task[%s] - start successful' % self.name)
150
151 class TaskPoller(object):
152
153 def __init__(self, queue_name):
154 self.sqs = boto.connect_sqs()
155 self.queue = self.sqs.lookup(queue_name)
156
157 def poll(self, wait=60, vtimeout=60):
158 while True:
159 m = self.queue.read(vtimeout)
160 if m:
161 task = Task.get_by_id(m.get_body())
162 if task:
163 if not task.message_id or m.id == task.message_id:
164 boto.log.info('Task[%s] - read message %s' % (task.name, m.id))
165 task.run(m, vtimeout)
166 else:
167 boto.log.info('Task[%s] - found extraneous message, igno ring' % task.name)
168 else:
169 time.sleep(wait)
170
171
172
173
174
175
OLDNEW
« no previous file with comments | « third_party/boto/manage/server.py ('k') | third_party/boto/manage/test_manage.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698