| Index: isolateserver.py
|
| diff --git a/isolateserver.py b/isolateserver.py
|
| index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..344872353237ae665e8c220b4a27a61e811d3dc9 100755
|
| --- a/isolateserver.py
|
| +++ b/isolateserver.py
|
| @@ -7,12 +7,10 @@
|
|
|
| __version__ = '0.2'
|
|
|
| -import binascii
|
| import hashlib
|
| import json
|
| import logging
|
| import os
|
| -import random
|
| import re
|
| import sys
|
| import threading
|
| @@ -29,14 +27,15 @@ from utils import threading_utils
|
| from utils import tools
|
|
|
|
|
| -# The minimum size of files to upload directly to the blobstore.
|
| -MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
|
| +# Version of isolate protocol passed to the server in /handshake request.
|
| +ISOLATE_PROTOCOL_VERSION = '1.0'
|
|
|
| -# The number of files to check the isolate server per /contains query.
|
| +
|
| +# The number of files to check the isolate server per /pre-upload query.
|
| # All files are sorted by likelihood of a change in the file content
|
| # (currently file size is used to estimate this: larger the file -> larger the
|
| # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
|
| -# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
|
| +# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
|
| # and so on. Numbers here is a trade-off; the more per request, the lower the
|
| # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
|
| # larger values cause longer lookups, increasing the initial latency to start
|
| @@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None
|
| # The size of each chunk to read when downloading and unzipping files.
|
| ZIPPED_FILE_CHUNK = 16 * 1024
|
|
|
| -
|
| # Chunk size to use when doing disk I/O.
|
| DISK_FILE_CHUNK = 1024 * 1024
|
|
|
| +# Chunk size to use when reading from network stream.
|
| +NET_IO_FILE_CHUNK = 16 * 1024
|
| +
|
|
|
| # Read timeout in seconds for downloads from isolate storage. If there's no
|
| # response from the server within this timeout whole download will be aborted.
|
| @@ -106,63 +107,6 @@ class MappingError(OSError):
|
| pass
|
|
|
|
|
| -def randomness():
|
| - """Generates low-entropy randomness for MIME encoding.
|
| -
|
| - Exists so it can be mocked out in unit tests.
|
| - """
|
| - return str(time.time())
|
| -
|
| -
|
| -def encode_multipart_formdata(fields, files,
|
| - mime_mapper=lambda _: 'application/octet-stream'):
|
| - """Encodes a Multipart form data object.
|
| -
|
| - Args:
|
| - fields: a sequence (name, value) elements for
|
| - regular form fields.
|
| - files: a sequence of (name, filename, value) elements for data to be
|
| - uploaded as files.
|
| - mime_mapper: function to return the mime type from the filename.
|
| - Returns:
|
| - content_type: for httplib.HTTP instance
|
| - body: for httplib.HTTP instance
|
| - """
|
| - boundary = hashlib.md5(randomness()).hexdigest()
|
| - body_list = []
|
| - for (key, value) in fields:
|
| - if isinstance(key, unicode):
|
| - value = key.encode('utf-8')
|
| - if isinstance(value, unicode):
|
| - value = value.encode('utf-8')
|
| - body_list.append('--' + boundary)
|
| - body_list.append('Content-Disposition: form-data; name="%s"' % key)
|
| - body_list.append('')
|
| - body_list.append(value)
|
| - body_list.append('--' + boundary)
|
| - body_list.append('')
|
| - for (key, filename, value) in files:
|
| - if isinstance(key, unicode):
|
| - value = key.encode('utf-8')
|
| - if isinstance(filename, unicode):
|
| - value = filename.encode('utf-8')
|
| - if isinstance(value, unicode):
|
| - value = value.encode('utf-8')
|
| - body_list.append('--' + boundary)
|
| - body_list.append('Content-Disposition: form-data; name="%s"; '
|
| - 'filename="%s"' % (key, filename))
|
| - body_list.append('Content-Type: %s' % mime_mapper(filename))
|
| - body_list.append('')
|
| - body_list.append(value)
|
| - body_list.append('--' + boundary)
|
| - body_list.append('')
|
| - if body_list:
|
| - body_list[-2] += '--'
|
| - body = '\r\n'.join(body_list)
|
| - content_type = 'multipart/form-data; boundary=%s' % boundary
|
| - return content_type, body
|
| -
|
| -
|
| def is_valid_hash(value, algo):
|
| """Returns if the value is a valid hash for the corresponding algorithm."""
|
| size = 2 * algo().digest_size
|
| @@ -184,6 +128,15 @@ def hash_file(filepath, algo):
|
| return digest.hexdigest()
|
|
|
|
|
| +def stream_read(stream, chunk_size):
|
| + """Reads chunks from |stream| and yields them."""
|
| + while True:
|
| + data = stream.read(chunk_size)
|
| + if not data:
|
| + break
|
| + yield data
|
| +
|
| +
|
| def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
|
| """Yields file content in chunks of given |chunk_size|."""
|
| with open(filepath, 'rb') as f:
|
| @@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7):
|
| yield tail
|
|
|
|
|
| +def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
|
| + """Reads zipped data from |content_generator| and yields decompressed data.
|
| +
|
| + Decompresses data in small chunks (no larger than |chunk_size|) so that
|
| + zip bomb file doesn't cause zlib to preallocate huge amount of memory.
|
| +
|
| + Raises IOError if data is corrupted or incomplete.
|
| + """
|
| + decompressor = zlib.decompressobj()
|
| + compressed_size = 0
|
| + try:
|
| + for chunk in content_generator:
|
| + compressed_size += len(chunk)
|
| + data = decompressor.decompress(chunk, chunk_size)
|
| + if data:
|
| + yield data
|
| + while decompressor.unconsumed_tail:
|
| + data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
|
| + if data:
|
| + yield data
|
| + tail = decompressor.flush()
|
| + if tail:
|
| + yield tail
|
| + except zlib.error as e:
|
| + raise IOError(
|
| + 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
|
| + # Ensure all data was read and decompressed.
|
| + if decompressor.unused_data or decompressor.unconsumed_tail:
|
| + raise IOError('Not all data was decompressed')
|
| +
|
| +
|
| def get_zip_compression_level(filename):
|
| """Given a filename calculates the ideal zip compression level to use."""
|
| file_ext = os.path.splitext(filename)[1].lower()
|
| @@ -301,13 +285,37 @@ def try_remove(filepath):
|
| pass
|
|
|
|
|
| -def url_read(url, **kwargs):
|
| - result = net.url_read(url, **kwargs)
|
| - if result is None:
|
| - # If we get no response from the server, assume it is down and raise an
|
| - # exception.
|
| - raise MappingError('Unable to connect to server %s' % url)
|
| - return result
|
| +class Item(object):
|
| + """An item to push to Storage.
|
| +
|
| + It starts its life in a main thread, travels to 'contains' thread, then to
|
| + 'push' thread and then finally back to the main thread.
|
| +
|
| + It is never used concurrently from multiple threads.
|
| + """
|
| +
|
| + def __init__(self, digest, size, is_isolated=False):
|
| + self.digest = digest
|
| + self.size = size
|
| + self.is_isolated = is_isolated
|
| + self.compression_level = 6
|
| + self.push_state = None
|
| +
|
| + def content(self, chunk_size):
|
| + """Iterable with content of this item in chunks of given size."""
|
| + raise NotImplementedError()
|
| +
|
| +
|
| +class FileItem(Item):
|
| + """A file to push to Storage."""
|
| +
|
| + def __init__(self, path, digest, size, is_isolated):
|
| + super(FileItem, self).__init__(digest, size, is_isolated)
|
| + self.path = path
|
| + self.compression_level = get_zip_compression_level(path)
|
| +
|
| + def content(self, chunk_size):
|
| + return file_read(self.path, chunk_size)
|
|
|
|
|
| class Storage(object):
|
| @@ -363,21 +371,30 @@ class Storage(object):
|
| """
|
| logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
|
|
|
| + # TODO(vadimsh): Introduce Item as a part of the public interface?
|
| +
|
| + # Convert |indir| + |infiles| into a list of FileItem objects.
|
| + # Filter out symlinks, since they are not represented by items on isolate
|
| + # server side.
|
| + items = [
|
| + FileItem(
|
| + path=os.path.join(indir, filepath),
|
| + digest=metadata['h'],
|
| + size=metadata['s'],
|
| + is_isolated=metadata.get('priority') == '0')
|
| + for filepath, metadata in infiles.iteritems()
|
| + if 'l' not in metadata
|
| + ]
|
| +
|
| # Enqueue all upload tasks.
|
| + missing = set()
|
| channel = threading_utils.TaskChannel()
|
| - missing = []
|
| - for filename, metadata, push_urls in self.get_missing_files(infiles):
|
| - missing.append((filename, metadata))
|
| - path = os.path.join(indir, filename)
|
| - if metadata.get('priority', '1') == '0':
|
| - priority = WorkerPool.HIGH
|
| - else:
|
| - priority = WorkerPool.MED
|
| - compression_level = get_zip_compression_level(path)
|
| - chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
|
| - content = file_read(path, chunk_size)
|
| - self.async_push(channel, priority, metadata['h'], metadata['s'],
|
| - content, compression_level, push_urls)
|
| + for missing_item in self.get_missing_items(items):
|
| + missing.add(missing_item)
|
| + self.async_push(
|
| + channel,
|
| + WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
|
| + missing_item)
|
|
|
| # No need to spawn deadlock detector thread if there's nothing to upload.
|
| if missing:
|
| @@ -388,18 +405,19 @@ class Storage(object):
|
| detector.ping()
|
| item = channel.pull()
|
| uploaded += 1
|
| - logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item)
|
| + logging.debug(
|
| + 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
|
| logging.info('All files are uploaded')
|
|
|
| # Print stats.
|
| - total = len(infiles)
|
| - total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
|
| + total = len(items)
|
| + total_size = sum(f.size for f in items)
|
| logging.info(
|
| 'Total: %6d, %9.1fkb',
|
| total,
|
| - sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
|
| - cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing)
|
| - cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
|
| + total_size / 1024.)
|
| + cache_hit = set(items) - missing
|
| + cache_hit_size = sum(f.size for f in cache_hit)
|
| logging.info(
|
| 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
|
| len(cache_hit),
|
| @@ -407,7 +425,7 @@ class Storage(object):
|
| len(cache_hit) * 100. / total,
|
| cache_hit_size * 100. / total_size if total_size else 0)
|
| cache_miss = missing
|
| - cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
|
| + cache_miss_size = sum(f.size for f in cache_miss)
|
| logging.info(
|
| 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
|
| len(cache_miss),
|
| @@ -415,18 +433,23 @@ class Storage(object):
|
| len(cache_miss) * 100. / total,
|
| cache_miss_size * 100. / total_size if total_size else 0)
|
|
|
| - def async_push(self, channel, priority, item, expected_size,
|
| - content_generator, compression_level, push_urls=None):
|
| - """Starts asynchronous push to the server in a parallel thread."""
|
| + def async_push(self, channel, priority, item):
|
| + """Starts asynchronous push to the server in a parallel thread.
|
| +
|
| + Arguments:
|
| + channel: TaskChannel object that receives back |item| when upload ends.
|
| + priority: thread pool task priority for the push.
|
| + item: item to upload as instance of Item class.
|
| + """
|
| def push(content, size):
|
| """Pushes an item and returns its id, to pass as a result to |channel|."""
|
| - self._storage_api.push(item, size, content, push_urls)
|
| + self._storage_api.push(item, content, size)
|
| return item
|
|
|
| # If zipping is not required, just start a push task.
|
| if not self.use_zip:
|
| self.net_thread_pool.add_task_with_channel(channel, priority, push,
|
| - content_generator, expected_size)
|
| + item.content(DISK_FILE_CHUNK), item.size)
|
| return
|
|
|
| # If zipping is enabled, zip in a separate thread.
|
| @@ -434,7 +457,8 @@ class Storage(object):
|
| # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
|
| # content right here. It will block until all file is zipped.
|
| try:
|
| - stream = zip_compress(content_generator, compression_level)
|
| + stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
|
| + item.compression_level)
|
| data = ''.join(stream)
|
| except Exception as exc:
|
| logging.error('Failed to zip \'%s\': %s', item, exc)
|
| @@ -442,23 +466,23 @@ class Storage(object):
|
| return
|
| self.net_thread_pool.add_task_with_channel(channel, priority, push,
|
| [data], UNKNOWN_FILE_SIZE)
|
| - self.cpu_thread_pool.add_task(0, zip_and_push)
|
| + self.cpu_thread_pool.add_task(priority, zip_and_push)
|
|
|
| - def get_missing_files(self, files):
|
| - """Yields files that are missing from the server.
|
| + def get_missing_items(self, items):
|
| + """Yields items that are missing from the server.
|
|
|
| Issues multiple parallel queries via StorageApi's 'contains' method.
|
|
|
| Arguments:
|
| - files: a dictionary file name -> metadata dict.
|
| + items: a list of Item objects to check.
|
|
|
| Yields:
|
| - Triplets (file name, metadata dict, push_urls object to pass to push).
|
| + Item objects that are missing from the server.
|
| """
|
| channel = threading_utils.TaskChannel()
|
| pending = 0
|
| # Enqueue all requests.
|
| - for batch in self.batch_files_for_check(files):
|
| + for batch in self.batch_items_for_check(items):
|
| self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
|
| self._storage_api.contains, batch)
|
| pending += 1
|
| @@ -468,25 +492,24 @@ class Storage(object):
|
| yield missing
|
|
|
| @staticmethod
|
| - def batch_files_for_check(files):
|
| - """Splits list of files to check for existence on the server into batches.
|
| + def batch_items_for_check(items):
|
| + """Splits list of items to check for existence on the server into batches.
|
|
|
| Each batch corresponds to a single 'exists?' query to the server via a call
|
| to StorageApi's 'contains' method.
|
|
|
| Arguments:
|
| - files: a dictionary file name -> metadata dict.
|
| + items: a list of Item objects.
|
|
|
| Yields:
|
| - Batches of files to query for existence in a single operation,
|
| - each batch is a list of pairs: (file name, metadata dict).
|
| + Batches of items to query for existence in a single operation,
|
| + each batch is a list of Item objects.
|
| """
|
| batch_count = 0
|
| batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
|
| next_queries = []
|
| - items = ((k, v) for k, v in files.iteritems() if 's' in v)
|
| - for filename, metadata in sorted(items, key=lambda x: -x[1]['s']):
|
| - next_queries.append((filename, metadata))
|
| + for item in sorted(items, key=lambda x: x.size, reverse=True):
|
| + next_queries.append(item)
|
| if len(next_queries) == batch_size_limit:
|
| yield next_queries
|
| next_queries = []
|
| @@ -500,188 +523,264 @@ class Storage(object):
|
| class StorageApi(object):
|
| """Interface for classes that implement low-level storage operations."""
|
|
|
| - def fetch(self, item, expected_size):
|
| + def fetch(self, digest, size):
|
| """Fetches an object and yields its content.
|
|
|
| Arguments:
|
| - item: hash digest of item to download.
|
| - expected_size: expected size of the item, to validate it.
|
| + digest: hash digest of item to download.
|
| + size: expected size of the item, to validate it.
|
|
|
| Yields:
|
| Chunks of downloaded item (as str objects).
|
| """
|
| raise NotImplementedError()
|
|
|
| - def push(self, item, expected_size, content_generator, push_urls=None):
|
| - """Uploads content generated by |content_generator| as |item|.
|
| + def push(self, item, content, size):
|
| + """Uploads an |item| with content generated by |content| generator.
|
|
|
| Arguments:
|
| - item: hash digest of item to upload.
|
| - expected_size: total length of the content yielded by |content_generator|.
|
| - content_generator: generator that produces chunks to push.
|
| - push_urls: optional URLs returned by 'contains' call for this item.
|
| + item: Item object that holds information about an item being pushed.
|
| + content: a generator that yields chunks to push.
|
| + size: expected size of stream produced by |content|.
|
|
|
| Returns:
|
| None.
|
| """
|
| raise NotImplementedError()
|
|
|
| - def contains(self, files):
|
| - """Checks for existence of given |files| on the server.
|
| + def contains(self, items):
|
| + """Checks for existence of given |items| on the server.
|
| +
|
| + Mutates |items| by assigning opaque implement specific object to Item's
|
| + push_state attribute on missing entries in the datastore.
|
|
|
| Arguments:
|
| - files: list of pairs (file name, metadata dict).
|
| + items: list of Item objects.
|
|
|
| Returns:
|
| - A list of files missing on server as a list of triplets
|
| - (file name, metadata dict, push_urls object to pass to push).
|
| + A list of items missing on server as a list of Item objects.
|
| """
|
| raise NotImplementedError()
|
|
|
|
|
| class IsolateServer(StorageApi):
|
| - """StorageApi implementation that downloads and uploads to Isolate Server."""
|
| + """StorageApi implementation that downloads and uploads to Isolate Server.
|
| +
|
| + It uploads and downloads directly from Google Storage whenever appropriate.
|
| + """
|
| +
|
| + class _PushState(object):
|
| + """State needed to call .push(), to be stored in Item.push_state."""
|
| + def __init__(self, upload_url, finalize_url):
|
| + self.upload_url = upload_url
|
| + self.finalize_url = finalize_url
|
| + self.uploaded = False
|
| + self.finalized = False
|
| +
|
| def __init__(self, base_url, namespace):
|
| super(IsolateServer, self).__init__()
|
| assert base_url.startswith('http'), base_url
|
| - self.content_url = base_url.rstrip('/') + '/content/'
|
| + self.base_url = base_url.rstrip('/')
|
| self.namespace = namespace
|
| self.algo = get_hash_algo(namespace)
|
| - self._token = None
|
| + self._use_zip = is_namespace_with_compression(namespace)
|
| self._lock = threading.Lock()
|
| + self._server_caps = None
|
| +
|
| + @staticmethod
|
| + def _generate_handshake_request():
|
| + """Returns a dict to be sent as handshake request body."""
|
| + # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
|
| + return {
|
| + 'client_app_version': __version__,
|
| + 'fetcher': True,
|
| + 'protocol_version': ISOLATE_PROTOCOL_VERSION,
|
| + 'pusher': True,
|
| + }
|
| +
|
| + @staticmethod
|
| + def _validate_handshake_response(caps):
|
| + """Validates and normalizes handshake response."""
|
| + logging.info('Protocol version: %s', caps['protocol_version'])
|
| + logging.info('Server version: %s', caps['server_app_version'])
|
| + if caps.get('error'):
|
| + raise MappingError(caps['error'])
|
| + if not caps['access_token']:
|
| + raise ValueError('access_token is missing')
|
| + return caps
|
|
|
| @property
|
| - def token(self):
|
| + def _server_capabilities(self):
|
| + """Performs handshake with the server if not yet done.
|
| +
|
| + Returns:
|
| + Server capabilities dictionary as returned by /handshake endpoint.
|
| +
|
| + Raises:
|
| + MappingError if server rejects the handshake.
|
| + """
|
| # TODO(maruel): Make this request much earlier asynchronously while the
|
| # files are being enumerated.
|
| with self._lock:
|
| - if not self._token:
|
| - self._token = urllib.quote(url_read(self.content_url + 'get_token'))
|
| - return self._token
|
| -
|
| - def fetch(self, item, expected_size):
|
| - assert isinstance(item, basestring)
|
| - assert (
|
| - isinstance(expected_size, (int, long)) or
|
| - expected_size == UNKNOWN_FILE_SIZE)
|
| - zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
|
| - logging.debug('download_file(%s)', zipped_url)
|
| + if self._server_caps is None:
|
| + request_body = json.dumps(
|
| + self._generate_handshake_request(), separators=(',', ':'))
|
| + response = net.url_read(
|
| + url=self.base_url + '/content-gs/handshake',
|
| + data=request_body,
|
| + content_type='application/json',
|
| + method='POST')
|
| + if response is None:
|
| + raise MappingError('Failed to perform handshake.')
|
| + try:
|
| + caps = json.loads(response)
|
| + if not isinstance(caps, dict):
|
| + raise ValueError('Expecting JSON dict')
|
| + self._server_caps = self._validate_handshake_response(caps)
|
| + except (ValueError, KeyError, TypeError) as exc:
|
| + # KeyError exception has very confusing str conversion: it's just a
|
| + # missing key value and nothing else. So print exception class name
|
| + # as well.
|
| + raise MappingError('Invalid handshake response (%s): %s' % (
|
| + exc.__class__.__name__, exc))
|
| + return self._server_caps
|
| +
|
| + def fetch(self, digest, size):
|
| + assert isinstance(digest, basestring)
|
| + assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
|
| +
|
| + source_url = '%s/content-gs/retrieve/%s/%s' % (
|
| + self.base_url, self.namespace, digest)
|
| + logging.debug('download_file(%s)', source_url)
|
|
|
| # Because the app engine DB is only eventually consistent, retry 404 errors
|
| # because the file might just not be visible yet (even though it has been
|
| # uploaded).
|
| connection = net.url_open(
|
| - zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
|
| + source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
|
| if not connection:
|
| - raise IOError('Unable to open connection to %s' % zipped_url)
|
| + raise IOError('Unable to open connection to %s' % source_url)
|
|
|
| - # TODO(maruel): Must only decompress when needed.
|
| - decompressor = zlib.decompressobj()
|
| try:
|
| - compressed_size = 0
|
| - decompressed_size = 0
|
| - while True:
|
| - chunk = connection.read(ZIPPED_FILE_CHUNK)
|
| - if not chunk:
|
| - break
|
| - compressed_size += len(chunk)
|
| - decompressed = decompressor.decompress(chunk)
|
| - decompressed_size += len(decompressed)
|
| - yield decompressed
|
| -
|
| - # Ensure that all the data was properly decompressed.
|
| - uncompressed_data = decompressor.flush()
|
| - if uncompressed_data:
|
| - raise IOError('Decompression failed')
|
| - if (expected_size != UNKNOWN_FILE_SIZE and
|
| - decompressed_size != expected_size):
|
| - raise IOError('File incorrect size after download of %s. Got %s and '
|
| - 'expected %s' % (item, decompressed_size, expected_size))
|
| - except zlib.error as e:
|
| - msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
|
| - item, compressed_size, connection.content_length, e)
|
| - logging.warning(msg)
|
| -
|
| - # Testing seems to show that if a few machines are trying to download
|
| - # the same blob, they can cause each other to fail. So if we hit a zip
|
| - # error, this is the most likely cause (it only downloads some of the
|
| - # data). Randomly sleep for between 5 and 25 seconds to try and spread
|
| - # out the downloads.
|
| - sleep_duration = (random.random() * 20) + 5
|
| - time.sleep(sleep_duration)
|
| - raise IOError(msg)
|
| -
|
| - def push(self, item, expected_size, content_generator, push_urls=None):
|
| - assert isinstance(item, basestring)
|
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
|
| - item = str(item)
|
| -
|
| - # TODO(maruel): Support large files. This would require streaming support.
|
| -
|
| - # A cheese way to avoid memcpy of (possibly huge) file, until streaming
|
| - # upload support is implemented.
|
| - if isinstance(content_generator, list) and len(content_generator) == 1:
|
| - content = content_generator[0]
|
| + # Prepare reading pipeline.
|
| + generator = stream_read(connection, NET_IO_FILE_CHUNK)
|
| + if self._use_zip:
|
| + generator = zip_decompress(generator, DISK_FILE_CHUNK)
|
| +
|
| + # Read and yield data, calculate total length of the decompressed stream.
|
| + total_size = 0
|
| + for chunk in generator:
|
| + total_size += len(chunk)
|
| + yield chunk
|
| +
|
| + # Verify data length matches expectation.
|
| + if size != UNKNOWN_FILE_SIZE and total_size != size:
|
| + raise IOError('Incorrect file size: expected %d, got %d' % (
|
| + size, total_size))
|
| +
|
| + except IOError as err:
|
| + logging.warning('Failed to fetch %s: %s', digest, err)
|
| + raise
|
| +
|
| + def push(self, item, content, size):
|
| + assert isinstance(item, Item)
|
| + assert isinstance(item.push_state, IsolateServer._PushState)
|
| + assert not item.push_state.finalized
|
| +
|
| + # TODO(vadimsh): Do not read from |content| generator when retrying push.
|
| + # If |content| is indeed a generator, it can not be re-winded back
|
| + # to the beginning of the stream. A retry will find it exhausted. A possible
|
| + # solution is to wrap |content| generator with some sort of caching
|
| + # restartable generator. It should be done alongside streaming support
|
| + # implementation.
|
| +
|
| + # This push operation may be a retry after failed finalization call below,
|
| + # no need to reupload contents in that case.
|
| + if not item.push_state.uploaded:
|
| + # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
|
| + # upload support is implemented.
|
| + if isinstance(content, list) and len(content) == 1:
|
| + content = content[0]
|
| + else:
|
| + content = ''.join(content)
|
| + # PUT file to |upload_url|.
|
| + response = net.url_read(
|
| + url=item.push_state.upload_url,
|
| + data=content,
|
| + content_type='application/octet-stream',
|
| + method='PUT')
|
| + if response is None:
|
| + raise IOError('Failed to upload a file %s to %s' % (
|
| + item.digest, item.push_state.upload_url))
|
| + item.push_state.uploaded = True
|
| else:
|
| - content = ''.join(content_generator)
|
| -
|
| - if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
|
| - return self._upload_hash_content_to_blobstore(item, content)
|
| -
|
| - url = '%sstore/%s/%s?token=%s' % (
|
| - self.content_url, self.namespace, item, self.token)
|
| - return url_read(url, data=content, content_type='application/octet-stream')
|
| -
|
| - def contains(self, files):
|
| - logging.info('Checking existence of %d files...', len(files))
|
| -
|
| - body = ''.join(
|
| - (binascii.unhexlify(metadata['h']) for (_, metadata) in files))
|
| - assert (len(body) % self.algo().digest_size) == 0, repr(body)
|
| + logging.info(
|
| + 'A file %s already uploaded, retrying finalization only', item.digest)
|
| +
|
| + # Optionally notify the server that it's done.
|
| + if item.push_state.finalize_url:
|
| + # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
|
| + # send it to isolated server. That way isolate server can verify that
|
| + # the data safely reached Google Storage (GS provides MD5 and CRC32C of
|
| + # stored files).
|
| + response = net.url_read(
|
| + url=item.push_state.finalize_url,
|
| + data='',
|
| + content_type='application/json',
|
| + method='POST')
|
| + if response is None:
|
| + raise IOError('Failed to finalize an upload of %s' % item.digest)
|
| + item.push_state.finalized = True
|
| +
|
| + def contains(self, items):
|
| + logging.info('Checking existence of %d files...', len(items))
|
| +
|
| + # Request body is a json encoded list of dicts.
|
| + body = [
|
| + {
|
| + 'h': item.digest,
|
| + 's': item.size,
|
| + 'i': int(item.is_isolated),
|
| + } for item in items
|
| + ]
|
|
|
| - query_url = '%scontains/%s?token=%s' % (
|
| - self.content_url, self.namespace, self.token)
|
| - response = url_read(
|
| - query_url, data=body, content_type='application/octet-stream')
|
| - if len(files) != len(response):
|
| + query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
|
| + self.base_url,
|
| + self.namespace,
|
| + urllib.quote(self._server_capabilities['access_token']))
|
| + response_body = net.url_read(
|
| + url=query_url,
|
| + data=json.dumps(body, separators=(',', ':')),
|
| + content_type='application/json',
|
| + method='POST')
|
| + if response_body is None:
|
| + raise MappingError('Failed to execute /pre-upload query')
|
| +
|
| + # Response body is a list of push_urls (or null if file is already present).
|
| + try:
|
| + response = json.loads(response_body)
|
| + if not isinstance(response, list):
|
| + raise ValueError('Expecting response with json-encoded list')
|
| + if len(response) != len(items):
|
| + raise ValueError(
|
| + 'Incorrect number of items in the list, expected %d, '
|
| + 'but got %d' % (len(items), len(response)))
|
| + except ValueError as err:
|
| raise MappingError(
|
| - 'Got an incorrect number of responses from the server. Expected %d, '
|
| - 'but got %d' % (len(files), len(response)))
|
| -
|
| - # This implementation of IsolateServer doesn't use push_urls field,
|
| - # set it to None.
|
| - missing_files = [
|
| - files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00'
|
| - ]
|
| + 'Invalid response from server: %s, body is %s' % (err, response_body))
|
| +
|
| + # Pick Items that are missing, attach _PushState to them.
|
| + missing_items = []
|
| + for i, push_urls in enumerate(response):
|
| + if push_urls:
|
| + assert len(push_urls) == 2, str(push_urls)
|
| + item = items[i]
|
| + assert item.push_state is None
|
| + item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
|
| + missing_items.append(item)
|
| logging.info('Queried %d files, %d cache hit',
|
| - len(files), len(files) - len(missing_files))
|
| - return missing_files
|
| -
|
| - def _upload_hash_content_to_blobstore(self, item, content):
|
| - """Uploads the content directly to the blobstore via a generated url."""
|
| - # TODO(maruel): Support large files. This would require streaming support.
|
| - gen_url = '%sgenerate_blobstore_url/%s/%s' % (
|
| - self.content_url, self.namespace, item)
|
| - # Token is guaranteed to be already quoted but it is unnecessary here, and
|
| - # only here.
|
| - data = [('token', urllib.unquote(self.token))]
|
| - content_type, body = encode_multipart_formdata(
|
| - data, [('content', item, content)])
|
| - last_url = gen_url
|
| - for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
|
| - # Retry HTTP 50x here but not 404.
|
| - upload_url = net.url_read(gen_url, data=data)
|
| - if not upload_url:
|
| - raise MappingError('Unable to connect to server %s' % gen_url)
|
| - last_url = upload_url
|
| -
|
| - # Do not retry this request on HTTP 50x. Regenerate an upload url each
|
| - # time since uploading "consumes" the upload url.
|
| - result = net.url_read(
|
| - upload_url, data=body, content_type=content_type, retry_50x=False)
|
| - if result is not None:
|
| - return result
|
| - raise MappingError('Unable to connect to server %s' % last_url)
|
| + len(items), len(items) - len(missing_items))
|
| + return missing_items
|
|
|
|
|
| class FileSystem(StorageApi):
|
| @@ -690,34 +789,32 @@ class FileSystem(StorageApi):
|
| The common use case is a NFS/CIFS file server that is mounted locally that is
|
| used to fetch the file on a local partition.
|
| """
|
| +
|
| def __init__(self, base_path):
|
| super(FileSystem, self).__init__()
|
| self.base_path = base_path
|
|
|
| - def fetch(self, item, expected_size):
|
| - assert isinstance(item, basestring)
|
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
|
| - source = os.path.join(self.base_path, item)
|
| - if (expected_size != UNKNOWN_FILE_SIZE and
|
| - not is_valid_file(source, expected_size)):
|
| - raise IOError('Invalid file %s' % item)
|
| + def fetch(self, digest, size):
|
| + assert isinstance(digest, basestring)
|
| + assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
|
| + source = os.path.join(self.base_path, digest)
|
| + if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
|
| + raise IOError('Invalid file %s' % digest)
|
| return file_read(source)
|
|
|
| - def push(self, item, expected_size, content_generator, push_urls=None):
|
| - assert isinstance(item, basestring)
|
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
|
| - dest = os.path.join(self.base_path, item)
|
| - total = file_write(dest, content_generator)
|
| - if expected_size != UNKNOWN_FILE_SIZE and total != expected_size:
|
| + def push(self, item, content, size):
|
| + assert isinstance(item, Item)
|
| + assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
|
| + dest = os.path.join(self.base_path, item.digest)
|
| + total = file_write(dest, content)
|
| + if size != UNKNOWN_FILE_SIZE and total != size:
|
| os.remove(dest)
|
| - raise IOError(
|
| - 'Invalid file %s, %d != %d' % (item, total, expected_size))
|
| + raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
|
|
|
| - def contains(self, files):
|
| + def contains(self, items):
|
| return [
|
| - (filename, metadata, None)
|
| - for filename, metadata in files
|
| - if not os.path.exists(os.path.join(self.base_path, metadata['h']))
|
| + item for item in items
|
| + if not os.path.exists(os.path.join(self.base_path, item.digest))
|
| ]
|
|
|
|
|
| @@ -767,7 +864,7 @@ def upload_tree(base_url, indir, infiles, namespace):
|
| query if an element was already uploaded, and |base_url|/store/
|
| can be used to upload a new element.
|
| indir: Root directory the infiles are based in.
|
| - infiles: dict of files to upload files from |indir| to |base_url|.
|
| + infiles: dict of files to upload from |indir| to |base_url|.
|
| namespace: The namespace to use on the server.
|
| """
|
| remote = get_storage_api(base_url, namespace)
|
|
|