| OLD | NEW | 
|---|
| (Empty) |  | 
|  | 1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/ | 
|  | 2 # | 
|  | 3 # Permission is hereby granted, free of charge, to any person obtaining a | 
|  | 4 # copy of this software and associated documentation files (the | 
|  | 5 # "Software"), to deal in the Software without restriction, including | 
|  | 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | 
|  | 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | 
|  | 8 # persons to whom the Software is furnished to do so, subject to the fol- | 
|  | 9 # lowing conditions: | 
|  | 10 # | 
|  | 11 # The above copyright notice and this permission notice shall be included | 
|  | 12 # in all copies or substantial portions of the Software. | 
|  | 13 # | 
|  | 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | 
|  | 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | 
|  | 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | 
|  | 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | 
|  | 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 
|  | 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 
|  | 20 # IN THE SOFTWARE. | 
|  | 21 # | 
|  | 22 | 
|  | 23 import boto | 
|  | 24 from boto.sdb.db.property import StringProperty, DateTimeProperty, IntegerProper
     ty | 
|  | 25 from boto.sdb.db.model import Model | 
|  | 26 import datetime, subprocess, StringIO, time | 
|  | 27 | 
|  | 28 def check_hour(val): | 
|  | 29     if val == '*': | 
|  | 30         return | 
|  | 31     if int(val) < 0 or int(val) > 23: | 
|  | 32         raise ValueError | 
|  | 33 | 
|  | 34 class Task(Model): | 
|  | 35 | 
|  | 36     """ | 
|  | 37     A scheduled, repeating task that can be executed by any participating server
     s. | 
|  | 38     The scheduling is similar to cron jobs.  Each task has an hour attribute. | 
|  | 39     The allowable values for hour are [0-23|*]. | 
|  | 40 | 
|  | 41     To keep the operation reasonably efficient and not cause excessive polling, | 
|  | 42     the minimum granularity of a Task is hourly.  Some examples: | 
|  | 43 | 
|  | 44          hour='*' - the task would be executed each hour | 
|  | 45          hour='3' - the task would be executed at 3AM GMT each day. | 
|  | 46 | 
|  | 47     """ | 
|  | 48     name = StringProperty() | 
|  | 49     hour = StringProperty(required=True, validator=check_hour, default='*') | 
|  | 50     command = StringProperty(required=True) | 
|  | 51     last_executed = DateTimeProperty() | 
|  | 52     last_status = IntegerProperty() | 
|  | 53     last_output = StringProperty() | 
|  | 54     message_id = StringProperty() | 
|  | 55 | 
|  | 56     @classmethod | 
|  | 57     def start_all(cls, queue_name): | 
|  | 58         for task in cls.all(): | 
|  | 59             task.start(queue_name) | 
|  | 60 | 
|  | 61     def __init__(self, id=None, **kw): | 
|  | 62         Model.__init__(self, id, **kw) | 
|  | 63         self.hourly = self.hour == '*' | 
|  | 64         self.daily = self.hour != '*' | 
|  | 65         self.now = datetime.datetime.utcnow() | 
|  | 66 | 
|  | 67     def check(self): | 
|  | 68         """ | 
|  | 69         Determine how long until the next scheduled time for a Task. | 
|  | 70         Returns the number of seconds until the next scheduled time or zero | 
|  | 71         if the task needs to be run immediately. | 
|  | 72         If it's an hourly task and it's never been run, run it now. | 
|  | 73         If it's a daily task and it's never been run and the hour is right, run 
     it now. | 
|  | 74         """ | 
|  | 75         boto.log.info('checking Task[%s]-now=%s, last=%s' % (self.name, self.now
     , self.last_executed)) | 
|  | 76 | 
|  | 77         if self.hourly and not self.last_executed: | 
|  | 78             return 0 | 
|  | 79 | 
|  | 80         if self.daily and not self.last_executed: | 
|  | 81             if int(self.hour) == self.now.hour: | 
|  | 82                 return 0 | 
|  | 83             else: | 
|  | 84                 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(s
     elf.hour)) )*60*60 | 
|  | 85 | 
|  | 86         delta = self.now - self.last_executed | 
|  | 87         if self.hourly: | 
|  | 88             if delta.seconds >= 60*60: | 
|  | 89                 return 0 | 
|  | 90             else: | 
|  | 91                 return 60*60 - delta.seconds | 
|  | 92         else: | 
|  | 93             if int(self.hour) == self.now.hour: | 
|  | 94                 if delta.days >= 1: | 
|  | 95                     return 0 | 
|  | 96                 else: | 
|  | 97                     return 82800 # 23 hours, just to be safe | 
|  | 98             else: | 
|  | 99                 return max( (int(self.hour)-self.now.hour), (self.now.hour-int(s
     elf.hour)) )*60*60 | 
