Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: third_party/buildbot_7_12/buildbot/status/status_push.py

Issue 12207158: Bye bye buildbot 0.7.12. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: buildbot.test.test_status_push -*-
2
3 """Push events to an abstract receiver.
4
5 Implements the HTTP receiver."""
6
7 import datetime
8 import logging
9 import os
10 import urllib
11 import urlparse
12
13 try:
14 import simplejson as json
15 except ImportError:
16 import json
17
18 from buildbot.master import BuildMaster
19 from buildbot.status.base import StatusReceiverMultiService
20 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
21 IQueue, MemoryQueue, PersistentQueue
22 from buildbot.status.web.status_json import FilterOut
23 from twisted.internet import defer, reactor
24 from twisted.python import log
25 from twisted.web import client
26
27
28
29 class StatusPush(StatusReceiverMultiService):
30 """Event streamer to a abstract channel.
31
32 It uses IQueue to batch push requests and queue the data when
33 the receiver is down.
34 When a PersistentQueue object is used, the items are saved to disk on master
35 shutdown so they can be pushed back when the master is restarted.
36 """
37
38 def __init__(self, serverPushCb, queue=None, path=None, filter=True,
39 bufferDelay=1, retryDelay=5, blackList=None):
40 """
41 @serverPushCb: callback to be used. It receives 'self' as parameter. It
42 should call self.queueNextServerPush() when it's done to queue the next
43 push. It is guaranteed that the queue is not empty when this function is
44 called.
45 @queue: a item queue that implements IQueue.
46 @path: path to save config.
47 @filter: when True (default), removes all "", None, False, [] or {}
48 entries.
49 @bufferDelay: amount of time events are queued before sending, to
50 reduce the number of push requests rate. This is the delay between the
51 end of a request to initializing a new one.
52 @retryDelay: amount of time between retries when no items were pushed on
53 last serverPushCb call.
54 @blackList: events that shouldn't be sent.
55 """
56 StatusReceiverMultiService.__init__(self)
57
58 # Parameters.
59 self.queue = queue
60 if self.queue is None:
61 self.queue = MemoryQueue()
62 self.queue = IndexedQueue(self.queue)
63 self.path = path
64 self.filter = filter
65 self.bufferDelay = bufferDelay
66 self.retryDelay = retryDelay
67 if not callable(serverPushCb):
68 raise NotImplementedError('Please pass serverPushCb parameter.')
69 def hookPushCb():
70 # Update the index so we know if the next push succeed or not, don't
71 # update the value when the queue is empty.
72 if not self.queue.nbItems():
73 return
74 self.lastIndex = self.queue.getIndex()
75 return serverPushCb(self)
76 self.serverPushCb = hookPushCb
77 self.blackList = blackList
78
79 # Other defaults.
80 # IDelayedCall object that represents the next queued push.
81 self.task = None
82 self.stopped = False
83 self.lastIndex = -1
84 self.state = {}
85 self.state['next_id'] = 1
86 self.state['last_id_pushed'] = 0
87 # Try to load back the state.
88 if self.path and os.path.isdir(self.path):
89 state_path = os.path.join(self.path, 'state')
90 if os.path.isfile(state_path):
91 self.state.update(json.load(open(state_path, 'r')))
92 # Started should refer to when the Buildbot was last restarted. We use
93 # this value to ensure which instance of a given Buildbot we're
94 # collecting data from.
95 self.state['started'] = str(datetime.datetime.utcnow())
96
97 if self.queue.nbItems():
98 # Last shutdown was not clean, don't wait to send events.
99 self.queueNextServerPush()
100
101 def setServiceParent(self, parent):
102 """Starting up."""
103 StatusReceiverMultiService.setServiceParent(self, parent)
104 self.status = self.parent.getStatus()
105 self.status.subscribe(self)
106 self.initialPush()
107
108 def wasLastPushSuccessful(self):
109 """Returns if the "virtual pointer" in the queue advanced."""
110 return self.lastIndex <= self.queue.getIndex()
111
112 def queueNextServerPush(self):
113 """Queue the next push or call it immediately.
114
115 Called to signal new items are available to be sent or on shutdown.
116 A timer should be queued to trigger a network request or the callback
117 should be called immediately. If a status push is already queued, ignore
118 the current call."""
119 # Determine the delay.
120 if self.wasLastPushSuccessful():
121 if self.stopped:
122 # Shutting down.
123 delay = 0
124 else:
125 # Normal case.
126 delay = self.bufferDelay
127 else:
128 if self.stopped:
129 # Too bad, we can't do anything now, we're shutting down and the
130 # receiver is also down. We'll just save the objects to disk.
131 return
132 else:
133 # The server is inaccessible, retry less often.
134 delay = self.retryDelay
135
136 # Cleanup a previously queued task if necessary.
137 if self.task:
138 # Warning: we could be running inside the task.
139 if self.task.active():
140 # There was already a task queue, don't requeue it, just let it
141 # go.
142 return
143 else:
144 if self.task.active():
145 # There was a task queued but it is requested to call it
146 # *right now* so cancel it.
147 self.task.cancel()
148 # Otherwise, it was just a stray object.
149 self.task = None
150
151 # Do the queue/direct call.
152 if delay:
153 # Call in delay seconds.
154 self.task = reactor.callLater(delay, self.serverPushCb)
155 elif self.stopped:
156 if not self.queue.nbItems():
157 return
158 # Call right now, we're shutting down.
159 @defer.deferredGenerator
160 def BlockForEverythingBeingSent():
161 d = self.serverPushCb()
162 if d:
163 x = defer.waitForDeferred(d)
164 yield x
165 x.getResult()
166 return BlockForEverythingBeingSent()
167 else:
168 # delay should never be 0. That can cause Buildbot to spin tightly
169 # trying to push events that may not be received well by a status
170 # listener.
171 logging.exception('Did not expect delay to be 0, but it is.')
172 return
173
174 def stopService(self):
175 """Shutting down."""
176 self.finalPush()
177 self.stopped = True
178 if (self.task and self.task.active()):
179 # We don't have time to wait, force an immediate call.
180 self.task.cancel()
181 self.task = None
182 d = self.queueNextServerPush()
183 elif self.wasLastPushSuccessful():
184 d = self.queueNextServerPush()
185 else:
186 d = defer.succeed(None)
187
188 # We're dying, make sure we save the results.
189 self.queue.save()
190 if self.path and os.path.isdir(self.path):
191 state_path = os.path.join(self.path, 'state')
192 json.dump(self.state, open(state_path, 'w'), sort_keys=True,
193 indent=2)
194 # Make sure all Deferreds are called on time and in a sane order.
195 defers = filter(None, [d, StatusReceiverMultiService.stopService(self)])
196 return defer.DeferredList(defers)
197
198 def push(self, event, **objs):
199 """Push a new event.
200
201 The new event will be either:
202 - Queued in memory to reduce network usage
203 - Queued to disk when the sink server is down
204 - Pushed (along the other queued items) to the server
205 """
206 if self.blackList and event in self.blackList:
207 return
208 # First, generate the packet.
209 packet = {}
210 packet['id'] = self.state['next_id']
211 self.state['next_id'] += 1
212 # Increment version when the packet format changes.
213 packet['version'] = 1
214 packet['timestamp'] = str(datetime.datetime.utcnow())
215 packet['project'] = self.status.getProjectName()
216 packet['started'] = self.state['started']
217 packet['event'] = event
218 packet['payload'] = {}
219 for obj_name, obj in objs.items():
220 if hasattr(obj, 'asDict'):
221 obj = obj.asDict()
222 if self.filter:
223 obj = FilterOut(obj)
224 packet['payload'][obj_name] = obj
225 packet['payload_json'] = json.dumps(packet['payload'])
226 del(packet['payload'])
227 self.queue.pushItem(packet)
228 if self.task is None or not self.task.active():
229 # No task queued since it was probably idle, let's queue a task.
230 return self.queueNextServerPush()
231
232 #### Events
233
234 def initialPush(self):
235 # Push everything we want to push from the initial configuration.
236 self.push('start', status=self.status)
237
238 def finalPush(self):
239 self.push('shutdown', status=self.status)
240
241 def requestSubmitted(self, request):
242 self.push('requestSubmitted', request=request)
243
244 def requestCancelled(self, builder, request):
245 self.push('requestCancelled', builder=builder, request=request)
246
247 def buildsetSubmitted(self, buildset):
248 self.push('buildsetSubmitted', buildset=buildset)
249
250 def builderAdded(self, builderName, builder):
251 self.push('builderAdded', builderName=builderName, builder=builder)
252 return self
253
254 def builderChangedState(self, builderName, state):
255 self.push('builderChangedState', builderName=builderName, state=state)
256
257 def buildStarted(self, builderName, build):
258 self.push('buildStarted', build=build)
259 return self
260
261 def buildETAUpdate(self, build, ETA):
262 self.push('buildETAUpdate', build=build, ETA=ETA)
263
264 def stepStarted(self, build, step):
265 self.push('stepStarted',
266 properties=build.getProperties().asList(),
267 step=step)
268
269 def stepTextChanged(self, build, step, text):
270 self.push('stepTextChanged',
271 properties=build.getProperties().asList(),
272 step=step,
273 text=text)
274
275 def stepText2Changed(self, build, step, text2):
276 self.push('stepText2Changed',
277 properties=build.getProperties().asList(),
278 step=step,
279 text2=text2)
280
281 def stepETAUpdate(self, build, step, ETA, expectations):
282 self.push('stepETAUpdate',
283 properties=build.getProperties().asList(),
284 step=step,
285 ETA=ETA,
286 expectations=expectations)
287
288 def logStarted(self, build, step, log):
289 self.push('logStarted',
290 properties=build.getProperties().asList(),
291 step=step)
292
293 def logFinished(self, build, step, log):
294 self.push('logFinished',
295 properties=build.getProperties().asList(),
296 step=step)
297
298 def stepFinished(self, build, step, results):
299 self.push('stepFinished',
300 properties=build.getProperties().asList(),
301 step=step)
302
303 def buildFinished(self, builderName, build, results):
304 self.push('buildFinished', build=build)
305
306 def builderRemoved(self, builderName):
307 self.push('buildedRemoved', builderName=builderName)
308
309 def changeAdded(self, change):
310 self.push('changeAdded', change=change)
311
312 def slaveConnected(self, slavename):
313 self.push('slaveConnected', slave=self.status.getSlave(slavename))
314
315 def slaveDisconnected(self, slavename):
316 self.push('slaveDisconnected', slavename=slavename)
317
318
319 class HttpStatusPush(StatusPush):
320 """Event streamer to a HTTP server."""
321
322 def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
323 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
324 extra_post_params=None, **kwargs):
325 """
326 @serverUrl: Base URL to be used to push events notifications.
327 @maxMemoryItems: Maximum number of items to keep queued in memory.
328 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
329 use disk at all.
330 @debug: Save the json with nice formatting.
331 @chunkSize: maximum number of items to send in each at each HTTP POST.
332 @maxHttpRequestSize: limits the size of encoded data for AE, the default
333 is 1MB.
334 """
335 # Parameters.
336 self.serverUrl = serverUrl
337 self.extra_post_params = extra_post_params or {}
338 self.debug = debug
339 self.chunkSize = chunkSize
340 self.lastPushWasSuccessful = True
341 self.maxHttpRequestSize = maxHttpRequestSize
342 if maxDiskItems != 0:
343 # The queue directory is determined by the server url.
344 path = ('events_' +
345 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
346 queue = PersistentQueue(
347 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
348 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
349 else:
350 path = None
351 queue = MemoryQueue(maxItems=maxMemoryItems)
352
353 # Use the unbounded method.
354 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
355 queue=queue, path=path, **kwargs)
356
357 def wasLastPushSuccessful(self):
358 return self.lastPushWasSuccessful
359
360 def popChunk(self):
361 """Pops items from the pending list.
362
363 They must be queued back on failure."""
364 if self.wasLastPushSuccessful():
365 chunkSize = self.chunkSize
366 else:
367 chunkSize = 1
368
369 while True:
370 items = self.queue.popChunk(chunkSize)
371 if self.debug:
372 packets = json.dumps(items, indent=2, sort_keys=True)
373 else:
374 packets = json.dumps(items, separators=(',',':'))
375 params = {'packets': packets}
376 params.update(self.extra_post_params)
377 data = urllib.urlencode(params)
378 if (not self.maxHttpRequestSize or
379 len(data) < self.maxHttpRequestSize):
380 return (data, items)
381
382 if chunkSize == 1:
383 # This packet is just too large. Drop this packet.
384 log.msg("ERROR: packet %s was dropped, too large: %d > %d" %
385 (items[0]['id'], len(data), self.maxHttpRequestSize))
386 chunkSize = self.chunkSize
387 else:
388 # Try with half the packets.
389 chunkSize /= 2
390 self.queue.insertBackChunk(items)
391
392 def pushHttp(self):
393 """Do the HTTP POST to the server."""
394 (encoded_packets, items) = self.popChunk()
395 if not self.serverUrl:
396 return
397
398 def Success(result):
399 """Queue up next push."""
400 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
401 self.lastPushWasSuccessful = True
402 return self.queueNextServerPush()
403
404 def Failure(result):
405 """Insert back items not sent and queue up next push."""
406 # Server is now down.
407 log.msg('Failed to push %d events to %s: %s' %
408 (len(items), self.serverUrl, str(result)))
409 self.queue.insertBackChunk(items)
410 if self.stopped:
411 # Bad timing, was being called on shutdown and the server died
412 # on us. Make sure the queue is saved since we just queued back
413 # items.
414 self.queue.save()
415 self.lastPushWasSuccessful = False
416 return self.queueNextServerPush()
417
418 # Trigger the HTTP POST request.
419 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
420 connection = client.getPage(self.serverUrl,
421 method='POST',
422 postdata=encoded_packets,
423 headers=headers,
424 agent='buildbot')
425 connection.addCallbacks(Success, Failure)
426 return connection
427
428 # vim: set ts=4 sts=4 sw=4 et:
OLDNEW
« no previous file with comments | « third_party/buildbot_7_12/buildbot/status/progress.py ('k') | third_party/buildbot_7_12/buildbot/status/tests.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698