Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Unified Diff: scheduler/appengine/engine/tq/tq.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: comment nit Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/engine/internal/tq_tasks.pb.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/tq/tq.go
diff --git a/scheduler/appengine/engine/tq/tq.go b/scheduler/appengine/engine/tq/tq.go
new file mode 100644
index 0000000000000000000000000000000000000000..3c14543350e2e04c244b871b98729c7ce8fdb703
--- /dev/null
+++ b/scheduler/appengine/engine/tq/tq.go
@@ -0,0 +1,414 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package tq implements simple routing layer for task queue tasks.
tandrii(chromium) 2017/07/17 09:54:30 once this is known to work (ie with tests :P), per
Vadim Sh. 2017/07/23 19:58:02 Maybe, if it's usable to anyone else.
+package tq
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/service/taskqueue"
+
+ "github.com/luci/luci-go/appengine/gaemiddleware"
+ "github.com/luci/luci-go/common/data/stringset"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry/transient"
+ "github.com/luci/luci-go/server/router"
+)
+
+// Dispatcher submits and handles task queue tasks.
+type Dispatcher struct {
+ BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default
+
+ mu sync.RWMutex
+ handlers map[string]handler // the key is proto message type name
+}
+
+// Task contains task body and additional parameters that influence how it is
+// routed.
+type Task struct {
+ // Payload is task's payload as well as indicator of its type.
+ //
+ // Tasks are routed based on type of the payload message, see RegisterTask.
+ Payload proto.Message
+
+ // DeduplicationKey is optional unique key of the task.
+ //
+ // If a task with given key has already been enqueued recently, this task
+ // will be silently ignored.
+ //
+ // Such tasks can only be used outside of transactions.
+ DeduplicationKey string
+
+ // Title is optional string that identifies the task in HTTP logs.
+ //
+ // It will show up as a suffix in task handler URL. It exists exclusively to
+ // simplify reading HTTP logs. It serves no other purpose! In particular,
+ // it is NOT a task name.
+ //
+ // Handlers won't ever see it. Pass all information through the task body.
+ Title string
+
+ // Delay specifies the duration the task queue service must wait before
+ // executing the task.
+ //
+ // Either Delay or ETA may be set, but not both.
+ Delay time.Duration
+
+ // ETA specifies the earliest time a task may be executed.
+ //
+ // Either Delay or ETA may be set, but not both.
+ ETA time.Time
+
+ // Retry options for this task.
+ //
+ // If given, overrides default options set when this task was registered.
+ RetryOptions *taskqueue.RetryOptions
+}
+
+// Handler is called to handle one enqueued task.
+//
+// The passed context is produced by a middleware chain installed with
+// InstallHandlers.
+//
+// execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
+// 1 on first execution attempt, 2 on a retry, and so on.
+//
+// May return transient errors. In this case, task queue may attempt to
+// redeliver the task (depending on RetryOptions).
+//
+// A fatal error (or success) mark the task as "done", it won't be retried.
+type Handler func(c context.Context, payload proto.Message, execCount int) error
+
+// RegisterTask tells the dispatcher that tasks of given proto type should be
+// handled by the given handler and routed through the given task queue.
+//
+// 'prototype' should be a pointer to some concrete proto message. It will be
+// used only for its type signature.
+//
+// Intended to be called during process startup. Panics if such message has
+// already been registered.
+func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue string, opts *taskqueue.RetryOptions) {
+ if queue == "" {
+ queue = "default" // default GAE task queue name, always exists
+ }
+
+ name := proto.MessageName(prototype)
+ if name == "" {
+ panic(fmt.Sprintf("unregistered proto message type %T", prototype))
+ }
+
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if _, ok := d.handlers[name]; ok {
+ panic(fmt.Sprintf("handler for %q has already been registered", name))
+ }
+
+ if d.handlers == nil {
+ d.handlers = make(map[string]handler)
+ }
+
+ d.handlers[name] = handler{
+ cb: cb,
+ queue: queue,
+ title: name,
+ retryOpts: opts,
+ }
+}
+
+// AddTask submits the given task to an appropriate task queue.
+//
+// It means, add some later time in some other GAE process, the callback
tandrii(chromium) 2017/07/17 09:54:30 s/add/at
Vadim Sh. 2017/07/23 19:58:02 Done.
+// registered as a handler for corresponding proto type will be called.
+//
+// If the given context is transactional, inherits the transaction.
+//
+// May return transient errors.
+func (d *Dispatcher) AddTask(c context.Context, task *Task) error {
+ // Note: we don't reuse AddTasks here do avoid overhead on additional
+ // structures and goroutines it is using.
+ t, queue, err := d.tqTask(task)
+ if err != nil {
+ return err
+ }
+ if err := taskqueue.Add(c, queue, t); err != nil {
+ if err == taskqueue.ErrTaskAlreadyAdded {
+ return nil
+ }
+ return transient.Tag.Apply(err)
+ }
+ return nil
+}
+
+// AddTasks is batch variant of AddTask.
+//
+// Note that it is not atomic outside of transactions. Returns an error if
+// at least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+func (d *Dispatcher) AddTasks(c context.Context, tasks []*Task) error {
+ if len(tasks) == 0 {
+ return nil
+ }
+
+ perQueue := map[string][]*taskqueue.Task{}
+ for _, task := range tasks {
+ t, queue, err := d.tqTask(task)
+ if err != nil {
+ return err
+ }
+ perQueue[queue] = append(perQueue[queue], t)
+ }
+
+ // Enqueue in parallel, per-queue, split into batches based on Task Queue
+ // RPC limits (100 tasks per batch).
+ errs := make(chan error)
+ ops := 0
+ for q, tasks := range perQueue {
+ for len(tasks) > 0 {
+ count := 100
+ if count > len(tasks) {
+ count = len(tasks)
+ }
+ go func(q string, batch []*taskqueue.Task) {
+ errs <- taskqueue.Add(c, q, batch...)
+ }(q, tasks[:count])
+ tasks = tasks[count:]
+ ops++
+ }
+ }
+
+ // Gather all errors throwing away ErrTaskAlreadyAdded.
+ var all errors.MultiError
+ for i := 0; i < ops; i++ {
+ err := <-errs
tandrii(chromium) 2017/07/17 09:54:30 for below lines: this really asks for MultiError.A
Vadim Sh. 2017/07/23 19:58:02 I don't like bloating interface of already quite c
+ if merr, yep := err.(errors.MultiError); yep {
tandrii(chromium) 2017/07/17 09:54:30 yep :)
+ for _, e := range merr {
+ if e != taskqueue.ErrTaskAlreadyAdded {
+ all = append(all, e)
+ }
+ }
+ } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
+ all = append(all, err)
+ }
+ }
+
+ if len(all) == 0 {
+ return nil
+ }
+
+ return transient.Tag.Apply(all)
+}
+
+// InstallRoutes installs appropriate HTTP routes in the router.
+//
+// Must be called only after all task handlers are registered!
+func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
+ queues := stringset.New(0)
+
+ d.mu.RLock()
+ for _, h := range d.handlers {
+ queues.Add(h.queue)
+ }
+ d.mu.RUnlock()
+
+ for _, q := range queues.ToSlice() {
+ r.POST(
+ fmt.Sprintf("%s%s/*title", d.baseURL(), q),
+ mw.Extend(gaemiddleware.RequireTaskQueue(q)),
+ d.processHTTPRequest)
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+type handler struct {
+ cb Handler // the actual handler
+ queue string // name of the task queue
+ title string // default task title
+ retryOpts *taskqueue.RetryOptions // default retry options
+}
+
+// tqTask constructs task queue task struct.
+func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) {
+ handler, err := d.handler(task.Payload)
+ if err != nil {
+ return nil, "", err
+ }
+
+ blob, err := serializePayload(task.Payload)
+ if err != nil {
+ return nil, "", err
+ }
+
+ title := handler.title
+ if task.Title != "" {
+ title = task.Title
+ }
+
+ retryOpts := handler.retryOpts
+ if task.RetryOptions != nil {
+ retryOpts = task.RetryOptions
+ }
+
+ // There's some weird restrictions on what characters are allowed inside task
+ // names. Lexicographically close names also cause hot spot issues in the
+ // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
+ // for task names.
+ name := ""
+ if task.DeduplicationKey != "" {
+ h := sha256.New()
+ h.Write([]byte(task.DeduplicationKey))
+ name = hex.EncodeToString(h.Sum(nil))
+ }
+
+ return &taskqueue.Task{
+ Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
+ Name: name,
+ Method: "POST",
+ Payload: blob,
+ ETA: task.ETA,
+ Delay: task.Delay,
+ RetryOptions: retryOpts,
+ }, handler.queue, nil
+}
+
+// baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
+//
+// It ends with '/'.
+func (d *Dispatcher) baseURL() string {
+ switch {
+ case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"):
+ return d.BaseURL
+ case d.BaseURL != "":
+ return d.BaseURL + "/"
+ default:
+ return "/internal/tasks/"
+ }
+}
+
+// handler returns a handler struct registered with Register.
+func (d *Dispatcher) handler(payload proto.Message) (handler, error) {
+ name := proto.MessageName(payload)
+
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ handler, registered := d.handlers[name]
+ if !registered {
+ return handler, fmt.Errorf("handler for %q is not registered", name)
+ }
+ return handler, nil
+}
+
+// processHTTPRequest is invoked on each HTTP POST.
+//
+// It deserializes the task and invokes an appropriate callback.
+func (d *Dispatcher) processHTTPRequest(c *router.Context) {
+ body, err := ioutil.ReadAll(c.Request.Body)
+ if err != nil {
+ httpReply(c, false, 500, "Failed to read request body: %s", err)
+ return
+ }
+ logging.Debugf(c.Context, "Received task: %s", body)
+
+ payload, err := deserializePayload(body)
+ if err != nil {
+ httpReply(c, false, 400, "Bad payload, can't deserialize: %s", err)
+ return
+ }
+
+ h, err := d.handler(payload)
+ if err != nil {
+ httpReply(c, false, 404, "Bad task: %s", err)
+ return
+ }
+
+ execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecutionCount"))
+ switch err = h.cb(c.Context, payload, execCount); {
+ case err == nil:
+ httpReply(c, true, 200, "OK")
+ case transient.Tag.In(err):
+ httpReply(c, false, 500, "Transient error: %s", err)
+ default:
+ httpReply(c, false, 200, "Fatal error: %s", err) // return 200 to stop retries
tandrii(chromium) 2017/07/17 09:54:30 so, are you saying that all our transient errors a
Vadim Sh. 2017/07/23 19:58:02 They should be when using this package. It's part
tandrii(chromium) 2017/07/26 09:51:34 Acknowledged.
+ }
+}
+
+func httpReply(c *router.Context, ok bool, code int, msg string, args ...interface{}) {
+ body := fmt.Sprintf(msg, args...)
+ if !ok {
+ logging.Errorf(c.Context, "%s", body)
+ }
+ http.Error(c.Writer, body, code)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+var marshaller = jsonpb.Marshaler{}
+
+type envelope struct {
+ Type string `json:"type"`
+ Body *json.RawMessage `json:"body"`
+}
+
+func serializePayload(task proto.Message) ([]byte, error) {
+ var buf bytes.Buffer
+ if err := marshaller.Marshal(&buf, task); err != nil {
+ return nil, err
+ }
+ raw := json.RawMessage(buf.Bytes())
+ return json.Marshal(envelope{
+ Type: proto.MessageName(task),
+ Body: &raw,
+ })
+}
+
+func deserializePayload(blob []byte) (proto.Message, error) {
+ env := envelope{}
+ if err := json.Unmarshal(blob, &env); err != nil {
+ return nil, err
+ }
+
+ tp := proto.MessageType(env.Type) // this is **ConcreteStruct{}
+ if tp == nil {
+ return nil, fmt.Errorf("unregistered proto message name %q", env.Type)
+ }
+ if env.Body == nil {
+ return nil, fmt.Errorf("no task body given")
+ }
+
+ task := reflect.New(tp.Elem()).Interface().(proto.Message)
+ if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
+ return nil, err
+ }
+
+ return task, nil
+}
« no previous file with comments | « scheduler/appengine/engine/internal/tq_tasks.pb.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698