|  | 100 | 
|  | 101     def _run(self, msg, vtimeout): | 
|  | 102         boto.log.info('Task[%s] - running:%s' % (self.name, self.command)) | 
|  | 103         log_fp = StringIO.StringIO() | 
|  | 104         process = subprocess.Popen(self.command, shell=True, stdin=subprocess.PI
     PE, | 
|  | 105                                    stdout=subprocess.PIPE, stderr=subprocess.PIP
     E) | 
|  | 106         nsecs = 5 | 
|  | 107         current_timeout = vtimeout | 
|  | 108         while process.poll() == None: | 
|  | 109             boto.log.info('nsecs=%s, timeout=%s' % (nsecs, current_timeout)) | 
|  | 110             if nsecs >= current_timeout: | 
|  | 111                 current_timeout += vtimeout | 
|  | 112                 boto.log.info('Task[%s] - setting timeout to %d seconds' % (self
     .name, current_timeout)) | 
|  | 113                 if msg: | 
|  | 114                     msg.change_visibility(current_timeout) | 
|  | 115             time.sleep(5) | 
|  | 116             nsecs += 5 | 
|  | 117         t = process.communicate() | 
|  | 118         log_fp.write(t[0]) | 
|  | 119         log_fp.write(t[1]) | 
|  | 120         boto.log.info('Task[%s] - output: %s' % (self.name, log_fp.getvalue())) | 
|  | 121         self.last_executed = self.now | 
|  | 122         self.last_status = process.returncode | 
|  | 123         self.last_output = log_fp.getvalue()[0:1023] | 
|  | 124 | 
|  | 125     def run(self, msg, vtimeout=60): | 
|  | 126         delay = self.check() | 
|  | 127         boto.log.info('Task[%s] - delay=%s seconds' % (self.name, delay)) | 
|  | 128         if delay == 0: | 
|  | 129             self._run(msg, vtimeout) | 
|  | 130             queue = msg.queue | 
|  | 131             new_msg = queue.new_message(self.id) | 
|  | 132             new_msg = queue.write(new_msg) | 
|  | 133             self.message_id = new_msg.id | 
|  | 134             self.put() | 
|  | 135             boto.log.info('Task[%s] - new message id=%s' % (self.name, new_msg.i
     d)) | 
|  | 136             msg.delete() | 
|  | 137             boto.log.info('Task[%s] - deleted message %s' % (self.name, msg.id)) | 
|  | 138         else: | 
|  | 139             boto.log.info('new_vtimeout: %d' % delay) | 
|  | 140             msg.change_visibility(delay) | 
|  | 141 | 
|  | 142     def start(self, queue_name): | 
|  | 143         boto.log.info('Task[%s] - starting with queue: %s' % (self.name, queue_n
     ame)) | 
|  | 144         queue = boto.lookup('sqs', queue_name) | 
|  | 145         msg = queue.new_message(self.id) | 
|  | 146         msg = queue.write(msg) | 
|  | 147         self.message_id = msg.id | 
|  | 148         self.put() | 
|  | 149         boto.log.info('Task[%s] - start successful' % self.name) | 
|  | 150 | 
|  | 151 class TaskPoller(object): | 
|  | 152 | 
|  | 153     def __init__(self, queue_name): | 
|  | 154         self.sqs = boto.connect_sqs() | 
|  | 155         self.queue = self.sqs.lookup(queue_name) | 
|  | 156 | 
|  | 157     def poll(self, wait=60, vtimeout=60): | 
|  | 158         while True: | 
|  | 159             m = self.queue.read(vtimeout) | 
|  | 160             if m: | 
|  | 161                 task = Task.get_by_id(m.get_body()) | 
|  | 162                 if task: | 
|  | 163                     if not task.message_id or m.id == task.message_id: | 
|  | 164                         boto.log.info('Task[%s] - read message %s' % (task.name,
      m.id)) | 
|  | 165                         task.run(m, vtimeout) | 
|  | 166                     else: | 
|  | 167                         boto.log.info('Task[%s] - found extraneous message, igno
     ring' % task.name) | 
|  | 168             else: | 
|  | 169                 time.sleep(wait) | 
|  | 170 | 
|  | 171 | 
|  | 172 | 
|  | 173 | 
|  | 174 | 
|  | 175 | 
| OLD | NEW | 
|---|