Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: appengine/cmd/dm/distributor/pubsub.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package distributor
6
7 import (
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "strconv"
12 "time"
13
14 "github.com/julienschmidt/httprouter"
15 "github.com/luci/luci-go/appengine/tumble"
16 "github.com/luci/luci-go/common/api/dm/service/v1"
17 "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/server/tokens"
19 "golang.org/x/net/context"
20 )
21
22 const notifyTopicSuffix = "dm-distributor-notify"
23
24 // PubsubReciever is the HTTP handler that processes incoming pubsub events
25 // delivered to topics prepared with TaskDescription.PrepareTopic, and routes
26 // them to the appropriate distributor implementation's HandleNotification
27 // method.
28 //
29 // It requires that a Registry be installed in c via WithRegistry.
30 func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
31 defer r.Body.Close()
32
33 type PubsubMessage struct {
34 Attributes map[string]string `json:"attributes"`
35 Data []byte `json:"data"`
36 MessageID string `json:"message_id"`
37 }
38 type PubsubPushMessage struct {
39 Message PubsubMessage `json:"message"`
40 Subscription string `json:"subscription"`
41 }
42 psm := &PubsubPushMessage{}
43
44 if err := json.NewDecoder(r.Body).Decode(psm); err != nil {
45 logging.WithError(err).Errorf(c, "Failed to parse pubsub message ")
46 http.Error(rw, "Failed to parse pubsub message", http.StatusInte rnalServerError)
47 return
48 }
49
50 eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_tok en"])
51 if err != nil {
52 logging.WithError(err).Errorf(c, "bad auth_token")
53 // Acknowledge this message, since it'll never be valid.
54 rw.WriteHeader(http.StatusNoContent)
55 return
56 }
57
58 // remove "auth_token" from Attributes to avoid having it pass to the
59 // distributor.
60 delete(psm.Message.Attributes, "auth_token")
61
62 err = tumble.RunMutation(c, &NotifyExecution{
63 cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attrib utes},
64 })
65 if err != nil {
66 // TODO(riannucci): distinguish between transient/non-transient failures.
67 logging.WithError(err).Errorf(c, "failed to NotifyExecution")
68 rw.WriteHeader(http.StatusInternalServerError)
69 return
70 }
71
72 rw.WriteHeader(http.StatusNoContent)
73 }
74
75 // pubsubAuthToken describes how to generate HMAC protected tokens used to
76 // authenticate PubSub messages.
77 var pubsubAuthToken = tokens.TokenKind{
78 Algo: tokens.TokenAlgoHmacSHA256,
79 Expiration: 48 * time.Hour,
80 SecretKey: "pubsub_auth_token",
81 Version: 1,
82 }
83
84 func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (s tring, error) {
85 return pubsubAuthToken.Generate(c, nil, map[string]string{
86 "quest": eid.Quest,
87 "attempt": strconv.FormatUint(uint64(eid.Attempt), 10),
88 "execution": strconv.FormatUint(uint64(eid.Id), 10),
89 "cfgName": cfgName,
90 }, 0)
91 }
92
93 func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) {
94 items, err := pubsubAuthToken.Validate(c, authToken, nil)
95 if err != nil {
96 return
97 }
98 quest, qok := items["quest"]
99 attempt, aok := items["attempt"]
100 execution, eok := items["execution"]
101 if !qok || !aok || !eok {
102 err = fmt.Errorf("missing keys: %v", items)
103 return
104 }
105 attemptNum, err := strconv.ParseUint(attempt, 10, 32)
106 if err != nil {
107 return
108 }
109 executionNum, err := strconv.ParseUint(execution, 10, 32)
110 if err != nil {
111 return
112 }
113 eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum))
114
115 cfgName, ok := items["cfgName"]
116 if !ok {
117 err = fmt.Errorf("missing config name")
118 }
119
120 return
121 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/distributor/notify_execution.go ('k') | appengine/cmd/dm/distributor/registry.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698