Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 » "time" 10 » "sync/atomic"
11 11
12 "github.com/julienschmidt/httprouter" 12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/gae/filter/dsQueryBatch" 13 "github.com/luci/gae/filter/dsQueryBatch"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 » tq "github.com/luci/gae/service/taskqueue" 15 » "github.com/luci/gae/service/info"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
19 "github.com/luci/luci-go/common/clock" 17 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/errors" 18 "github.com/luci/luci-go/common/errors"
21 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
22 » "github.com/luci/luci-go/common/proto/logdog/svcconfig" 20 » "github.com/luci/luci-go/common/parallel"
23 "golang.org/x/net/context" 21 "golang.org/x/net/context"
24 ) 22 )
25 23
26 const archiveTaskVersion = "v1" 24 const archiveTaskVersion = "v4"
27
28 // archiveTaskQueueName returns the task queue name for archival, or an error
29 // if it's not configured.
30 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
31 » q := cfg.GetCoordinator().ArchiveTaskQueue
32 » if q == "" {
33 » » return "", errors.New("missing archive task queue name")
34 » }
35 » return q, nil
36 }
37
38 func archiveTaskNameForHash(hashID string) string {
39 » return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
40 }
41
42 // createArchiveTask creates a new archive Task.
43 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) {
44 » desc := logdog.ArchiveTask{
45 » » Path: string(ls.Path()),
46 » » Complete: complete,
47 » }
48 » t, err := createPullTask(&desc)
49 » if err != nil {
50 » » return nil, err
51 » }
52
53 » t.Name = archiveTaskNameForHash(ls.HashID())
54 » return t, nil
55 }
56 25
57 // HandleArchiveCron is the handler for the archive cron endpoint. This scans 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans
58 // for terminal log streams that are ready for archival. 27 // for log streams that are ready for archival.
59 // 28 //
60 // This will be called periodically by AppEngine cron. 29 // This will be called periodically by AppEngine cron.
61 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
62 errorWrapper(c, w, func() error { 31 errorWrapper(c, w, func() error {
63 » » return b.archiveCron(c, true) 32 » » return b.archiveCron(c)
64 }) 33 })
65 } 34 }
66 35
67 // HandleArchiveCronNT is the handler for the archive non-terminal cron 36 func (b *Backend) archiveCron(c context.Context) error {
68 // endpoint. This scans for non-terminal log streams that have not been updated 37 » services := b.GetServices()
69 // in sufficiently long that we're willing to declare them complete and mark 38 » _, cfg, err := services.Config(c)
70 // them terminal.
71 //
72 // This will be called periodically by AppEngine cron.
73 func (b *Backend) HandleArchiveCronNT(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
74 » errorWrapper(c, w, func() error {
75 » » return b.archiveCron(c, false)
76 » })
77 }
78
79 // HandleArchiveCronPurge purges all archival tasks from the task queue.
80 func (b *Backend) HandleArchiveCronPurge(c context.Context, w http.ResponseWrite r, r *http.Request, p httprouter.Params) {
81 » errorWrapper(c, w, func() error {
82 » » cfg, err := config.Load(c)
83 » » if err != nil {
84 » » » log.WithError(err).Errorf(c, "Failed to load configurati on.")
85 » » » return err
86 » » }
87
88 » » queueName, err := archiveTaskQueueName(cfg)
89 » » if err != nil {
90 » » » log.Errorf(c, "Failed to get task queue name.")
91 » » » return err
92 » » }
93
94 » » if err := tq.Get(c).Purge(queueName); err != nil {
95 » » » log.Fields{
96 » » » » log.ErrorKey: err,
97 » » » » "queue": queueName,
98 » » » }.Errorf(c, "Failed to purge task queue.")
99 » » » return err
100 » » }
101 » » return nil
102 » })
103 }
104
105 func (b *Backend) archiveCron(c context.Context, complete bool) error {
106 » cfg, err := config.Load(c)
107 if err != nil { 39 if err != nil {
108 » » log.WithError(err).Errorf(c, "Failed to load configuration.") 40 » » return fmt.Errorf("failed to load configuration: %v", err)
109 » » return err
110 } 41 }
111 42
112 » queueName, err := archiveTaskQueueName(cfg) 43 » archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
113 » if err != nil { 44 » if archiveDelayMax <= 0 {
114 » » log.Errorf(c, "Failed to get task queue name.") 45 » » return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
115 » » return err
116 } 46 }
117 47
118 » now := clock.Now(c).UTC() 48 » ap, err := services.ArchivalPublisher(c)
119 » q := ds.NewQuery("LogStream") 49 » if err != nil {
50 » » return fmt.Errorf("failed to get archival publisher: %v", err)
51 » }
120 52
121 » var threshold time.Duration 53 » threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
122 » if complete { 54 » log.Fields{
123 » » threshold = cfg.GetCoordinator().ArchiveDelay.Duration() 55 » » "threshold": threshold,
124 » » q = q.Eq("State", coordinator.LSTerminated) 56 » }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
125 » } else { 57
126 » » threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() 58 » // Query for log streams that were created <= our threshold and that are
127 » » q = q.Eq("State", coordinator.LSPending) 59 » // still in LSStreaming state.
60 » //
61 » // We order descending because this is already an index that we use for our
62 » // "logdog.Logs.Query".
63 » q := ds.NewQuery("LogStream").
64 » » KeysOnly(true).
65 » » Eq("State", coordinator.LSStreaming).
66 » » Lte("Created", threshold).
67 » » Order("-Created", "State")
68
69 » // Since these logs are beyond maximum archival delay, we will dispatch
70 » // archival immediately.
71 » params := coordinator.ArchivalParams{
72 » » RequestID: info.Get(c).RequestID(),
128 } 73 }
129 q = q.Lte("Updated", now.Add(-threshold))
130 74
131 » // If the log stream has a terminal index, and its Updated time is less than 75 » // Create archive tasks for our expired log streams in parallel.
132 » // the maximum archive delay, require this archival to be complete (no 76 » batch := b.getMultiTaskBatchSize()
133 » // missing LogEntry). 77 » var tasked int32
134 » // 78 » var failed int32
135 » // If we're past maximum archive delay, settle for any (even empty) arch ival.
136 » // This is a failsafe to prevent logs from sitting in limbo forever.
137 » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
138 79
139 » // Perform a query, dispatching tasks in batches. 80 » var ierr error
140 » batch := b.getMultiTaskBatchSize() 81 » parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
82 » » // Run a batched query across the expired log stream space.
83 » » ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
84 » » » var ls coordinator.LogStream
85 » » » ds.PopulateKey(&ls, lsKey)
141 86
142 » ti := tq.Get(c) 87 » » » // Archive this log stream in a transaction.
143 » tasks := make([]*tq.Task, 0, batch) 88 » » » taskC <- func() error {
144 » totalScheduledTasks := 0 89 » » » » err := ds.Get(c).RunInTransaction(func(c context .Context) error {
145 » addAndMaybeDispatchTasks := func(task *tq.Task) error { 90 » » » » » if err := ds.Get(c).Get(&ls); err != nil {
146 » » switch task { 91 » » » » » » log.WithError(err).Errorf(c, "Fa iled to load stream.")
147 » » case nil: 92 » » » » » » return err
148 » » » if len(tasks) == 0 { 93 » » » » » }
94
95 » » » » » log.Fields{
96 » » » » » » "path": ls.Path(),
97 » » » » » » "id": ls.HashID,
98 » » » » » }.Infof(c, "Identified expired log strea m.")
99
100 » » » » » if err := params.PublishTask(c, ap, &ls) ; err != nil {
dnj 2016/04/11 17:20:04 PublishTask is transaction-friendly, as it interna
101 » » » » » » if err == coordinator.ErrArchive Tasked {
102 » » » » » » » log.Warningf(c, "Archiva l has already been tasked for this stream.")
103 » » » » » » » return nil
104 » » » » » » }
105 » » » » » » return err
106 » » » » » }
107 » » » » » return ds.Get(c).Put(&ls)
108 » » » » }, nil)
109
110 » » » » if err != nil {
111 » » » » » log.Fields{
112 » » » » » » log.ErrorKey: err,
113 » » » » » » "path": ls.Path(),
114 » » » » » }.Errorf(c, "Failed to archive log strea m.")
115 » » » » » atomic.AddInt32(&failed, 1)
116 » » » » » return nil // Nothing will consume it an yway.
117 » » » » }
118
119 » » » » log.Fields{
120 » » » » » "path": ls.Path(),
121 » » » » » "id": ls.HashID,
122 » » » » » "archiveTopic": cfg.Coordinator.ArchiveT opic,
123 » » » » }.Infof(c, "Created archive task.")
124 » » » » atomic.AddInt32(&tasked, 1)
149 return nil 125 return nil
150 } 126 }
151 127
152 » » default: 128 » » » return nil
153 » » » tasks = append(tasks, task) 129 » » })
154 » » » if len(tasks) < batch { 130 » }))
155 » » » » return nil
156 » » » }
157 » » }
158 131
159 » » err := ti.AddMulti(tasks, queueName) 132 » // Return an error code if we experienced any failures. This doesn't rea lly
160 » » if merr, ok := err.(errors.MultiError); ok { 133 » // have an impact, but it will show up as a "!" in the cron UI.
161 » » » for _, e := range merr { 134 » switch {
162 » » » » log.Warningf(c, "Task add error: %v", e) 135 » case ierr != nil:
163 » » » } 136 » » log.Fields{
164 » » } 137 » » » log.ErrorKey: err,
165 » » if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil { 138 » » » "archiveCount": tasked,
166 » » » log.Fields{ 139 » » }.Errorf(c, "Failed to execute expired tasks query.")
167 » » » » log.ErrorKey: err, 140 » » return ierr
168 » » » » "queue": queueName,
169 » » » » "numTasks": len(tasks),
170 » » » » "scheduledTaskCount": totalScheduledTasks,
171 » » » }.Errorf(c, "Failed to add tasks to task queue.")
172 » » » return errors.New("failed to add tasks to task queue")
173 » » }
174 141
175 » » totalScheduledTasks += len(tasks) 142 » case failed > 0:
176 » » tasks = tasks[:0] 143 » » log.Fields{
144 » » » log.ErrorKey: err,
145 » » » "archiveCount": tasked,
146 » » » "failCount": failed,
147 » » }.Errorf(c, "Failed to archive candidate all streams.")
148 » » return errors.New("failed to archive all candidate streams")
dnj 2016/04/11 17:20:04 Since this is cron, we're basically just using the
149
150 » default:
151 » » log.Fields{
152 » » » "archiveCount": tasked,
153 » » }.Infof(c, "Archive sweep completed successfully.")
177 return nil 154 return nil
178 } 155 }
179
180 err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
181 requireComplete := !now.After(ls.Updated.Add(maxDelay))
182 if !requireComplete {
183 log.Fields{
184 "path": ls.Path(),
185 "id": ls.HashID(),
186 "updatedTimestamp": ls.Updated,
187 "maxDelay": maxDelay,
188 }.Warningf(c, "Identified log stream past maximum archiv al delay.")
189 } else {
190 log.Fields{
191 "id": ls.HashID(),
192 "updated": ls.Updated.String(),
193 }.Infof(c, "Identified log stream ready for archival.")
194 }
195
196 task, err := createArchiveTask(cfg.GetCoordinator(), ls, require Complete)
197 if err != nil {
198 log.Fields{
199 log.ErrorKey: err,
200 "path": ls.Path(),
201 }.Errorf(c, "Failed to create archive task.")
202 return err
203 }
204
205 return addAndMaybeDispatchTasks(task)
206 })
207 if err != nil {
208 log.Fields{
209 log.ErrorKey: err,
210 "scheduledTaskCount": totalScheduledTasks,
211 }.Errorf(c, "Outer archive query failed.")
212 return errors.New("outer archive query failed")
213 }
214
215 // Dispatch any remaining enqueued tasks.
216 if err := addAndMaybeDispatchTasks(nil); err != nil {
217 return err
218 }
219
220 log.Fields{
221 "scheduledTaskCount": totalScheduledTasks,
222 }.Debugf(c, "Archive sweep completed successfully.")
223 return nil
224 } 156 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698