Index: appengine/swarming/cipd.py |
diff --git a/appengine/swarming/cipd.py b/appengine/swarming/cipd.py |
index d04a087800194dca5f81e7745c39a40cc395ec40..ec798bd4d5a46783c88a9cefc5af4beb1241455b 100644 |
--- a/appengine/swarming/cipd.py |
+++ b/appengine/swarming/cipd.py |
@@ -4,6 +4,7 @@ |
"""CIPD-specific code is concentrated here.""" |
+import contextlib |
import re |
# Regular expressions below are copied from |
@@ -26,13 +27,94 @@ TAG_MAX_LEN = 400 |
# os can be "linux", "mac" or "windows" and arch can be "386", "amd64" or |
# "armv6l". |
PARAM_PLATFORM = '${platform}' |
+PARAM_PLATFORM_ESC = re.escape(PARAM_PLATFORM) |
# OS version parameter defines major and minor version of the OS distribution. |
# It is useful if package depends on .dll/.so libraries provided by the OS. |
# Example values: "ubuntu14_04", "mac10_9", "win6_1". |
PARAM_OS_VER = '${os_ver}' |
+PARAM_OS_VER_ESC = re.escape(PARAM_OS_VER) |
ALL_PARAMS = (PARAM_PLATFORM, PARAM_OS_VER) |
+@contextlib.contextmanager |
+def pin_check_fn(platform=None, os_ver=None): |
M-A Ruel
2016/08/30 18:54:12
I'd prefer no default value.
|
+ """Yields a function that verifies that an input CipdPackage could have been |
+ plausibly expanded, via pinning, to another CipdPackage. Repeated invocations |
+ of the function will retain knowledge of any resolved name template paramters |
+ like ${platform} and ${os_ver}. |
+ |
+ Args: |
+ platform - a pre-defined expansion of ${platform}, or None to learn from the |
+ first valid checked CipdPackage containing ${platform}. |
+ os_ver - a pre-defined expansion of ${os_ver}, or None to learn from the |
+ first valid checked CipdPackage containing ${os_ver}. |
+ |
+ Args of yielded function: |
+ original - a CipdPackage which may contain template params like ${platform} |
+ expanded - a CipdPackage which is nominally an expansion of original. |
+ |
+ CipdPackage is a duck-typed object which has three string properties: |
+ 'package_name', 'path' and 'version'. |
+ |
+ Yielded function raises: |
+ ValueError if expanded is not a valid derivation of original. |
+ |
+ Example: |
+ with pin_check_fn() as check: |
+ check(CipdPackage('', '${platform}', 'ref'), |
+ CipdPackage('', 'windows-amd64', 'deadbeef'*5)) |
+ check(CipdPackage('', '${platform}', 'ref'), |
+ CipdPackage('', 'linux-amd64', 'deadbeef'*5)) ## will raise ValueError |
+ """ |
+ plat_ref = [platform] |
+ os_ver_ref = [os_ver] |
+ def _check_fn(original, expanded): |
+ if original.path != expanded.path: |
+ raise ValueError('Mismatched path') |
+ |
+ def sub_param(regex, param_esc, param_re, param_const): |
+ # The 1 allows each substitution to only show up in the pattern once. |
+ # If it shows up more than once, then this is a very strange package name, |
+ # and should be subject to further scrutiny. If it turns out that for some |
+ # reason the substitutions SHOULD be allowed more than once per name, we |
+ # will need to assert that the matched-values of them are also all the |
+ # same (e.g. ${platform} cannot resolve to more than one value) |
+ ret = False |
+ if param_const is None: |
+ ret = param_esc in regex |
M-A Ruel
2016/08/30 18:54:12
Then what about:
regex.count(param_esc)
and check
|
+ if ret: |
+ regex = regex.replace(param_esc, param_re, 1) |
+ else: |
+ regex = regex.replace(param_esc, param_const, 1) |
+ return regex, ret |
+ |
+ name_regex = re.escape(original.package_name) |
+ name_regex, scan_plat = sub_param( |
+ name_regex, PARAM_PLATFORM_ESC, r'(?P<platform>\w+-[a-z0-9]+)', |
+ plat_ref[0]) |
+ name_regex, scan_os_ver = sub_param( |
+ name_regex, PARAM_OS_VER_ESC, r'(?P<os_ver>[_a-z0-9]+)', |
+ os_ver_ref[0]) |
+ |
+ match = re.match(name_regex, expanded.package_name) |
+ if not match: |
+ raise ValueError('Mismatched package_name') |
+ |
+ if is_pinned_version(original.version): |
+ if original.version != expanded.version: |
+ raise ValueError('Mismatched pins') |
+ else: |
+ if not is_pinned_version(expanded.version): |
+ raise ValueError('Pin value is not a pin') |
+ |
+ if scan_plat: |
+ plat_ref[0] = re.escape(match.group('platform')) |
+ if scan_os_ver: |
+ os_ver_ref[0] = re.escape(match.group('os_ver')) |
+ |
+ yield _check_fn |
+ |
+ |
def is_valid_package_name(package_name): |
"""Returns True if |package_name| is a valid CIPD package name.""" |
return bool(PACKAGE_NAME_RE.match(package_name)) |