| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "io" |
| 9 "time" |
| 10 |
| 8 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
| 9 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 10 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 11 "github.com/luci/luci-go/common/gcloud/gs" | 14 "github.com/luci/luci-go/common/gcloud/gs" |
| 15 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 12 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/parallel" |
| 13 "github.com/luci/luci-go/server/internal/logdog/archivist" | 18 "github.com/luci/luci-go/server/internal/logdog/archivist" |
| 14 "github.com/luci/luci-go/server/internal/logdog/service" | 19 "github.com/luci/luci-go/server/internal/logdog/service" |
| 15 "github.com/luci/luci-go/server/taskqueueClient" | |
| 16 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 21 "google.golang.org/cloud" |
| 22 gcps "google.golang.org/cloud/pubsub" |
| 17 ) | 23 ) |
| 18 | 24 |
| 19 var ( | 25 var ( |
| 20 errInvalidConfig = errors.New("invalid configuration") | 26 errInvalidConfig = errors.New("invalid configuration") |
| 21 ) | 27 ) |
| 22 | 28 |
| 29 const ( |
| 30 // subscriptionErrorDelay is the amount of time to sleep after a subscri
ption |
| 31 // iterator returns a non-terminal error. |
| 32 subscriptionErrorDelay = 10 * time.Second |
| 33 ) |
| 34 |
| 23 // application is the Archivist application state. | 35 // application is the Archivist application state. |
| 24 type application struct { | 36 type application struct { |
| 25 service.Service | 37 service.Service |
| 26 } | 38 } |
| 27 | 39 |
| 28 // run is the main execution function. | 40 // run is the main execution function. |
| 29 func (a *application) runArchivist(c context.Context) error { | 41 func (a *application) runArchivist(c context.Context) error { |
| 30 cfg := a.Config() | 42 cfg := a.Config() |
| 31 | 43 |
| 32 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() | 44 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() |
| 33 switch { | 45 switch { |
| 34 case coordCfg == nil: | 46 case coordCfg == nil: |
| 35 fallthrough | 47 fallthrough |
| 36 case coordCfg.Project == "": | |
| 37 return errors.New("missing coordinator project name") | |
| 38 case coordCfg.ArchiveTaskQueue == "": | |
| 39 return errors.New("missing archive task queue name") | |
| 40 | 48 |
| 41 case acfg == nil: | 49 case acfg == nil: |
| 42 return errors.New("missing Archivist configuration") | 50 return errors.New("missing Archivist configuration") |
| 43 case acfg.GsBase == "": | 51 case acfg.GsBase == "": |
| 44 return errors.New("missing archive GS bucket") | 52 return errors.New("missing archive GS bucket") |
| 53 case acfg.GsStagingBase == "": |
| 54 return errors.New("missing archive staging GS bucket") |
| 45 } | 55 } |
| 46 | 56 |
| 47 » // Construct and validate our GS base. | 57 » // Construct and validate our GS bases. |
| 48 gsBase := gs.Path(acfg.GsBase) | 58 gsBase := gs.Path(acfg.GsBase) |
| 49 if gsBase.Bucket() == "" { | 59 if gsBase.Bucket() == "" { |
| 50 log.Fields{ | 60 log.Fields{ |
| 51 » » » "gsBase": acfg.GsBase, | 61 » » » "value": gsBase, |
| 52 }.Errorf(c, "Google Storage base does not include a bucket name.
") | 62 }.Errorf(c, "Google Storage base does not include a bucket name.
") |
| 53 return errors.New("invalid Google Storage base") | 63 return errors.New("invalid Google Storage base") |
| 54 } | 64 } |
| 55 | 65 |
| 56 » // Initialize task queue client. | 66 » gsStagingBase := gs.Path(acfg.GsStagingBase) |
| 57 » tqClient, err := a.AuthenticatedClient(func(o *auth.Options) { | 67 » if gsStagingBase.Bucket() == "" { |
| 58 » » o.Scopes = taskqueueClient.Scopes | 68 » » log.Fields{ |
| 69 » » » "value": gsStagingBase, |
| 70 » » }.Errorf(c, "Google Storage staging base does not include a buck
et name.") |
| 71 » » return errors.New("invalid Google Storage staging base") |
| 72 » } |
| 73 |
| 74 » // Initialize Pub/Sub client. |
| 75 » // |
| 76 » // We will initialize both an authenticated Client instance and an |
| 77 » // authenticated Context, since we need the latter for raw ACK deadline |
| 78 » // updates. |
| 79 » taskSub := pubsub.Subscription(acfg.Subscription) |
| 80 » if err := taskSub.Validate(); err != nil { |
| 81 » » log.Fields{ |
| 82 » » » log.ErrorKey: err, |
| 83 » » » "value": taskSub, |
| 84 » » }.Errorf(c, "Task subscription did not validate.") |
| 85 » » return errors.New("invalid task subscription name") |
| 86 » } |
| 87 » psProject, psSubscriptionName := taskSub.Split() |
| 88 |
| 89 » psAuth, err := a.Authenticator(func(o *auth.Options) { |
| 90 » » o.Scopes = pubsub.SubscriberScopes |
| 59 }) | 91 }) |
| 60 if err != nil { | 92 if err != nil { |
| 61 » » log.WithError(err).Errorf(c, "Failed to get task queue client.") | 93 » » log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato
r.") |
| 62 return err | 94 return err |
| 63 } | 95 } |
| 64 | 96 |
| 97 // Pub/Sub: HTTP Client => Context |
| 98 psHTTPClient, err := psAuth.Client() |
| 99 if err != nil { |
| 100 log.WithError(err).Errorf(c, "Failed to create authenticated Pub
/Sub transport.") |
| 101 return err |
| 102 } |
| 103 psContext := cloud.WithContext(c, psProject, psHTTPClient) |
| 104 |
| 105 // Pub/Sub: TokenSource => Client |
| 106 psClient, err := gcps.NewClient(c, psProject, cloud.WithTokenSource(psAu
th.TokenSource())) |
| 107 if err != nil { |
| 108 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 109 return err |
| 110 } |
| 111 sub := psClient.Subscription(psSubscriptionName) |
| 112 |
| 65 // Initialize our Storage. | 113 // Initialize our Storage. |
| 66 » s, err := a.IntermediateStorage(c) | 114 » st, err := a.IntermediateStorage(c) |
| 67 if err != nil { | 115 if err != nil { |
| 68 log.WithError(err).Errorf(c, "Failed to get storage instance.") | 116 log.WithError(err).Errorf(c, "Failed to get storage instance.") |
| 69 return err | 117 return err |
| 70 } | 118 } |
| 71 » defer s.Close() | 119 » defer st.Close() |
| 72 | 120 |
| 73 // Initialize our Google Storage client. | 121 // Initialize our Google Storage client. |
| 74 gsClient, err := a.GSClient(c) | 122 gsClient, err := a.GSClient(c) |
| 75 if err != nil { | 123 if err != nil { |
| 76 log.WithError(err).Errorf(c, "Failed to get Google Storage clien
t.") | 124 log.WithError(err).Errorf(c, "Failed to get Google Storage clien
t.") |
| 77 return err | 125 return err |
| 78 } | 126 } |
| 79 defer gsClient.Close() | 127 defer gsClient.Close() |
| 80 | 128 |
| 81 // Application shutdown will now operate by cancelling the Collector's | |
| 82 // shutdown Context. | |
| 83 shutdownCtx, shutdownFunc := context.WithCancel(c) | |
| 84 a.SetShutdownFunc(shutdownFunc) | |
| 85 | |
| 86 ar := archivist.Archivist{ | 129 ar := archivist.Archivist{ |
| 87 Service: a.Coordinator(), | 130 Service: a.Coordinator(), |
| 88 » » Storage: s, | 131 » » Storage: st, |
| 89 GSClient: gsClient, | 132 GSClient: gsClient, |
| 90 | 133 |
| 91 GSBase: gsBase, | 134 GSBase: gsBase, |
| 135 GSStagingBase: gsStagingBase, |
| 92 StreamIndexRange: int(acfg.StreamIndexRange), | 136 StreamIndexRange: int(acfg.StreamIndexRange), |
| 93 PrefixIndexRange: int(acfg.PrefixIndexRange), | 137 PrefixIndexRange: int(acfg.PrefixIndexRange), |
| 94 ByteRange: int(acfg.ByteRange), | 138 ByteRange: int(acfg.ByteRange), |
| 95 } | 139 } |
| 96 | 140 |
| 97 » tqOpts := taskqueueClient.Options{ | 141 » tasks := int(acfg.Tasks) |
| 98 » » Project: coordCfg.Project, | 142 » if tasks <= 0 { |
| 99 » » Queue: coordCfg.ArchiveTaskQueue, | 143 » » tasks = 1 |
| 100 » » Client: tqClient, | |
| 101 » » UserAgent: "LogDog Archivist", | |
| 102 » » Tasks: int(acfg.Tasks), | |
| 103 } | 144 } |
| 104 | 145 |
| 105 log.Fields{ | 146 log.Fields{ |
| 106 » » "project": tqOpts.Project, | 147 » » "subscription": taskSub, |
| 107 » » "queue": tqOpts.Queue, | 148 » » "tasks": tasks, |
| 108 » }.Infof(c, "Pulling tasks from task queue.") | 149 » }.Infof(c, "Pulling tasks from Pub/Sub subscription.") |
| 109 » taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t
taskqueueClient.Task) bool { | 150 » it, err := sub.Pull(c, gcps.MaxExtension(pubsub.MaxACKDeadline), gcps.Ma
xPrefetch(tasks)) |
| 110 » » c = log.SetField(c, "taskID", t.ID) | 151 » if err != nil { |
| 152 » » log.Fields{ |
| 153 » » » log.ErrorKey: err, |
| 154 » » » "subscription": taskSub, |
| 155 » » }.Errorf(c, "Failed to create Pub/Sub subscription iterator.") |
| 156 » } |
| 157 » defer it.Stop() |
| 111 | 158 |
| 112 » » startTime := clock.Now(c) | 159 » // Application shutdown will now operate by stopping the Iterator. |
| 113 » » err := ar.ArchiveTask(c, t.Payload) | 160 » a.SetShutdownFunc(it.Stop) |
| 114 » » duration := clock.Now(c).Sub(startTime) | |
| 115 | 161 |
| 116 » » switch { | 162 » // Loop, pulling messages from our iterator and dispatching them. |
| 117 » » case errors.IsTransient(err): | 163 » parallel.Ignore(parallel.Run(tasks, func(taskC chan<- func() error) { |
| 118 » » » // Do not consume | 164 » » for { |
| 119 » » » log.Fields{ | 165 » » » msg, err := it.Next() |
| 120 » » » » log.ErrorKey: err, | 166 » » » switch err { |
| 121 » » » » "duration": duration, | 167 » » » case nil: |
| 122 » » » }.Warningf(c, "TRANSIENT error processing task.") | 168 » » » » c := log.SetFields(c, log.Fields{ |
| 123 » » » return false | 169 » » » » » "messageID": msg.ID, |
| 170 » » » » » "ackID": msg.AckID, |
| 171 » » » » }) |
| 124 | 172 |
| 125 » » case err == nil: | 173 » » » » // Dispatch an archive handler for this message. |
| 126 » » » log.Fields{ | 174 » » » » taskC <- func() error { |
| 127 » » » » "duration": duration, | 175 » » » » » deleteTask := false |
| 128 » » » }.Infof(c, "Task successfully processed; deleting.") | 176 » » » » » defer func() { |
| 129 » » » return true | 177 » » » » » » msg.Done(deleteTask) |
| 178 » » » » » }() |
| 130 | 179 |
| 131 » » default: | 180 » » » » » task, err := makePubSubArchivistTask(psC
ontext, psSubscriptionName, msg) |
| 132 » » » log.Fields{ | 181 » » » » » if err != nil { |
| 133 » » » » log.ErrorKey: err, | 182 » » » » » » log.WithError(err).Errorf(c, "Fa
iled to unmarshal archive task from message.") |
| 134 » » » » "duration": duration, | 183 » » » » » » deleteTask = true |
| 135 » » » }.Errorf(c, "Non-transient error processing task; deleti
ng.") | 184 » » » » » » return nil |
| 136 » » » return true | 185 » » » » » } |
| 186 |
| 187 » » » » » startTime := clock.Now(c) |
| 188 » » » » » deleteTask = ar.ArchiveTask(c, task) |
| 189 » » » » » duration := clock.Now(c).Sub(startTime) |
| 190 |
| 191 » » » » » if deleteTask { |
| 192 » » » » » » log.Fields{ |
| 193 » » » » » » » "duration": duration, |
| 194 » » » » » » }.Infof(c, "Task successfully pr
ocessed; deleting.") |
| 195 » » » » » } else { |
| 196 » » » » » » log.Fields{ |
| 197 » » » » » » » "duration": duration, |
| 198 » » » » » » }.Infof(c, "Task processing inco
mplete. Not deleting.") |
| 199 » » » » » } |
| 200 » » » » » return nil |
| 201 » » » » } |
| 202 |
| 203 » » » case io.EOF, context.Canceled, context.DeadlineExceeded: |
| 204 » » » » log.Infof(c, "Subscription iterator is finished.
") |
| 205 » » » » return |
| 206 |
| 207 » » » default: |
| 208 » » » » log.WithError(err).Warningf(c, "Subscription ite
rator returned error. Sleeping...") |
| 209 » » » » clock.Sleep(c, subscriptionErrorDelay) |
| 210 » » » » continue |
| 211 » » » } |
| 137 } | 212 } |
| 138 » }) | 213 » })) |
| 139 | 214 |
| 140 log.Debugf(c, "Archivist finished.") | 215 log.Debugf(c, "Archivist finished.") |
| 141 return nil | 216 return nil |
| 142 } | 217 } |
| 143 | 218 |
| 144 // Entry point. | 219 // Entry point. |
| 145 func main() { | 220 func main() { |
| 146 a := application{} | 221 a := application{} |
| 147 a.Run(context.Background(), a.runArchivist) | 222 a.Run(context.Background(), a.runArchivist) |
| 148 } | 223 } |
| OLD | NEW |