OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package distributor | |
6 | |
7 import ( | |
8 "encoding/json" | |
9 "fmt" | |
10 "net/http" | |
11 "strconv" | |
12 "time" | |
13 | |
14 "github.com/julienschmidt/httprouter" | |
15 "github.com/luci/luci-go/appengine/tumble" | |
16 "github.com/luci/luci-go/common/api/dm/service/v1" | |
17 "github.com/luci/luci-go/common/logging" | |
18 "github.com/luci/luci-go/server/tokens" | |
19 "golang.org/x/net/context" | |
20 ) | |
21 | |
22 const notifyTopicSuffix = "dm-distributor-notify" | |
23 | |
24 func pubsubTopic(c context.Context) string { | |
iannucci
2016/06/08 02:54:24
note that all of this is roughly untested. This wi
| |
25 return fmt.Sprintf("projects/%s/topics/%s", getTrimmedAppID(c), notifyTo picSuffix) | |
dnj (Google)
2016/06/09 18:00:55
I would recommend using "common/gcloud/pubsub" her
iannucci
2016/06/15 00:46:00
Done.
| |
26 } | |
27 | |
28 // PubsubReciever is the http handler that processes incoming pubsub events | |
dnj (Google)
2016/06/09 18:00:55
nit: HTTP
iannucci
2016/06/15 00:46:00
Done.
| |
29 // delivered to topics prepared with TaskDescription.PrepareTopic, and routes | |
30 // them to the appropriate distributor implementation's HandleNotification | |
31 // method. | |
32 // | |
33 // It requires that a Registry be installed in c via WithRegistry. | |
34 func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { | |
dnj (Google)
2016/06/09 18:00:55
nit: it might be cleaner to wrap a more specific m
iannucci
2016/06/15 00:46:00
meh.... all this handler stuff is already function
| |
35 defer r.Body.Close() | |
36 | |
37 type PubsubMessage struct { | |
38 Attributes map[string]string `json:"attributes"` | |
39 Data []byte `json:"data"` | |
40 MessageID string `json:"message_id"` | |
41 } | |
42 type PubsubPushMessage struct { | |
43 Message PubsubMessage `json:"message"` | |
44 Subscription string `json:"subscription"` | |
45 } | |
46 psm := &PubsubPushMessage{} | |
47 | |
48 err := json.NewDecoder(r.Body).Decode(psm) | |
dnj (Google)
2016/06/09 18:00:55
Why use JSON here and not protobuf?
iannucci
2016/06/15 00:46:00
this is just the pubsub message envelope thingy
| |
49 if err != nil { | |
dnj (Google)
2016/06/09 18:00:55
nit: if err := json.NewDecoder...
iannucci
2016/06/15 00:46:00
Done.
| |
50 logging.Fields{"error": err}.Errorf(c, "Failed to parse pubsub m essage") | |
51 http.Error(rw, "Failed to parse pubsub message", http.StatusInte rnalServerError) | |
52 return | |
53 } | |
54 | |
55 eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_tok en"]) | |
56 if err != nil { | |
57 logging.WithError(err).Errorf(c, "bad auth_token") | |
58 // Acknowledge this message, since it'll never be valid. | |
59 rw.WriteHeader(http.StatusNoContent) | |
60 return | |
61 } | |
62 | |
63 // remove "auth_token" from Attributes to avoid having it pass to the | |
64 // distributor. | |
65 delete(psm.Message.Attributes, "auth_token") | |
66 | |
67 err = tumble.RunMutation(c, &NotifyExecution{ | |
68 cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attrib utes}, | |
69 }) | |
70 if err != nil { | |
71 // TODO(riannucci): distinguish between transient/non-transient failures. | |
72 logging.WithError(err).Errorf(c, "failed to NotifyExecution") | |
73 rw.WriteHeader(http.StatusInternalServerError) | |
74 return | |
75 } | |
76 | |
77 rw.WriteHeader(http.StatusNoContent) | |
78 } | |
79 | |
80 // pubsubAuthToken describes how to generate HMAC protected tokens used to | |
81 // authenticate PubSub messages. | |
82 var pubsubAuthToken = tokens.TokenKind{ | |
83 Algo: tokens.TokenAlgoHmacSHA256, | |
84 Expiration: 48 * time.Hour, | |
85 SecretKey: "pubsub_auth_token", | |
86 Version: 1, | |
87 } | |
88 | |
89 func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (s tring, error) { | |
90 return pubsubAuthToken.Generate(c, nil, map[string]string{ | |
91 "quest": eid.Quest, | |
92 "attempt": strconv.FormatUint(uint64(eid.Attempt), 10), | |
93 "execution": strconv.FormatUint(uint64(eid.Id), 10), | |
94 "cfgName": cfgName, | |
95 }, 0) | |
96 } | |
97 | |
98 func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) { | |
99 items, err := pubsubAuthToken.Validate(c, authToken, nil) | |
100 if err != nil { | |
101 return | |
102 } | |
103 quest, qok := items["quest"] | |
104 attempt, aok := items["attempt"] | |
105 execution, eok := items["execution"] | |
106 if !qok || !aok || !eok { | |
107 err = fmt.Errorf("missing keys: %v", items) | |
108 return | |
109 } | |
110 attemptNum, err := strconv.ParseUint(attempt, 10, 32) | |
111 if err != nil { | |
112 return | |
113 } | |
114 executionNum, err := strconv.ParseUint(execution, 10, 32) | |
115 if err != nil { | |
116 return | |
117 } | |
118 eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum)) | |
119 | |
120 cfgName, ok := items["cfgName"] | |
121 if !ok { | |
122 err = fmt.Errorf("missing config name") | |
123 } | |
124 | |
125 return | |
126 } | |
OLD | NEW |