Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Use new GAE filter for query batching. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
10 "net/http" 9 "net/http"
11 "time" 10 "time"
12 11
13 "github.com/julienschmidt/httprouter" 12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/gae/filter/dsQueryBatch"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 tq "github.com/luci/gae/service/taskqueue" 15 tq "github.com/luci/gae/service/taskqueue"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
19 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/errors"
20 log "github.com/luci/luci-go/common/logging" 21 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
22 "golang.org/x/net/context" 23 "golang.org/x/net/context"
23 ) 24 )
24 25
25 // archiveTaskQueueName returns the task queue name for archival, or an error 26 // archiveTaskQueueName returns the task queue name for archival, or an error
26 // if it's not configured. 27 // if it's not configured.
27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { 28 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
28 q := cfg.GetCoordinator().ArchiveTaskQueue 29 q := cfg.GetCoordinator().ArchiveTaskQueue
29 if q == "" { 30 if q == "" {
30 return "", errors.New("missing archive task queue name") 31 return "", errors.New("missing archive task queue name")
31 } 32 }
32 return q, nil 33 return q, nil
33 } 34 }
34 35
36 func archiveTaskNameForHash(hashID string) string {
37 return fmt.Sprintf("archive--%s", hashID)
iannucci 2016/03/31 22:52:31 let's put a version number in here too or undo th
dnj 2016/04/01 00:21:52 Done.
38 }
39
35 // createArchiveTask creates a new archive Task. 40 // createArchiveTask creates a new archive Task.
36 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) { 41 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) {
37 desc := logdog.ArchiveTask{ 42 desc := logdog.ArchiveTask{
38 Path: string(ls.Path()), 43 Path: string(ls.Path()),
39 Complete: complete, 44 Complete: complete,
40 } 45 }
41 t, err := createPullTask(&desc) 46 t, err := createPullTask(&desc)
42 if err != nil { 47 if err != nil {
43 return nil, err 48 return nil, err
44 } 49 }
45 50
46 » t.Name = fmt.Sprintf("archive-%s", ls.HashID()) 51 » t.Name = archiveTaskNameForHash(ls.HashID())
47 return t, nil 52 return t, nil
48 } 53 }
49 54
50 // HandleArchiveCron is the handler for the archive cron endpoint. This scans 55 // HandleArchiveCron is the handler for the archive cron endpoint. This scans
51 // for terminal log streams that are ready for archival. 56 // for terminal log streams that are ready for archival.
52 // 57 //
53 // This will be called periodically by AppEngine cron. 58 // This will be called periodically by AppEngine cron.
54 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { 59 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
55 errorWrapper(c, w, func() error { 60 errorWrapper(c, w, func() error {
56 return b.archiveCron(c, true) 61 return b.archiveCron(c, true)
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 var threshold time.Duration 119 var threshold time.Duration
115 if complete { 120 if complete {
116 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() 121 threshold = cfg.GetCoordinator().ArchiveDelay.Duration()
117 q = q.Eq("State", coordinator.LSTerminated) 122 q = q.Eq("State", coordinator.LSTerminated)
118 } else { 123 } else {
119 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() 124 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration()
120 q = q.Eq("State", coordinator.LSPending) 125 q = q.Eq("State", coordinator.LSPending)
121 } 126 }
122 q = q.Lte("Updated", now.Add(-threshold)) 127 q = q.Lte("Updated", now.Add(-threshold))
123 128
124 » // Query and dispatch our tasks. 129 » // If the log stream has a terminal index, and its Updated time is less than
125 » var ierr error 130 » // the maximum archive delay, require this archival to be complete (no
126 » count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { 131 » // missing LogEntry).
127 » » ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { 132 » //
133 » // If we're past maximum archive delay, settle for any (even empty) arch ival.
134 » // This is a failsafe to prevent logs from sitting in limbo forever.
135 » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
136
137 » // Perform a query, dispatching tasks in batches.
138 » batch := b.getMultiTaskBatchSize()
139
140 » ti := tq.Get(c)
141 » tasks := make([]*tq.Task, 0, batch)
142 » totalScheduledTasks := 0
143 » dispatchTasks := func() error {
144 » » if len(tasks) == 0 {
145 » » » return nil
146 » » }
147
148 » » err := ti.AddMulti(tasks, queueName)
149 » » if merr, ok := err.(errors.MultiError); ok {
150 » » » for _, e := range merr {
151 » » » » log.Warningf(c, "Task add error: %v", e)
152 » » » }
153 » » }
154 » » if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
155 » » » log.Fields{
156 » » » » log.ErrorKey: err,
157 » » » » "queue": queueName,
158 » » » » "numTasks": len(tasks),
159 » » » » "scheduledTaskCount": totalScheduledTasks,
160 » » » }.Errorf(c, "Failed to add tasks to task queue.")
161 » » » return errors.New("failed to add tasks to task queue")
162 » » }
163
164 » » totalScheduledTasks += len(tasks)
165 » » tasks = tasks[:0]
iannucci 2016/03/31 22:52:31 would it be cleaner to do this prune at the callsi
dnj 2016/04/01 00:21:52 I don't think so. Since this manages the "tasks" a
166 » » return nil
167 » }
168
169 » c = dsQueryBatch.BatchQueries(c, int32(batch))
170 » err = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
iannucci 2016/03/31 22:52:31 dsS := ds.Get(dsQueryBatch.......) or something
171 » » requireComplete := !now.After(ls.Updated.Add(maxDelay))
172 » » if !requireComplete {
173 » » » log.Fields{
174 » » » » "path": ls.Path(),
175 » » » » "id": ls.HashID(),
176 » » » » "updatedTimestamp": ls.Updated,
177 » » » » "maxDelay": maxDelay,
178 » » » }.Warningf(c, "Identified log stream past maximum archiv al delay.")
179 » » } else {
128 log.Fields{ 180 log.Fields{
129 "id": ls.HashID(), 181 "id": ls.HashID(),
130 "updated": ls.Updated.String(), 182 "updated": ls.Updated.String(),
131 }.Infof(c, "Identified log stream ready for archival.") 183 }.Infof(c, "Identified log stream ready for archival.")
184 }
132 185
133 » » » // If the log stream has a terminal index, and its Updat ed time is less than 186 » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, require Complete)
134 » » » // the maximum archive delay, require this archival to b e complete (no 187 » » if err != nil {
135 » » » // missing LogEntry). 188 » » » log.Fields{
136 » » » // 189 » » » » log.ErrorKey: err,
137 » » » // If we're past maximum archive delay, settle for any ( even empty) archival. 190 » » » » "path": ls.Path(),
138 » » » // This is a failsafe to prevent logs from sitting in li mbo forever. 191 » » » }.Errorf(c, "Failed to create archive task.")
139 » » » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duratio n() 192 » » » return err
140 » » » requireComplete := !now.After(ls.Updated.Add(maxDelay)) 193 » » }
141 » » » if !requireComplete {
142 » » » » log.Fields{
143 » » » » » "path": ls.Path(),
144 » » » » » "updatedTimestamp": ls.Updated,
145 » » » » » "maxDelay": maxDelay,
146 » » » » }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.")
147 » » » }
148 194
149 » » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) 195 » » tasks = append(tasks, task)
150 » » » if err != nil { 196 » » if len(tasks) >= batch {
151 » » » » log.Fields{ 197 » » » return dispatchTasks()
152 » » » » » log.ErrorKey: err, 198 » » }
153 » » » » » "path": ls.Path(), 199 » » return nil
154 » » » » }.Errorf(c, "Failed to create archive task.") 200 » })
155 » » » » return err 201 » if err != nil {
156 » » » } 202 » » log.Fields{
203 » » » log.ErrorKey: err,
204 » » » "scheduledTaskCount": totalScheduledTasks,
205 » » }.Errorf(c, "Outer archive query failed.")
206 » » return errors.New("outer archive query failed")
207 » }
157 208
158 » » » taskC <- task 209 » // Dispatch any remaining enqueued tasks.
159 » » » return nil 210 » if err := dispatchTasks(); err != nil {
160 » » }) 211 » » return err
161 » })
162 » if err != nil || ierr != nil {
163 » » log.Fields{
164 » » » log.ErrorKey: err,
165 » » » "queryErr": ierr,
166 » » » "taskCount": count,
167 » » }.Errorf(c, "Failed to dispatch archival tasks.")
168 » » return errors.New("failed to dispatch archival tasks")
169 } 212 }
170 213
171 log.Fields{ 214 log.Fields{
172 » » "taskCount": count, 215 » » "scheduledTaskCount": totalScheduledTasks,
173 }.Debugf(c, "Archive sweep completed successfully.") 216 }.Debugf(c, "Archive sweep completed successfully.")
174 return nil 217 return nil
175 } 218 }
OLDNEW
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698