Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(860)

Unified Diff: appengine/cmd/dm/distributor/distributor.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/distributor/config.go ('k') | appengine/cmd/dm/distributor/fake/fake.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/distributor/distributor.go
diff --git a/appengine/cmd/dm/distributor/distributor.go b/appengine/cmd/dm/distributor/distributor.go
new file mode 100644
index 0000000000000000000000000000000000000000..d801d05640da0bd10ed1a033d07241dcccc7f3e9
--- /dev/null
+++ b/appengine/cmd/dm/distributor/distributor.go
@@ -0,0 +1,161 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+// Package distributor contains all the adaptors for the various supported
+// distributor protocols. At a high level, it works like this:
+// * Quests specify a distributor configuration by name as part of their
+// identity.
+// * When an Execution for that Quest NeedsExecution, DM reads configuration
+// (distributor.proto) from luci-config. This configuration is stored
+// as part of the Execution so that for the duration of a given Exectuion,
+// DM always interacts with the same distributor in the same way (barring
+// code changes in DM's adapter logic itself).
+// * DM uses the selected distributor implementation to start a task and
+// record its Token. Additionally, the distributor MUST subscribe to publish
+// on DM's pubsub topic for updates. When publishing updates, the
+// distributor MUST include 2 attributes (execution_id, pubsub_key), which
+// are provided as part of TaskDescription.
+// * When DM gets a hit on pubsub, it will load the Execution, load its cached
+// distributor configuration, and then call HandleNotification for the
+// adapter to parse the notification body and return the state of the task.
+//
+// Adding a new distributor requires:
+// * Add a new subdir of protos with the configuration proto for the new
+// distributor. Each distributor implementation must have its own unique
+// Config message.
+// * Add a matching subdir of this package for the implementation of the
+// distributor.
+// * In the implementation, add a Register method that registers the
+// implementation with this package appropriately.
+// * In the DM frontend, import your new package implementation and run its
+// Register method.
+package distributor
+
+import (
+ "net/http"
+ "time"
+
+ dm "github.com/luci/luci-go/common/api/dm/service/v1"
+
+ "golang.org/x/net/context"
+)
+
+// Token is an opaque token that a distributor should use to
+// uniquely identify a single DM execution.
+type Token string
+
+// The PersistentState token for the job. For a given Attempt, this will be
+// retrieved from Finished executions and then passed to new Executions.
+type PersistentState []byte
+
+// TaskResult is the conclusion of the task. One of the two fields may be
+// populated.
+type TaskResult struct {
+ // PersistentState if AbnormalFinish is nil. This indicates that the Execution
+ // is counted as FINISHED, with this value as its PersistentState.
+ PersistentState PersistentState
+ // If this is !nil, the Execution is counted as ABNORMAL_FINISHED, with this
+ // value as the result.
+ AbnormalFinish *dm.AbnormalFinish
+}
+
+// Notification represents a notification from the distributor to DM that
+// a particular execution has a status update. Data and Attrs are interpreted
+// purely by the distributor implementation.
+type Notification struct {
+ ID *dm.Execution_ID
+ Data []byte
+ Attrs map[string]string
+}
+
+// D is the interface for all distributor implementations.
+//
+// Retries
+//
+// Unless otherwise noted, DM will retry methods here if they return an error
+// marked as Transient, up to some internal limit. If they return
+// a non-Transient error (or nil) DM will make a best effort not to duplicate
+// calls, but it can't guarantee that.
+type D interface {
+ // Run prepares and runs a new Task from the given TaskDescription.
+ //
+ // Scheduling the same TaskDescription multiple times SHOULD return the same
+ // Token. It's OK if this doesn't happen, but only one of the scheduled tasks
+ // will be able to invoke ActivateExecution; the other one(s) will
+ // early-abort and/or timeout.
+ //
+ // If this returns a non-Transient error, the Execution will be marked as
+ // Rejected with the returned error message as the 'Reason'.
+ //
+ // The various time durations, if non-zero, will be used verbatim for DM to
+ // timeout that phase of the task's execution. If the task's execution times
+ // out in the 'STOPPING' phase, DM will poll the distributor's GetStatus
+ // method up to 3 times with a 30-second gap to attempt to retrieve the final
+ // information. After more than 3 times, DM will give up and mark the task as
+ // expired.
+ //
+ // If the distributor doesn't intend to use Pubsub for notifying DM about the
+ // final status of the job, set timeToStop to the amount of time you want DM
+ // to wait before polling GetStatus. e.g. if after calling FinishAttempt or
+ // EnsureGraphData your distributor needs 10 seconds before it can correctly
+ // respond to a GetStatus request, you should set timeToStop to >= 10s.
+ // Otherwise timeToStop should be set fairly high (e.g. 12 hours) as a hedge
+ // against a broken pubsub notification pipeline.
+ //
+ // If you have the choice between pubsub or not, prefer to use pubsub as it
+ // allows DM to more proactively update the graph state (and unblock waiting
+ // Attempts, etc.)
+ Run(*TaskDescription) (tok Token, timeToStart, timeToRun, timeToStop time.Duration, err error)
+
+ // Cancel attempts to cancel a running task. If a task is canceled more than
+ // once, this should return nil.
+ Cancel(Token) error
+
+ // GetStatus retrieves the current state of the task from the distributor.
+ //
+ // If this returns a non-Transient error more than 30 seconds after the task
+ // was Run(), the execution will be marked Missing with the returned error
+ // message as the 'Reason'. If it returns a non-Transient error within 30
+ // seconds of being run, DM will automatically treat that as Transient.
+ GetStatus(Token) (*TaskResult, error)
+
+ // InfoURL calculates a user-presentable information url for the task
+ // identified by Token. This should be a local operation, so it is not the
+ // implementation's responsibility to validate the token in this method (e.g.
+ // it could point to a non-existant job, etc.)
+ InfoURL(Token) string
+
+ // HandleNotification is called whenever DM receives a PubSub message sent to
+ // a topic created with TaskDescription.PrepareTopic. The Attrs map will omit
+ // the 'auth_token' field.
+ //
+ // Returning (nil, nil) will indicate that DM should ignore this notification.
+ //
+ // DM will convert pubsub Messages to a delayed GetStatus if a pubsub message
+ // is delivered which refers to an Attempt whose status is NeedsExecution,
+ // which could happen in the event of a not-fully-settled transacion.
+ //
+ // DM will ignore any notifications for executions which it doesn't know
+ // about.
+ HandleNotification(notification *Notification) (*TaskResult, error)
+
+ // HandleTaskQueueTask is called if the distributor used Config.EnqueueTask.
+ //
+ // It may return zero or more Notifications for DM about arbitrary Executions.
+ // These notifications will be handled 'later' by the HandleNotification
+ // implementation.
+ HandleTaskQueueTask(r *http.Request) ([]*Notification, error)
+
+ // Validate should return a non-nil error if the given payload is not
+ // appropriate for this Distributor. Payload is guaranteed to be a valid
+ // JSON object. This should validate that the content of that JSON object is
+ // what the distributor expects.
+ Validate(payload string) error
+}
+
+// Factory is a function which produces new distributor instance with the
+// provided configuration proto.
+//
+// c is guaranteed to be non-transactional.
+type Factory func(c context.Context, dist *Config) (D, error)
« no previous file with comments | « appengine/cmd/dm/distributor/config.go ('k') | appengine/cmd/dm/distributor/fake/fake.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698