Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: third_party/gsutil/gslib/command.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Base class for gsutil commands.
16
17 In addition to base class code, this file contains helpers that depend on base
18 class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir,
19 self.bucket_storage_uri_class, etc.) In general, functions that depend on class
20 state and that are used by multiple commands belong in this file. Functions that
21 don't depend on class state belong in util.py, and non-shared helpers belong in
22 individual subclasses.
23 """
24
25 import boto
26 import getopt
27 import gslib
28 import logging
29 import multiprocessing
30 import os
31 import platform
32 import re
33 import sys
34 import wildcard_iterator
35 import xml.dom.minidom
36
37 from boto import handler
38 from boto.storage_uri import StorageUri
39 from getopt import GetoptError
40 from gslib import util
41 from gslib.exception import CommandException
42 from gslib.help_provider import HelpProvider
43 from gslib.name_expansion import NameExpansionIterator
44 from gslib.name_expansion import NameExpansionIteratorQueue
45 from gslib.project_id import ProjectIdHandler
46 from gslib.storage_uri_builder import StorageUriBuilder
47 from gslib.thread_pool import ThreadPool
48 from gslib.util import HAVE_OAUTH2
49 from gslib.util import NO_MAX
50
51 from gslib.wildcard_iterator import ContainsWildcard
52
53
54 def _ThreadedLogger():
55 """Creates a logger that resembles 'print' output, but is thread safe.
56
57 The logger will display all messages logged with level INFO or above. Log
58 propagation is disabled.
59
60 Returns:
61 A logger object.
62 """
63 log = logging.getLogger('threaded-logging')
64 log.propagate = False
65 log.setLevel(logging.INFO)
66 log_handler = logging.StreamHandler()
67 log_handler.setFormatter(logging.Formatter('%(message)s'))
68 log.addHandler(log_handler)
69 return log
70
71 # command_spec key constants.
72 COMMAND_NAME = 'command_name'
73 COMMAND_NAME_ALIASES = 'command_name_aliases'
74 MIN_ARGS = 'min_args'
75 MAX_ARGS = 'max_args'
76 SUPPORTED_SUB_ARGS = 'supported_sub_args'
77 FILE_URIS_OK = 'file_uri_ok'
78 PROVIDER_URIS_OK = 'provider_uri_ok'
79 URIS_START_ARG = 'uris_start_arg'
80 CONFIG_REQUIRED = 'config_required'
81
82 _EOF_NAME_EXPANSION_RESULT = ("EOF")
83
84
85 class Command(object):
86 # Global instance of a threaded logger object.
87 THREADED_LOGGER = _ThreadedLogger()
88
89 REQUIRED_SPEC_KEYS = [COMMAND_NAME]
90
91 # Each subclass must define the following map, minimally including the
92 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
93 # although for readbility subclasses should specify the complete map.
94 command_spec = {
95 # Name of command.
96 COMMAND_NAME : None,
97 # List of command name aliases.
98 COMMAND_NAME_ALIASES : [],
99 # Min number of args required by this command.
100 MIN_ARGS : 0,
101 # Max number of args required by this command, or NO_MAX.
102 MAX_ARGS : NO_MAX,
103 # Getopt-style string specifying acceptable sub args.
104 SUPPORTED_SUB_ARGS : '',
105 # True if file URIs are acceptable for this command.
106 FILE_URIS_OK : False,
107 # True if provider-only URIs are acceptable for this command.
108 PROVIDER_URIS_OK : False,
109 # Index in args of first URI arg.
110 URIS_START_ARG : 0,
111 # True if must configure gsutil before running command.
112 CONFIG_REQUIRED : True,
113 }
114 _default_command_spec = command_spec
115 help_spec = HelpProvider.help_spec
116
117 """Define an empty test specification, which derived classes must populate.
118
119 This is a list of tuples containing the following values:
120
121 step_name - mnemonic name for test, displayed when test is run
122 cmd_line - shell command line to run test
123 expect_ret or None - expected return code from test (None means ignore)
124 (result_file, expect_file) or None - tuple of result file and expected
125 file to diff for additional test
126 verification beyond the return code
127 (None means no diff requested)
128 Notes:
129
130 - Setting expected_ret to None means there is no expectation and,
131 hence, any returned value will pass.
132
133 - Any occurrences of the string 'gsutil' in the cmd_line parameter
134 are expanded to the full path to the gsutil command under test.
135
136 - The cmd_line, result_file and expect_file parameters may
137 contain the following special substrings:
138
139 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
140 $On - converted to one of 10 unique-for-testing object names (n=0..9)
141 $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
142 $G - converted to the directory where gsutil is installed. Useful for
143 referencing test data.
144
145 - The generated file names are full pathnames, whereas the generated
146 bucket and object names are simple relative names.
147
148 - Tests with a non-None result_file and expect_file automatically
149 trigger an implicit diff of the two files.
150
151 - These test specifications, in combination with the conversion strings
152 allow tests to be constructed parametrically. For example, here's an
153 annotated subset of a test_steps for the cp command:
154
155 # Copy local file to object, verify 0 return code.
156 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
157 # Copy uploaded object back to local file and diff vs. orig file.
158 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
159
160 - After pattern substitution, the specs are run sequentially, in the
161 order in which they appear in the test_steps list.
162 """
163 test_steps = []
164
165 # Define a convenience property for command name, since it's used many places.
166 def _GetDefaultCommandName(self):
167 return self.command_spec[COMMAND_NAME]
168 command_name = property(_GetDefaultCommandName)
169
170 def __init__(self, command_runner, args, headers, debug, parallel_operations,
171 gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver,
172 bucket_storage_uri_class, test_method=None):
173 """
174 Args:
175 command_runner: CommandRunner (for commands built atop other commands).
176 args: Command-line args (arg0 = actual arg, not command name ala bash).
177 headers: Dictionary containing optional HTTP headers to pass to boto.
178 debug: Debug level to pass in to boto connection (range 0..3).
179 parallel_operations: Should command operations be executed in parallel?
180 gsutil_bin_dir: Bin dir from which gsutil is running.
181 boto_lib_dir: Lib dir where boto runs.
182 config_file_list: Config file list returned by _GetBotoConfigFileList().
183 gsutil_ver: Version string of currently running gsutil command.
184 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
185 Settable for testing/mocking.
186 test_method: Optional general purpose method for testing purposes.
187 Application and semantics of this method will vary by
188 command and test type.
189
190 Implementation note: subclasses shouldn't need to define an __init__
191 method, and instead depend on the shared initialization that happens
192 here. If you do define an __init__ method in a subclass you'll need to
193 explicitly call super().__init__(). But you're encouraged not to do this,
194 because it will make changing the __init__ interface more painful.
195 """
196 # Save class values from constructor params.
197 self.command_runner = command_runner
198 self.args = args
199 self.unparsed_args = args
200 self.headers = headers
201 self.debug = debug
202 self.parallel_operations = parallel_operations
203 self.gsutil_bin_dir = gsutil_bin_dir
204 self.boto_lib_dir = boto_lib_dir
205 self.config_file_list = config_file_list
206 self.gsutil_ver = gsutil_ver
207 self.bucket_storage_uri_class = bucket_storage_uri_class
208 self.test_method = test_method
209 self.exclude_symlinks = False
210 self.recursion_requested = False
211 self.all_versions = False
212
213 # Process sub-command instance specifications.
214 # First, ensure subclass implementation sets all required keys.
215 for k in self.REQUIRED_SPEC_KEYS:
216 if k not in self.command_spec or self.command_spec[k] is None:
217 raise CommandException('"%s" command implementation is missing %s '
218 'specification' % (self.command_name, k))
219 # Now override default command_spec with subclass-specified values.
220 tmp = self._default_command_spec
221 tmp.update(self.command_spec)
222 self.command_spec = tmp
223 del tmp
224
225 # Make sure command provides a test specification.
226 if not self.test_steps:
227 # TODO: Uncomment following lines when test feature is ready.
228 #raise CommandException('"%s" command implementation is missing test '
229 #'specification' % self.command_name)
230 pass
231
232 # Parse and validate args.
233 try:
234 (self.sub_opts, self.args) = getopt.getopt(
235 args, self.command_spec[SUPPORTED_SUB_ARGS])
236 except GetoptError, e:
237 raise CommandException('%s for "%s" command.' % (e.msg,
238 self.command_name))
239 if (len(self.args) < self.command_spec[MIN_ARGS]
240 or len(self.args) > self.command_spec[MAX_ARGS]):
241 raise CommandException('Wrong number of arguments for "%s" command.' %
242 self.command_name)
243 if (not self.command_spec[FILE_URIS_OK]
244 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
245 raise CommandException('"%s" command does not support "file://" URIs. '
246 'Did you mean to use a gs:// URI?' %
247 self.command_name)
248 if (not self.command_spec[PROVIDER_URIS_OK]
249 and self._HaveProviderUris(
250 self.args[self.command_spec[URIS_START_ARG]:])):
251 raise CommandException('"%s" command does not support provider-only '
252 'URIs.' % self.command_name)
253 if self.command_spec[CONFIG_REQUIRED]:
254 self._ConfigureNoOpAuthIfNeeded()
255
256 self.proj_id_handler = ProjectIdHandler()
257 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
258
259 # Cross-platform path to run gsutil binary.
260 self.gsutil_cmd = ''
261 # Cross-platform list containing gsutil path for use with subprocess.
262 self.gsutil_exec_list = []
263 # If running on Windows, invoke python interpreter explicitly.
264 if platform.system() == "Windows":
265 self.gsutil_cmd += 'python '
266 self.gsutil_exec_list += ['python']
267 # Add full path to gsutil to make sure we test the correct version.
268 self.gsutil_path = os.path.join(self.gsutil_bin_dir, 'gsutil')
269 self.gsutil_cmd += self.gsutil_path
270 self.gsutil_exec_list += [self.gsutil_path]
271
272 # We're treating recursion_requested like it's used by all commands, but
273 # only some of the commands accept the -R option.
274 if self.sub_opts:
275 for o, unused_a in self.sub_opts:
276 if o == '-r' or o == '-R':
277 self.recursion_requested = True
278 break
279
280 def WildcardIterator(self, uri_or_str, all_versions=False):
281 """
282 Helper to instantiate gslib.WildcardIterator. Args are same as
283 gslib.WildcardIterator interface, but this method fills in most of the
284 values from instance state.
285
286 Args:
287 uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
288 """
289 return wildcard_iterator.wildcard_iterator(
290 uri_or_str, self.proj_id_handler,
291 bucket_storage_uri_class=self.bucket_storage_uri_class,
292 all_versions=all_versions,
293 headers=self.headers, debug=self.debug)
294
295 def RunCommand(self):
296 """Abstract function in base class. Subclasses must implement this. The
297 return value of this function will be used as the exit status of the
298 process, so subclass commands should return an integer exit code (0 for
299 success, a value in [1,255] for failure).
300 """
301 raise CommandException('Command %s is missing its RunCommand() '
302 'implementation' % self.command_name)
303
304 ############################################################
305 # Shared helper functions that depend on base class state. #
306 ############################################################
307
308 def UrisAreForSingleProvider(self, uri_args):
309 """Tests whether the uris are all for a single provider.
310
311 Returns: a StorageUri for one of the uris on success, None on failure.
312 """
313 provider = None
314 uri = None
315 for uri_str in uri_args:
316 # validate=False because we allow wildcard uris.
317 uri = boto.storage_uri(
318 uri_str, debug=self.debug, validate=False,
319 bucket_storage_uri_class=self.bucket_storage_uri_class)
320 if not provider:
321 provider = uri.scheme
322 elif uri.scheme != provider:
323 return None
324 return uri
325
326 def SetAclCommandHelper(self):
327 """
328 Common logic for setting ACLs. Sets the standard ACL or the default
329 object ACL depending on self.command_name.
330 """
331
332 acl_arg = self.args[0]
333 uri_args = self.args[1:]
334 # Disallow multi-provider setacl requests, because there are differences in
335 # the ACL models.
336 storage_uri = self.UrisAreForSingleProvider(uri_args)
337 if not storage_uri:
338 raise CommandException('"%s" command spanning providers not allowed.' %
339 self.command_name)
340
341 # Determine whether acl_arg names a file containing XML ACL text vs. the
342 # string name of a canned ACL.
343 if os.path.isfile(acl_arg):
344 acl_file = open(acl_arg, 'r')
345 acl_arg = acl_file.read()
346
347 # TODO: Remove this workaround when GCS allows
348 # whitespace in the Permission element on the server-side
349 acl_arg = re.sub(r'<Permission>\s*(\S+)\s*</Permission>',
350 r'<Permission>\1</Permission>', acl_arg)
351
352 acl_file.close()
353 self.canned = False
354 else:
355 # No file exists, so expect a canned ACL string.
356 canned_acls = storage_uri.canned_acls()
357 if acl_arg not in canned_acls:
358 raise CommandException('Invalid canned ACL "%s".' % acl_arg)
359 self.canned = True
360
361 # Used to track if any ACLs failed to be set.
362 self.everything_set_okay = True
363
364 def _SetAclExceptionHandler(e):
365 """Simple exception handler to allow post-completion status."""
366 self.THREADED_LOGGER.error(str(e))
367 self.everything_set_okay = False
368
369 def _SetAclFunc(name_expansion_result):
370 exp_src_uri = self.suri_builder.StorageUri(
371 name_expansion_result.GetExpandedUriStr())
372 # We don't do bucket operations multi-threaded (see comment below).
373 assert self.command_name != 'setdefacl'
374 self.THREADED_LOGGER.info('Setting ACL on %s...' %
375 name_expansion_result.expanded_uri_str)
376 if self.canned:
377 exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False,
378 self.headers)
379 else:
380 exp_src_uri.set_xml_acl(acl_arg, exp_src_uri.object_name, False,
381 self.headers)
382
383 # If user specified -R option, convert any bucket args to bucket wildcards
384 # (e.g., gs://bucket/*), to prevent the operation from being applied to
385 # the buckets themselves.
386 if self.recursion_requested:
387 for i in range(len(uri_args)):
388 uri = self.suri_builder.StorageUri(uri_args[i])
389 if uri.names_bucket():
390 uri_args[i] = uri.clone_replace_name('*').uri
391 else:
392 # Handle bucket ACL setting operations single-threaded, because
393 # our threading machinery currently assumes it's working with objects
394 # (name_expansion_iterator), and normally we wouldn't expect users to need
395 # to set ACLs on huge numbers of buckets at once anyway.
396 for i in range(len(uri_args)):
397 uri_str = uri_args[i]
398 if self.suri_builder.StorageUri(uri_str).names_bucket():
399 self._RunSingleThreadedSetAcl(acl_arg, uri_args)
400 return
401
402 name_expansion_iterator = NameExpansionIterator(
403 self.command_name, self.proj_id_handler, self.headers, self.debug,
404 self.bucket_storage_uri_class, uri_args, self.recursion_requested,
405 self.recursion_requested, all_versions=self.all_versions)
406
407 # Perform requests in parallel (-m) mode, if requested, using
408 # configured number of parallel processes and threads. Otherwise,
409 # perform requests with sequential function calls in current process.
410 self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler)
411
412 if not self.everything_set_okay:
413 raise CommandException('ACLs for some objects could not be set.')
414
415 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
416 some_matched = False
417 for uri_str in uri_args:
418 for blr in self.WildcardIterator(uri_str):
419 if blr.HasPrefix():
420 continue
421 some_matched = True
422 uri = blr.GetUri()
423 if self.command_name == 'setdefacl':
424 print 'Setting default object ACL on %s...' % uri
425 if self.canned:
426 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
427 else:
428 uri.set_def_xml_acl(acl_arg, False, self.headers)
429 else:
430 print 'Setting ACL on %s...' % uri
431 if self.canned:
432 uri.set_acl(acl_arg, uri.object_name, False, self.headers)
433 else:
434 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
435 if not some_matched:
436 raise CommandException('No URIs matched')
437
438 def GetAclCommandHelper(self):
439 """Common logic for getting ACLs. Gets the standard ACL or the default
440 object ACL depending on self.command_name."""
441
442 # Resolve to just one object.
443 # Handle wildcard-less URI specially in case this is a version-specific
444 # URI, because WildcardIterator().IterUris() would lose the versioning info.
445 if not ContainsWildcard(self.args[0]):
446 uri = self.suri_builder.StorageUri(self.args[0])
447 else:
448 uris = list(self.WildcardIterator(self.args[0]).IterUris())
449 if len(uris) == 0:
450 raise CommandException('No URIs matched')
451 if len(uris) != 1:
452 raise CommandException('%s matched more than one URI, which is not '
453 'allowed by the %s command' % (self.args[0], self.command_name))
454 uri = uris[0]
455 if not uri.names_bucket() and not uri.names_object():
456 raise CommandException('"%s" command must specify a bucket or '
457 'object.' % self.command_name)
458 if self.command_name == 'getdefacl':
459 acl = uri.get_def_acl(False, self.headers)
460 else:
461 acl = uri.get_acl(False, self.headers)
462 # Pretty-print the XML to make it more easily human editable.
463 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
464 print parsed_xml.toprettyxml(indent=' ')
465
466 def GetXmlSubresource(self, subresource, uri_arg):
467 """Print an xml subresource, e.g. logging, for a bucket/object.
468
469 Args:
470 subresource: The subresource name.
471 uri_arg: URI for the bucket/object. Wildcards will be expanded.
472
473 Raises:
474 CommandException: if errors encountered.
475 """
476 # Wildcarding is allowed but must resolve to just one bucket.
477 uris = list(self.WildcardIterator(uri_arg).IterUris())
478 if len(uris) != 1:
479 raise CommandException('Wildcards must resolve to exactly one item for '
480 'get %s' % subresource)
481 uri = uris[0]
482 xml_str = uri.get_subresource(subresource, False, self.headers)
483 # Pretty-print the XML to make it more easily human editable.
484 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
485 print parsed_xml.toprettyxml(indent=' ')
486
487 def Apply(self, func, name_expansion_iterator, thr_exc_handler,
488 shared_attrs=None):
489 """Dispatch input URI assignments across a pool of parallel OS
490 processes and/or Python threads, based on options (-m or not)
491 and settings in the user's config file. If non-parallel mode
492 or only one OS process requested, execute requests sequentially
493 in the current OS process.
494
495 Args:
496 func: Function to call to process each URI.
497 name_expansion_iterator: Iterator of NameExpansionResult.
498 thr_exc_handler: Exception handler for ThreadPool class.
499 shared_attrs: List of attributes to manage across sub-processes.
500
501 Raises:
502 CommandException if invalid config encountered.
503 """
504
505 # Set OS process and python thread count as a function of options
506 # and config.
507 if self.parallel_operations:
508 process_count = boto.config.getint(
509 'GSUtil', 'parallel_process_count',
510 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
511 if process_count < 1:
512 raise CommandException('Invalid parallel_process_count "%d".' %
513 process_count)
514 thread_count = boto.config.getint(
515 'GSUtil', 'parallel_thread_count',
516 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
517 if thread_count < 1:
518 raise CommandException('Invalid parallel_thread_count "%d".' %
519 thread_count)
520 else:
521 # If -m not specified, then assume 1 OS process and 1 Python thread.
522 process_count = 1
523 thread_count = 1
524
525 if self.debug:
526 self.THREADED_LOGGER.info('process count: %d', process_count)
527 self.THREADED_LOGGER.info('thread count: %d', thread_count)
528
529 if self.parallel_operations and process_count > 1:
530 procs = []
531 # If any shared attributes passed by caller, create a dictionary of
532 # shared memory variables for every element in the list of shared
533 # attributes.
534 shared_vars = None
535 if shared_attrs:
536 for name in shared_attrs:
537 if not shared_vars:
538 shared_vars = {}
539 shared_vars[name] = multiprocessing.Value('i', 0)
540 # Construct work queue for parceling out work to multiprocessing workers,
541 # setting the max queue length of 50k so we will block if workers don't
542 # empty the queue as fast as we can continue iterating over the bucket
543 # listing. This number may need tuning; it should be large enough to
544 # keep workers busy (overlapping bucket list next-page retrieval with
545 # operations being fed from the queue) but small enough that we don't
546 # overfill memory when runing across a slow network link.
547 work_queue = multiprocessing.Queue(50000)
548 for shard in range(process_count):
549 # Spawn a separate OS process for each shard.
550 if self.debug:
551 self.THREADED_LOGGER.info('spawning process for shard %d', shard)
552 p = multiprocessing.Process(target=self._ApplyThreads,
553 args=(func, work_queue, shard,
554 thread_count, thr_exc_handler,
555 shared_vars))
556 procs.append(p)
557 p.start()
558
559 last_name_expansion_result = None
560 try:
561 # Feed all work into the queue being emptied by the workers.
562 for name_expansion_result in name_expansion_iterator:
563 last_name_expansion_result = name_expansion_result
564 work_queue.put(name_expansion_result)
565 except:
566 sys.stderr.write('Failed URI iteration. Last result (prior to '
567 'exception) was: %s\n'
568 % repr(last_name_expansion_result))
569 finally:
570 # We do all of the process cleanup in a finally cause in case the name
571 # expansion iterator throws an exception. This will send EOF to all the
572 # child processes and join them back into the parent process.
573
574 # Send an EOF per worker.
575 for shard in range(process_count):
576 work_queue.put(_EOF_NAME_EXPANSION_RESULT)
577
578 # Wait for all spawned OS processes to finish.
579 failed_process_count = 0
580 for p in procs:
581 p.join()
582 # Count number of procs that returned non-zero exit code.
583 if p.exitcode != 0:
584 failed_process_count += 1
585
586 # Propagate shared variables back to caller's attributes.
587 if shared_vars:
588 for (name, var) in shared_vars.items():
589 setattr(self, name, var.value)
590
591 # Abort main process if one or more sub-processes failed. Note that this
592 # is outside the finally clause, because we only want to raise a new
593 # exception if an exception wasn't already raised in the try clause above.
594 if failed_process_count:
595 plural_str = ''
596 if failed_process_count > 1:
597 plural_str = 'es'
598 raise Exception('unexpected failure in %d sub-process%s, '
599 'aborting...' % (failed_process_count, plural_str))
600
601 else:
602 # Using just 1 process, so funnel results to _ApplyThreads using facade
603 # that makes NameExpansionIterator look like a Multiprocessing.Queue
604 # that sends one EOF once the iterator empties.
605 work_queue = NameExpansionIteratorQueue(name_expansion_iterator,
606 _EOF_NAME_EXPANSION_RESULT)
607 self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler,
608 None)
609
610 def HaveFileUris(self, args_to_check):
611 """Checks whether args_to_check contain any file URIs.
612
613 Args:
614 args_to_check: Command-line argument subset to check.
615
616 Returns:
617 True if args_to_check contains any file URIs.
618 """
619 for uri_str in args_to_check:
620 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
621 return True
622 return False
623
624 ######################
625 # Private functions. #
626 ######################
627
628 def _HaveProviderUris(self, args_to_check):
629 """Checks whether args_to_check contains any provider URIs (like 'gs://').
630
631 Args:
632 args_to_check: Command-line argument subset to check.
633
634 Returns:
635 True if args_to_check contains any provider URIs.
636 """
637 for uri_str in args_to_check:
638 if re.match('^[a-z]+://$', uri_str):
639 return True
640 return False
641
642 def _ConfigureNoOpAuthIfNeeded(self):
643 """Sets up no-op auth handler if no boto credentials are configured."""
644 config = boto.config
645 if not util.HasConfiguredCredentials():
646 if self.config_file_list:
647 if (config.has_option('Credentials', 'gs_oauth2_refresh_token')
648 and not HAVE_OAUTH2):
649 raise CommandException(
650 'Your gsutil is configured with OAuth2 authentication '
651 'credentials.\nHowever, OAuth2 is only supported when running '
652 'under Python 2.6 or later\n(unless additional dependencies are '
653 'installed, see README for details); you are running Python %s.' %
654 sys.version)
655 raise CommandException('You have no storage service credentials in any '
656 'of the following boto config\nfiles. Please '
657 'add your credentials as described in the '
658 'gsutil README file, or else\nre-run '
659 '"gsutil config" to re-create a config '
660 'file:\n%s' % self.config_file_list)
661 else:
662 # With no boto config file the user can still access publicly readable
663 # buckets and objects.
664 from gslib import no_op_auth_plugin
665
666 def _ApplyThreads(self, func, work_queue, shard, num_threads,
667 thr_exc_handler=None, shared_vars=None):
668 """
669 Perform subset of required requests across a caller specified
670 number of parallel Python threads, which may be one, in which
671 case the requests are processed in the current thread.
672
673 Args:
674 func: Function to call for each request.
675 work_queue: shared queue of NameExpansionResult to process.
676 shard: Assigned subset (shard number) for this function.
677 num_threads: Number of Python threads to spawn to process this shard.
678 thr_exc_handler: Exception handler for ThreadPool class.
679 shared_vars: Dict of shared memory variables to be managed.
680 (only relevant, and non-None, if this function is
681 run in a separate OS process).
682 """
683 # Each OS process needs to establish its own set of connections to
684 # the server to avoid writes from different OS processes interleaving
685 # onto the same socket (and garbling the underlying SSL session).
686 # We ensure each process gets its own set of connections here by
687 # closing all connections in the storage provider connection pool.
688 connection_pool = StorageUri.provider_pool
689 if connection_pool:
690 for i in connection_pool:
691 connection_pool[i].connection.close()
692
693 if num_threads > 1:
694 thread_pool = ThreadPool(num_threads, thr_exc_handler)
695 try:
696 while True: # Loop until we hit EOF marker.
697 name_expansion_result = work_queue.get()
698 if name_expansion_result == _EOF_NAME_EXPANSION_RESULT:
699 break
700 exp_src_uri = self.suri_builder.StorageUri(
701 name_expansion_result.GetExpandedUriStr())
702 if self.debug:
703 self.THREADED_LOGGER.info('process %d shard %d is handling uri %s',
704 os.getpid(), shard, exp_src_uri)
705 if (self.exclude_symlinks and exp_src_uri.is_file_uri()
706 and os.path.islink(exp_src_uri.object_name)):
707 self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri)
708 elif num_threads > 1:
709 thread_pool.AddTask(func, name_expansion_result)
710 else:
711 func(name_expansion_result)
712 # If any Python threads created, wait here for them to finish.
713 if num_threads > 1:
714 thread_pool.WaitCompletion()
715 finally:
716 if num_threads > 1:
717 thread_pool.Shutdown()
718 # If any shared variables (which means we are running in a separate OS
719 # process), increment value for each shared variable.
720 if shared_vars:
721 for (name, var) in shared_vars.items():
722 var.value += getattr(self, name)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/bucket_listing_ref.py ('k') | third_party/gsutil/gslib/command_runner.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698