Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(470)

Unified Diff: tools/isolate/run_test_from_archive.py

Issue 10876044: run_test_from_archive: Rewritten to be more parallelized. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Rebase againt HEAD and include fixes Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/isolate/run_test_from_archive.py
diff --git a/tools/isolate/run_test_from_archive.py b/tools/isolate/run_test_from_archive.py
index f0c039321ce342d5098fcfa4ab794476d06aefea..9290072dddb8f6b3545519b6340de37b7636f4e9 100755
--- a/tools/isolate/run_test_from_archive.py
+++ b/tools/isolate/run_test_from_archive.py
@@ -9,6 +9,7 @@ Keeps a local cache.
"""
import ctypes
+import hashlib
import json
import logging
import optparse
@@ -144,13 +145,6 @@ def is_same_filesystem(path1, path2):
return os.stat(path1).st_dev == os.stat(path2).st_dev
-def open_remote(file_or_url):
- """Reads a file or url."""
- if re.match(r'^https?://.+$', file_or_url):
- return urllib.urlopen(file_or_url)
- return open(file_or_url, 'rb')
-
-
def get_free_space(path):
"""Returns the number of free bytes."""
if sys.platform == 'win32':
@@ -216,6 +210,13 @@ def load_manifest(content):
raise ConfigError(
'Did not expect both \'sha-1\' and \'link\', got: %r' % subvalue)
+ elif key == 'includes':
+ if not isinstance(value, list):
+ raise ConfigError('Expected list, got %r' % value)
+ for subvalue in value:
+ if not RE_IS_SHA1.match(subvalue):
+ raise ConfigError('Expected sha-1, got %r' % subvalue)
+
elif key == 'read_only':
if not isinstance(value, bool):
raise ConfigError('Expected bool, got %r' % value)
@@ -413,12 +414,14 @@ class Cache(object):
# oldest item.
self.state = []
+ # Items currently being fetched. Keep it local to reduce lock contention.
+ self._pending_queue = set()
+
# Profiling values.
# The files added and removed are stored as tuples of the filename and
# the file size.
self.files_added = []
self.files_removed = []
- self.time_retrieving_files = 0
if not os.path.isdir(self.cache_dir):
os.makedirs(self.cache_dir)
@@ -443,8 +446,6 @@ class Cache(object):
len(self.files_added))
logging.info('Size of files added to cache: %i',
sum(item[1] for item in self.files_added))
- logging.info('Time taken (in seconds) to add files to cache: %s',
- self.time_retrieving_files)
logging.debug('All files added:')
logging.debug(self.files_added)
@@ -510,30 +511,30 @@ class Cache(object):
self.save()
- def retrieve(self, item):
- """Retrieves a file from the remote and add it to the cache."""
+ def retrieve(self, priority, item):
+ """Retrieves a file from the remote, if not already cached, and adds it to
+ the cache.
+ """
assert not '/' in item
+ path = self.path(item)
try:
index = self.state.index(item)
# Was already in cache. Update it's LRU value.
self.state.pop(index)
self.state.append(item)
- return False
+ os.utime(path, None)
except ValueError:
- out = self.path(item)
- start_retrieve = time.time()
- self.remote.fetch_item(Remote.MED, item, out)
- # TODO(maruel): Temporarily fetch the files serially.
- self.remote.get_result()
- if os.path.exists(out):
- self.state.append(item)
- self.files_added.append((out, os.stat(out).st_size))
- else:
- logging.error('File, %s, not placed in cache' % item)
- self.time_retrieving_files += time.time() - start_retrieve
- return True
- finally:
- self.save()
+ if item in self._pending_queue:
+ # Already pending. The same object could be referenced multiple times.
+ return
+ self.remote.fetch_item(priority, item, path)
+ self._pending_queue.add(item)
+
+ def add(self, filepath, obj):
+ """Forcibly adds a file to the cache."""
+ if not obj in self.state:
+ link_file(self.path(obj), filepath, HARDLINK)
+ self.state.append(obj)
def path(self, item):
"""Returns the path to one item."""
@@ -543,51 +544,240 @@ class Cache(object):
"""Saves the LRU ordering."""
json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':'))
+ def wait_for(self, items):
+ """Starts a loop that waits for at least one of |items| to be retrieved.
+
+ Returns the first item retrieved.
+ """
+ # Flush items already present.
+ for item in items:
+ if item in self.state:
+ return item
+
+ assert all(i in self._pending_queue for i in items), (
+ items, self._pending_queue)
+ # Note that:
+ # len(self._pending_queue) ==
+ # ( len(self.remote._workers) - self.remote._ready +
+ # len(self._remote._queue) + len(self._remote.done))
+ # There is no lock-free way to verify that.
+ while self._pending_queue:
+ item = self.remote.get_result()
+ self._pending_queue.remove(item)
+ self.state.append(item)
+ if item in items:
+ return item
+
+
+class Manifest(object):
+ """Represents a single parsed manifest, e.g. a .results file."""
+ def __init__(self, obj_hash):
+ """|obj_hash| is really the sha-1 of the file."""
+ logging.debug('Manifest(%s)' % obj_hash)
+ self.obj_hash = obj_hash
+ # Set once all the left-side of the tree is parsed. 'Tree' here means the
+ # manifest and all the manifest recursively included by it with 'includes'
+ # key. The order of each manifest sha-1 in 'includes' is important, as the
+ # later ones are not processed until the firsts are retrieved and read.
+ self.can_fetch = False
+
+ # Raw data.
+ self.data = {}
+ # A Manifest instance, one per object in self.includes.
+ self.children = []
+
+ # Set once the manifest is loaded.
+ self._manifest_parsed = False
+ # Set once the files are fetched.
+ self.files_fetched = False
+
+ def load(self, content):
+ """Verifies the manifest is valid and loads this object with the json data.
+ """
+ logging.debug('Manifest.load(%s)' % self.obj_hash)
+ assert not self._manifest_parsed
+ self.data = load_manifest(content)
+ self.children = [Manifest(i) for i in self.data.get('includes', [])]
+ self._manifest_parsed = True
+
+ def fetch_files(self, cache, files):
+ """Adds files in this manifest not present in files dictionary.
-def run_tha_test(manifest, cache_dir, remote, policies):
+ Preemptively request files.
+
+ Note that |files| is modified by this function.
+ """
+ assert self.can_fetch
+ if not self._manifest_parsed or self.files_fetched:
+ return
+ logging.info('fetch_files(%s)' % self.obj_hash)
+ for filepath, properties in self.data.get('files', {}).iteritems():
+ # Root manifest has priority on the files being mapped. In particular,
+ # overriden files must not be fetched.
+ if filepath not in files:
+ files[filepath] = properties
+ if 'sha-1' in properties:
+ # Preemptively request files.
+ logging.info('fetching %s' % filepath)
+ cache.retrieve(Remote.MED, properties['sha-1'])
+ self.files_fetched = True
+
+
+class Settings(object):
+ """Results of a completely parsed manifest."""
+ def __init__(self):
+ self.command = []
+ self.files = {}
+ self.read_only = None
+ self.relative_cwd = None
+ # The main manifest.
+ self.root = None
+ logging.debug('Settings')
+
+ def load(self, cache, root_manifest_hash):
+ """Loads the manifest and all the included manifests asynchronously.
+
+ It enables support for included manifest. They are processed in strict order
+ but fetched asynchronously from the cache. This is important so that a file
+ in an included manifest that is overridden by an embedding manifest is not
+ fetched neededlessly. The includes are fetched in one pass and the files are
+ fetched as soon as all the manifests on the left-side of the tree were
+ fetched.
+
+ The prioritization is very important here for nested manifests. 'includes'
+ have the highest priority and the algorithm is optimized for both deep and
+ wide manifests. A deep one is a long link of manifest referenced one at a
+ time by one item in 'includes'. A wide one has a large number of 'includes'
+ in a single manifest. 'left' is defined as an included manifest earlier in
+ the 'includes' list. So the order of the elements in 'includes' is
+ important.
+ """
+ self.root = Manifest(root_manifest_hash)
+ cache.retrieve(Remote.HIGH, root_manifest_hash)
+ pending = {root_manifest_hash: self.root}
+ # Keeps the list of retrieved items to refuse recursive includes.
+ retrieved = [root_manifest_hash]
+
+ def update_self(node):
+ node.fetch_files(cache, self.files)
+ # Grabs properties.
+ if not self.command and node.data.get('command'):
+ self.command = node.data['command']
+ if self.read_only is None and node.data.get('read_only') is not None:
+ self.read_only = node.data['read_only']
+ if (self.relative_cwd is None and
+ node.data.get('relative_cwd') is not None):
+ self.relative_cwd = node.data['relative_cwd']
+
+ def traverse_tree(node):
+ if node.can_fetch:
+ if not node.files_fetched:
+ update_self(node)
+ will_break = False
+ for i in node.children:
+ if not i.can_fetch:
+ if will_break:
+ break
+ # Automatically mark the first one as fetcheable.
+ i.can_fetch = True
+ will_break = True
+ traverse_tree(i)
+
+ while pending:
+ item_hash = cache.wait_for(pending)
+ item = pending.pop(item_hash)
+ item.load(open(cache.path(item_hash), 'r').read())
+ if item_hash == root_manifest_hash:
+ # It's the root item.
+ item.can_fetch = True
+
+ for new_child in item.children:
+ h = new_child.obj_hash
+ if h in retrieved:
+ raise ConfigError('Manifest %s is retrieved recursively' % h)
+ pending[h] = new_child
+ cache.retrieve(Remote.HIGH, h)
+
+ # Traverse the whole tree to see if files can now be fetched.
+ traverse_tree(self.root)
+ def check(n):
+ return all(check(x) for x in n.children) and n.files_fetched
+ assert check(self.root)
+ self.relative_cwd = self.relative_cwd or ''
+ self.read_only = self.read_only or False
+
+
+def run_tha_test(manifest_hash, cache_dir, remote, policies):
"""Downloads the dependencies in the cache, hardlinks them into a temporary
directory and runs the executable.
"""
+ settings = Settings()
with Cache(cache_dir, Remote(remote), policies) as cache:
outdir = make_temp_dir('run_tha_test', cache_dir)
-
- if not 'files' in manifest:
- print >> sys.stderr, 'No file to map'
- return 1
- if not 'command' in manifest:
- print >> sys.stderr, 'No command to map run'
- return 1
-
try:
- with Profiler('GetFiles') as _prof:
- for filepath, properties in manifest['files'].iteritems():
+ # Initiate all the files download.
+ with Profiler('GetManifests') as _prof:
+ # Optionally support local files.
+ if not RE_IS_SHA1.match(manifest_hash):
+ # Adds it in the cache. While not strictly necessary, this simplifies
+ # the rest.
+ h = hashlib.sha1(open(manifest_hash, 'r').read()).hexdigest()
+ cache.add(manifest_hash, h)
+ manifest_hash = h
+ settings.load(cache, manifest_hash)
+
+ if not settings.command:
+ print >> sys.stderr, 'No command to run'
+ return 1
+
+ with Profiler('GetRest') as _prof:
+ logging.debug('Creating directories')
+ # Creates the tree of directories to create.
+ directories = set(os.path.dirname(f) for f in settings.files)
+ for item in directories:
+ directories.add(os.path.dirname(item))
+ for d in sorted(directories):
+ if d:
+ os.mkdir(os.path.join(outdir, d))
+
+ # Creates the links if necessary.
+ for filepath, properties in settings.files.iteritems():
+ if 'link' not in properties:
+ continue
outfile = os.path.join(outdir, filepath)
- outfiledir = os.path.dirname(outfile)
- if not os.path.isdir(outfiledir):
- os.makedirs(outfiledir)
- if 'sha-1' in properties:
- # A normal file.
- infile = properties['sha-1']
- cache.retrieve(infile)
- link_file(outfile, cache.path(infile), HARDLINK)
- elif 'link' in properties:
- # A symlink.
- os.symlink(properties['link'], outfile)
- else:
- raise ConfigError('Unexpected entry: %s' % properties)
+ os.symlink(properties['link'], outfile)
+ if 'mode' in properties:
+ # It's not set on Windows.
+ os.chmod(outfile, properties['mode'])
+
+ # Remaining files to be processed.
+ # Note that files could still be not be downloaded yet here.
+ remaining = dict(
+ (props['sha-1'], (filepath, props))
+ for filepath, props in settings.files.iteritems()
+ if 'sha-1' in props)
+
+ # Do bookkeeping while files are being downloaded in the background.
+ cwd = os.path.join(outdir, settings.relative_cwd)
+ if not os.path.isdir(cwd):
+ os.makedirs(cwd)
+ cmd = settings.command[:]
+ # Ensure paths are correctly separated on windows.
+ cmd[0] = cmd[0].replace('/', os.path.sep)
+ cmd = fix_python_path(cmd)
+
+ # Now block on the remaining files to be downloaded and mapped.
+ while remaining:
+ obj = cache.wait_for(remaining)
+ filepath, properties = remaining.pop(obj)
+ outfile = os.path.join(outdir, filepath)
+ link_file(outfile, cache.path(obj), HARDLINK)
if 'mode' in properties:
# It's not set on Windows.
os.chmod(outfile, properties['mode'])
- cwd = os.path.join(outdir, manifest.get('relative_cwd', ''))
- if not os.path.isdir(cwd):
- os.makedirs(cwd)
- if manifest.get('read_only'):
+ if settings.read_only:
make_writable(outdir, True)
- cmd = manifest['command']
- # Ensure paths are correctly separated on windows.
- cmd[0] = cmd[0].replace('/', os.path.sep)
- cmd = fix_python_path(cmd)
logging.info('Running %s, cwd=%s' % (cmd, cwd))
try:
with Profiler('RunTest') as _prof:
@@ -659,21 +849,11 @@ def main():
if args:
parser.error('Unsupported args %s' % ' '.join(args))
- if options.hash:
- # First calculate the reference to it.
- options.manifest = '%s/%s' % (options.remote.rstrip('/'), options.hash)
- try:
- manifest = load_manifest(open_remote(options.manifest).read())
- except IOError as e:
- parser.error(
- 'Failed to read manifest %s; remote:%s; hash:%s; %s' %
- (options.manifest, options.remote, options.hash, str(e)))
-
policies = CachePolicies(
options.max_cache_size, options.min_free_space, options.max_items)
try:
return run_tha_test(
- manifest,
+ options.manifest or options.hash,
os.path.abspath(options.cache),
options.remote,
policies)
« no previous file with comments | « tools/isolate/data/run_test_from_archive/manifest2.results ('k') | tools/isolate/run_test_from_archive_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698