Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "net/http" | 9 "net/http" |
| 11 "time" | 10 "time" |
| 12 | 11 |
| 13 "github.com/julienschmidt/httprouter" | 12 "github.com/julienschmidt/httprouter" |
| 13 "github.com/luci/gae/filter/dsQueryBatch" | |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 tq "github.com/luci/gae/service/taskqueue" | 15 tq "github.com/luci/gae/service/taskqueue" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 19 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/errors" | |
| 20 log "github.com/luci/luci-go/common/logging" | 21 log "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 22 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 23 ) | 24 ) |
| 24 | 25 |
| 25 // archiveTaskQueueName returns the task queue name for archival, or an error | 26 // archiveTaskQueueName returns the task queue name for archival, or an error |
| 26 // if it's not configured. | 27 // if it's not configured. |
| 27 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { | 28 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { |
| 28 q := cfg.GetCoordinator().ArchiveTaskQueue | 29 q := cfg.GetCoordinator().ArchiveTaskQueue |
| 29 if q == "" { | 30 if q == "" { |
| 30 return "", errors.New("missing archive task queue name") | 31 return "", errors.New("missing archive task queue name") |
| 31 } | 32 } |
| 32 return q, nil | 33 return q, nil |
| 33 } | 34 } |
| 34 | 35 |
| 36 func archiveTaskNameForHash(hashID string) string { | |
| 37 return fmt.Sprintf("archive--%s", hashID) | |
|
iannucci
2016/03/31 22:52:31
let's put a version number in here too
or undo th
dnj
2016/04/01 00:21:52
Done.
| |
| 38 } | |
| 39 | |
| 35 // createArchiveTask creates a new archive Task. | 40 // createArchiveTask creates a new archive Task. |
| 36 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) { | 41 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) { |
| 37 desc := logdog.ArchiveTask{ | 42 desc := logdog.ArchiveTask{ |
| 38 Path: string(ls.Path()), | 43 Path: string(ls.Path()), |
| 39 Complete: complete, | 44 Complete: complete, |
| 40 } | 45 } |
| 41 t, err := createPullTask(&desc) | 46 t, err := createPullTask(&desc) |
| 42 if err != nil { | 47 if err != nil { |
| 43 return nil, err | 48 return nil, err |
| 44 } | 49 } |
| 45 | 50 |
| 46 » t.Name = fmt.Sprintf("archive-%s", ls.HashID()) | 51 » t.Name = archiveTaskNameForHash(ls.HashID()) |
| 47 return t, nil | 52 return t, nil |
| 48 } | 53 } |
| 49 | 54 |
| 50 // HandleArchiveCron is the handler for the archive cron endpoint. This scans | 55 // HandleArchiveCron is the handler for the archive cron endpoint. This scans |
| 51 // for terminal log streams that are ready for archival. | 56 // for terminal log streams that are ready for archival. |
| 52 // | 57 // |
| 53 // This will be called periodically by AppEngine cron. | 58 // This will be called periodically by AppEngine cron. |
| 54 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { | 59 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| 55 errorWrapper(c, w, func() error { | 60 errorWrapper(c, w, func() error { |
| 56 return b.archiveCron(c, true) | 61 return b.archiveCron(c, true) |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 114 var threshold time.Duration | 119 var threshold time.Duration |
| 115 if complete { | 120 if complete { |
| 116 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() | 121 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() |
| 117 q = q.Eq("State", coordinator.LSTerminated) | 122 q = q.Eq("State", coordinator.LSTerminated) |
| 118 } else { | 123 } else { |
| 119 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() | 124 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| 120 q = q.Eq("State", coordinator.LSPending) | 125 q = q.Eq("State", coordinator.LSPending) |
| 121 } | 126 } |
| 122 q = q.Lte("Updated", now.Add(-threshold)) | 127 q = q.Lte("Updated", now.Add(-threshold)) |
| 123 | 128 |
| 124 » // Query and dispatch our tasks. | 129 » // If the log stream has a terminal index, and its Updated time is less than |
| 125 » var ierr error | 130 » // the maximum archive delay, require this archival to be complete (no |
| 126 » count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { | 131 » // missing LogEntry). |
| 127 » » ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { | 132 » // |
| 133 » // If we're past maximum archive delay, settle for any (even empty) arch ival. | |
| 134 » // This is a failsafe to prevent logs from sitting in limbo forever. | |
| 135 » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() | |
| 136 | |
| 137 » // Perform a query, dispatching tasks in batches. | |
| 138 » batch := b.getMultiTaskBatchSize() | |
| 139 | |
| 140 » ti := tq.Get(c) | |
| 141 » tasks := make([]*tq.Task, 0, batch) | |
| 142 » totalScheduledTasks := 0 | |
| 143 » dispatchTasks := func() error { | |
| 144 » » if len(tasks) == 0 { | |
| 145 » » » return nil | |
| 146 » » } | |
| 147 | |
| 148 » » err := ti.AddMulti(tasks, queueName) | |
| 149 » » if merr, ok := err.(errors.MultiError); ok { | |
| 150 » » » for _, e := range merr { | |
| 151 » » » » log.Warningf(c, "Task add error: %v", e) | |
| 152 » » » } | |
| 153 » » } | |
| 154 » » if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil { | |
| 155 » » » log.Fields{ | |
| 156 » » » » log.ErrorKey: err, | |
| 157 » » » » "queue": queueName, | |
| 158 » » » » "numTasks": len(tasks), | |
| 159 » » » » "scheduledTaskCount": totalScheduledTasks, | |
| 160 » » » }.Errorf(c, "Failed to add tasks to task queue.") | |
| 161 » » » return errors.New("failed to add tasks to task queue") | |
| 162 » » } | |
| 163 | |
| 164 » » totalScheduledTasks += len(tasks) | |
| 165 » » tasks = tasks[:0] | |
|
iannucci
2016/03/31 22:52:31
would it be cleaner to do this prune at the callsi
dnj
2016/04/01 00:21:52
I don't think so. Since this manages the "tasks" a
| |
| 166 » » return nil | |
| 167 » } | |
| 168 | |
| 169 » c = dsQueryBatch.BatchQueries(c, int32(batch)) | |
| 170 » err = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { | |
|
iannucci
2016/03/31 22:52:31
dsS := ds.Get(dsQueryBatch.......)
or something
| |
| 171 » » requireComplete := !now.After(ls.Updated.Add(maxDelay)) | |
| 172 » » if !requireComplete { | |
| 173 » » » log.Fields{ | |
| 174 » » » » "path": ls.Path(), | |
| 175 » » » » "id": ls.HashID(), | |
| 176 » » » » "updatedTimestamp": ls.Updated, | |
| 177 » » » » "maxDelay": maxDelay, | |
| 178 » » » }.Warningf(c, "Identified log stream past maximum archiv al delay.") | |
| 179 » » } else { | |
| 128 log.Fields{ | 180 log.Fields{ |
| 129 "id": ls.HashID(), | 181 "id": ls.HashID(), |
| 130 "updated": ls.Updated.String(), | 182 "updated": ls.Updated.String(), |
| 131 }.Infof(c, "Identified log stream ready for archival.") | 183 }.Infof(c, "Identified log stream ready for archival.") |
| 184 } | |
| 132 | 185 |
| 133 » » » // If the log stream has a terminal index, and its Updat ed time is less than | 186 » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, require Complete) |
| 134 » » » // the maximum archive delay, require this archival to b e complete (no | 187 » » if err != nil { |
| 135 » » » // missing LogEntry). | 188 » » » log.Fields{ |
| 136 » » » // | 189 » » » » log.ErrorKey: err, |
| 137 » » » // If we're past maximum archive delay, settle for any ( even empty) archival. | 190 » » » » "path": ls.Path(), |
| 138 » » » // This is a failsafe to prevent logs from sitting in li mbo forever. | 191 » » » }.Errorf(c, "Failed to create archive task.") |
| 139 » » » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duratio n() | 192 » » » return err |
| 140 » » » requireComplete := !now.After(ls.Updated.Add(maxDelay)) | 193 » » } |
| 141 » » » if !requireComplete { | |
| 142 » » » » log.Fields{ | |
| 143 » » » » » "path": ls.Path(), | |
| 144 » » » » » "updatedTimestamp": ls.Updated, | |
| 145 » » » » » "maxDelay": maxDelay, | |
| 146 » » » » }.Warningf(c, "Log stream is past maximum archiv al delay. Dropping completeness requirement.") | |
| 147 » » » } | |
| 148 | 194 |
| 149 » » » task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) | 195 » » tasks = append(tasks, task) |
| 150 » » » if err != nil { | 196 » » if len(tasks) >= batch { |
| 151 » » » » log.Fields{ | 197 » » » return dispatchTasks() |
| 152 » » » » » log.ErrorKey: err, | 198 » » } |
| 153 » » » » » "path": ls.Path(), | 199 » » return nil |
| 154 » » » » }.Errorf(c, "Failed to create archive task.") | 200 » }) |
| 155 » » » » return err | 201 » if err != nil { |
| 156 » » » } | 202 » » log.Fields{ |
| 203 » » » log.ErrorKey: err, | |
| 204 » » » "scheduledTaskCount": totalScheduledTasks, | |
| 205 » » }.Errorf(c, "Outer archive query failed.") | |
| 206 » » return errors.New("outer archive query failed") | |
| 207 » } | |
| 157 | 208 |
| 158 » » » taskC <- task | 209 » // Dispatch any remaining enqueued tasks. |
| 159 » » » return nil | 210 » if err := dispatchTasks(); err != nil { |
| 160 » » }) | 211 » » return err |
| 161 » }) | |
| 162 » if err != nil || ierr != nil { | |
| 163 » » log.Fields{ | |
| 164 » » » log.ErrorKey: err, | |
| 165 » » » "queryErr": ierr, | |
| 166 » » » "taskCount": count, | |
| 167 » » }.Errorf(c, "Failed to dispatch archival tasks.") | |
| 168 » » return errors.New("failed to dispatch archival tasks") | |
| 169 } | 212 } |
| 170 | 213 |
| 171 log.Fields{ | 214 log.Fields{ |
| 172 » » "taskCount": count, | 215 » » "scheduledTaskCount": totalScheduledTasks, |
| 173 }.Debugf(c, "Archive sweep completed successfully.") | 216 }.Debugf(c, "Archive sweep completed successfully.") |
| 174 return nil | 217 return nil |
| 175 } | 218 } |
| OLD | NEW |