Index: tools/isolate/run_test_from_archive.py |
diff --git a/tools/isolate/run_test_from_archive.py b/tools/isolate/run_test_from_archive.py |
index f0c039321ce342d5098fcfa4ab794476d06aefea..9290072dddb8f6b3545519b6340de37b7636f4e9 100755 |
--- a/tools/isolate/run_test_from_archive.py |
+++ b/tools/isolate/run_test_from_archive.py |
@@ -9,6 +9,7 @@ Keeps a local cache. |
""" |
import ctypes |
+import hashlib |
import json |
import logging |
import optparse |
@@ -144,13 +145,6 @@ def is_same_filesystem(path1, path2): |
return os.stat(path1).st_dev == os.stat(path2).st_dev |
-def open_remote(file_or_url): |
- """Reads a file or url.""" |
- if re.match(r'^https?://.+$', file_or_url): |
- return urllib.urlopen(file_or_url) |
- return open(file_or_url, 'rb') |
- |
- |
def get_free_space(path): |
"""Returns the number of free bytes.""" |
if sys.platform == 'win32': |
@@ -216,6 +210,13 @@ def load_manifest(content): |
raise ConfigError( |
'Did not expect both \'sha-1\' and \'link\', got: %r' % subvalue) |
+ elif key == 'includes': |
+ if not isinstance(value, list): |
+ raise ConfigError('Expected list, got %r' % value) |
+ for subvalue in value: |
+ if not RE_IS_SHA1.match(subvalue): |
+ raise ConfigError('Expected sha-1, got %r' % subvalue) |
+ |
elif key == 'read_only': |
if not isinstance(value, bool): |
raise ConfigError('Expected bool, got %r' % value) |
@@ -413,12 +414,14 @@ class Cache(object): |
# oldest item. |
self.state = [] |
+ # Items currently being fetched. Keep it local to reduce lock contention. |
+ self._pending_queue = set() |
+ |
# Profiling values. |
# The files added and removed are stored as tuples of the filename and |
# the file size. |
self.files_added = [] |
self.files_removed = [] |
- self.time_retrieving_files = 0 |
if not os.path.isdir(self.cache_dir): |
os.makedirs(self.cache_dir) |
@@ -443,8 +446,6 @@ class Cache(object): |
len(self.files_added)) |
logging.info('Size of files added to cache: %i', |
sum(item[1] for item in self.files_added)) |
- logging.info('Time taken (in seconds) to add files to cache: %s', |
- self.time_retrieving_files) |
logging.debug('All files added:') |
logging.debug(self.files_added) |
@@ -510,30 +511,30 @@ class Cache(object): |
self.save() |
- def retrieve(self, item): |
- """Retrieves a file from the remote and add it to the cache.""" |
+ def retrieve(self, priority, item): |
+ """Retrieves a file from the remote, if not already cached, and adds it to |
+ the cache. |
+ """ |
assert not '/' in item |
+ path = self.path(item) |
try: |
index = self.state.index(item) |
# Was already in cache. Update it's LRU value. |
self.state.pop(index) |
self.state.append(item) |
- return False |
+ os.utime(path, None) |
except ValueError: |
- out = self.path(item) |
- start_retrieve = time.time() |
- self.remote.fetch_item(Remote.MED, item, out) |
- # TODO(maruel): Temporarily fetch the files serially. |
- self.remote.get_result() |
- if os.path.exists(out): |
- self.state.append(item) |
- self.files_added.append((out, os.stat(out).st_size)) |
- else: |
- logging.error('File, %s, not placed in cache' % item) |
- self.time_retrieving_files += time.time() - start_retrieve |
- return True |
- finally: |
- self.save() |
+ if item in self._pending_queue: |
+ # Already pending. The same object could be referenced multiple times. |
+ return |
+ self.remote.fetch_item(priority, item, path) |
+ self._pending_queue.add(item) |
+ |
+ def add(self, filepath, obj): |
+ """Forcibly adds a file to the cache.""" |
+ if not obj in self.state: |
+ link_file(self.path(obj), filepath, HARDLINK) |
+ self.state.append(obj) |
def path(self, item): |
"""Returns the path to one item.""" |
@@ -543,51 +544,240 @@ class Cache(object): |
"""Saves the LRU ordering.""" |
json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':')) |
+ def wait_for(self, items): |
+ """Starts a loop that waits for at least one of |items| to be retrieved. |
+ |
+ Returns the first item retrieved. |
+ """ |
+ # Flush items already present. |
+ for item in items: |
+ if item in self.state: |
+ return item |
+ |
+ assert all(i in self._pending_queue for i in items), ( |
+ items, self._pending_queue) |
+ # Note that: |
+ # len(self._pending_queue) == |
+ # ( len(self.remote._workers) - self.remote._ready + |
+ # len(self._remote._queue) + len(self._remote.done)) |
+ # There is no lock-free way to verify that. |
+ while self._pending_queue: |
+ item = self.remote.get_result() |
+ self._pending_queue.remove(item) |
+ self.state.append(item) |
+ if item in items: |
+ return item |
+ |
+ |
+class Manifest(object): |
+ """Represents a single parsed manifest, e.g. a .results file.""" |
+ def __init__(self, obj_hash): |
+ """|obj_hash| is really the sha-1 of the file.""" |
+ logging.debug('Manifest(%s)' % obj_hash) |
+ self.obj_hash = obj_hash |
+ # Set once all the left-side of the tree is parsed. 'Tree' here means the |
+ # manifest and all the manifest recursively included by it with 'includes' |
+ # key. The order of each manifest sha-1 in 'includes' is important, as the |
+ # later ones are not processed until the firsts are retrieved and read. |
+ self.can_fetch = False |
+ |
+ # Raw data. |
+ self.data = {} |
+ # A Manifest instance, one per object in self.includes. |
+ self.children = [] |
+ |
+ # Set once the manifest is loaded. |
+ self._manifest_parsed = False |
+ # Set once the files are fetched. |
+ self.files_fetched = False |
+ |
+ def load(self, content): |
+ """Verifies the manifest is valid and loads this object with the json data. |
+ """ |
+ logging.debug('Manifest.load(%s)' % self.obj_hash) |
+ assert not self._manifest_parsed |
+ self.data = load_manifest(content) |
+ self.children = [Manifest(i) for i in self.data.get('includes', [])] |
+ self._manifest_parsed = True |
+ |
+ def fetch_files(self, cache, files): |
+ """Adds files in this manifest not present in files dictionary. |
-def run_tha_test(manifest, cache_dir, remote, policies): |
+ Preemptively request files. |
+ |
+ Note that |files| is modified by this function. |
+ """ |
+ assert self.can_fetch |
+ if not self._manifest_parsed or self.files_fetched: |
+ return |
+ logging.info('fetch_files(%s)' % self.obj_hash) |
+ for filepath, properties in self.data.get('files', {}).iteritems(): |
+ # Root manifest has priority on the files being mapped. In particular, |
+ # overriden files must not be fetched. |
+ if filepath not in files: |
+ files[filepath] = properties |
+ if 'sha-1' in properties: |
+ # Preemptively request files. |
+ logging.info('fetching %s' % filepath) |
+ cache.retrieve(Remote.MED, properties['sha-1']) |
+ self.files_fetched = True |
+ |
+ |
+class Settings(object): |
+ """Results of a completely parsed manifest.""" |
+ def __init__(self): |
+ self.command = [] |
+ self.files = {} |
+ self.read_only = None |
+ self.relative_cwd = None |
+ # The main manifest. |
+ self.root = None |
+ logging.debug('Settings') |
+ |
+ def load(self, cache, root_manifest_hash): |
+ """Loads the manifest and all the included manifests asynchronously. |
+ |
+ It enables support for included manifest. They are processed in strict order |
+ but fetched asynchronously from the cache. This is important so that a file |
+ in an included manifest that is overridden by an embedding manifest is not |
+ fetched neededlessly. The includes are fetched in one pass and the files are |
+ fetched as soon as all the manifests on the left-side of the tree were |
+ fetched. |
+ |
+ The prioritization is very important here for nested manifests. 'includes' |
+ have the highest priority and the algorithm is optimized for both deep and |
+ wide manifests. A deep one is a long link of manifest referenced one at a |
+ time by one item in 'includes'. A wide one has a large number of 'includes' |
+ in a single manifest. 'left' is defined as an included manifest earlier in |
+ the 'includes' list. So the order of the elements in 'includes' is |
+ important. |
+ """ |
+ self.root = Manifest(root_manifest_hash) |
+ cache.retrieve(Remote.HIGH, root_manifest_hash) |
+ pending = {root_manifest_hash: self.root} |
+ # Keeps the list of retrieved items to refuse recursive includes. |
+ retrieved = [root_manifest_hash] |
+ |
+ def update_self(node): |
+ node.fetch_files(cache, self.files) |
+ # Grabs properties. |
+ if not self.command and node.data.get('command'): |
+ self.command = node.data['command'] |
+ if self.read_only is None and node.data.get('read_only') is not None: |
+ self.read_only = node.data['read_only'] |
+ if (self.relative_cwd is None and |
+ node.data.get('relative_cwd') is not None): |
+ self.relative_cwd = node.data['relative_cwd'] |
+ |
+ def traverse_tree(node): |
+ if node.can_fetch: |
+ if not node.files_fetched: |
+ update_self(node) |
+ will_break = False |
+ for i in node.children: |
+ if not i.can_fetch: |
+ if will_break: |
+ break |
+ # Automatically mark the first one as fetcheable. |
+ i.can_fetch = True |
+ will_break = True |
+ traverse_tree(i) |
+ |
+ while pending: |
+ item_hash = cache.wait_for(pending) |
+ item = pending.pop(item_hash) |
+ item.load(open(cache.path(item_hash), 'r').read()) |
+ if item_hash == root_manifest_hash: |
+ # It's the root item. |
+ item.can_fetch = True |
+ |
+ for new_child in item.children: |
+ h = new_child.obj_hash |
+ if h in retrieved: |
+ raise ConfigError('Manifest %s is retrieved recursively' % h) |
+ pending[h] = new_child |
+ cache.retrieve(Remote.HIGH, h) |
+ |
+ # Traverse the whole tree to see if files can now be fetched. |
+ traverse_tree(self.root) |
+ def check(n): |
+ return all(check(x) for x in n.children) and n.files_fetched |
+ assert check(self.root) |
+ self.relative_cwd = self.relative_cwd or '' |
+ self.read_only = self.read_only or False |
+ |
+ |
+def run_tha_test(manifest_hash, cache_dir, remote, policies): |
"""Downloads the dependencies in the cache, hardlinks them into a temporary |
directory and runs the executable. |
""" |
+ settings = Settings() |
with Cache(cache_dir, Remote(remote), policies) as cache: |
outdir = make_temp_dir('run_tha_test', cache_dir) |
- |
- if not 'files' in manifest: |
- print >> sys.stderr, 'No file to map' |
- return 1 |
- if not 'command' in manifest: |
- print >> sys.stderr, 'No command to map run' |
- return 1 |
- |
try: |
- with Profiler('GetFiles') as _prof: |
- for filepath, properties in manifest['files'].iteritems(): |
+ # Initiate all the files download. |
+ with Profiler('GetManifests') as _prof: |
+ # Optionally support local files. |
+ if not RE_IS_SHA1.match(manifest_hash): |
+ # Adds it in the cache. While not strictly necessary, this simplifies |
+ # the rest. |
+ h = hashlib.sha1(open(manifest_hash, 'r').read()).hexdigest() |
+ cache.add(manifest_hash, h) |
+ manifest_hash = h |
+ settings.load(cache, manifest_hash) |
+ |
+ if not settings.command: |
+ print >> sys.stderr, 'No command to run' |
+ return 1 |
+ |
+ with Profiler('GetRest') as _prof: |
+ logging.debug('Creating directories') |
+ # Creates the tree of directories to create. |
+ directories = set(os.path.dirname(f) for f in settings.files) |
+ for item in directories: |
+ directories.add(os.path.dirname(item)) |
+ for d in sorted(directories): |
+ if d: |
+ os.mkdir(os.path.join(outdir, d)) |
+ |
+ # Creates the links if necessary. |
+ for filepath, properties in settings.files.iteritems(): |
+ if 'link' not in properties: |
+ continue |
outfile = os.path.join(outdir, filepath) |
- outfiledir = os.path.dirname(outfile) |
- if not os.path.isdir(outfiledir): |
- os.makedirs(outfiledir) |
- if 'sha-1' in properties: |
- # A normal file. |
- infile = properties['sha-1'] |
- cache.retrieve(infile) |
- link_file(outfile, cache.path(infile), HARDLINK) |
- elif 'link' in properties: |
- # A symlink. |
- os.symlink(properties['link'], outfile) |
- else: |
- raise ConfigError('Unexpected entry: %s' % properties) |
+ os.symlink(properties['link'], outfile) |
+ if 'mode' in properties: |
+ # It's not set on Windows. |
+ os.chmod(outfile, properties['mode']) |
+ |
+ # Remaining files to be processed. |
+ # Note that files could still be not be downloaded yet here. |
+ remaining = dict( |
+ (props['sha-1'], (filepath, props)) |
+ for filepath, props in settings.files.iteritems() |
+ if 'sha-1' in props) |
+ |
+ # Do bookkeeping while files are being downloaded in the background. |
+ cwd = os.path.join(outdir, settings.relative_cwd) |
+ if not os.path.isdir(cwd): |
+ os.makedirs(cwd) |
+ cmd = settings.command[:] |
+ # Ensure paths are correctly separated on windows. |
+ cmd[0] = cmd[0].replace('/', os.path.sep) |
+ cmd = fix_python_path(cmd) |
+ |
+ # Now block on the remaining files to be downloaded and mapped. |
+ while remaining: |
+ obj = cache.wait_for(remaining) |
+ filepath, properties = remaining.pop(obj) |
+ outfile = os.path.join(outdir, filepath) |
+ link_file(outfile, cache.path(obj), HARDLINK) |
if 'mode' in properties: |
# It's not set on Windows. |
os.chmod(outfile, properties['mode']) |
- cwd = os.path.join(outdir, manifest.get('relative_cwd', '')) |
- if not os.path.isdir(cwd): |
- os.makedirs(cwd) |
- if manifest.get('read_only'): |
+ if settings.read_only: |
make_writable(outdir, True) |
- cmd = manifest['command'] |
- # Ensure paths are correctly separated on windows. |
- cmd[0] = cmd[0].replace('/', os.path.sep) |
- cmd = fix_python_path(cmd) |
logging.info('Running %s, cwd=%s' % (cmd, cwd)) |
try: |
with Profiler('RunTest') as _prof: |
@@ -659,21 +849,11 @@ def main(): |
if args: |
parser.error('Unsupported args %s' % ' '.join(args)) |
- if options.hash: |
- # First calculate the reference to it. |
- options.manifest = '%s/%s' % (options.remote.rstrip('/'), options.hash) |
- try: |
- manifest = load_manifest(open_remote(options.manifest).read()) |
- except IOError as e: |
- parser.error( |
- 'Failed to read manifest %s; remote:%s; hash:%s; %s' % |
- (options.manifest, options.remote, options.hash, str(e))) |
- |
policies = CachePolicies( |
options.max_cache_size, options.min_free_space, options.max_items) |
try: |
return run_tha_test( |
- manifest, |
+ options.manifest or options.hash, |
os.path.abspath(options.cache), |
options.remote, |
policies) |