Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 » "time" | 10 » "sync/atomic" |
| 11 | 11 |
| 12 "github.com/julienschmidt/httprouter" | 12 "github.com/julienschmidt/httprouter" |
| 13 "github.com/luci/gae/filter/dsQueryBatch" | 13 "github.com/luci/gae/filter/dsQueryBatch" |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 » tq "github.com/luci/gae/service/taskqueue" | 15 » "github.com/luci/gae/service/info" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | |
| 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | |
| 19 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
| 21 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 22 » "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 20 » "github.com/luci/luci-go/common/parallel" |
| 23 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 24 ) | 22 ) |
| 25 | 23 |
| 26 const archiveTaskVersion = "v1" | 24 const archiveTaskVersion = "v4" |
| 27 | |
| 28 // archiveTaskQueueName returns the task queue name for archival, or an error | |
| 29 // if it's not configured. | |
| 30 func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { | |
| 31 » q := cfg.GetCoordinator().ArchiveTaskQueue | |
| 32 » if q == "" { | |
| 33 » » return "", errors.New("missing archive task queue name") | |
| 34 » } | |
| 35 » return q, nil | |
| 36 } | |
| 37 | |
| 38 func archiveTaskNameForHash(hashID string) string { | |
| 39 » return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion) | |
| 40 } | |
| 41 | |
| 42 // createArchiveTask creates a new archive Task. | |
| 43 func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co mplete bool) (*tq.Task, error) { | |
| 44 » desc := logdog.ArchiveTask{ | |
| 45 » » Path: string(ls.Path()), | |
| 46 » » Complete: complete, | |
| 47 » } | |
| 48 » t, err := createPullTask(&desc) | |
| 49 » if err != nil { | |
| 50 » » return nil, err | |
| 51 » } | |
| 52 | |
| 53 » t.Name = archiveTaskNameForHash(ls.HashID()) | |
| 54 » return t, nil | |
| 55 } | |
| 56 | 25 |
| 57 // HandleArchiveCron is the handler for the archive cron endpoint. This scans | 26 // HandleArchiveCron is the handler for the archive cron endpoint. This scans |
| 58 // for terminal log streams that are ready for archival. | 27 // for log streams that are ready for archival. |
| 59 // | 28 // |
| 60 // This will be called periodically by AppEngine cron. | 29 // This will be called periodically by AppEngine cron. |
| 61 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { | 30 func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| 62 errorWrapper(c, w, func() error { | 31 errorWrapper(c, w, func() error { |
| 63 » » return b.archiveCron(c, true) | 32 » » return b.archiveCron(c) |
| 64 }) | 33 }) |
| 65 } | 34 } |
| 66 | 35 |
| 67 // HandleArchiveCronNT is the handler for the archive non-terminal cron | 36 func (b *Backend) archiveCron(c context.Context) error { |
| 68 // endpoint. This scans for non-terminal log streams that have not been updated | 37 » services := b.GetServices() |
| 69 // in sufficiently long that we're willing to declare them complete and mark | 38 » _, cfg, err := services.Config(c) |
| 70 // them terminal. | |
| 71 // | |
| 72 // This will be called periodically by AppEngine cron. | |
| 73 func (b *Backend) HandleArchiveCronNT(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) { | |
| 74 » errorWrapper(c, w, func() error { | |
| 75 » » return b.archiveCron(c, false) | |
| 76 » }) | |
| 77 } | |
| 78 | |
| 79 // HandleArchiveCronPurge purges all archival tasks from the task queue. | |
| 80 func (b *Backend) HandleArchiveCronPurge(c context.Context, w http.ResponseWrite r, r *http.Request, p httprouter.Params) { | |
| 81 » errorWrapper(c, w, func() error { | |
| 82 » » cfg, err := config.Load(c) | |
| 83 » » if err != nil { | |
| 84 » » » log.WithError(err).Errorf(c, "Failed to load configurati on.") | |
| 85 » » » return err | |
| 86 » » } | |
| 87 | |
| 88 » » queueName, err := archiveTaskQueueName(cfg) | |
| 89 » » if err != nil { | |
| 90 » » » log.Errorf(c, "Failed to get task queue name.") | |
| 91 » » » return err | |
| 92 » » } | |
| 93 | |
| 94 » » if err := tq.Get(c).Purge(queueName); err != nil { | |
| 95 » » » log.Fields{ | |
| 96 » » » » log.ErrorKey: err, | |
| 97 » » » » "queue": queueName, | |
| 98 » » » }.Errorf(c, "Failed to purge task queue.") | |
| 99 » » » return err | |
| 100 » » } | |
| 101 » » return nil | |
| 102 » }) | |
| 103 } | |
| 104 | |
| 105 func (b *Backend) archiveCron(c context.Context, complete bool) error { | |
| 106 » cfg, err := config.Load(c) | |
| 107 if err != nil { | 39 if err != nil { |
| 108 » » log.WithError(err).Errorf(c, "Failed to load configuration.") | 40 » » return fmt.Errorf("failed to load configuration: %v", err) |
| 109 » » return err | |
| 110 } | 41 } |
| 111 | 42 |
| 112 » queueName, err := archiveTaskQueueName(cfg) | 43 » archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() |
| 113 » if err != nil { | 44 » if archiveDelayMax <= 0 { |
| 114 » » log.Errorf(c, "Failed to get task queue name.") | 45 » » return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String()) |
| 115 » » return err | |
| 116 } | 46 } |
| 117 | 47 |
| 118 » now := clock.Now(c).UTC() | 48 » ap, err := services.ArchivalPublisher(c) |
| 119 » q := ds.NewQuery("LogStream") | 49 » if err != nil { |
| 50 » » return fmt.Errorf("failed to get archival publisher: %v", err) | |
| 51 » } | |
| 120 | 52 |
| 121 » var threshold time.Duration | 53 » threshold := clock.Now(c).UTC().Add(-archiveDelayMax) |
| 122 » if complete { | 54 » log.Fields{ |
| 123 » » threshold = cfg.GetCoordinator().ArchiveDelay.Duration() | 55 » » "threshold": threshold, |
| 124 » » q = q.Eq("State", coordinator.LSTerminated) | 56 » }.Infof(c, "Querying for all streaming logs created before max archival threshold.") |
| 125 » } else { | 57 |
| 126 » » threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() | 58 » // Query for log streams that were created <= our threshold and that are |
| 127 » » q = q.Eq("State", coordinator.LSPending) | 59 » // still in LSStreaming state. |
| 60 » // | |
| 61 » // We order descending because this is already an index that we use for our | |
| 62 » // "logdog.Logs.Query". | |
| 63 » q := ds.NewQuery("LogStream"). | |
| 64 » » KeysOnly(true). | |
| 65 » » Eq("State", coordinator.LSStreaming). | |
| 66 » » Lte("Created", threshold). | |
| 67 » » Order("-Created", "State") | |
| 68 | |
| 69 » // Since these logs are beyond maximum archival delay, we will dispatch | |
| 70 » // archival immediately. | |
| 71 » params := coordinator.ArchivalParams{ | |
| 72 » » RequestID: info.Get(c).RequestID(), | |
| 128 } | 73 } |
| 129 q = q.Lte("Updated", now.Add(-threshold)) | |
| 130 | 74 |
| 131 » // If the log stream has a terminal index, and its Updated time is less than | 75 » // Create archive tasks for our expired log streams in parallel. |
| 132 » // the maximum archive delay, require this archival to be complete (no | 76 » batch := b.getMultiTaskBatchSize() |
| 133 » // missing LogEntry). | 77 » var tasked int32 |
| 134 » // | 78 » var failed int32 |
| 135 » // If we're past maximum archive delay, settle for any (even empty) arch ival. | |
| 136 » // This is a failsafe to prevent logs from sitting in limbo forever. | |
| 137 » maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() | |
| 138 | 79 |
| 139 » // Perform a query, dispatching tasks in batches. | 80 » var ierr error |
| 140 » batch := b.getMultiTaskBatchSize() | 81 » parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) { |
| 82 » » // Run a batched query across the expired log stream space. | |
| 83 » » ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error { | |
| 84 » » » var ls coordinator.LogStream | |
| 85 » » » ds.PopulateKey(&ls, lsKey) | |
| 141 | 86 |
| 142 » ti := tq.Get(c) | 87 » » » // Archive this log stream in a transaction. |
| 143 » tasks := make([]*tq.Task, 0, batch) | 88 » » » taskC <- func() error { |
| 144 » totalScheduledTasks := 0 | 89 » » » » err := ds.Get(c).RunInTransaction(func(c context .Context) error { |
| 145 » addAndMaybeDispatchTasks := func(task *tq.Task) error { | 90 » » » » » if err := ds.Get(c).Get(&ls); err != nil { |
| 146 » » switch task { | 91 » » » » » » log.WithError(err).Errorf(c, "Fa iled to load stream.") |
| 147 » » case nil: | 92 » » » » » » return err |
| 148 » » » if len(tasks) == 0 { | 93 » » » » » } |
| 94 | |
| 95 » » » » » log.Fields{ | |
| 96 » » » » » » "path": ls.Path(), | |
| 97 » » » » » » "id": ls.HashID, | |
| 98 » » » » » }.Infof(c, "Identified expired log strea m.") | |
| 99 | |
| 100 » » » » » if err := params.PublishTask(c, ap, &ls) ; err != nil { | |
|
dnj
2016/04/11 17:20:04
PublishTask is transaction-friendly, as it interna
| |
| 101 » » » » » » if err == coordinator.ErrArchive Tasked { | |
| 102 » » » » » » » log.Warningf(c, "Archiva l has already been tasked for this stream.") | |
| 103 » » » » » » » return nil | |
| 104 » » » » » » } | |
| 105 » » » » » » return err | |
| 106 » » » » » } | |
| 107 » » » » » return ds.Get(c).Put(&ls) | |
| 108 » » » » }, nil) | |
| 109 | |
| 110 » » » » if err != nil { | |
| 111 » » » » » log.Fields{ | |
| 112 » » » » » » log.ErrorKey: err, | |
| 113 » » » » » » "path": ls.Path(), | |
| 114 » » » » » }.Errorf(c, "Failed to archive log strea m.") | |
| 115 » » » » » atomic.AddInt32(&failed, 1) | |
| 116 » » » » » return nil // Nothing will consume it an yway. | |
| 117 » » » » } | |
| 118 | |
| 119 » » » » log.Fields{ | |
| 120 » » » » » "path": ls.Path(), | |
| 121 » » » » » "id": ls.HashID, | |
| 122 » » » » » "archiveTopic": cfg.Coordinator.ArchiveT opic, | |
| 123 » » » » }.Infof(c, "Created archive task.") | |
| 124 » » » » atomic.AddInt32(&tasked, 1) | |
| 149 return nil | 125 return nil |
| 150 } | 126 } |
| 151 | 127 |
| 152 » » default: | 128 » » » return nil |
| 153 » » » tasks = append(tasks, task) | 129 » » }) |
| 154 » » » if len(tasks) < batch { | 130 » })) |
| 155 » » » » return nil | |
| 156 » » » } | |
| 157 » » } | |
| 158 | 131 |
| 159 » » err := ti.AddMulti(tasks, queueName) | 132 » // Return an error code if we experienced any failures. This doesn't rea lly |
| 160 » » if merr, ok := err.(errors.MultiError); ok { | 133 » // have an impact, but it will show up as a "!" in the cron UI. |
| 161 » » » for _, e := range merr { | 134 » switch { |
| 162 » » » » log.Warningf(c, "Task add error: %v", e) | 135 » case ierr != nil: |
| 163 » » » } | 136 » » log.Fields{ |
| 164 » » } | 137 » » » log.ErrorKey: err, |
| 165 » » if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil { | 138 » » » "archiveCount": tasked, |
| 166 » » » log.Fields{ | 139 » » }.Errorf(c, "Failed to execute expired tasks query.") |
| 167 » » » » log.ErrorKey: err, | 140 » » return ierr |
| 168 » » » » "queue": queueName, | |
| 169 » » » » "numTasks": len(tasks), | |
| 170 » » » » "scheduledTaskCount": totalScheduledTasks, | |
| 171 » » » }.Errorf(c, "Failed to add tasks to task queue.") | |
| 172 » » » return errors.New("failed to add tasks to task queue") | |
| 173 » » } | |
| 174 | 141 |
| 175 » » totalScheduledTasks += len(tasks) | 142 » case failed > 0: |
| 176 » » tasks = tasks[:0] | 143 » » log.Fields{ |
| 144 » » » log.ErrorKey: err, | |
| 145 » » » "archiveCount": tasked, | |
| 146 » » » "failCount": failed, | |
| 147 » » }.Errorf(c, "Failed to archive candidate all streams.") | |
| 148 » » return errors.New("failed to archive all candidate streams") | |
|
dnj
2016/04/11 17:20:04
Since this is cron, we're basically just using the
| |
| 149 | |
| 150 » default: | |
| 151 » » log.Fields{ | |
| 152 » » » "archiveCount": tasked, | |
| 153 » » }.Infof(c, "Archive sweep completed successfully.") | |
| 177 return nil | 154 return nil |
| 178 } | 155 } |
| 179 | |
| 180 err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error { | |
| 181 requireComplete := !now.After(ls.Updated.Add(maxDelay)) | |
| 182 if !requireComplete { | |
| 183 log.Fields{ | |
| 184 "path": ls.Path(), | |
| 185 "id": ls.HashID(), | |
| 186 "updatedTimestamp": ls.Updated, | |
| 187 "maxDelay": maxDelay, | |
| 188 }.Warningf(c, "Identified log stream past maximum archiv al delay.") | |
| 189 } else { | |
| 190 log.Fields{ | |
| 191 "id": ls.HashID(), | |
| 192 "updated": ls.Updated.String(), | |
| 193 }.Infof(c, "Identified log stream ready for archival.") | |
| 194 } | |
| 195 | |
| 196 task, err := createArchiveTask(cfg.GetCoordinator(), ls, require Complete) | |
| 197 if err != nil { | |
| 198 log.Fields{ | |
| 199 log.ErrorKey: err, | |
| 200 "path": ls.Path(), | |
| 201 }.Errorf(c, "Failed to create archive task.") | |
| 202 return err | |
| 203 } | |
| 204 | |
| 205 return addAndMaybeDispatchTasks(task) | |
| 206 }) | |
| 207 if err != nil { | |
| 208 log.Fields{ | |
| 209 log.ErrorKey: err, | |
| 210 "scheduledTaskCount": totalScheduledTasks, | |
| 211 }.Errorf(c, "Outer archive query failed.") | |
| 212 return errors.New("outer archive query failed") | |
| 213 } | |
| 214 | |
| 215 // Dispatch any remaining enqueued tasks. | |
| 216 if err := addAndMaybeDispatchTasks(nil); err != nil { | |
| 217 return err | |
| 218 } | |
| 219 | |
| 220 log.Fields{ | |
| 221 "scheduledTaskCount": totalScheduledTasks, | |
| 222 }.Debugf(c, "Archive sweep completed successfully.") | |
| 223 return nil | |
| 224 } | 156 } |
| OLD | NEW |