Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(244)

Side by Side Diff: server/cmd/logdog_archivist/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/proto/logdog/svcconfig/config.pb.go ('k') | server/cmd/logdog_archivist/task.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "io"
9 "time"
10
8 "github.com/luci/luci-go/common/auth" 11 "github.com/luci/luci-go/common/auth"
9 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
10 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
11 "github.com/luci/luci-go/common/gcloud/gs" 14 "github.com/luci/luci-go/common/gcloud/gs"
15 "github.com/luci/luci-go/common/gcloud/pubsub"
12 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/parallel"
13 "github.com/luci/luci-go/server/internal/logdog/archivist" 18 "github.com/luci/luci-go/server/internal/logdog/archivist"
14 "github.com/luci/luci-go/server/internal/logdog/service" 19 "github.com/luci/luci-go/server/internal/logdog/service"
15 "github.com/luci/luci-go/server/taskqueueClient"
16 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 "google.golang.org/cloud"
22 gcps "google.golang.org/cloud/pubsub"
17 ) 23 )
18 24
19 var ( 25 var (
20 errInvalidConfig = errors.New("invalid configuration") 26 errInvalidConfig = errors.New("invalid configuration")
21 ) 27 )
22 28
29 const (
30 // subscriptionErrorDelay is the amount of time to sleep after a subscri ption
31 // iterator returns a non-terminal error.
32 subscriptionErrorDelay = 10 * time.Second
33 )
34
23 // application is the Archivist application state. 35 // application is the Archivist application state.
24 type application struct { 36 type application struct {
25 service.Service 37 service.Service
26 } 38 }
27 39
28 // run is the main execution function. 40 // run is the main execution function.
29 func (a *application) runArchivist(c context.Context) error { 41 func (a *application) runArchivist(c context.Context) error {
30 cfg := a.Config() 42 cfg := a.Config()
31 43
32 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() 44 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist()
33 switch { 45 switch {
34 case coordCfg == nil: 46 case coordCfg == nil:
35 fallthrough 47 fallthrough
36 case coordCfg.Project == "":
37 return errors.New("missing coordinator project name")
38 case coordCfg.ArchiveTaskQueue == "":
39 return errors.New("missing archive task queue name")
40 48
41 case acfg == nil: 49 case acfg == nil:
42 return errors.New("missing Archivist configuration") 50 return errors.New("missing Archivist configuration")
43 case acfg.GsBase == "": 51 case acfg.GsBase == "":
44 return errors.New("missing archive GS bucket") 52 return errors.New("missing archive GS bucket")
53 case acfg.GsStagingBase == "":
54 return errors.New("missing archive staging GS bucket")
45 } 55 }
46 56
47 » // Construct and validate our GS base. 57 » // Construct and validate our GS bases.
48 gsBase := gs.Path(acfg.GsBase) 58 gsBase := gs.Path(acfg.GsBase)
49 if gsBase.Bucket() == "" { 59 if gsBase.Bucket() == "" {
50 log.Fields{ 60 log.Fields{
51 » » » "gsBase": acfg.GsBase, 61 » » » "value": gsBase,
52 }.Errorf(c, "Google Storage base does not include a bucket name. ") 62 }.Errorf(c, "Google Storage base does not include a bucket name. ")
53 return errors.New("invalid Google Storage base") 63 return errors.New("invalid Google Storage base")
54 } 64 }
55 65
56 » // Initialize task queue client. 66 » gsStagingBase := gs.Path(acfg.GsStagingBase)
57 » tqClient, err := a.AuthenticatedClient(func(o *auth.Options) { 67 » if gsStagingBase.Bucket() == "" {
58 » » o.Scopes = taskqueueClient.Scopes 68 » » log.Fields{
69 » » » "value": gsStagingBase,
70 » » }.Errorf(c, "Google Storage staging base does not include a buck et name.")
71 » » return errors.New("invalid Google Storage staging base")
72 » }
73
74 » // Initialize Pub/Sub client.
75 » //
76 » // We will initialize both an authenticated Client instance and an
77 » // authenticated Context, since we need the latter for raw ACK deadline
78 » // updates.
79 » taskSub := pubsub.Subscription(acfg.Subscription)
80 » if err := taskSub.Validate(); err != nil {
81 » » log.Fields{
82 » » » log.ErrorKey: err,
83 » » » "value": taskSub,
84 » » }.Errorf(c, "Task subscription did not validate.")
85 » » return errors.New("invalid task subscription name")
86 » }
87 » psProject, psSubscriptionName := taskSub.Split()
88
89 » psAuth, err := a.Authenticator(func(o *auth.Options) {
90 » » o.Scopes = pubsub.SubscriberScopes
59 }) 91 })
60 if err != nil { 92 if err != nil {
61 » » log.WithError(err).Errorf(c, "Failed to get task queue client.") 93 » » log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato r.")
62 return err 94 return err
63 } 95 }
64 96
97 // Pub/Sub: HTTP Client => Context
98 psHTTPClient, err := psAuth.Client()
99 if err != nil {
100 log.WithError(err).Errorf(c, "Failed to create authenticated Pub /Sub transport.")
101 return err
102 }
103 psContext := cloud.WithContext(c, psProject, psHTTPClient)
104
105 // Pub/Sub: TokenSource => Client
106 psClient, err := gcps.NewClient(c, psProject, cloud.WithTokenSource(psAu th.TokenSource()))
107 if err != nil {
108 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
109 return err
110 }
111 sub := psClient.Subscription(psSubscriptionName)
112
65 // Initialize our Storage. 113 // Initialize our Storage.
66 » s, err := a.IntermediateStorage(c) 114 » st, err := a.IntermediateStorage(c)
67 if err != nil { 115 if err != nil {
68 log.WithError(err).Errorf(c, "Failed to get storage instance.") 116 log.WithError(err).Errorf(c, "Failed to get storage instance.")
69 return err 117 return err
70 } 118 }
71 » defer s.Close() 119 » defer st.Close()
72 120
73 // Initialize our Google Storage client. 121 // Initialize our Google Storage client.
74 gsClient, err := a.GSClient(c) 122 gsClient, err := a.GSClient(c)
75 if err != nil { 123 if err != nil {
76 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") 124 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.")
77 return err 125 return err
78 } 126 }
79 defer gsClient.Close() 127 defer gsClient.Close()
80 128
81 // Application shutdown will now operate by cancelling the Collector's
82 // shutdown Context.
83 shutdownCtx, shutdownFunc := context.WithCancel(c)
84 a.SetShutdownFunc(shutdownFunc)
85
86 ar := archivist.Archivist{ 129 ar := archivist.Archivist{
87 Service: a.Coordinator(), 130 Service: a.Coordinator(),
88 » » Storage: s, 131 » » Storage: st,
89 GSClient: gsClient, 132 GSClient: gsClient,
90 133
91 GSBase: gsBase, 134 GSBase: gsBase,
135 GSStagingBase: gsStagingBase,
92 StreamIndexRange: int(acfg.StreamIndexRange), 136 StreamIndexRange: int(acfg.StreamIndexRange),
93 PrefixIndexRange: int(acfg.PrefixIndexRange), 137 PrefixIndexRange: int(acfg.PrefixIndexRange),
94 ByteRange: int(acfg.ByteRange), 138 ByteRange: int(acfg.ByteRange),
95 } 139 }
96 140
97 » tqOpts := taskqueueClient.Options{ 141 » tasks := int(acfg.Tasks)
98 » » Project: coordCfg.Project, 142 » if tasks <= 0 {
99 » » Queue: coordCfg.ArchiveTaskQueue, 143 » » tasks = 1
100 » » Client: tqClient,
101 » » UserAgent: "LogDog Archivist",
102 » » Tasks: int(acfg.Tasks),
103 } 144 }
104 145
105 log.Fields{ 146 log.Fields{
106 » » "project": tqOpts.Project, 147 » » "subscription": taskSub,
107 » » "queue": tqOpts.Queue, 148 » » "tasks": tasks,
108 » }.Infof(c, "Pulling tasks from task queue.") 149 » }.Infof(c, "Pulling tasks from Pub/Sub subscription.")
109 » taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t taskqueueClient.Task) bool { 150 » it, err := sub.Pull(c, gcps.MaxExtension(pubsub.MaxACKDeadline), gcps.Ma xPrefetch(tasks))
110 » » c = log.SetField(c, "taskID", t.ID) 151 » if err != nil {
152 » » log.Fields{
153 » » » log.ErrorKey: err,
154 » » » "subscription": taskSub,
155 » » }.Errorf(c, "Failed to create Pub/Sub subscription iterator.")
156 » }
157 » defer it.Stop()
111 158
112 » » startTime := clock.Now(c) 159 » // Application shutdown will now operate by stopping the Iterator.
113 » » err := ar.ArchiveTask(c, t.Payload) 160 » a.SetShutdownFunc(it.Stop)
114 » » duration := clock.Now(c).Sub(startTime)
115 161
116 » » switch { 162 » // Loop, pulling messages from our iterator and dispatching them.
117 » » case errors.IsTransient(err): 163 » parallel.Ignore(parallel.Run(tasks, func(taskC chan<- func() error) {
118 » » » // Do not consume 164 » » for {
119 » » » log.Fields{ 165 » » » msg, err := it.Next()
120 » » » » log.ErrorKey: err, 166 » » » switch err {
121 » » » » "duration": duration, 167 » » » case nil:
122 » » » }.Warningf(c, "TRANSIENT error processing task.") 168 » » » » c := log.SetFields(c, log.Fields{
123 » » » return false 169 » » » » » "messageID": msg.ID,
170 » » » » » "ackID": msg.AckID,
171 » » » » })
124 172
125 » » case err == nil: 173 » » » » // Dispatch an archive handler for this message.
126 » » » log.Fields{ 174 » » » » taskC <- func() error {
127 » » » » "duration": duration, 175 » » » » » deleteTask := false
128 » » » }.Infof(c, "Task successfully processed; deleting.") 176 » » » » » defer func() {
129 » » » return true 177 » » » » » » msg.Done(deleteTask)
178 » » » » » }()
130 179
131 » » default: 180 » » » » » task, err := makePubSubArchivistTask(psC ontext, psSubscriptionName, msg)
132 » » » log.Fields{ 181 » » » » » if err != nil {
133 » » » » log.ErrorKey: err, 182 » » » » » » log.WithError(err).Errorf(c, "Fa iled to unmarshal archive task from message.")
134 » » » » "duration": duration, 183 » » » » » » deleteTask = true
135 » » » }.Errorf(c, "Non-transient error processing task; deleti ng.") 184 » » » » » » return nil
136 » » » return true 185 » » » » » }
186
187 » » » » » startTime := clock.Now(c)
188 » » » » » deleteTask = ar.ArchiveTask(c, task)
189 » » » » » duration := clock.Now(c).Sub(startTime)
190
191 » » » » » if deleteTask {
192 » » » » » » log.Fields{
193 » » » » » » » "duration": duration,
194 » » » » » » }.Infof(c, "Task successfully pr ocessed; deleting.")
195 » » » » » } else {
196 » » » » » » log.Fields{
197 » » » » » » » "duration": duration,
198 » » » » » » }.Infof(c, "Task processing inco mplete. Not deleting.")
199 » » » » » }
200 » » » » » return nil
201 » » » » }
202
203 » » » case io.EOF, context.Canceled, context.DeadlineExceeded:
204 » » » » log.Infof(c, "Subscription iterator is finished. ")
205 » » » » return
206
207 » » » default:
208 » » » » log.WithError(err).Warningf(c, "Subscription ite rator returned error. Sleeping...")
209 » » » » clock.Sleep(c, subscriptionErrorDelay)
210 » » » » continue
211 » » » }
137 } 212 }
138 » }) 213 » }))
139 214
140 log.Debugf(c, "Archivist finished.") 215 log.Debugf(c, "Archivist finished.")
141 return nil 216 return nil
142 } 217 }
143 218
144 // Entry point. 219 // Entry point.
145 func main() { 220 func main() {
146 a := application{} 221 a := application{}
147 a.Run(context.Background(), a.runArchivist) 222 a.Run(context.Background(), a.runArchivist)
148 } 223 }
OLDNEW
« no previous file with comments | « common/proto/logdog/svcconfig/config.pb.go ('k') | server/cmd/logdog_archivist/task.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698