Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: appengine/swarming/server/task_scheduler.py

Issue 2267363004: Add CIPD pin reporting to swarming. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Fix bottest Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """High level tasks execution scheduling API. 5 """High level tasks execution scheduling API.
6 6
7 This is the interface closest to the HTTP handlers. 7 This is the interface closest to the HTTP handlers.
8 """ 8 """
9 9
10 import datetime 10 import datetime
(...skipping 590 matching lines...) Expand 10 before | Expand all | Expand 10 after
601 user=request.user) 601 user=request.user)
602 return request, run_result 602 return request, run_result
603 if failures: 603 if failures:
604 logging.info( 604 logging.info(
605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) 605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped)
606 return None, None 606 return None, None
607 607
608 608
609 def bot_update_task( 609 def bot_update_task(
610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, 610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration,
611 hard_timeout, io_timeout, cost_usd, outputs_ref, performance_stats): 611 hard_timeout, io_timeout, cost_usd, outputs_ref, cipd_pins,
612 performance_stats):
612 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. 613 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput.
613 614
614 Arguments: 615 Arguments:
615 - run_result_key: ndb.Key to TaskRunResult. 616 - run_result_key: ndb.Key to TaskRunResult.
616 - bot_id: Self advertised bot id to ensure it's the one expected. 617 - bot_id: Self advertised bot id to ensure it's the one expected.
617 - output: Data to append to this command output. 618 - output: Data to append to this command output.
618 - output_chunk_start: Index of output in the stdout stream. 619 - output_chunk_start: Index of output in the stdout stream.
619 - exit_code: Mark that this task completed. 620 - exit_code: Mark that this task completed.
620 - duration: Time spent in seconds for this task, excluding overheads. 621 - duration: Time spent in seconds for this task, excluding overheads.
621 - hard_timeout: Bool set if an hard timeout occured. 622 - hard_timeout: Bool set if an hard timeout occured.
622 - io_timeout: Bool set if an I/O timeout occured. 623 - io_timeout: Bool set if an I/O timeout occured.
623 - cost_usd: Cost in $USD of this task up to now. 624 - cost_usd: Cost in $USD of this task up to now.
624 - outputs_ref: task_request.FilesRef instance or None. 625 - outputs_ref: task_request.FilesRef instance or None.
626 - cipd_pins: None or task_result.CipdPins
625 - performance_stats: task_result.PerformanceStats instance or None. Can only 627 - performance_stats: task_result.PerformanceStats instance or None. Can only
626 be set when the task is completing. 628 be set when the task is completing.
627 629
628 Invalid states, these are flat out refused: 630 Invalid states, these are flat out refused:
629 - A command is updated after it had an exit code assigned to. 631 - A command is updated after it had an exit code assigned to.
630 632
631 Returns: 633 Returns:
632 TaskRunResult.state or None in case of failure. 634 TaskRunResult.state or None in case of failure.
633 """ 635 """
634 assert output_chunk_start is None or isinstance(output_chunk_start, int) 636 assert output_chunk_start is None or isinstance(output_chunk_start, int)
635 assert output is None or isinstance(output, str) 637 assert output is None or isinstance(output, str)
636 if cost_usd is not None and cost_usd < 0.: 638 if cost_usd is not None and cost_usd < 0.:
637 raise ValueError('cost_usd must be None or greater or equal than 0') 639 raise ValueError('cost_usd must be None or greater or equal than 0')
638 if duration is not None and duration < 0.: 640 if duration is not None and duration < 0.:
639 raise ValueError('duration must be None or greater or equal than 0') 641 raise ValueError('duration must be None or greater or equal than 0')
640 if (duration is None) != (exit_code is None): 642 if (duration is None) != (exit_code is None):
641 raise ValueError( 643 raise ValueError(
642 'had unexpected duration; expected iff a command completes\n' 644 'had unexpected duration; expected iff a command completes\n'
643 'duration: %r; exit: %r' % (duration, exit_code)) 645 'duration: %r; exit: %r' % (duration, exit_code))
644 if performance_stats and duration is None: 646 if performance_stats and duration is None:
645 raise ValueError( 647 raise ValueError(
646 'duration must be set when performance_stats is set\n' 648 'duration must be set when performance_stats is set\n'
647 'duration: %s; performance_stats: %s' % 649 'duration: %s; performance_stats: %s' %
648 (duration, performance_stats)) 650 (duration, performance_stats))
649 651
650 packed = task_pack.pack_run_result_key(run_result_key) 652 packed = task_pack.pack_run_result_key(run_result_key)
651 logging.debug( 653 logging.debug(
652 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', 654 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
653 packed, bot_id, len(output) if output else output, output_chunk_start, 655 packed, bot_id, len(output) if output else output, output_chunk_start,
654 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, 656 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref,
655 performance_stats) 657 cipd_pins, performance_stats)
656 658
657 result_summary_key = task_pack.run_result_key_to_result_summary_key( 659 result_summary_key = task_pack.run_result_key_to_result_summary_key(
658 run_result_key) 660 run_result_key)
659 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) 661 request_key = task_pack.result_summary_key_to_request_key(result_summary_key)
660 request_future = request_key.get_async() 662 request_future = request_key.get_async()
661 server_version = utils.get_app_version() 663 server_version = utils.get_app_version()
662 request = request_future.get_result() 664 request = request_future.get_result()
663 now = utils.utcnow() 665 now = utils.utcnow()
664 666
665 def run(): 667 def run():
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
701 result_summary_future.wait() 703 result_summary_future.wait()
702 return None, None, 'got 2 different durations; %s then %s' % ( 704 return None, None, 'got 2 different durations; %s then %s' % (
703 run_result.duration, duration) 705 run_result.duration, duration)
704 else: 706 else:
705 run_result.duration = duration 707 run_result.duration = duration
706 run_result.exit_code = exit_code 708 run_result.exit_code = exit_code
707 709
708 if outputs_ref: 710 if outputs_ref:
709 run_result.outputs_ref = outputs_ref 711 run_result.outputs_ref = outputs_ref
710 712
713 if cipd_pins:
714 run_result.cipd_pins = cipd_pins
715
711 if run_result.state in task_result.State.STATES_RUNNING: 716 if run_result.state in task_result.State.STATES_RUNNING:
712 if hard_timeout or io_timeout: 717 if hard_timeout or io_timeout:
713 run_result.state = task_result.State.TIMED_OUT 718 run_result.state = task_result.State.TIMED_OUT
714 run_result.completed_ts = now 719 run_result.completed_ts = now
715 elif run_result.exit_code is not None: 720 elif run_result.exit_code is not None:
716 run_result.state = task_result.State.COMPLETED 721 run_result.state = task_result.State.COMPLETED
717 run_result.completed_ts = now 722 run_result.completed_ts = now
718 723
719 run_result.signal_server_version(server_version) 724 run_result.signal_server_version(server_version)
725 run_result.validate(request)
720 to_put = [run_result] 726 to_put = [run_result]
721 if output: 727 if output:
722 # This does 1 multi GETs. This also modifies run_result in place. 728 # This does 1 multi GETs. This also modifies run_result in place.
723 to_put.extend(run_result.append_output(output, output_chunk_start or 0)) 729 to_put.extend(run_result.append_output(output, output_chunk_start or 0))
724 if performance_stats: 730 if performance_stats:
725 performance_stats.key = task_pack.run_result_key_to_performance_stats_key( 731 performance_stats.key = task_pack.run_result_key_to_performance_stats_key(
726 run_result.key) 732 run_result.key)
727 to_put.append(performance_stats) 733 to_put.append(performance_stats)
728 734
729 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) 735 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.)
730 run_result.modified_ts = now 736 run_result.modified_ts = now
731 737
732 result_summary = result_summary_future.get_result() 738 result_summary = result_summary_future.get_result()
733 if (result_summary.try_number and 739 if (result_summary.try_number and
734 result_summary.try_number > run_result.try_number): 740 result_summary.try_number > run_result.try_number):
735 # The situation where a shard is retried but the bot running the previous 741 # The situation where a shard is retried but the bot running the previous
736 # try somehow reappears and reports success, the result must still show 742 # try somehow reappears and reports success, the result must still show
737 # the last try's result. We still need to update cost_usd manually. 743 # the last try's result. We still need to update cost_usd manually.
738 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd 744 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd
739 result_summary.modified_ts = now 745 result_summary.modified_ts = now
740 else: 746 else:
741 result_summary.set_from_run_result(run_result, request) 747 result_summary.set_from_run_result(run_result, request)
742 748
749 result_summary.validate(request)
743 to_put.append(result_summary) 750 to_put.append(result_summary)
744 ndb.put_multi(to_put) 751 ndb.put_multi(to_put)
745 752
746 return result_summary, run_result, None 753 return result_summary, run_result, None
747 754
748 try: 755 try:
749 smry, run_result, error = datastore_utils.transaction(run) 756 smry, run_result, error = datastore_utils.transaction(run)
750 except datastore_utils.CommitError as e: 757 except datastore_utils.CommitError as e:
751 logging.info('Got commit error: %s', e) 758 logging.info('Got commit error: %s', e)
752 # It is important that the caller correctly surface this error. 759 # It is important that the caller correctly surface this error.
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after
946 ## Task queue tasks. 953 ## Task queue tasks.
947 954
948 955
949 def task_handle_pubsub_task(payload): 956 def task_handle_pubsub_task(payload):
950 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq."""
951 # Do not catch errors to trigger task queue task retry. Errors should not 958 # Do not catch errors to trigger task queue task retry. Errors should not
952 # happen in normal case. 959 # happen in normal case.
953 _pubsub_notify( 960 _pubsub_notify(
954 payload['task_id'], payload['topic'], 961 payload['task_id'], payload['topic'],
955 payload['auth_token'], payload['userdata']) 962 payload['auth_token'], payload['userdata'])
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_result_test.py ('k') | appengine/swarming/server/task_scheduler_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698