OLD | NEW |
| (Empty) |
1 # -*- test-case-name: buildbot.test.test_status_push -*- | |
2 | |
3 """Push events to an abstract receiver. | |
4 | |
5 Implements the HTTP receiver.""" | |
6 | |
7 import datetime | |
8 import logging | |
9 import os | |
10 import urllib | |
11 import urlparse | |
12 | |
13 try: | |
14 import simplejson as json | |
15 except ImportError: | |
16 import json | |
17 | |
18 from buildbot.master import BuildMaster | |
19 from buildbot.status.base import StatusReceiverMultiService | |
20 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ | |
21 IQueue, MemoryQueue, PersistentQueue | |
22 from buildbot.status.web.status_json import FilterOut | |
23 from twisted.internet import defer, reactor | |
24 from twisted.python import log | |
25 from twisted.web import client | |
26 | |
27 | |
28 | |
29 class StatusPush(StatusReceiverMultiService): | |
30 """Event streamer to a abstract channel. | |
31 | |
32 It uses IQueue to batch push requests and queue the data when | |
33 the receiver is down. | |
34 When a PersistentQueue object is used, the items are saved to disk on master | |
35 shutdown so they can be pushed back when the master is restarted. | |
36 """ | |
37 | |
38 def __init__(self, serverPushCb, queue=None, path=None, filter=True, | |
39 bufferDelay=1, retryDelay=5, blackList=None): | |
40 """ | |
41 @serverPushCb: callback to be used. It receives 'self' as parameter. It | |
42 should call self.queueNextServerPush() when it's done to queue the next | |
43 push. It is guaranteed that the queue is not empty when this function is | |
44 called. | |
45 @queue: a item queue that implements IQueue. | |
46 @path: path to save config. | |
47 @filter: when True (default), removes all "", None, False, [] or {} | |
48 entries. | |
49 @bufferDelay: amount of time events are queued before sending, to | |
50 reduce the number of push requests rate. This is the delay between the | |
51 end of a request to initializing a new one. | |
52 @retryDelay: amount of time between retries when no items were pushed on | |
53 last serverPushCb call. | |
54 @blackList: events that shouldn't be sent. | |
55 """ | |
56 StatusReceiverMultiService.__init__(self) | |
57 | |
58 # Parameters. | |
59 self.queue = queue | |
60 if self.queue is None: | |
61 self.queue = MemoryQueue() | |
62 self.queue = IndexedQueue(self.queue) | |
63 self.path = path | |
64 self.filter = filter | |
65 self.bufferDelay = bufferDelay | |
66 self.retryDelay = retryDelay | |
67 if not callable(serverPushCb): | |
68 raise NotImplementedError('Please pass serverPushCb parameter.') | |
69 def hookPushCb(): | |
70 # Update the index so we know if the next push succeed or not, don't | |
71 # update the value when the queue is empty. | |
72 if not self.queue.nbItems(): | |
73 return | |
74 self.lastIndex = self.queue.getIndex() | |
75 return serverPushCb(self) | |
76 self.serverPushCb = hookPushCb | |
77 self.blackList = blackList | |
78 | |
79 # Other defaults. | |
80 # IDelayedCall object that represents the next queued push. | |
81 self.task = None | |
82 self.stopped = False | |
83 self.lastIndex = -1 | |
84 self.state = {} | |
85 self.state['next_id'] = 1 | |
86 self.state['last_id_pushed'] = 0 | |
87 # Try to load back the state. | |
88 if self.path and os.path.isdir(self.path): | |
89 state_path = os.path.join(self.path, 'state') | |
90 if os.path.isfile(state_path): | |
91 self.state.update(json.load(open(state_path, 'r'))) | |
92 # Started should refer to when the Buildbot was last restarted. We use | |
93 # this value to ensure which instance of a given Buildbot we're | |
94 # collecting data from. | |
95 self.state['started'] = str(datetime.datetime.utcnow()) | |
96 | |
97 if self.queue.nbItems(): | |
98 # Last shutdown was not clean, don't wait to send events. | |
99 self.queueNextServerPush() | |
100 | |
101 def setServiceParent(self, parent): | |
102 """Starting up.""" | |
103 StatusReceiverMultiService.setServiceParent(self, parent) | |
104 self.status = self.parent.getStatus() | |
105 self.status.subscribe(self) | |
106 self.initialPush() | |
107 | |
108 def wasLastPushSuccessful(self): | |
109 """Returns if the "virtual pointer" in the queue advanced.""" | |
110 return self.lastIndex <= self.queue.getIndex() | |
111 | |
112 def queueNextServerPush(self): | |
113 """Queue the next push or call it immediately. | |
114 | |
115 Called to signal new items are available to be sent or on shutdown. | |
116 A timer should be queued to trigger a network request or the callback | |
117 should be called immediately. If a status push is already queued, ignore | |
118 the current call.""" | |
119 # Determine the delay. | |
120 if self.wasLastPushSuccessful(): | |
121 if self.stopped: | |
122 # Shutting down. | |
123 delay = 0 | |
124 else: | |
125 # Normal case. | |
126 delay = self.bufferDelay | |
127 else: | |
128 if self.stopped: | |
129 # Too bad, we can't do anything now, we're shutting down and the | |
130 # receiver is also down. We'll just save the objects to disk. | |
131 return | |
132 else: | |
133 # The server is inaccessible, retry less often. | |
134 delay = self.retryDelay | |
135 | |
136 # Cleanup a previously queued task if necessary. | |
137 if self.task: | |
138 # Warning: we could be running inside the task. | |
139 if self.task.active(): | |
140 # There was already a task queue, don't requeue it, just let it | |
141 # go. | |
142 return | |
143 else: | |
144 if self.task.active(): | |
145 # There was a task queued but it is requested to call it | |
146 # *right now* so cancel it. | |
147 self.task.cancel() | |
148 # Otherwise, it was just a stray object. | |
149 self.task = None | |
150 | |
151 # Do the queue/direct call. | |
152 if delay: | |
153 # Call in delay seconds. | |
154 self.task = reactor.callLater(delay, self.serverPushCb) | |
155 elif self.stopped: | |
156 if not self.queue.nbItems(): | |
157 return | |
158 # Call right now, we're shutting down. | |
159 @defer.deferredGenerator | |
160 def BlockForEverythingBeingSent(): | |
161 d = self.serverPushCb() | |
162 if d: | |
163 x = defer.waitForDeferred(d) | |
164 yield x | |
165 x.getResult() | |
166 return BlockForEverythingBeingSent() | |
167 else: | |
168 # delay should never be 0. That can cause Buildbot to spin tightly | |
169 # trying to push events that may not be received well by a status | |
170 # listener. | |
171 logging.exception('Did not expect delay to be 0, but it is.') | |
172 return | |
173 | |
174 def stopService(self): | |
175 """Shutting down.""" | |
176 self.finalPush() | |
177 self.stopped = True | |
178 if (self.task and self.task.active()): | |
179 # We don't have time to wait, force an immediate call. | |
180 self.task.cancel() | |
181 self.task = None | |
182 d = self.queueNextServerPush() | |
183 elif self.wasLastPushSuccessful(): | |
184 d = self.queueNextServerPush() | |
185 else: | |
186 d = defer.succeed(None) | |
187 | |
188 # We're dying, make sure we save the results. | |
189 self.queue.save() | |
190 if self.path and os.path.isdir(self.path): | |
191 state_path = os.path.join(self.path, 'state') | |
192 json.dump(self.state, open(state_path, 'w'), sort_keys=True, | |
193 indent=2) | |
194 # Make sure all Deferreds are called on time and in a sane order. | |
195 defers = filter(None, [d, StatusReceiverMultiService.stopService(self)]) | |
196 return defer.DeferredList(defers) | |
197 | |
198 def push(self, event, **objs): | |
199 """Push a new event. | |
200 | |
201 The new event will be either: | |
202 - Queued in memory to reduce network usage | |
203 - Queued to disk when the sink server is down | |
204 - Pushed (along the other queued items) to the server | |
205 """ | |
206 if self.blackList and event in self.blackList: | |
207 return | |
208 # First, generate the packet. | |
209 packet = {} | |
210 packet['id'] = self.state['next_id'] | |
211 self.state['next_id'] += 1 | |
212 # Increment version when the packet format changes. | |
213 packet['version'] = 1 | |
214 packet['timestamp'] = str(datetime.datetime.utcnow()) | |
215 packet['project'] = self.status.getProjectName() | |
216 packet['started'] = self.state['started'] | |
217 packet['event'] = event | |
218 packet['payload'] = {} | |
219 for obj_name, obj in objs.items(): | |
220 if hasattr(obj, 'asDict'): | |
221 obj = obj.asDict() | |
222 if self.filter: | |
223 obj = FilterOut(obj) | |
224 packet['payload'][obj_name] = obj | |
225 packet['payload_json'] = json.dumps(packet['payload']) | |
226 del(packet['payload']) | |
227 self.queue.pushItem(packet) | |
228 if self.task is None or not self.task.active(): | |
229 # No task queued since it was probably idle, let's queue a task. | |
230 return self.queueNextServerPush() | |
231 | |
232 #### Events | |
233 | |
234 def initialPush(self): | |
235 # Push everything we want to push from the initial configuration. | |
236 self.push('start', status=self.status) | |
237 | |
238 def finalPush(self): | |
239 self.push('shutdown', status=self.status) | |
240 | |
241 def requestSubmitted(self, request): | |
242 self.push('requestSubmitted', request=request) | |
243 | |
244 def requestCancelled(self, builder, request): | |
245 self.push('requestCancelled', builder=builder, request=request) | |
246 | |
247 def buildsetSubmitted(self, buildset): | |
248 self.push('buildsetSubmitted', buildset=buildset) | |
249 | |
250 def builderAdded(self, builderName, builder): | |
251 self.push('builderAdded', builderName=builderName, builder=builder) | |
252 return self | |
253 | |
254 def builderChangedState(self, builderName, state): | |
255 self.push('builderChangedState', builderName=builderName, state=state) | |
256 | |
257 def buildStarted(self, builderName, build): | |
258 self.push('buildStarted', build=build) | |
259 return self | |
260 | |
261 def buildETAUpdate(self, build, ETA): | |
262 self.push('buildETAUpdate', build=build, ETA=ETA) | |
263 | |
264 def stepStarted(self, build, step): | |
265 self.push('stepStarted', | |
266 properties=build.getProperties().asList(), | |
267 step=step) | |
268 | |
269 def stepTextChanged(self, build, step, text): | |
270 self.push('stepTextChanged', | |
271 properties=build.getProperties().asList(), | |
272 step=step, | |
273 text=text) | |
274 | |
275 def stepText2Changed(self, build, step, text2): | |
276 self.push('stepText2Changed', | |
277 properties=build.getProperties().asList(), | |
278 step=step, | |
279 text2=text2) | |
280 | |
281 def stepETAUpdate(self, build, step, ETA, expectations): | |
282 self.push('stepETAUpdate', | |
283 properties=build.getProperties().asList(), | |
284 step=step, | |
285 ETA=ETA, | |
286 expectations=expectations) | |
287 | |
288 def logStarted(self, build, step, log): | |
289 self.push('logStarted', | |
290 properties=build.getProperties().asList(), | |
291 step=step) | |
292 | |
293 def logFinished(self, build, step, log): | |
294 self.push('logFinished', | |
295 properties=build.getProperties().asList(), | |
296 step=step) | |
297 | |
298 def stepFinished(self, build, step, results): | |
299 self.push('stepFinished', | |
300 properties=build.getProperties().asList(), | |
301 step=step) | |
302 | |
303 def buildFinished(self, builderName, build, results): | |
304 self.push('buildFinished', build=build) | |
305 | |
306 def builderRemoved(self, builderName): | |
307 self.push('buildedRemoved', builderName=builderName) | |
308 | |
309 def changeAdded(self, change): | |
310 self.push('changeAdded', change=change) | |
311 | |
312 def slaveConnected(self, slavename): | |
313 self.push('slaveConnected', slave=self.status.getSlave(slavename)) | |
314 | |
315 def slaveDisconnected(self, slavename): | |
316 self.push('slaveDisconnected', slavename=slavename) | |
317 | |
318 | |
319 class HttpStatusPush(StatusPush): | |
320 """Event streamer to a HTTP server.""" | |
321 | |
322 def __init__(self, serverUrl, debug=None, maxMemoryItems=None, | |
323 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, | |
324 extra_post_params=None, **kwargs): | |
325 """ | |
326 @serverUrl: Base URL to be used to push events notifications. | |
327 @maxMemoryItems: Maximum number of items to keep queued in memory. | |
328 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't | |
329 use disk at all. | |
330 @debug: Save the json with nice formatting. | |
331 @chunkSize: maximum number of items to send in each at each HTTP POST. | |
332 @maxHttpRequestSize: limits the size of encoded data for AE, the default | |
333 is 1MB. | |
334 """ | |
335 # Parameters. | |
336 self.serverUrl = serverUrl | |
337 self.extra_post_params = extra_post_params or {} | |
338 self.debug = debug | |
339 self.chunkSize = chunkSize | |
340 self.lastPushWasSuccessful = True | |
341 self.maxHttpRequestSize = maxHttpRequestSize | |
342 if maxDiskItems != 0: | |
343 # The queue directory is determined by the server url. | |
344 path = ('events_' + | |
345 urlparse.urlparse(self.serverUrl)[1].split(':')[0]) | |
346 queue = PersistentQueue( | |
347 primaryQueue=MemoryQueue(maxItems=maxMemoryItems), | |
348 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) | |
349 else: | |
350 path = None | |
351 queue = MemoryQueue(maxItems=maxMemoryItems) | |
352 | |
353 # Use the unbounded method. | |
354 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, | |
355 queue=queue, path=path, **kwargs) | |
356 | |
357 def wasLastPushSuccessful(self): | |
358 return self.lastPushWasSuccessful | |
359 | |
360 def popChunk(self): | |
361 """Pops items from the pending list. | |
362 | |
363 They must be queued back on failure.""" | |
364 if self.wasLastPushSuccessful(): | |
365 chunkSize = self.chunkSize | |
366 else: | |
367 chunkSize = 1 | |
368 | |
369 while True: | |
370 items = self.queue.popChunk(chunkSize) | |
371 if self.debug: | |
372 packets = json.dumps(items, indent=2, sort_keys=True) | |
373 else: | |
374 packets = json.dumps(items, separators=(',',':')) | |
375 params = {'packets': packets} | |
376 params.update(self.extra_post_params) | |
377 data = urllib.urlencode(params) | |
378 if (not self.maxHttpRequestSize or | |
379 len(data) < self.maxHttpRequestSize): | |
380 return (data, items) | |
381 | |
382 if chunkSize == 1: | |
383 # This packet is just too large. Drop this packet. | |
384 log.msg("ERROR: packet %s was dropped, too large: %d > %d" % | |
385 (items[0]['id'], len(data), self.maxHttpRequestSize)) | |
386 chunkSize = self.chunkSize | |
387 else: | |
388 # Try with half the packets. | |
389 chunkSize /= 2 | |
390 self.queue.insertBackChunk(items) | |
391 | |
392 def pushHttp(self): | |
393 """Do the HTTP POST to the server.""" | |
394 (encoded_packets, items) = self.popChunk() | |
395 if not self.serverUrl: | |
396 return | |
397 | |
398 def Success(result): | |
399 """Queue up next push.""" | |
400 log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) | |
401 self.lastPushWasSuccessful = True | |
402 return self.queueNextServerPush() | |
403 | |
404 def Failure(result): | |
405 """Insert back items not sent and queue up next push.""" | |
406 # Server is now down. | |
407 log.msg('Failed to push %d events to %s: %s' % | |
408 (len(items), self.serverUrl, str(result))) | |
409 self.queue.insertBackChunk(items) | |
410 if self.stopped: | |
411 # Bad timing, was being called on shutdown and the server died | |
412 # on us. Make sure the queue is saved since we just queued back | |
413 # items. | |
414 self.queue.save() | |
415 self.lastPushWasSuccessful = False | |
416 return self.queueNextServerPush() | |
417 | |
418 # Trigger the HTTP POST request. | |
419 headers = {'Content-Type': 'application/x-www-form-urlencoded'} | |
420 connection = client.getPage(self.serverUrl, | |
421 method='POST', | |
422 postdata=encoded_packets, | |
423 headers=headers, | |
424 agent='buildbot') | |
425 connection.addCallbacks(Success, Failure) | |
426 return connection | |
427 | |
428 # vim: set ts=4 sts=4 sw=4 et: | |
OLD | NEW |