Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(233)

Side by Side Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
229 def get_zip_compression_level(filename): 213 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 214 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 215 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 216 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 218
235 219
236 def create_directories(base_directory, files): 220 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 221 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 278
295 279
296 def try_remove(filepath): 280 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 281 """Removes a file without crashing even if it doesn't exist."""
298 try: 282 try:
299 os.remove(filepath) 283 os.remove(filepath)
300 except OSError: 284 except OSError:
301 pass 285 pass
302 286
303 287
304 def url_read(url, **kwargs): 288 class Item(object):
305 result = net.url_read(url, **kwargs) 289 """An item to push to Storage.
306 if result is None: 290
307 # If we get no response from the server, assume it is down and raise an 291 It starts its life in a main thread, travels to 'contains' thread, then to
308 # exception. 292 'push' thread and then finally back to the main thread.
309 raise MappingError('Unable to connect to server %s' % url) 293
310 return result 294 It is never used concurrently from multiple threads.
295 """
296
297 def __init__(self, digest, size, is_isolated=False):
298 self.digest = digest
299 self.size = size
300 self.is_isolated = is_isolated
301 self.compression_level = 6
302 self.push_state = None
303
304 def content(self, chunk_size):
305 """Iterable with content of this item in chunks of given size."""
306 raise NotImplementedError()
307
308
309 class FileItem(Item):
310 """A file to push to Storage."""
311
312 def __init__(self, path, digest, size, is_isolated):
313 super(FileItem, self).__init__(digest, size, is_isolated)
314 self.path = path
315 self.compression_level = get_zip_compression_level(path)
316
317 def content(self, chunk_size):
318 return file_read(self.path, chunk_size)
311 319
312 320
313 class Storage(object): 321 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 322 """Efficiently downloads or uploads large set of files via StorageApi."""
315 323
316 def __init__(self, storage_api, use_zip): 324 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 325 self.use_zip = use_zip
318 self._storage_api = storage_api 326 self._storage_api = storage_api
319 self._cpu_thread_pool = None 327 self._cpu_thread_pool = None
320 self._net_thread_pool = None 328 self._net_thread_pool = None
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 364
357 def upload_tree(self, indir, infiles): 365 def upload_tree(self, indir, infiles):
358 """Uploads the given tree to the isolate server. 366 """Uploads the given tree to the isolate server.
359 367
360 Arguments: 368 Arguments:
361 indir: root directory the infiles are based in. 369 indir: root directory the infiles are based in.
362 infiles: dict of files to upload from |indir|. 370 infiles: dict of files to upload from |indir|.
363 """ 371 """
364 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) 372 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
365 373
374 # TODO(vadimsh): Introduce Item as a part of the public interface?
375
376 # Convert |indir| + |infiles| into a list of FileItem objects.
377 # Filter out symlinks, since they are not represented by items on isolate
378 # server side.
379 items = [
380 FileItem(
381 path=os.path.join(indir, filepath),
382 digest=metadata['h'],
383 size=metadata['s'],
384 is_isolated=metadata.get('priority') == '0')
385 for filepath, metadata in infiles.iteritems()
386 if 'l' not in metadata
387 ]
388
366 # Enqueue all upload tasks. 389 # Enqueue all upload tasks.
390 missing = set()
367 channel = threading_utils.TaskChannel() 391 channel = threading_utils.TaskChannel()
368 missing = [] 392 for missing_item in self.get_missing_items(items):
369 for filename, metadata, push_urls in self.get_missing_files(infiles): 393 missing.add(missing_item)
370 missing.append((filename, metadata)) 394 self.async_push(
371 path = os.path.join(indir, filename) 395 channel,
372 if metadata.get('priority', '1') == '0': 396 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
373 priority = WorkerPool.HIGH 397 missing_item)
374 else:
375 priority = WorkerPool.MED
376 compression_level = get_zip_compression_level(path)
377 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
378 content = file_read(path, chunk_size)
379 self.async_push(channel, priority, metadata['h'], metadata['s'],
380 content, compression_level, push_urls)
381 398
382 # No need to spawn deadlock detector thread if there's nothing to upload. 399 # No need to spawn deadlock detector thread if there's nothing to upload.
383 if missing: 400 if missing:
384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 401 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
385 # Wait for all started uploads to finish. 402 # Wait for all started uploads to finish.
386 uploaded = 0 403 uploaded = 0
387 while uploaded != len(missing): 404 while uploaded != len(missing):
388 detector.ping() 405 detector.ping()
389 item = channel.pull() 406 item = channel.pull()
390 uploaded += 1 407 uploaded += 1
391 logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item) 408 logging.debug(
409 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
392 logging.info('All files are uploaded') 410 logging.info('All files are uploaded')
393 411
394 # Print stats. 412 # Print stats.
395 total = len(infiles) 413 total = len(items)
396 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues()) 414 total_size = sum(f.size for f in items)
397 logging.info( 415 logging.info(
398 'Total: %6d, %9.1fkb', 416 'Total: %6d, %9.1fkb',
399 total, 417 total,
400 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.) 418 total_size / 1024.)
401 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing) 419 cache_hit = set(items) - missing
402 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit) 420 cache_hit_size = sum(f.size for f in cache_hit)
403 logging.info( 421 logging.info(
404 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 422 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
405 len(cache_hit), 423 len(cache_hit),
406 cache_hit_size / 1024., 424 cache_hit_size / 1024.,
407 len(cache_hit) * 100. / total, 425 len(cache_hit) * 100. / total,
408 cache_hit_size * 100. / total_size if total_size else 0) 426 cache_hit_size * 100. / total_size if total_size else 0)
409 cache_miss = missing 427 cache_miss = missing
410 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) 428 cache_miss_size = sum(f.size for f in cache_miss)
411 logging.info( 429 logging.info(
412 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 430 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
413 len(cache_miss), 431 len(cache_miss),
414 cache_miss_size / 1024., 432 cache_miss_size / 1024.,
415 len(cache_miss) * 100. / total, 433 len(cache_miss) * 100. / total,
416 cache_miss_size * 100. / total_size if total_size else 0) 434 cache_miss_size * 100. / total_size if total_size else 0)
417 435
418 def async_push(self, channel, priority, item, expected_size, 436 def async_push(self, channel, priority, item):
419 content_generator, compression_level, push_urls=None): 437 """Starts asynchronous push to the server in a parallel thread.
420 """Starts asynchronous push to the server in a parallel thread.""" 438
439 Arguments:
440 channel: TaskChannel object that receives back |item| when upload ends.
441 priority: thread pool task priority for the push.
442 item: item to upload as instance of Item class.
443 """
421 def push(content, size): 444 def push(content, size):
422 """Pushes an item and returns its id, to pass as a result to |channel|.""" 445 """Pushes an item and returns its id, to pass as a result to |channel|."""
423 self._storage_api.push(item, size, content, push_urls) 446 self._storage_api.push(item, content, size)
424 return item 447 return item
425 448
426 # If zipping is not required, just start a push task. 449 # If zipping is not required, just start a push task.
427 if not self.use_zip: 450 if not self.use_zip:
428 self.net_thread_pool.add_task_with_channel(channel, priority, push, 451 self.net_thread_pool.add_task_with_channel(channel, priority, push,
429 content_generator, expected_size) 452 item.content(DISK_FILE_CHUNK), item.size)
430 return 453 return
431 454
432 # If zipping is enabled, zip in a separate thread. 455 # If zipping is enabled, zip in a separate thread.
433 def zip_and_push(): 456 def zip_and_push():
434 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble 457 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
435 # content right here. It will block until all file is zipped. 458 # content right here. It will block until all file is zipped.
436 try: 459 try:
437 stream = zip_compress(content_generator, compression_level) 460 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
461 item.compression_level)
438 data = ''.join(stream) 462 data = ''.join(stream)
439 except Exception as exc: 463 except Exception as exc:
440 logging.error('Failed to zip \'%s\': %s', item, exc) 464 logging.error('Failed to zip \'%s\': %s', item, exc)
441 channel.send_exception(exc) 465 channel.send_exception(exc)
442 return 466 return
443 self.net_thread_pool.add_task_with_channel(channel, priority, push, 467 self.net_thread_pool.add_task_with_channel(channel, priority, push,
444 [data], UNKNOWN_FILE_SIZE) 468 [data], UNKNOWN_FILE_SIZE)
445 self.cpu_thread_pool.add_task(0, zip_and_push) 469 self.cpu_thread_pool.add_task(priority, zip_and_push)
446 470
447 def get_missing_files(self, files): 471 def get_missing_items(self, items):
448 """Yields files that are missing from the server. 472 """Yields items that are missing from the server.
449 473
450 Issues multiple parallel queries via StorageApi's 'contains' method. 474 Issues multiple parallel queries via StorageApi's 'contains' method.
451 475
452 Arguments: 476 Arguments:
453 files: a dictionary file name -> metadata dict. 477 items: a list of Item objects to check.
454 478
455 Yields: 479 Yields:
456 Triplets (file name, metadata dict, push_urls object to pass to push). 480 Item objects that are missing from the server.
457 """ 481 """
458 channel = threading_utils.TaskChannel() 482 channel = threading_utils.TaskChannel()
459 pending = 0 483 pending = 0
460 # Enqueue all requests. 484 # Enqueue all requests.
461 for batch in self.batch_files_for_check(files): 485 for batch in self.batch_items_for_check(items):
462 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, 486 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
463 self._storage_api.contains, batch) 487 self._storage_api.contains, batch)
464 pending += 1 488 pending += 1
465 # Yield results as they come in. 489 # Yield results as they come in.
466 for _ in xrange(pending): 490 for _ in xrange(pending):
467 for missing in channel.pull(): 491 for missing in channel.pull():
468 yield missing 492 yield missing
469 493
470 @staticmethod 494 @staticmethod
471 def batch_files_for_check(files): 495 def batch_items_for_check(items):
472 """Splits list of files to check for existence on the server into batches. 496 """Splits list of items to check for existence on the server into batches.
473 497
474 Each batch corresponds to a single 'exists?' query to the server via a call 498 Each batch corresponds to a single 'exists?' query to the server via a call
475 to StorageApi's 'contains' method. 499 to StorageApi's 'contains' method.
476 500
477 Arguments: 501 Arguments:
478 files: a dictionary file name -> metadata dict. 502 items: a list of Item objects.
479 503
480 Yields: 504 Yields:
481 Batches of files to query for existence in a single operation, 505 Batches of items to query for existence in a single operation,
482 each batch is a list of pairs: (file name, metadata dict). 506 each batch is a list of Item objects.
483 """ 507 """
484 batch_count = 0 508 batch_count = 0
485 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] 509 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
486 next_queries = [] 510 next_queries = []
487 items = ((k, v) for k, v in files.iteritems() if 's' in v) 511 for item in sorted(items, key=lambda x: x.size, reverse=True):
488 for filename, metadata in sorted(items, key=lambda x: -x[1]['s']): 512 next_queries.append(item)
489 next_queries.append((filename, metadata))
490 if len(next_queries) == batch_size_limit: 513 if len(next_queries) == batch_size_limit:
491 yield next_queries 514 yield next_queries
492 next_queries = [] 515 next_queries = []
493 batch_count += 1 516 batch_count += 1
494 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ 517 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
495 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] 518 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
496 if next_queries: 519 if next_queries:
497 yield next_queries 520 yield next_queries
498 521
499 522
500 class StorageApi(object): 523 class StorageApi(object):
501 """Interface for classes that implement low-level storage operations.""" 524 """Interface for classes that implement low-level storage operations."""
502 525
503 def fetch(self, item, expected_size): 526 def fetch(self, digest, size):
504 """Fetches an object and yields its content. 527 """Fetches an object and yields its content.
505 528
506 Arguments: 529 Arguments:
507 item: hash digest of item to download. 530 digest: hash digest of item to download.
508 expected_size: expected size of the item, to validate it. 531 size: expected size of the item, to validate it.
509 532
510 Yields: 533 Yields:
511 Chunks of downloaded item (as str objects). 534 Chunks of downloaded item (as str objects).
512 """ 535 """
513 raise NotImplementedError() 536 raise NotImplementedError()
514 537
515 def push(self, item, expected_size, content_generator, push_urls=None): 538 def push(self, item, content, size):
516 """Uploads content generated by |content_generator| as |item|. 539 """Uploads an |item| with content generated by |content| generator.
517 540
518 Arguments: 541 Arguments:
519 item: hash digest of item to upload. 542 item: Item object that holds information about an item being pushed.
520 expected_size: total length of the content yielded by |content_generator|. 543 content: a generator that yields chunks to push.
521 content_generator: generator that produces chunks to push. 544 size: expected size of stream produced by |content|.
522 push_urls: optional URLs returned by 'contains' call for this item.
523 545
524 Returns: 546 Returns:
525 None. 547 None.
526 """ 548 """
527 raise NotImplementedError() 549 raise NotImplementedError()
528 550
529 def contains(self, files): 551 def contains(self, items):
530 """Checks for existence of given |files| on the server. 552 """Checks for existence of given |items| on the server.
553
554 Mutates |items| by assigning opaque implement specific object to Item's
555 push_state attribute on missing entries in the datastore.
531 556
532 Arguments: 557 Arguments:
533 files: list of pairs (file name, metadata dict). 558 items: list of Item objects.
534 559
535 Returns: 560 Returns:
536 A list of files missing on server as a list of triplets 561 A list of items missing on server as a list of Item objects.
537 (file name, metadata dict, push_urls object to pass to push).
538 """ 562 """
539 raise NotImplementedError() 563 raise NotImplementedError()
540 564
541 565
542 class IsolateServer(StorageApi): 566 class IsolateServer(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 567 """StorageApi implementation that downloads and uploads to Isolate Server.
568
569 It uploads and downloads directly from Google Storage whenever appropriate.
570 """
571
572 class _PushState(object):
573 """State needed to call .push(), to be stored in Item.push_state."""
574 def __init__(self, upload_url, finalize_url):
575 self.upload_url = upload_url
576 self.finalize_url = finalize_url
577 self.uploaded = False
578 self.finalized = False
579
544 def __init__(self, base_url, namespace): 580 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 581 super(IsolateServer, self).__init__()
546 assert base_url.startswith('http'), base_url 582 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 583 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 584 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 585 self.algo = get_hash_algo(namespace)
550 self._token = None 586 self._use_zip = is_namespace_with_compression(namespace)
551 self._lock = threading.Lock() 587 self._lock = threading.Lock()
588 self._server_caps = None
589
590 @staticmethod
591 def _generate_handshake_request():
592 """Returns a dict to be sent as handshake request body."""
593 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
594 return {
595 'client_app_version': __version__,
596 'fetcher': True,
597 'protocol_version': ISOLATE_PROTOCOL_VERSION,
598 'pusher': True,
599 }
600
601 @staticmethod
602 def _validate_handshake_response(caps):
603 """Validates and normalizes handshake response."""
604 logging.info('Protocol version: %s', caps['protocol_version'])
605 logging.info('Server version: %s', caps['server_app_version'])
606 if caps.get('error'):
607 raise MappingError(caps['error'])
608 if not caps['access_token']:
609 raise ValueError('access_token is missing')
610 return caps
552 611
553 @property 612 @property
554 def token(self): 613 def _server_capabilities(self):
614 """Performs handshake with the server if not yet done.
615
616 Returns:
617 Server capabilities dictionary as returned by /handshake endpoint.
618
619 Raises:
620 MappingError if server rejects the handshake.
621 """
555 # TODO(maruel): Make this request much earlier asynchronously while the 622 # TODO(maruel): Make this request much earlier asynchronously while the
556 # files are being enumerated. 623 # files are being enumerated.
557 with self._lock: 624 with self._lock:
558 if not self._token: 625 if self._server_caps is None:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) 626 request_body = json.dumps(
560 return self._token 627 self._generate_handshake_request(), separators=(',', ':'))
561 628 response = net.url_read(
562 def fetch(self, item, expected_size): 629 url=self.base_url + '/content-gs/handshake',
563 assert isinstance(item, basestring) 630 data=request_body,
564 assert ( 631 content_type='application/json',
565 isinstance(expected_size, (int, long)) or 632 method='POST')
566 expected_size == UNKNOWN_FILE_SIZE) 633 if response is None:
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 634 raise MappingError('Failed to perform handshake.')
568 logging.debug('download_file(%s)', zipped_url) 635 try:
636 caps = json.loads(response)
637 if not isinstance(caps, dict):
638 raise ValueError('Expecting JSON dict')
639 self._server_caps = self._validate_handshake_response(caps)
640 except (ValueError, KeyError, TypeError) as exc:
641 # KeyError exception has very confusing str conversion: it's just a
642 # missing key value and nothing else. So print exception class name
643 # as well.
644 raise MappingError('Invalid handshake response (%s): %s' % (
645 exc.__class__.__name__, exc))
646 return self._server_caps
647
648 def fetch(self, digest, size):
649 assert isinstance(digest, basestring)
650 assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
651
652 source_url = '%s/content-gs/retrieve/%s/%s' % (
653 self.base_url, self.namespace, digest)
654 logging.debug('download_file(%s)', source_url)
569 655
570 # Because the app engine DB is only eventually consistent, retry 404 errors 656 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 657 # because the file might just not be visible yet (even though it has been
572 # uploaded). 658 # uploaded).
573 connection = net.url_open( 659 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 660 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 661 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 662 raise IOError('Unable to open connection to %s' % source_url)
577 663
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 664 try:
581 compressed_size = 0 665 # Prepare reading pipeline.
582 decompressed_size = 0 666 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 667 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 668 generator = zip_decompress(generator, DISK_FILE_CHUNK)
585 if not chunk: 669
586 break 670 # Read and yield data, calculate total length of the decompressed stream.
587 compressed_size += len(chunk) 671 total_size = 0
588 decompressed = decompressor.decompress(chunk) 672 for chunk in generator:
589 decompressed_size += len(decompressed) 673 total_size += len(chunk)
590 yield decompressed 674 yield chunk
591 675
592 # Ensure that all the data was properly decompressed. 676 # Verify data length matches expectation.
593 uncompressed_data = decompressor.flush() 677 if size != UNKNOWN_FILE_SIZE and total_size != size:
594 if uncompressed_data: 678 raise IOError('Incorrect file size: expected %d, got %d' % (
595 raise IOError('Decompression failed') 679 size, total_size))
596 if (expected_size != UNKNOWN_FILE_SIZE and 680
597 decompressed_size != expected_size): 681 except IOError as err:
598 raise IOError('File incorrect size after download of %s. Got %s and ' 682 logging.warning('Failed to fetch %s: %s', digest, err)
599 'expected %s' % (item, decompressed_size, expected_size)) 683 raise
600 except zlib.error as e: 684
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( 685 def push(self, item, content, size):
602 item, compressed_size, connection.content_length, e) 686 assert isinstance(item, Item)
603 logging.warning(msg) 687 assert isinstance(item.push_state, IsolateServer._PushState)
604 688 assert not item.push_state.finalized
605 # Testing seems to show that if a few machines are trying to download 689
606 # the same blob, they can cause each other to fail. So if we hit a zip 690 # TODO(vadimsh): Do not read from |content| generator when retrying push.
607 # error, this is the most likely cause (it only downloads some of the 691 # If |content| is indeed a generator, it can not be re-winded back
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread 692 # to the beginning of the stream. A retry will find it exhausted. A possible
609 # out the downloads. 693 # solution is to wrap |content| generator with some sort of caching
610 sleep_duration = (random.random() * 20) + 5 694 # restartable generator. It should be done alongside streaming support
611 time.sleep(sleep_duration) 695 # implementation.
612 raise IOError(msg) 696
613 697 # This push operation may be a retry after failed finalization call below,
614 def push(self, item, expected_size, content_generator, push_urls=None): 698 # no need to reupload contents in that case.
615 assert isinstance(item, basestring) 699 if not item.push_state.uploaded:
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 700 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
617 item = str(item) 701 # upload support is implemented.
618 702 if isinstance(content, list) and len(content) == 1:
619 # TODO(maruel): Support large files. This would require streaming support. 703 content = content[0]
620 704 else:
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 705 content = ''.join(content)
622 # upload support is implemented. 706 # PUT file to |upload_url|.
623 if isinstance(content_generator, list) and len(content_generator) == 1: 707 response = net.url_read(
624 content = content_generator[0] 708 url=item.push_state.upload_url,
709 data=content,
710 content_type='application/octet-stream',
711 method='PUT')
712 if response is None:
713 raise IOError('Failed to upload a file %s to %s' % (
714 item.digest, item.push_state.upload_url))
715 item.push_state.uploaded = True
625 else: 716 else:
626 content = ''.join(content_generator) 717 logging.info(
627 718 'A file %s already uploaded, retrying finalization only', item.digest)
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 719
629 return self._upload_hash_content_to_blobstore(item, content) 720 # Optionally notify the server that it's done.
630 721 if item.push_state.finalize_url:
631 url = '%sstore/%s/%s?token=%s' % ( 722 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
632 self.content_url, self.namespace, item, self.token) 723 # send it to isolated server. That way isolate server can verify that
633 return url_read(url, data=content, content_type='application/octet-stream') 724 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
634 725 # stored files).
635 def contains(self, files): 726 response = net.url_read(
636 logging.info('Checking existence of %d files...', len(files)) 727 url=item.push_state.finalize_url,
637 728 data='',
638 body = ''.join( 729 content_type='application/json',
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 730 method='POST')
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 731 if response is None:
641 732 raise IOError('Failed to finalize an upload of %s' % item.digest)
642 query_url = '%scontains/%s?token=%s' % ( 733 item.push_state.finalized = True
643 self.content_url, self.namespace, self.token) 734
644 response = url_read( 735 def contains(self, items):
645 query_url, data=body, content_type='application/octet-stream') 736 logging.info('Checking existence of %d files...', len(items))
646 if len(files) != len(response): 737
738 # Request body is a json encoded list of dicts.
739 body = [
740 {
741 'h': item.digest,
742 's': item.size,
743 'i': int(item.is_isolated),
744 } for item in items
745 ]
746
747 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
748 self.base_url,
749 self.namespace,
750 urllib.quote(self._server_capabilities['access_token']))
751 response_body = net.url_read(
752 url=query_url,
753 data=json.dumps(body, separators=(',', ':')),
754 content_type='application/json',
755 method='POST')
756 if response_body is None:
757 raise MappingError('Failed to execute /pre-upload query')
758
759 # Response body is a list of push_urls (or null if file is already present).
760 try:
761 response = json.loads(response_body)
762 if not isinstance(response, list):
763 raise ValueError('Expecting response with json-encoded list')
764 if len(response) != len(items):
765 raise ValueError(
766 'Incorrect number of items in the list, expected %d, '
767 'but got %d' % (len(items), len(response)))
768 except ValueError as err:
647 raise MappingError( 769 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 770 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response))) 771
650 772 # Pick Items that are missing, attach _PushState to them.
651 # This implementation of IsolateServer doesn't use push_urls field, 773 missing_items = []
652 # set it to None. 774 for i, push_urls in enumerate(response):
653 missing_files = [ 775 if push_urls:
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 776 assert len(push_urls) == 2, str(push_urls)
655 ] 777 item = items[i]
778 assert item.push_state is None
779 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
780 missing_items.append(item)
656 logging.info('Queried %d files, %d cache hit', 781 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 782 len(items), len(items) - len(missing_items))
658 return missing_files 783 return missing_items
659
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685 784
686 785
687 class FileSystem(StorageApi): 786 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 787 """StorageApi implementation that fetches data from the file system.
689 788
690 The common use case is a NFS/CIFS file server that is mounted locally that is 789 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 790 used to fetch the file on a local partition.
692 """ 791 """
792
693 def __init__(self, base_path): 793 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 794 super(FileSystem, self).__init__()
695 self.base_path = base_path 795 self.base_path = base_path
696 796
697 def fetch(self, item, expected_size): 797 def fetch(self, digest, size):
698 assert isinstance(item, basestring) 798 assert isinstance(digest, basestring)
699 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 799 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
700 source = os.path.join(self.base_path, item) 800 source = os.path.join(self.base_path, digest)
701 if (expected_size != UNKNOWN_FILE_SIZE and 801 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
702 not is_valid_file(source, expected_size)): 802 raise IOError('Invalid file %s' % digest)
703 raise IOError('Invalid file %s' % item)
704 return file_read(source) 803 return file_read(source)
705 804
706 def push(self, item, expected_size, content_generator, push_urls=None): 805 def push(self, item, content, size):
707 assert isinstance(item, basestring) 806 assert isinstance(item, Item)
708 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 807 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
709 dest = os.path.join(self.base_path, item) 808 dest = os.path.join(self.base_path, item.digest)
710 total = file_write(dest, content_generator) 809 total = file_write(dest, content)
711 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: 810 if size != UNKNOWN_FILE_SIZE and total != size:
712 os.remove(dest) 811 os.remove(dest)
713 raise IOError( 812 raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
714 'Invalid file %s, %d != %d' % (item, total, expected_size))
715 813
716 def contains(self, files): 814 def contains(self, items):
717 return [ 815 return [
718 (filename, metadata, None) 816 item for item in items
719 for filename, metadata in files 817 if not os.path.exists(os.path.join(self.base_path, item.digest))
720 if not os.path.exists(os.path.join(self.base_path, metadata['h']))
721 ] 818 ]
722 819
723 820
724 def get_hash_algo(_namespace): 821 def get_hash_algo(_namespace):
725 """Return hash algorithm class to use when uploading to given |namespace|.""" 822 """Return hash algorithm class to use when uploading to given |namespace|."""
726 # TODO(vadimsh): Implement this at some point. 823 # TODO(vadimsh): Implement this at some point.
727 return hashlib.sha1 824 return hashlib.sha1
728 825
729 826
730 def is_namespace_with_compression(namespace): 827 def is_namespace_with_compression(namespace):
(...skipping 29 matching lines...) Expand all
760 857
761 858
762 def upload_tree(base_url, indir, infiles, namespace): 859 def upload_tree(base_url, indir, infiles, namespace):
763 """Uploads the given tree to the given url. 860 """Uploads the given tree to the given url.
764 861
765 Arguments: 862 Arguments:
766 base_url: The base url, it is assume that |base_url|/has/ can be used to 863 base_url: The base url, it is assume that |base_url|/has/ can be used to
767 query if an element was already uploaded, and |base_url|/store/ 864 query if an element was already uploaded, and |base_url|/store/
768 can be used to upload a new element. 865 can be used to upload a new element.
769 indir: Root directory the infiles are based in. 866 indir: Root directory the infiles are based in.
770 infiles: dict of files to upload files from |indir| to |base_url|. 867 infiles: dict of files to upload from |indir| to |base_url|.
771 namespace: The namespace to use on the server. 868 namespace: The namespace to use on the server.
772 """ 869 """
773 remote = get_storage_api(base_url, namespace) 870 remote = get_storage_api(base_url, namespace)
774 with Storage(remote, is_namespace_with_compression(namespace)) as storage: 871 with Storage(remote, is_namespace_with_compression(namespace)) as storage:
775 storage.upload_tree(indir, infiles) 872 storage.upload_tree(indir, infiles)
776 return 0 873 return 0
777 874
778 875
779 class MemoryCache(object): 876 class MemoryCache(object):
780 """This class is intended to be usable everywhere the Cache class is. 877 """This class is intended to be usable everywhere the Cache class is.
(...skipping 521 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1399 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1400 sys.stderr.write('\n')
1304 return 1 1401 return 1
1305 1402
1306 1403
1307 if __name__ == '__main__': 1404 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1405 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1406 tools.disable_buffering()
1310 colorama.init() 1407 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1408 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698