OLD | NEW |
| (Empty) |
1 # -*- test-case-name: buildbot.test.test_persistent_queue -*- | |
2 | |
3 try: | |
4 # Python 2.4+ | |
5 from collections import deque | |
6 except ImportError: | |
7 deque = None | |
8 import os | |
9 import pickle | |
10 | |
11 from zope.interface import implements, Interface | |
12 | |
13 | |
14 def ReadFile(path): | |
15 f = open(path, 'rb') | |
16 try: | |
17 return f.read() | |
18 finally: | |
19 f.close() | |
20 | |
21 | |
22 def WriteFile(path, buf): | |
23 f = open(path, 'wb') | |
24 try: | |
25 f.write(buf) | |
26 finally: | |
27 f.close() | |
28 | |
29 | |
30 class IQueue(Interface): | |
31 """Abstraction of a queue.""" | |
32 def pushItem(item): | |
33 """Adds an individual item to the end of the queue. | |
34 | |
35 Returns an item if it was overflowed.""" | |
36 | |
37 def insertBackChunk(items): | |
38 """Adds a list of items as the oldest entries. | |
39 | |
40 Normally called in case of failure to process the data, queue the data | |
41 back so it can be retrieved at a later time. | |
42 | |
43 Returns a list of items if it was overflowed.""" | |
44 | |
45 def popChunk(nbItems=None): | |
46 """Pop many items at once. Defaults to self.maxItems().""" | |
47 | |
48 def save(): | |
49 """Save the queue to storage if implemented.""" | |
50 | |
51 def items(): | |
52 """Returns items in the queue. | |
53 | |
54 Warning: Can be extremely slow for queue on disk.""" | |
55 | |
56 def nbItems(): | |
57 """Returns the number of items in the queue.""" | |
58 | |
59 def maxItems(): | |
60 """Returns the maximum number of items this queue can hold.""" | |
61 | |
62 | |
63 # Available for python 2.3 and earlier. | |
64 class ListMemoryQueue(object): | |
65 """Simple length bounded queue using list.""" | |
66 implements(IQueue) | |
67 | |
68 def __init__(self, maxItems=None): | |
69 self._maxItems = maxItems | |
70 if self._maxItems is None: | |
71 self._maxItems = 10000 | |
72 self._items = [] | |
73 | |
74 def pushItem(self, item): | |
75 self._items.append(item) | |
76 if len(self._items) > self._maxItems: | |
77 return self._items.pop(0) | |
78 | |
79 def insertBackChunk(self, chunk): | |
80 ret = None | |
81 excess = len(self._items) + len(chunk) - self._maxItems | |
82 if excess > 0: | |
83 ret = chunk[0:excess] | |
84 chunk = chunk[excess:] | |
85 self._items = chunk + self._items | |
86 return ret | |
87 | |
88 def popChunk(self, nbItems=None): | |
89 if nbItems is None: | |
90 nbItems = self._maxItems | |
91 if nbItems > len(self._items): | |
92 items = self._items | |
93 self._items = [] | |
94 else: | |
95 items = self._items[0:nbItems] | |
96 del self._items[0:nbItems] | |
97 return items | |
98 | |
99 def save(self): | |
100 pass | |
101 | |
102 def items(self): | |
103 return self._items | |
104 | |
105 def nbItems(self): | |
106 return len(self._items) | |
107 | |
108 def maxItems(self): | |
109 return self._maxItems | |
110 | |
111 if deque: | |
112 class DequeMemoryQueue(object): | |
113 """Simple length bounded queue using deque. | |
114 | |
115 list.pop(0) operation is O(n) so for a 10000 items list, it can start to | |
116 be real slow. On the contrary, deque.popleft() is O(1) most of the time. | |
117 See http://docs.python.org/library/collections.html for more | |
118 information. | |
119 """ | |
120 implements(IQueue) | |
121 | |
122 def __init__(self, maxItems=None): | |
123 self._maxItems = maxItems | |
124 if self._maxItems is None: | |
125 self._maxItems = 10000 | |
126 self._items = deque() | |
127 | |
128 def pushItem(self, item): | |
129 ret = None | |
130 if len(self._items) == self._maxItems: | |
131 ret = self._items.popleft() | |
132 self._items.append(item) | |
133 return ret | |
134 | |
135 def insertBackChunk(self, chunk): | |
136 ret = None | |
137 excess = len(self._items) + len(chunk) - self._maxItems | |
138 if excess > 0: | |
139 ret = chunk[0:excess] | |
140 chunk = chunk[excess:] | |
141 self._items.extendleft(reversed(chunk)) | |
142 return ret | |
143 | |
144 def popChunk(self, nbItems=None): | |
145 if nbItems is None: | |
146 nbItems = self._maxItems | |
147 if nbItems > len(self._items): | |
148 items = list(self._items) | |
149 self._items = deque() | |
150 else: | |
151 items = [] | |
152 for i in range(nbItems): | |
153 items.append(self._items.popleft()) | |
154 return items | |
155 | |
156 def save(self): | |
157 pass | |
158 | |
159 def items(self): | |
160 return list(self._items) | |
161 | |
162 def nbItems(self): | |
163 return len(self._items) | |
164 | |
165 def maxItems(self): | |
166 return self._maxItems | |
167 | |
168 MemoryQueue = DequeMemoryQueue | |
169 else: | |
170 MemoryQueue = ListMemoryQueue | |
171 | |
172 | |
173 class DiskQueue(object): | |
174 """Keeps a list of abstract items and serializes it to the disk. | |
175 | |
176 Use pickle for serialization.""" | |
177 implements(IQueue) | |
178 | |
179 def __init__(self, path, maxItems=None, pickleFn=pickle.dumps, | |
180 unpickleFn=pickle.loads): | |
181 """ | |
182 @path: directory to save the items. | |
183 @maxItems: maximum number of items to keep on disk, flush the | |
184 older ones. | |
185 @pickleFn: function used to pack the items to disk. | |
186 @unpickleFn: function used to unpack items from disk. | |
187 """ | |
188 self.path = path | |
189 self._maxItems = maxItems | |
190 if self._maxItems is None: | |
191 self._maxItems = 100000 | |
192 if not os.path.isdir(self.path): | |
193 os.mkdir(self.path) | |
194 self.pickleFn = pickleFn | |
195 self.unpickleFn = unpickleFn | |
196 | |
197 # Total number of items. | |
198 self._nbItems = 0 | |
199 # The actual items id start at one. | |
200 self.firstItemId = 0 | |
201 self.lastItemId = 0 | |
202 self._loadFromDisk() | |
203 | |
204 def pushItem(self, item): | |
205 ret = None | |
206 if self._nbItems == self._maxItems: | |
207 id = self._findNext(self.firstItemId) | |
208 path = os.path.join(self.path, str(id)) | |
209 ret = self.unpickleFn(ReadFile(path)) | |
210 os.remove(path) | |
211 self.firstItemId = id + 1 | |
212 else: | |
213 self._nbItems += 1 | |
214 self.lastItemId += 1 | |
215 path = os.path.join(self.path, str(self.lastItemId)) | |
216 if os.path.exists(path): | |
217 raise IOError('%s already exists.' % path) | |
218 WriteFile(path, self.pickleFn(item)) | |
219 return ret | |
220 | |
221 def insertBackChunk(self, chunk): | |
222 ret = None | |
223 excess = self._nbItems + len(chunk) - self._maxItems | |
224 if excess > 0: | |
225 ret = chunk[0:excess] | |
226 chunk = chunk[excess:] | |
227 for i in reversed(chunk): | |
228 self.firstItemId -= 1 | |
229 path = os.path.join(self.path, str(self.firstItemId)) | |
230 if os.path.exists(path): | |
231 raise IOError('%s already exists.' % path) | |
232 WriteFile(path, self.pickleFn(i)) | |
233 self._nbItems += 1 | |
234 return ret | |
235 | |
236 def popChunk(self, nbItems=None): | |
237 if nbItems is None: | |
238 nbItems = self._maxItems | |
239 ret = [] | |
240 for i in range(nbItems): | |
241 if self._nbItems == 0: | |
242 break | |
243 id = self._findNext(self.firstItemId) | |
244 path = os.path.join(self.path, str(id)) | |
245 ret.append(self.unpickleFn(ReadFile(path))) | |
246 os.remove(path) | |
247 self._nbItems -= 1 | |
248 self.firstItemId = id + 1 | |
249 return ret | |
250 | |
251 def save(self): | |
252 pass | |
253 | |
254 def items(self): | |
255 """Warning, very slow.""" | |
256 ret = [] | |
257 for id in range(self.firstItemId, self.lastItemId + 1): | |
258 path = os.path.join(self.path, str(id)) | |
259 if os.path.exists(path): | |
260 ret.append(self.unpickleFn(ReadFile(path))) | |
261 return ret | |
262 | |
263 def nbItems(self): | |
264 return self._nbItems | |
265 | |
266 def maxItems(self): | |
267 return self._maxItems | |
268 | |
269 #### Protected functions | |
270 | |
271 def _findNext(self, id): | |
272 while True: | |
273 path = os.path.join(self.path, str(id)) | |
274 if os.path.isfile(path): | |
275 return id | |
276 id += 1 | |
277 return None | |
278 | |
279 def _loadFromDisk(self): | |
280 """Loads the queue from disk upto self.maxMemoryItems items into | |
281 self.items. | |
282 """ | |
283 def SafeInt(item): | |
284 try: | |
285 return int(item) | |
286 except ValueError: | |
287 return None | |
288 | |
289 files = filter(None, [SafeInt(x) for x in os.listdir(self.path)]) | |
290 files.sort() | |
291 self._nbItems = len(files) | |
292 if self._nbItems: | |
293 self.firstItemId = files[0] | |
294 self.lastItemId = files[-1] | |
295 | |
296 | |
297 class PersistentQueue(object): | |
298 """Keeps a list of abstract items and serializes it to the disk. | |
299 | |
300 It has 2 layers of queue, normally an in-memory queue and an on-disk queue. | |
301 When the number of items in the primary queue gets too large, the new items | |
302 are automatically saved to the secondary queue. The older items are kept in | |
303 the primary queue. | |
304 """ | |
305 implements(IQueue) | |
306 | |
307 def __init__(self, primaryQueue=None, secondaryQueue=None, path=None): | |
308 """ | |
309 @primaryQueue: memory queue to use before buffering to disk. | |
310 @secondaryQueue: disk queue to use as permanent buffer. | |
311 @path: path is a shortcut when using default DiskQueue settings. | |
312 """ | |
313 self.primaryQueue = primaryQueue | |
314 if self.primaryQueue is None: | |
315 self.primaryQueue = MemoryQueue() | |
316 self.secondaryQueue = secondaryQueue | |
317 if self.secondaryQueue is None: | |
318 self.secondaryQueue = DiskQueue(path) | |
319 # Preload data from the secondary queue only if we know we won't start | |
320 # using the secondary queue right away. | |
321 if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems(): | |
322 self.primaryQueue.insertBackChunk( | |
323 self.secondaryQueue.popChunk(self.primaryQueue.maxItems())) | |
324 | |
325 def pushItem(self, item): | |
326 # If there is already items in secondaryQueue, we'd need to pop them | |
327 # all to start inserting them into primaryQueue so don't bother and | |
328 # just push it in secondaryQueue. | |
329 if (self.secondaryQueue.nbItems() or | |
330 self.primaryQueue.nbItems() == self.primaryQueue.maxItems()): | |
331 item = self.secondaryQueue.pushItem(item) | |
332 if item is None: | |
333 return item | |
334 # If item is not None, secondaryQueue overflowed. We need to push it | |
335 # back to primaryQueue so the oldest item is dumped. | |
336 # Or everything fit in the primaryQueue. | |
337 return self.primaryQueue.pushItem(item) | |
338 | |
339 def insertBackChunk(self, chunk): | |
340 ret = None | |
341 # Overall excess | |
342 excess = self.nbItems() + len(chunk) - self.maxItems() | |
343 if excess > 0: | |
344 ret = chunk[0:excess] | |
345 chunk = chunk[excess:] | |
346 # Memory excess | |
347 excess = (self.primaryQueue.nbItems() + len(chunk) - | |
348 self.primaryQueue.maxItems()) | |
349 if excess > 0: | |
350 chunk2 = [] | |
351 for i in range(excess): | |
352 chunk2.append(self.primaryQueue.items().pop()) | |
353 chunk2.reverse() | |
354 x = self.primaryQueue.insertBackChunk(chunk) | |
355 assert x is None, "primaryQueue.insertBackChunk did not return None" | |
356 if excess > 0: | |
357 x = self.secondaryQueue.insertBackChunk(chunk2) | |
358 assert x is None, ("secondaryQueue.insertBackChunk did not return " | |
359 " None") | |
360 return ret | |
361 | |
362 def popChunk(self, nbItems=None): | |
363 if nbItems is None: | |
364 nbItems = self.primaryQueue.maxItems() | |
365 ret = self.primaryQueue.popChunk(nbItems) | |
366 nbItems -= len(ret) | |
367 if nbItems and self.secondaryQueue.nbItems(): | |
368 ret.extend(self.secondaryQueue.popChunk(nbItems)) | |
369 return ret | |
370 | |
371 def save(self): | |
372 self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk()) | |
373 | |
374 def items(self): | |
375 return self.primaryQueue.items() + self.secondaryQueue.items() | |
376 | |
377 def nbItems(self): | |
378 return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems() | |
379 | |
380 def maxItems(self): | |
381 return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems() | |
382 | |
383 | |
384 class IndexedQueue(object): | |
385 """Adds functionality to a IQueue object to track its usage. | |
386 | |
387 Adds a new member function getIndex() and modify popChunk() and | |
388 insertBackChunk() to keep a virtual pointer to the queue's first entry | |
389 index.""" | |
390 implements(IQueue) | |
391 | |
392 def __init__(self, queue): | |
393 # Copy all the member functions from the other object that this class | |
394 # doesn't already define. | |
395 assert IQueue.providedBy(queue) | |
396 def Filter(m): | |
397 return (m[0] != '_' and callable(getattr(queue, m)) | |
398 and not hasattr(self, m)) | |
399 for member in filter(Filter, dir(queue)): | |
400 setattr(self, member, getattr(queue, member)) | |
401 self.queue = queue | |
402 self._index = 0 | |
403 | |
404 def getIndex(self): | |
405 return self._index | |
406 | |
407 def popChunk(self, *args, **kwargs): | |
408 items = self.queue.popChunk(*args, **kwargs) | |
409 if items: | |
410 self._index += len(items) | |
411 return items | |
412 | |
413 def insertBackChunk(self, items): | |
414 self._index -= len(items) | |
415 ret = self.queue.insertBackChunk(items) | |
416 if ret: | |
417 self._index += len(ret) | |
418 return ret | |
419 | |
420 | |
421 def ToIndexedQueue(queue): | |
422 """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue.""" | |
423 if not IQueue.providedBy(queue): | |
424 raise TypeError("queue doesn't implement IQueue", queue) | |
425 if isinstance(queue, IndexedQueue): | |
426 return queue | |
427 return IndexedQueue(queue) | |
428 | |
429 # vim: set ts=4 sts=4 sw=4 et: | |
OLD | NEW |