Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Respond to code review comments. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
10 "net/http" 9 "net/http"
11 "time" 10 "time"
12 11
13 "github.com/julienschmidt/httprouter" 12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/gae/filter/dsQueryBatch"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 tq "github.com/luci/gae/service/taskqueue" 15 tq "github.com/luci/gae/service/taskqueue"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
19 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/errors"
20 log "github.com/luci/luci-go/common/logging" 21 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
22 "golang.org/x/net/context" 23 "golang.org/x/net/context"
23 ) 24 )
24 25
26 const archiveTaskVersion = "v1"
27
25 // archiveTaskQueueName returns the task queue name for archival, or an error 28 // archiveTaskQueueName returns the task queue name for archival, or an error
26 // if it's not configured. 29 // if it's not configured.
27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { 30 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
28 q := cfg.GetCoordinator().ArchiveTaskQueue 31 q := cfg.GetCoordinator().ArchiveTaskQueue
29 if q == "" { 32 if q == "" {
30 return "", errors.New("missing archive task queue name") 33 return "", errors.New("missing archive task queue name")
31 } 34 }
32 return q, nil 35 return q, nil
33 } 36 }
34 37
38 func archiveTaskNameForHash(hashID string) string {
39 return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
40 }
41
35 // createArchiveTask creates a new archive Task. 42 // createArchiveTask creates a new archive Task.
36 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) { 43 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) {
37 desc := logdog.ArchiveTask{ 44 desc := logdog.ArchiveTask{
38 Path: string(ls.Path()), 45 Path: string(ls.Path()),
39 Complete: complete, 46 Complete: complete,
40 } 47 }
41 t, err := createPullTask(&desc) 48 t, err := createPullTask(&desc)
42 if err != nil { 49 if err != nil {
43 return nil, err 50 return nil, err
44 } 51 }
45 52
46 » t.Name = fmt.Sprintf("archive-%s", ls.HashID()) 53 » t.Name = archiveTaskNameForHash(ls.HashID())
47 return t, nil 54 return t, nil
48 } 55 }
49 56
50 // HandleArchiveCron is the handler for the archive cron endpoint. This scans 57 // HandleArchiveCron is the handler for the archive cron endpoint. This scans
51 // for terminal log streams that are ready for archival. 58 // for terminal log streams that are ready for archival.
52 // 59 //
53 // This will be called periodically by AppEngine cron. 60 // This will be called periodically by AppEngine cron.
54 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { 61 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
55 errorWrapper(c, w, func() error { 62 errorWrapper(c, w, func() error {
56 return b.archiveCron(c, true) 63 return b.archiveCron(c, true)
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 var threshold time.Duration 121 var threshold time.Duration
115 if complete { 122 if complete {
116 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() 123 threshold = cfg.GetCoordinator().ArchiveDelay.Duration()
117 q = q.Eq("State", coordinator.LSTerminated) 124 q = q.Eq("State", coordinator.LSTerminated)
118 } else { 125 } else {
119 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() 126 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration()
120 q = q.Eq("State", coordinator.LSPending) 127 q = q.Eq("State", coordinator.LSPending)
121 } 128 }
122 q = q.Lte("Updated", now.Add(-threshold)) 129 q = q.Lte("Updated", now.Add(-threshold))
123 130
124 » // Query and dispatch our tasks. 131 » // If the log stream has a terminal index, and its Updated time is less than
125 » var ierr error 132 » // the maximum archive delay, require this archival to be complete (no
126 » count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { 133 » // missing LogEntry).
127 » » ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { 134 » //
135 » // If we're past maximum archive delay, settle for any (even empty) arch ival.
136 » // This is a failsafe to prevent logs from sitting in limbo forever.
137 » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
138
139 » // Perform a query, dispatching tasks in batches.
140 » batch := b.getMultiTaskBatchSize()
141
142 » ti := tq.Get(c)
143 » tasks := make([]*tq.Task, 0, batch)
144 » totalScheduledTasks := 0
145 » addAndMaybeDispatchTasks := func(task *tq.Task) error {
146 » » switch task {
147 » » case nil:
148 » » » if len(tasks) == 0 {
149 » » » » return nil
150 » » » }
151
152 » » default:
153 » » » tasks = append(tasks, task)
154 » » » if len(tasks) < batch {
155 » » » » return nil
156 » » » }
157 » » }
158
159 » » err := ti.AddMulti(tasks, queueName)
160 » » if merr, ok := err.(errors.MultiError); ok {
161 » » » for _, e := range merr {
162 » » » » log.Warningf(c, "Task add error: %v", e)
163 » » » }
164 » » }
165 » » if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
166 » » » log.Fields{
167 » » » » log.ErrorKey: err,
168 » » » » "queue": queueName,
169 » » » » "numTasks": len(tasks),
170 » » » » "scheduledTaskCount": totalScheduledTasks,
171 » » » }.Errorf(c, "Failed to add tasks to task queue.")
172 » » » return errors.New("failed to add tasks to task queue")
173 » » }
174
175 » » totalScheduledTasks += len(tasks)
176 » » tasks = tasks[:0]
177 » » return nil
178 » }
179
180 » err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
181 » » requireComplete := !now.After(ls.Updated.Add(maxDelay))
182 » » if !requireComplete {
183 » » » log.Fields{
184 » » » » "path": ls.Path(),
185 » » » » "id": ls.HashID(),
186 » » » » "updatedTimestamp": ls.Updated,
187 » » » » "maxDelay": maxDelay,
188 » » » }.Warningf(c, "Identified log stream past maximum archiv al delay.")
189 » » } else {
128 log.Fields{ 190 log.Fields{
129 "id": ls.HashID(), 191 "id": ls.HashID(),
130 "updated": ls.Updated.String(), 192 "updated": ls.Updated.String(),
131 }.Infof(c, "Identified log stream ready for archival.") 193 }.Infof(c, "Identified log stream ready for archival.")
194 }
132 195
133 » » » // If the log stream has a terminal index, and its Updat ed time is less than 196 » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, require Complete)
134 » » » // the maximum archive delay, require this archival to b e complete (no 197 » » if err != nil {
135 » » » // missing LogEntry). 198 » » » log.Fields{
136 » » » // 199 » » » » log.ErrorKey: err,
137 » » » // If we're past maximum archive delay, settle for any ( even empty) archival. 200 » » » » "path": ls.Path(),
138 » » » // This is a failsafe to prevent logs from sitting in li mbo forever. 201 » » » }.Errorf(c, "Failed to create archive task.")
139 » » » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duratio n() 202 » » » return err
140 » » » requireComplete := !now.After(ls.Updated.Add(maxDelay)) 203 » » }
141 » » » if !requireComplete {
142 » » » » log.Fields{
143 » » » » » "path": ls.Path(),
144 » » » » » "updatedTimestamp": ls.Updated,
145 » » » » » "maxDelay": maxDelay,
146 » » » » }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.")
147 » » » }
148 204
149 » » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) 205 » » return addAndMaybeDispatchTasks(task)
150 » » » if err != nil { 206 » })
151 » » » » log.Fields{ 207 » if err != nil {
152 » » » » » log.ErrorKey: err, 208 » » log.Fields{
153 » » » » » "path": ls.Path(), 209 » » » log.ErrorKey: err,
154 » » » » }.Errorf(c, "Failed to create archive task.") 210 » » » "scheduledTaskCount": totalScheduledTasks,
155 » » » » return err 211 » » }.Errorf(c, "Outer archive query failed.")
156 » » » } 212 » » return errors.New("outer archive query failed")
213 » }
157 214
158 » » » taskC <- task 215 » // Dispatch any remaining enqueued tasks.
159 » » » return nil 216 » if err := addAndMaybeDispatchTasks(nil); err != nil {
160 » » }) 217 » » return err
161 » })
162 » if err != nil || ierr != nil {
163 » » log.Fields{
164 » » » log.ErrorKey: err,
165 » » » "queryErr": ierr,
166 » » » "taskCount": count,
167 » » }.Errorf(c, "Failed to dispatch archival tasks.")
168 » » return errors.New("failed to dispatch archival tasks")
169 } 218 }
170 219
171 log.Fields{ 220 log.Fields{
172 » » "taskCount": count, 221 » » "scheduledTaskCount": totalScheduledTasks,
173 }.Debugf(c, "Archive sweep completed successfully.") 222 }.Debugf(c, "Archive sweep completed successfully.")
174 return nil 223 return nil
175 } 224 }
OLDNEW
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698