Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(458)

Unified Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: isolateserver.py
diff --git a/isolateserver.py b/isolateserver.py
index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..344872353237ae665e8c220b4a27a61e811d3dc9 100755
--- a/isolateserver.py
+++ b/isolateserver.py
@@ -7,12 +7,10 @@
__version__ = '0.2'
-import binascii
import hashlib
import json
import logging
import os
-import random
import re
import sys
import threading
@@ -29,14 +27,15 @@ from utils import threading_utils
from utils import tools
-# The minimum size of files to upload directly to the blobstore.
-MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
+# Version of isolate protocol passed to the server in /handshake request.
+ISOLATE_PROTOCOL_VERSION = '1.0'
-# The number of files to check the isolate server per /contains query.
+
+# The number of files to check the isolate server per /pre-upload query.
# All files are sorted by likelihood of a change in the file content
# (currently file size is used to estimate this: larger the file -> larger the
# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
-# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
+# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
# and so on. Numbers here is a trade-off; the more per request, the lower the
# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
# larger values cause longer lookups, increasing the initial latency to start
@@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None
# The size of each chunk to read when downloading and unzipping files.
ZIPPED_FILE_CHUNK = 16 * 1024
-
# Chunk size to use when doing disk I/O.
DISK_FILE_CHUNK = 1024 * 1024
+# Chunk size to use when reading from network stream.
+NET_IO_FILE_CHUNK = 16 * 1024
+
# Read timeout in seconds for downloads from isolate storage. If there's no
# response from the server within this timeout whole download will be aborted.
@@ -106,63 +107,6 @@ class MappingError(OSError):
pass
-def randomness():
- """Generates low-entropy randomness for MIME encoding.
-
- Exists so it can be mocked out in unit tests.
- """
- return str(time.time())
-
-
-def encode_multipart_formdata(fields, files,
- mime_mapper=lambda _: 'application/octet-stream'):
- """Encodes a Multipart form data object.
-
- Args:
- fields: a sequence (name, value) elements for
- regular form fields.
- files: a sequence of (name, filename, value) elements for data to be
- uploaded as files.
- mime_mapper: function to return the mime type from the filename.
- Returns:
- content_type: for httplib.HTTP instance
- body: for httplib.HTTP instance
- """
- boundary = hashlib.md5(randomness()).hexdigest()
- body_list = []
- for (key, value) in fields:
- if isinstance(key, unicode):
- value = key.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- body_list.append('--' + boundary)
- body_list.append('Content-Disposition: form-data; name="%s"' % key)
- body_list.append('')
- body_list.append(value)
- body_list.append('--' + boundary)
- body_list.append('')
- for (key, filename, value) in files:
- if isinstance(key, unicode):
- value = key.encode('utf-8')
- if isinstance(filename, unicode):
- value = filename.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- body_list.append('--' + boundary)
- body_list.append('Content-Disposition: form-data; name="%s"; '
- 'filename="%s"' % (key, filename))
- body_list.append('Content-Type: %s' % mime_mapper(filename))
- body_list.append('')
- body_list.append(value)
- body_list.append('--' + boundary)
- body_list.append('')
- if body_list:
- body_list[-2] += '--'
- body = '\r\n'.join(body_list)
- content_type = 'multipart/form-data; boundary=%s' % boundary
- return content_type, body
-
-
def is_valid_hash(value, algo):
"""Returns if the value is a valid hash for the corresponding algorithm."""
size = 2 * algo().digest_size
@@ -184,6 +128,15 @@ def hash_file(filepath, algo):
return digest.hexdigest()
+def stream_read(stream, chunk_size):
+ """Reads chunks from |stream| and yields them."""
+ while True:
+ data = stream.read(chunk_size)
+ if not data:
+ break
+ yield data
+
+
def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
"""Yields file content in chunks of given |chunk_size|."""
with open(filepath, 'rb') as f:
@@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7):
yield tail
+def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
+ """Reads zipped data from |content_generator| and yields decompressed data.
+
+ Decompresses data in small chunks (no larger than |chunk_size|) so that
+ zip bomb file doesn't cause zlib to preallocate huge amount of memory.
+
+ Raises IOError if data is corrupted or incomplete.
+ """
+ decompressor = zlib.decompressobj()
+ compressed_size = 0
+ try:
+ for chunk in content_generator:
+ compressed_size += len(chunk)
+ data = decompressor.decompress(chunk, chunk_size)
+ if data:
+ yield data
+ while decompressor.unconsumed_tail:
+ data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
+ if data:
+ yield data
+ tail = decompressor.flush()
+ if tail:
+ yield tail
+ except zlib.error as e:
+ raise IOError(
+ 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
+ # Ensure all data was read and decompressed.
+ if decompressor.unused_data or decompressor.unconsumed_tail:
+ raise IOError('Not all data was decompressed')
+
+
def get_zip_compression_level(filename):
"""Given a filename calculates the ideal zip compression level to use."""
file_ext = os.path.splitext(filename)[1].lower()
@@ -301,13 +285,37 @@ def try_remove(filepath):
pass
-def url_read(url, **kwargs):
- result = net.url_read(url, **kwargs)
- if result is None:
- # If we get no response from the server, assume it is down and raise an
- # exception.
- raise MappingError('Unable to connect to server %s' % url)
- return result
+class Item(object):
+ """An item to push to Storage.
+
+ It starts its life in a main thread, travels to 'contains' thread, then to
+ 'push' thread and then finally back to the main thread.
+
+ It is never used concurrently from multiple threads.
+ """
+
+ def __init__(self, digest, size, is_isolated=False):
+ self.digest = digest
+ self.size = size
+ self.is_isolated = is_isolated
+ self.compression_level = 6
+ self.push_state = None
+
+ def content(self, chunk_size):
+ """Iterable with content of this item in chunks of given size."""
+ raise NotImplementedError()
+
+
+class FileItem(Item):
+ """A file to push to Storage."""
+
+ def __init__(self, path, digest, size, is_isolated):
+ super(FileItem, self).__init__(digest, size, is_isolated)
+ self.path = path
+ self.compression_level = get_zip_compression_level(path)
+
+ def content(self, chunk_size):
+ return file_read(self.path, chunk_size)
class Storage(object):
@@ -363,21 +371,30 @@ class Storage(object):
"""
logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
+ # TODO(vadimsh): Introduce Item as a part of the public interface?
+
+ # Convert |indir| + |infiles| into a list of FileItem objects.
+ # Filter out symlinks, since they are not represented by items on isolate
+ # server side.
+ items = [
+ FileItem(
+ path=os.path.join(indir, filepath),
+ digest=metadata['h'],
+ size=metadata['s'],
+ is_isolated=metadata.get('priority') == '0')
+ for filepath, metadata in infiles.iteritems()
+ if 'l' not in metadata
+ ]
+
# Enqueue all upload tasks.
+ missing = set()
channel = threading_utils.TaskChannel()
- missing = []
- for filename, metadata, push_urls in self.get_missing_files(infiles):
- missing.append((filename, metadata))
- path = os.path.join(indir, filename)
- if metadata.get('priority', '1') == '0':
- priority = WorkerPool.HIGH
- else:
- priority = WorkerPool.MED
- compression_level = get_zip_compression_level(path)
- chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
- content = file_read(path, chunk_size)
- self.async_push(channel, priority, metadata['h'], metadata['s'],
- content, compression_level, push_urls)
+ for missing_item in self.get_missing_items(items):
+ missing.add(missing_item)
+ self.async_push(
+ channel,
+ WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
+ missing_item)
# No need to spawn deadlock detector thread if there's nothing to upload.
if missing:
@@ -388,18 +405,19 @@ class Storage(object):
detector.ping()
item = channel.pull()
uploaded += 1
- logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item)
+ logging.debug(
+ 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
logging.info('All files are uploaded')
# Print stats.
- total = len(infiles)
- total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
+ total = len(items)
+ total_size = sum(f.size for f in items)
logging.info(
'Total: %6d, %9.1fkb',
total,
- sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
- cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing)
- cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
+ total_size / 1024.)
+ cache_hit = set(items) - missing
+ cache_hit_size = sum(f.size for f in cache_hit)
logging.info(
'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
len(cache_hit),
@@ -407,7 +425,7 @@ class Storage(object):
len(cache_hit) * 100. / total,
cache_hit_size * 100. / total_size if total_size else 0)
cache_miss = missing
- cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
+ cache_miss_size = sum(f.size for f in cache_miss)
logging.info(
'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
len(cache_miss),
@@ -415,18 +433,23 @@ class Storage(object):
len(cache_miss) * 100. / total,
cache_miss_size * 100. / total_size if total_size else 0)
- def async_push(self, channel, priority, item, expected_size,
- content_generator, compression_level, push_urls=None):
- """Starts asynchronous push to the server in a parallel thread."""
+ def async_push(self, channel, priority, item):
+ """Starts asynchronous push to the server in a parallel thread.
+
+ Arguments:
+ channel: TaskChannel object that receives back |item| when upload ends.
+ priority: thread pool task priority for the push.
+ item: item to upload as instance of Item class.
+ """
def push(content, size):
"""Pushes an item and returns its id, to pass as a result to |channel|."""
- self._storage_api.push(item, size, content, push_urls)
+ self._storage_api.push(item, content, size)
return item
# If zipping is not required, just start a push task.
if not self.use_zip:
self.net_thread_pool.add_task_with_channel(channel, priority, push,
- content_generator, expected_size)
+ item.content(DISK_FILE_CHUNK), item.size)
return
# If zipping is enabled, zip in a separate thread.
@@ -434,7 +457,8 @@ class Storage(object):
# TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
# content right here. It will block until all file is zipped.
try:
- stream = zip_compress(content_generator, compression_level)
+ stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
+ item.compression_level)
data = ''.join(stream)
except Exception as exc:
logging.error('Failed to zip \'%s\': %s', item, exc)
@@ -442,23 +466,23 @@ class Storage(object):
return
self.net_thread_pool.add_task_with_channel(channel, priority, push,
[data], UNKNOWN_FILE_SIZE)
- self.cpu_thread_pool.add_task(0, zip_and_push)
+ self.cpu_thread_pool.add_task(priority, zip_and_push)
- def get_missing_files(self, files):
- """Yields files that are missing from the server.
+ def get_missing_items(self, items):
+ """Yields items that are missing from the server.
Issues multiple parallel queries via StorageApi's 'contains' method.
Arguments:
- files: a dictionary file name -> metadata dict.
+ items: a list of Item objects to check.
Yields:
- Triplets (file name, metadata dict, push_urls object to pass to push).
+ Item objects that are missing from the server.
"""
channel = threading_utils.TaskChannel()
pending = 0
# Enqueue all requests.
- for batch in self.batch_files_for_check(files):
+ for batch in self.batch_items_for_check(items):
self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
self._storage_api.contains, batch)
pending += 1
@@ -468,25 +492,24 @@ class Storage(object):
yield missing
@staticmethod
- def batch_files_for_check(files):
- """Splits list of files to check for existence on the server into batches.
+ def batch_items_for_check(items):
+ """Splits list of items to check for existence on the server into batches.
Each batch corresponds to a single 'exists?' query to the server via a call
to StorageApi's 'contains' method.
Arguments:
- files: a dictionary file name -> metadata dict.
+ items: a list of Item objects.
Yields:
- Batches of files to query for existence in a single operation,
- each batch is a list of pairs: (file name, metadata dict).
+ Batches of items to query for existence in a single operation,
+ each batch is a list of Item objects.
"""
batch_count = 0
batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
next_queries = []
- items = ((k, v) for k, v in files.iteritems() if 's' in v)
- for filename, metadata in sorted(items, key=lambda x: -x[1]['s']):
- next_queries.append((filename, metadata))
+ for item in sorted(items, key=lambda x: x.size, reverse=True):
+ next_queries.append(item)
if len(next_queries) == batch_size_limit:
yield next_queries
next_queries = []
@@ -500,188 +523,264 @@ class Storage(object):
class StorageApi(object):
"""Interface for classes that implement low-level storage operations."""
- def fetch(self, item, expected_size):
+ def fetch(self, digest, size):
"""Fetches an object and yields its content.
Arguments:
- item: hash digest of item to download.
- expected_size: expected size of the item, to validate it.
+ digest: hash digest of item to download.
+ size: expected size of the item, to validate it.
Yields:
Chunks of downloaded item (as str objects).
"""
raise NotImplementedError()
- def push(self, item, expected_size, content_generator, push_urls=None):
- """Uploads content generated by |content_generator| as |item|.
+ def push(self, item, content, size):
+ """Uploads an |item| with content generated by |content| generator.
Arguments:
- item: hash digest of item to upload.
- expected_size: total length of the content yielded by |content_generator|.
- content_generator: generator that produces chunks to push.
- push_urls: optional URLs returned by 'contains' call for this item.
+ item: Item object that holds information about an item being pushed.
+ content: a generator that yields chunks to push.
+ size: expected size of stream produced by |content|.
Returns:
None.
"""
raise NotImplementedError()
- def contains(self, files):
- """Checks for existence of given |files| on the server.
+ def contains(self, items):
+ """Checks for existence of given |items| on the server.
+
+ Mutates |items| by assigning opaque implement specific object to Item's
+ push_state attribute on missing entries in the datastore.
Arguments:
- files: list of pairs (file name, metadata dict).
+ items: list of Item objects.
Returns:
- A list of files missing on server as a list of triplets
- (file name, metadata dict, push_urls object to pass to push).
+ A list of items missing on server as a list of Item objects.
"""
raise NotImplementedError()
class IsolateServer(StorageApi):
- """StorageApi implementation that downloads and uploads to Isolate Server."""
+ """StorageApi implementation that downloads and uploads to Isolate Server.
+
+ It uploads and downloads directly from Google Storage whenever appropriate.
+ """
+
+ class _PushState(object):
+ """State needed to call .push(), to be stored in Item.push_state."""
+ def __init__(self, upload_url, finalize_url):
+ self.upload_url = upload_url
+ self.finalize_url = finalize_url
+ self.uploaded = False
+ self.finalized = False
+
def __init__(self, base_url, namespace):
super(IsolateServer, self).__init__()
assert base_url.startswith('http'), base_url
- self.content_url = base_url.rstrip('/') + '/content/'
+ self.base_url = base_url.rstrip('/')
self.namespace = namespace
self.algo = get_hash_algo(namespace)
- self._token = None
+ self._use_zip = is_namespace_with_compression(namespace)
self._lock = threading.Lock()
+ self._server_caps = None
+
+ @staticmethod
+ def _generate_handshake_request():
+ """Returns a dict to be sent as handshake request body."""
+ # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
+ return {
+ 'client_app_version': __version__,
+ 'fetcher': True,
+ 'protocol_version': ISOLATE_PROTOCOL_VERSION,
+ 'pusher': True,
+ }
+
+ @staticmethod
+ def _validate_handshake_response(caps):
+ """Validates and normalizes handshake response."""
+ logging.info('Protocol version: %s', caps['protocol_version'])
+ logging.info('Server version: %s', caps['server_app_version'])
+ if caps.get('error'):
+ raise MappingError(caps['error'])
+ if not caps['access_token']:
+ raise ValueError('access_token is missing')
+ return caps
@property
- def token(self):
+ def _server_capabilities(self):
+ """Performs handshake with the server if not yet done.
+
+ Returns:
+ Server capabilities dictionary as returned by /handshake endpoint.
+
+ Raises:
+ MappingError if server rejects the handshake.
+ """
# TODO(maruel): Make this request much earlier asynchronously while the
# files are being enumerated.
with self._lock:
- if not self._token:
- self._token = urllib.quote(url_read(self.content_url + 'get_token'))
- return self._token
-
- def fetch(self, item, expected_size):
- assert isinstance(item, basestring)
- assert (
- isinstance(expected_size, (int, long)) or
- expected_size == UNKNOWN_FILE_SIZE)
- zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
- logging.debug('download_file(%s)', zipped_url)
+ if self._server_caps is None:
+ request_body = json.dumps(
+ self._generate_handshake_request(), separators=(',', ':'))
+ response = net.url_read(
+ url=self.base_url + '/content-gs/handshake',
+ data=request_body,
+ content_type='application/json',
+ method='POST')
+ if response is None:
+ raise MappingError('Failed to perform handshake.')
+ try:
+ caps = json.loads(response)
+ if not isinstance(caps, dict):
+ raise ValueError('Expecting JSON dict')
+ self._server_caps = self._validate_handshake_response(caps)
+ except (ValueError, KeyError, TypeError) as exc:
+ # KeyError exception has very confusing str conversion: it's just a
+ # missing key value and nothing else. So print exception class name
+ # as well.
+ raise MappingError('Invalid handshake response (%s): %s' % (
+ exc.__class__.__name__, exc))
+ return self._server_caps
+
+ def fetch(self, digest, size):
+ assert isinstance(digest, basestring)
+ assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
+
+ source_url = '%s/content-gs/retrieve/%s/%s' % (
+ self.base_url, self.namespace, digest)
+ logging.debug('download_file(%s)', source_url)
# Because the app engine DB is only eventually consistent, retry 404 errors
# because the file might just not be visible yet (even though it has been
# uploaded).
connection = net.url_open(
- zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
+ source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
if not connection:
- raise IOError('Unable to open connection to %s' % zipped_url)
+ raise IOError('Unable to open connection to %s' % source_url)
- # TODO(maruel): Must only decompress when needed.
- decompressor = zlib.decompressobj()
try:
- compressed_size = 0
- decompressed_size = 0
- while True:
- chunk = connection.read(ZIPPED_FILE_CHUNK)
- if not chunk:
- break
- compressed_size += len(chunk)
- decompressed = decompressor.decompress(chunk)
- decompressed_size += len(decompressed)
- yield decompressed
-
- # Ensure that all the data was properly decompressed.
- uncompressed_data = decompressor.flush()
- if uncompressed_data:
- raise IOError('Decompression failed')
- if (expected_size != UNKNOWN_FILE_SIZE and
- decompressed_size != expected_size):
- raise IOError('File incorrect size after download of %s. Got %s and '
- 'expected %s' % (item, decompressed_size, expected_size))
- except zlib.error as e:
- msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
- item, compressed_size, connection.content_length, e)
- logging.warning(msg)
-
- # Testing seems to show that if a few machines are trying to download
- # the same blob, they can cause each other to fail. So if we hit a zip
- # error, this is the most likely cause (it only downloads some of the
- # data). Randomly sleep for between 5 and 25 seconds to try and spread
- # out the downloads.
- sleep_duration = (random.random() * 20) + 5
- time.sleep(sleep_duration)
- raise IOError(msg)
-
- def push(self, item, expected_size, content_generator, push_urls=None):
- assert isinstance(item, basestring)
- assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
- item = str(item)
-
- # TODO(maruel): Support large files. This would require streaming support.
-
- # A cheese way to avoid memcpy of (possibly huge) file, until streaming
- # upload support is implemented.
- if isinstance(content_generator, list) and len(content_generator) == 1:
- content = content_generator[0]
+ # Prepare reading pipeline.
+ generator = stream_read(connection, NET_IO_FILE_CHUNK)
+ if self._use_zip:
+ generator = zip_decompress(generator, DISK_FILE_CHUNK)
+
+ # Read and yield data, calculate total length of the decompressed stream.
+ total_size = 0
+ for chunk in generator:
+ total_size += len(chunk)
+ yield chunk
+
+ # Verify data length matches expectation.
+ if size != UNKNOWN_FILE_SIZE and total_size != size:
+ raise IOError('Incorrect file size: expected %d, got %d' % (
+ size, total_size))
+
+ except IOError as err:
+ logging.warning('Failed to fetch %s: %s', digest, err)
+ raise
+
+ def push(self, item, content, size):
+ assert isinstance(item, Item)
+ assert isinstance(item.push_state, IsolateServer._PushState)
+ assert not item.push_state.finalized
+
+ # TODO(vadimsh): Do not read from |content| generator when retrying push.
+ # If |content| is indeed a generator, it can not be re-winded back
+ # to the beginning of the stream. A retry will find it exhausted. A possible
+ # solution is to wrap |content| generator with some sort of caching
+ # restartable generator. It should be done alongside streaming support
+ # implementation.
+
+ # This push operation may be a retry after failed finalization call below,
+ # no need to reupload contents in that case.
+ if not item.push_state.uploaded:
+ # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
+ # upload support is implemented.
+ if isinstance(content, list) and len(content) == 1:
+ content = content[0]
+ else:
+ content = ''.join(content)
+ # PUT file to |upload_url|.
+ response = net.url_read(
+ url=item.push_state.upload_url,
+ data=content,
+ content_type='application/octet-stream',
+ method='PUT')
+ if response is None:
+ raise IOError('Failed to upload a file %s to %s' % (
+ item.digest, item.push_state.upload_url))
+ item.push_state.uploaded = True
else:
- content = ''.join(content_generator)
-
- if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
- return self._upload_hash_content_to_blobstore(item, content)
-
- url = '%sstore/%s/%s?token=%s' % (
- self.content_url, self.namespace, item, self.token)
- return url_read(url, data=content, content_type='application/octet-stream')
-
- def contains(self, files):
- logging.info('Checking existence of %d files...', len(files))
-
- body = ''.join(
- (binascii.unhexlify(metadata['h']) for (_, metadata) in files))
- assert (len(body) % self.algo().digest_size) == 0, repr(body)
+ logging.info(
+ 'A file %s already uploaded, retrying finalization only', item.digest)
+
+ # Optionally notify the server that it's done.
+ if item.push_state.finalize_url:
+ # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
+ # send it to isolated server. That way isolate server can verify that
+ # the data safely reached Google Storage (GS provides MD5 and CRC32C of
+ # stored files).
+ response = net.url_read(
+ url=item.push_state.finalize_url,
+ data='',
+ content_type='application/json',
+ method='POST')
+ if response is None:
+ raise IOError('Failed to finalize an upload of %s' % item.digest)
+ item.push_state.finalized = True
+
+ def contains(self, items):
+ logging.info('Checking existence of %d files...', len(items))
+
+ # Request body is a json encoded list of dicts.
+ body = [
+ {
+ 'h': item.digest,
+ 's': item.size,
+ 'i': int(item.is_isolated),
+ } for item in items
+ ]
- query_url = '%scontains/%s?token=%s' % (
- self.content_url, self.namespace, self.token)
- response = url_read(
- query_url, data=body, content_type='application/octet-stream')
- if len(files) != len(response):
+ query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
+ self.base_url,
+ self.namespace,
+ urllib.quote(self._server_capabilities['access_token']))
+ response_body = net.url_read(
+ url=query_url,
+ data=json.dumps(body, separators=(',', ':')),
+ content_type='application/json',
+ method='POST')
+ if response_body is None:
+ raise MappingError('Failed to execute /pre-upload query')
+
+ # Response body is a list of push_urls (or null if file is already present).
+ try:
+ response = json.loads(response_body)
+ if not isinstance(response, list):
+ raise ValueError('Expecting response with json-encoded list')
+ if len(response) != len(items):
+ raise ValueError(
+ 'Incorrect number of items in the list, expected %d, '
+ 'but got %d' % (len(items), len(response)))
+ except ValueError as err:
raise MappingError(
- 'Got an incorrect number of responses from the server. Expected %d, '
- 'but got %d' % (len(files), len(response)))
-
- # This implementation of IsolateServer doesn't use push_urls field,
- # set it to None.
- missing_files = [
- files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00'
- ]
+ 'Invalid response from server: %s, body is %s' % (err, response_body))
+
+ # Pick Items that are missing, attach _PushState to them.
+ missing_items = []
+ for i, push_urls in enumerate(response):
+ if push_urls:
+ assert len(push_urls) == 2, str(push_urls)
+ item = items[i]
+ assert item.push_state is None
+ item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
+ missing_items.append(item)
logging.info('Queried %d files, %d cache hit',
- len(files), len(files) - len(missing_files))
- return missing_files
-
- def _upload_hash_content_to_blobstore(self, item, content):
- """Uploads the content directly to the blobstore via a generated url."""
- # TODO(maruel): Support large files. This would require streaming support.
- gen_url = '%sgenerate_blobstore_url/%s/%s' % (
- self.content_url, self.namespace, item)
- # Token is guaranteed to be already quoted but it is unnecessary here, and
- # only here.
- data = [('token', urllib.unquote(self.token))]
- content_type, body = encode_multipart_formdata(
- data, [('content', item, content)])
- last_url = gen_url
- for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
- # Retry HTTP 50x here but not 404.
- upload_url = net.url_read(gen_url, data=data)
- if not upload_url:
- raise MappingError('Unable to connect to server %s' % gen_url)
- last_url = upload_url
-
- # Do not retry this request on HTTP 50x. Regenerate an upload url each
- # time since uploading "consumes" the upload url.
- result = net.url_read(
- upload_url, data=body, content_type=content_type, retry_50x=False)
- if result is not None:
- return result
- raise MappingError('Unable to connect to server %s' % last_url)
+ len(items), len(items) - len(missing_items))
+ return missing_items
class FileSystem(StorageApi):
@@ -690,34 +789,32 @@ class FileSystem(StorageApi):
The common use case is a NFS/CIFS file server that is mounted locally that is
used to fetch the file on a local partition.
"""
+
def __init__(self, base_path):
super(FileSystem, self).__init__()
self.base_path = base_path
- def fetch(self, item, expected_size):
- assert isinstance(item, basestring)
- assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
- source = os.path.join(self.base_path, item)
- if (expected_size != UNKNOWN_FILE_SIZE and
- not is_valid_file(source, expected_size)):
- raise IOError('Invalid file %s' % item)
+ def fetch(self, digest, size):
+ assert isinstance(digest, basestring)
+ assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
+ source = os.path.join(self.base_path, digest)
+ if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
+ raise IOError('Invalid file %s' % digest)
return file_read(source)
- def push(self, item, expected_size, content_generator, push_urls=None):
- assert isinstance(item, basestring)
- assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
- dest = os.path.join(self.base_path, item)
- total = file_write(dest, content_generator)
- if expected_size != UNKNOWN_FILE_SIZE and total != expected_size:
+ def push(self, item, content, size):
+ assert isinstance(item, Item)
+ assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
+ dest = os.path.join(self.base_path, item.digest)
+ total = file_write(dest, content)
+ if size != UNKNOWN_FILE_SIZE and total != size:
os.remove(dest)
- raise IOError(
- 'Invalid file %s, %d != %d' % (item, total, expected_size))
+ raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
- def contains(self, files):
+ def contains(self, items):
return [
- (filename, metadata, None)
- for filename, metadata in files
- if not os.path.exists(os.path.join(self.base_path, metadata['h']))
+ item for item in items
+ if not os.path.exists(os.path.join(self.base_path, item.digest))
]
@@ -767,7 +864,7 @@ def upload_tree(base_url, indir, infiles, namespace):
query if an element was already uploaded, and |base_url|/store/
can be used to upload a new element.
indir: Root directory the infiles are based in.
- infiles: dict of files to upload files from |indir| to |base_url|.
+ infiles: dict of files to upload from |indir| to |base_url|.
namespace: The namespace to use on the server.
"""
remote = get_storage_api(base_url, namespace)
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698