Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(429)

Side by Side Diff: appengine/cmd/dm/distributor/pubsub.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package distributor
6
7 import (
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "strconv"
12 "time"
13
14 "github.com/julienschmidt/httprouter"
15 "github.com/luci/luci-go/appengine/tumble"
16 "github.com/luci/luci-go/common/api/dm/service/v1"
17 "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/server/tokens"
19 "golang.org/x/net/context"
20 )
21
22 const notifyTopicSuffix = "dm-distributor-notify"
23
24 func pubsubTopic(c context.Context) string {
iannucci 2016/06/08 02:54:24 note that all of this is roughly untested. This wi
25 return fmt.Sprintf("projects/%s/topics/%s", getTrimmedAppID(c), notifyTo picSuffix)
dnj (Google) 2016/06/09 18:00:55 I would recommend using "common/gcloud/pubsub" her
iannucci 2016/06/15 00:46:00 Done.
26 }
27
28 // PubsubReciever is the http handler that processes incoming pubsub events
dnj (Google) 2016/06/09 18:00:55 nit: HTTP
iannucci 2016/06/15 00:46:00 Done.
29 // delivered to topics prepared with TaskDescription.PrepareTopic, and routes
30 // them to the appropriate distributor implementation's HandleNotification
31 // method.
32 //
33 // It requires that a Registry be installed in c via WithRegistry.
34 func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
dnj (Google) 2016/06/09 18:00:55 nit: it might be cleaner to wrap a more specific m
iannucci 2016/06/15 00:46:00 meh.... all this handler stuff is already function
35 defer r.Body.Close()
36
37 type PubsubMessage struct {
38 Attributes map[string]string `json:"attributes"`
39 Data []byte `json:"data"`
40 MessageID string `json:"message_id"`
41 }
42 type PubsubPushMessage struct {
43 Message PubsubMessage `json:"message"`
44 Subscription string `json:"subscription"`
45 }
46 psm := &PubsubPushMessage{}
47
48 err := json.NewDecoder(r.Body).Decode(psm)
dnj (Google) 2016/06/09 18:00:55 Why use JSON here and not protobuf?
iannucci 2016/06/15 00:46:00 this is just the pubsub message envelope thingy
49 if err != nil {
dnj (Google) 2016/06/09 18:00:55 nit: if err := json.NewDecoder...
iannucci 2016/06/15 00:46:00 Done.
50 logging.Fields{"error": err}.Errorf(c, "Failed to parse pubsub m essage")
51 http.Error(rw, "Failed to parse pubsub message", http.StatusInte rnalServerError)
52 return
53 }
54
55 eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_tok en"])
56 if err != nil {
57 logging.WithError(err).Errorf(c, "bad auth_token")
58 // Acknowledge this message, since it'll never be valid.
59 rw.WriteHeader(http.StatusNoContent)
60 return
61 }
62
63 // remove "auth_token" from Attributes to avoid having it pass to the
64 // distributor.
65 delete(psm.Message.Attributes, "auth_token")
66
67 err = tumble.RunMutation(c, &NotifyExecution{
68 cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attrib utes},
69 })
70 if err != nil {
71 // TODO(riannucci): distinguish between transient/non-transient failures.
72 logging.WithError(err).Errorf(c, "failed to NotifyExecution")
73 rw.WriteHeader(http.StatusInternalServerError)
74 return
75 }
76
77 rw.WriteHeader(http.StatusNoContent)
78 }
79
80 // pubsubAuthToken describes how to generate HMAC protected tokens used to
81 // authenticate PubSub messages.
82 var pubsubAuthToken = tokens.TokenKind{
83 Algo: tokens.TokenAlgoHmacSHA256,
84 Expiration: 48 * time.Hour,
85 SecretKey: "pubsub_auth_token",
86 Version: 1,
87 }
88
89 func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (s tring, error) {
90 return pubsubAuthToken.Generate(c, nil, map[string]string{
91 "quest": eid.Quest,
92 "attempt": strconv.FormatUint(uint64(eid.Attempt), 10),
93 "execution": strconv.FormatUint(uint64(eid.Id), 10),
94 "cfgName": cfgName,
95 }, 0)
96 }
97
98 func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) {
99 items, err := pubsubAuthToken.Validate(c, authToken, nil)
100 if err != nil {
101 return
102 }
103 quest, qok := items["quest"]
104 attempt, aok := items["attempt"]
105 execution, eok := items["execution"]
106 if !qok || !aok || !eok {
107 err = fmt.Errorf("missing keys: %v", items)
108 return
109 }
110 attemptNum, err := strconv.ParseUint(attempt, 10, 32)
111 if err != nil {
112 return
113 }
114 executionNum, err := strconv.ParseUint(execution, 10, 32)
115 if err != nil {
116 return
117 }
118 eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum))
119
120 cfgName, ok := items["cfgName"]
121 if !ok {
122 err = fmt.Errorf("missing config name")
123 }
124
125 return
126 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698