Index: appengine/cmd/dm/distributor/tq_handler.go |
diff --git a/appengine/cmd/dm/distributor/tq_handler.go b/appengine/cmd/dm/distributor/tq_handler.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..059a57369f6a43947dc56d3cd530acae1daa0956 |
--- /dev/null |
+++ b/appengine/cmd/dm/distributor/tq_handler.go |
@@ -0,0 +1,60 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package distributor |
+ |
+import ( |
+ "net/http" |
+ "net/url" |
+ "strings" |
+ |
+ "github.com/julienschmidt/httprouter" |
+ "github.com/luci/luci-go/appengine/tumble" |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+) |
+ |
+const handlerPattern = "/tq/distributor/:cfgName" |
+ |
+func handlerPath(cfgName string) string { |
+ return strings.Replace(handlerPattern, ":cfgName", url.QueryEscape(cfgName), 1) |
+} |
+ |
+// TaskqueueHandler is the http handler that routes taskqueue tasks made with |
+// Config.EnqueueTask to a distributor's HandleTaskQueueTask method. |
+// |
+// This requires that c already have a Registry installed via the WithRegistry |
+// method. |
+func TaskqueueHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { |
+ defer r.Body.Close() |
+ |
+ cfgName := p.ByName("cfgName") |
+ dist, _, err := GetRegistry(c).MakeDistributor(c, cfgName) |
+ if err != nil { |
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to make distributor") |
+ http.Error(rw, "bad distributor", http.StatusBadRequest) |
+ return |
+ } |
+ notifications, err := dist.HandleTaskQueueTask(r) |
+ if err != nil { |
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to handle taskqueue task") |
+ http.Error(rw, "failure to execute handler", http.StatusInternalServerError) |
+ return |
+ } |
+ if len(notifications) > 0 { |
+ muts := make([]tumble.Mutation, 0, len(notifications)) |
+ for _, notify := range notifications { |
+ if notify != nil { |
+ muts = append(muts, &NotifyExecution{cfgName, notify}) |
+ } |
+ } |
+ err = tumble.AddToJournal(c, muts...) |
+ if err != nil { |
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to handle notifications") |
+ http.Error(rw, "failure to handle notifications", http.StatusInternalServerError) |
+ return |
+ } |
+ } |
+ rw.WriteHeader(http.StatusOK) |
+} |