Index: third_party/buildbot_7_12/buildbot/status/status_push.py |
diff --git a/third_party/buildbot_7_12/buildbot/status/status_push.py b/third_party/buildbot_7_12/buildbot/status/status_push.py |
deleted file mode 100644 |
index 011509a055fbdde8ec6afb20707a4d972b725d02..0000000000000000000000000000000000000000 |
--- a/third_party/buildbot_7_12/buildbot/status/status_push.py |
+++ /dev/null |
@@ -1,428 +0,0 @@ |
-# -*- test-case-name: buildbot.test.test_status_push -*- |
- |
-"""Push events to an abstract receiver. |
- |
-Implements the HTTP receiver.""" |
- |
-import datetime |
-import logging |
-import os |
-import urllib |
-import urlparse |
- |
-try: |
- import simplejson as json |
-except ImportError: |
- import json |
- |
-from buildbot.master import BuildMaster |
-from buildbot.status.base import StatusReceiverMultiService |
-from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ |
- IQueue, MemoryQueue, PersistentQueue |
-from buildbot.status.web.status_json import FilterOut |
-from twisted.internet import defer, reactor |
-from twisted.python import log |
-from twisted.web import client |
- |
- |
- |
-class StatusPush(StatusReceiverMultiService): |
- """Event streamer to a abstract channel. |
- |
- It uses IQueue to batch push requests and queue the data when |
- the receiver is down. |
- When a PersistentQueue object is used, the items are saved to disk on master |
- shutdown so they can be pushed back when the master is restarted. |
- """ |
- |
- def __init__(self, serverPushCb, queue=None, path=None, filter=True, |
- bufferDelay=1, retryDelay=5, blackList=None): |
- """ |
- @serverPushCb: callback to be used. It receives 'self' as parameter. It |
- should call self.queueNextServerPush() when it's done to queue the next |
- push. It is guaranteed that the queue is not empty when this function is |
- called. |
- @queue: a item queue that implements IQueue. |
- @path: path to save config. |
- @filter: when True (default), removes all "", None, False, [] or {} |
- entries. |
- @bufferDelay: amount of time events are queued before sending, to |
- reduce the number of push requests rate. This is the delay between the |
- end of a request to initializing a new one. |
- @retryDelay: amount of time between retries when no items were pushed on |
- last serverPushCb call. |
- @blackList: events that shouldn't be sent. |
- """ |
- StatusReceiverMultiService.__init__(self) |
- |
- # Parameters. |
- self.queue = queue |
- if self.queue is None: |
- self.queue = MemoryQueue() |
- self.queue = IndexedQueue(self.queue) |
- self.path = path |
- self.filter = filter |
- self.bufferDelay = bufferDelay |
- self.retryDelay = retryDelay |
- if not callable(serverPushCb): |
- raise NotImplementedError('Please pass serverPushCb parameter.') |
- def hookPushCb(): |
- # Update the index so we know if the next push succeed or not, don't |
- # update the value when the queue is empty. |
- if not self.queue.nbItems(): |
- return |
- self.lastIndex = self.queue.getIndex() |
- return serverPushCb(self) |
- self.serverPushCb = hookPushCb |
- self.blackList = blackList |
- |
- # Other defaults. |
- # IDelayedCall object that represents the next queued push. |
- self.task = None |
- self.stopped = False |
- self.lastIndex = -1 |
- self.state = {} |
- self.state['next_id'] = 1 |
- self.state['last_id_pushed'] = 0 |
- # Try to load back the state. |
- if self.path and os.path.isdir(self.path): |
- state_path = os.path.join(self.path, 'state') |
- if os.path.isfile(state_path): |
- self.state.update(json.load(open(state_path, 'r'))) |
- # Started should refer to when the Buildbot was last restarted. We use |
- # this value to ensure which instance of a given Buildbot we're |
- # collecting data from. |
- self.state['started'] = str(datetime.datetime.utcnow()) |
- |
- if self.queue.nbItems(): |
- # Last shutdown was not clean, don't wait to send events. |
- self.queueNextServerPush() |
- |
- def setServiceParent(self, parent): |
- """Starting up.""" |
- StatusReceiverMultiService.setServiceParent(self, parent) |
- self.status = self.parent.getStatus() |
- self.status.subscribe(self) |
- self.initialPush() |
- |
- def wasLastPushSuccessful(self): |
- """Returns if the "virtual pointer" in the queue advanced.""" |
- return self.lastIndex <= self.queue.getIndex() |
- |
- def queueNextServerPush(self): |
- """Queue the next push or call it immediately. |
- |
- Called to signal new items are available to be sent or on shutdown. |
- A timer should be queued to trigger a network request or the callback |
- should be called immediately. If a status push is already queued, ignore |
- the current call.""" |
- # Determine the delay. |
- if self.wasLastPushSuccessful(): |
- if self.stopped: |
- # Shutting down. |
- delay = 0 |
- else: |
- # Normal case. |
- delay = self.bufferDelay |
- else: |
- if self.stopped: |
- # Too bad, we can't do anything now, we're shutting down and the |
- # receiver is also down. We'll just save the objects to disk. |
- return |
- else: |
- # The server is inaccessible, retry less often. |
- delay = self.retryDelay |
- |
- # Cleanup a previously queued task if necessary. |
- if self.task: |
- # Warning: we could be running inside the task. |
- if self.task.active(): |
- # There was already a task queue, don't requeue it, just let it |
- # go. |
- return |
- else: |
- if self.task.active(): |
- # There was a task queued but it is requested to call it |
- # *right now* so cancel it. |
- self.task.cancel() |
- # Otherwise, it was just a stray object. |
- self.task = None |
- |
- # Do the queue/direct call. |
- if delay: |
- # Call in delay seconds. |
- self.task = reactor.callLater(delay, self.serverPushCb) |
- elif self.stopped: |
- if not self.queue.nbItems(): |
- return |
- # Call right now, we're shutting down. |
- @defer.deferredGenerator |
- def BlockForEverythingBeingSent(): |
- d = self.serverPushCb() |
- if d: |
- x = defer.waitForDeferred(d) |
- yield x |
- x.getResult() |
- return BlockForEverythingBeingSent() |
- else: |
- # delay should never be 0. That can cause Buildbot to spin tightly |
- # trying to push events that may not be received well by a status |
- # listener. |
- logging.exception('Did not expect delay to be 0, but it is.') |
- return |
- |
- def stopService(self): |
- """Shutting down.""" |
- self.finalPush() |
- self.stopped = True |
- if (self.task and self.task.active()): |
- # We don't have time to wait, force an immediate call. |
- self.task.cancel() |
- self.task = None |
- d = self.queueNextServerPush() |
- elif self.wasLastPushSuccessful(): |
- d = self.queueNextServerPush() |
- else: |
- d = defer.succeed(None) |
- |
- # We're dying, make sure we save the results. |
- self.queue.save() |
- if self.path and os.path.isdir(self.path): |
- state_path = os.path.join(self.path, 'state') |
- json.dump(self.state, open(state_path, 'w'), sort_keys=True, |
- indent=2) |
- # Make sure all Deferreds are called on time and in a sane order. |
- defers = filter(None, [d, StatusReceiverMultiService.stopService(self)]) |
- return defer.DeferredList(defers) |
- |
- def push(self, event, **objs): |
- """Push a new event. |
- |
- The new event will be either: |
- - Queued in memory to reduce network usage |
- - Queued to disk when the sink server is down |
- - Pushed (along the other queued items) to the server |
- """ |
- if self.blackList and event in self.blackList: |
- return |
- # First, generate the packet. |
- packet = {} |
- packet['id'] = self.state['next_id'] |
- self.state['next_id'] += 1 |
- # Increment version when the packet format changes. |
- packet['version'] = 1 |
- packet['timestamp'] = str(datetime.datetime.utcnow()) |
- packet['project'] = self.status.getProjectName() |
- packet['started'] = self.state['started'] |
- packet['event'] = event |
- packet['payload'] = {} |
- for obj_name, obj in objs.items(): |
- if hasattr(obj, 'asDict'): |
- obj = obj.asDict() |
- if self.filter: |
- obj = FilterOut(obj) |
- packet['payload'][obj_name] = obj |
- packet['payload_json'] = json.dumps(packet['payload']) |
- del(packet['payload']) |
- self.queue.pushItem(packet) |
- if self.task is None or not self.task.active(): |
- # No task queued since it was probably idle, let's queue a task. |
- return self.queueNextServerPush() |
- |
- #### Events |
- |
- def initialPush(self): |
- # Push everything we want to push from the initial configuration. |
- self.push('start', status=self.status) |
- |
- def finalPush(self): |
- self.push('shutdown', status=self.status) |
- |
- def requestSubmitted(self, request): |
- self.push('requestSubmitted', request=request) |
- |
- def requestCancelled(self, builder, request): |
- self.push('requestCancelled', builder=builder, request=request) |
- |
- def buildsetSubmitted(self, buildset): |
- self.push('buildsetSubmitted', buildset=buildset) |
- |
- def builderAdded(self, builderName, builder): |
- self.push('builderAdded', builderName=builderName, builder=builder) |
- return self |
- |
- def builderChangedState(self, builderName, state): |
- self.push('builderChangedState', builderName=builderName, state=state) |
- |
- def buildStarted(self, builderName, build): |
- self.push('buildStarted', build=build) |
- return self |
- |
- def buildETAUpdate(self, build, ETA): |
- self.push('buildETAUpdate', build=build, ETA=ETA) |
- |
- def stepStarted(self, build, step): |
- self.push('stepStarted', |
- properties=build.getProperties().asList(), |
- step=step) |
- |
- def stepTextChanged(self, build, step, text): |
- self.push('stepTextChanged', |
- properties=build.getProperties().asList(), |
- step=step, |
- text=text) |
- |
- def stepText2Changed(self, build, step, text2): |
- self.push('stepText2Changed', |
- properties=build.getProperties().asList(), |
- step=step, |
- text2=text2) |
- |
- def stepETAUpdate(self, build, step, ETA, expectations): |
- self.push('stepETAUpdate', |
- properties=build.getProperties().asList(), |
- step=step, |
- ETA=ETA, |
- expectations=expectations) |
- |
- def logStarted(self, build, step, log): |
- self.push('logStarted', |
- properties=build.getProperties().asList(), |
- step=step) |
- |
- def logFinished(self, build, step, log): |
- self.push('logFinished', |
- properties=build.getProperties().asList(), |
- step=step) |
- |
- def stepFinished(self, build, step, results): |
- self.push('stepFinished', |
- properties=build.getProperties().asList(), |
- step=step) |
- |
- def buildFinished(self, builderName, build, results): |
- self.push('buildFinished', build=build) |
- |
- def builderRemoved(self, builderName): |
- self.push('buildedRemoved', builderName=builderName) |
- |
- def changeAdded(self, change): |
- self.push('changeAdded', change=change) |
- |
- def slaveConnected(self, slavename): |
- self.push('slaveConnected', slave=self.status.getSlave(slavename)) |
- |
- def slaveDisconnected(self, slavename): |
- self.push('slaveDisconnected', slavename=slavename) |
- |
- |
-class HttpStatusPush(StatusPush): |
- """Event streamer to a HTTP server.""" |
- |
- def __init__(self, serverUrl, debug=None, maxMemoryItems=None, |
- maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, |
- extra_post_params=None, **kwargs): |
- """ |
- @serverUrl: Base URL to be used to push events notifications. |
- @maxMemoryItems: Maximum number of items to keep queued in memory. |
- @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't |
- use disk at all. |
- @debug: Save the json with nice formatting. |
- @chunkSize: maximum number of items to send in each at each HTTP POST. |
- @maxHttpRequestSize: limits the size of encoded data for AE, the default |
- is 1MB. |
- """ |
- # Parameters. |
- self.serverUrl = serverUrl |
- self.extra_post_params = extra_post_params or {} |
- self.debug = debug |
- self.chunkSize = chunkSize |
- self.lastPushWasSuccessful = True |
- self.maxHttpRequestSize = maxHttpRequestSize |
- if maxDiskItems != 0: |
- # The queue directory is determined by the server url. |
- path = ('events_' + |
- urlparse.urlparse(self.serverUrl)[1].split(':')[0]) |
- queue = PersistentQueue( |
- primaryQueue=MemoryQueue(maxItems=maxMemoryItems), |
- secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) |
- else: |
- path = None |
- queue = MemoryQueue(maxItems=maxMemoryItems) |
- |
- # Use the unbounded method. |
- StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, |
- queue=queue, path=path, **kwargs) |
- |
- def wasLastPushSuccessful(self): |
- return self.lastPushWasSuccessful |
- |
- def popChunk(self): |
- """Pops items from the pending list. |
- |
- They must be queued back on failure.""" |
- if self.wasLastPushSuccessful(): |
- chunkSize = self.chunkSize |
- else: |
- chunkSize = 1 |
- |
- while True: |
- items = self.queue.popChunk(chunkSize) |
- if self.debug: |
- packets = json.dumps(items, indent=2, sort_keys=True) |
- else: |
- packets = json.dumps(items, separators=(',',':')) |
- params = {'packets': packets} |
- params.update(self.extra_post_params) |
- data = urllib.urlencode(params) |
- if (not self.maxHttpRequestSize or |
- len(data) < self.maxHttpRequestSize): |
- return (data, items) |
- |
- if chunkSize == 1: |
- # This packet is just too large. Drop this packet. |
- log.msg("ERROR: packet %s was dropped, too large: %d > %d" % |
- (items[0]['id'], len(data), self.maxHttpRequestSize)) |
- chunkSize = self.chunkSize |
- else: |
- # Try with half the packets. |
- chunkSize /= 2 |
- self.queue.insertBackChunk(items) |
- |
- def pushHttp(self): |
- """Do the HTTP POST to the server.""" |
- (encoded_packets, items) = self.popChunk() |
- if not self.serverUrl: |
- return |
- |
- def Success(result): |
- """Queue up next push.""" |
- log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) |
- self.lastPushWasSuccessful = True |
- return self.queueNextServerPush() |
- |
- def Failure(result): |
- """Insert back items not sent and queue up next push.""" |
- # Server is now down. |
- log.msg('Failed to push %d events to %s: %s' % |
- (len(items), self.serverUrl, str(result))) |
- self.queue.insertBackChunk(items) |
- if self.stopped: |
- # Bad timing, was being called on shutdown and the server died |
- # on us. Make sure the queue is saved since we just queued back |
- # items. |
- self.queue.save() |
- self.lastPushWasSuccessful = False |
- return self.queueNextServerPush() |
- |
- # Trigger the HTTP POST request. |
- headers = {'Content-Type': 'application/x-www-form-urlencoded'} |
- connection = client.getPage(self.serverUrl, |
- method='POST', |
- postdata=encoded_packets, |
- headers=headers, |
- agent='buildbot') |
- connection.addCallbacks(Success, Failure) |
- return connection |
- |
-# vim: set ts=4 sts=4 sw=4 et: |