Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | |
| 9 "encoding/hex" | |
| 8 "fmt" | 10 "fmt" |
| 11 "io" | |
| 9 | 12 |
| 10 "github.com/golang/protobuf/proto" | 13 "github.com/golang/protobuf/proto" |
| 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 12 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/gcloud/gs" | 16 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" | |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" | 20 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 17 "github.com/luci/luci-go/server/logdog/archive" | 21 "github.com/luci/luci-go/server/logdog/archive" |
| 18 "github.com/luci/luci-go/server/logdog/storage" | 22 "github.com/luci/luci-go/server/logdog/storage" |
| 19 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 20 ) | 24 ) |
| 21 | 25 |
| 26 // Task is a single archive task. | |
| 27 type Task interface { | |
| 28 // UniqueID returns a task-unique value. Other tasks, and other retries of | |
| 29 // this task, should (try to) not reuse this ID. | |
| 30 UniqueID() string | |
| 31 | |
| 32 // Data is the archive task's data content. | |
| 33 Data() []byte | |
| 34 | |
| 35 // AssertLease asserts that the lease for this Task is still held. | |
| 36 // | |
| 37 // On failure, it will return an error. If successful, the Archivist may | |
| 38 // assume that it holds the lease longer. | |
| 39 AssertLease(context.Context) error | |
| 40 } | |
| 41 | |
| 22 // Archivist is a stateless configuration capable of archiving individual log | 42 // Archivist is a stateless configuration capable of archiving individual log |
| 23 // streams. | 43 // streams. |
| 24 type Archivist struct { | 44 type Archivist struct { |
| 25 // Service is the client to use to communicate with Coordinator's Servic es | 45 // Service is the client to use to communicate with Coordinator's Servic es |
| 26 // endpoint. | 46 // endpoint. |
| 27 Service logdog.ServicesClient | 47 Service logdog.ServicesClient |
| 28 | 48 |
| 29 // Storage is the intermediate storage instance to use to pull log entri es for | 49 // Storage is the intermediate storage instance to use to pull log entri es for |
| 30 // archival. | 50 // archival. |
| 31 Storage storage.Storage | 51 Storage storage.Storage |
| 32 | 52 |
| 33 // GSClient is the Google Storage client to for archive generation. | 53 // GSClient is the Google Storage client to for archive generation. |
| 34 GSClient gs.Client | 54 GSClient gs.Client |
| 35 | 55 |
| 36 // GSBase is the base Google Storage path. This includes the bucket name | 56 // GSBase is the base Google Storage path. This includes the bucket name |
| 37 // and any associated path. | 57 // and any associated path. |
| 38 GSBase gs.Path | 58 GSBase gs.Path |
| 59 // GSStagingBase is the base Google Storage path for archive staging. Th is | |
| 60 // includes the bucket name and any associated path. | |
| 61 GSStagingBase gs.Path | |
| 62 | |
| 39 // PrefixIndexRange is the maximum number of stream indexes in between i ndex | 63 // PrefixIndexRange is the maximum number of stream indexes in between i ndex |
| 40 // entries. See archive.Manifest for more information. | 64 // entries. See archive.Manifest for more information. |
| 41 StreamIndexRange int | 65 StreamIndexRange int |
| 42 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex | 66 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex |
| 43 // entries. See archive.Manifest for more information. | 67 // entries. See archive.Manifest for more information. |
| 44 PrefixIndexRange int | 68 PrefixIndexRange int |
| 45 // ByteRange is the maximum number of stream data bytes in between index | 69 // ByteRange is the maximum number of stream data bytes in between index |
| 46 // entries. See archive.Manifest for more information. | 70 // entries. See archive.Manifest for more information. |
| 47 ByteRange int | 71 ByteRange int |
| 48 } | 72 } |
| 49 | 73 |
| 50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 74 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| 51 // to during archival. This should be greater than the maximum LogEntry size. | 75 // to during archival. This should be greater than the maximum LogEntry size. |
| 52 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 76 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| 53 | 77 |
| 54 // ArchiveTask processes and executes a single log stream archive task. | 78 // ArchiveTask processes and executes a single log stream archive task. |
| 55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { | |
| 56 var task logdog.ArchiveTask | |
| 57 if err := proto.Unmarshal(desc, &task); err != nil { | |
| 58 log.WithError(err).Errorf(c, "Failed to decode archive task.") | |
| 59 return err | |
| 60 } | |
| 61 return a.Archive(c, &task) | |
| 62 } | |
| 63 | |
| 64 // Archive archives a single log stream. If unsuccessful, an error is returned. | |
| 65 // | 79 // |
| 66 // This error may be wrapped in errors.Transient if it is believed to have been | 80 // It returns true on success (delete the task) and false on failure (don't |
| 67 // caused by a transient failure. | 81 // delete the task). The return value of true should only be used if the task |
| 82 // is truly complete and acknowledged by the Coordinator. | |
| 68 // | 83 // |
| 69 // If the supplied Context is Done, operation may terminate before completion, | 84 // If the supplied Context is Done, operation may terminate before completion, |
| 70 // returning the Context's error. | 85 // returning the Context's error. |
| 71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { | 86 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
|
dnj
2016/04/11 17:20:04
I re-wrote most of this to use a staging space and
| |
| 87 » delete, _ := a.archiveTaskImpl(c, task) | |
| 88 » return delete | |
| 89 } | |
| 90 | |
| 91 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes | |
| 92 // an error. The error is useful for testing to assert that certain conditions | |
| 93 // were hit. | |
| 94 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { | |
| 95 » // If we can't decode the archival task, we can't decide whether or not to | |
| 96 » // delete it, so we will leave it in the queue. | |
| 97 » var at logdog.ArchiveTask | |
| 98 » if err := proto.Unmarshal(task.Data(), &at); err != nil { | |
| 99 » » log.WithError(err).Errorf(c, "Failed to decode archive task.") | |
| 100 » » return false, err | |
| 101 » } | |
| 102 | |
| 103 » log.Fields{ | |
| 104 » » "path": at.Path, | |
| 105 » }.Debugf(c, "Received archival task.") | |
| 106 | |
| 72 // Load the log stream's current state. If it is already archived, we wi ll | 107 // Load the log stream's current state. If it is already archived, we wi ll |
| 73 // return an immediate success. | 108 // return an immediate success. |
| 74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ | 109 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| 75 » » Path: t.Path, | 110 » » Path: at.Path, |
| 76 Desc: true, | 111 Desc: true, |
| 77 }) | 112 }) |
| 78 switch { | 113 switch { |
| 79 case err != nil: | 114 case err != nil: |
| 80 log.WithError(err).Errorf(c, "Failed to load log stream.") | 115 log.WithError(err).Errorf(c, "Failed to load log stream.") |
| 81 » » return err | 116 » » return false, err |
| 117 | |
| 82 case ls.State == nil: | 118 case ls.State == nil: |
| 83 » » return errors.New("missing state") | 119 » » log.Errorf(c, "Log stream did not include state.") |
| 120 » » return false, errors.New("log stream did not include state") | |
| 121 | |
| 122 » case ls.State.Purged: | |
| 123 » » log.Warningf(c, "Log stream is purged. Discarding archival reque st.") | |
| 124 » » return true, errors.New("log stream is purged") | |
| 125 | |
| 126 » case ls.State.Archived: | |
| 127 » » log.Infof(c, "Log stream is already archived. Discarding archiva l request.") | |
| 128 » » return true, errors.New("log stream is archived") | |
| 129 | |
| 130 » case !bytes.Equal(ls.ArchivalKey, at.Key): | |
| 131 » » if len(ls.ArchivalKey) == 0 { | |
| 132 » » » // The log stream is not registering as "archive pending " state. | |
| 133 » » » // | |
| 134 » » » // This can happen if the eventually-consistent datastor e hasn't updated | |
| 135 » » » // its log stream state by the time this Pub/Sub task is received. In | |
| 136 » » » // this case, we will continue retrying the task until d atastore registers | |
| 137 » » » // that some key is associated with it. | |
| 138 » » » log.Fields{ | |
| 139 » » » » "logStreamArchivalKey": hex.EncodeToString(ls.Ar chivalKey), | |
| 140 » » » » "requestArchivalKey": hex.EncodeToString(at.Ke y), | |
| 141 » » » }.Infof(c, "Archival request received before log stream has its key.") | |
| 142 » » » return false, errors.New("premature archival request") | |
| 143 » » } | |
| 144 | |
| 145 » » // This can happen if a Pub/Sub message is dispatched during a t ransaction, | |
| 146 » » // but that specific transaction failed. In this case, the Pub/S ub message | |
| 147 » » // will have a key that doesn't match the key that was transacti onally | |
| 148 » » // encoded, and can be discarded. | |
| 149 » » log.Fields{ | |
| 150 » » » "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKe y), | |
| 151 » » » "requestArchivalKey": hex.EncodeToString(at.Key), | |
| 152 » » }.Infof(c, "Superfluous archival request (keys do not match). Di scarding.") | |
| 153 » » return true, errors.New("superfluous archival request") | |
| 154 | |
| 84 case ls.State.ProtoVersion != logpb.Version: | 155 case ls.State.ProtoVersion != logpb.Version: |
| 85 log.Fields{ | 156 log.Fields{ |
| 86 "protoVersion": ls.State.ProtoVersion, | 157 "protoVersion": ls.State.ProtoVersion, |
| 87 "expectedVersion": logpb.Version, | 158 "expectedVersion": logpb.Version, |
| 88 }.Errorf(c, "Unsupported log stream protobuf version.") | 159 }.Errorf(c, "Unsupported log stream protobuf version.") |
| 89 » » return errors.New("unsupported protobuf version") | 160 » » return false, errors.New("unsupported log stream protobuf versio n") |
| 161 | |
| 90 case ls.Desc == nil: | 162 case ls.Desc == nil: |
| 91 » » return errors.New("missing descriptor") | 163 » » log.Errorf(c, "Log stream did not include a descriptor.") |
| 92 | 164 » » return false, errors.New("log stream did not include a descripto r") |
| 93 » case ls.State.Purged: | 165 » } |
| 94 » » log.Warningf(c, "Log stream is purged.") | 166 |
| 95 » » return nil | 167 » // If the archival request is younger than the settle delay, kick it bac k to |
| 96 » case ls.State.Archived: | 168 » // retry later. |
| 97 » » log.Infof(c, "Log stream is already archived.") | 169 » age := ls.Age.Duration() |
| 98 » » return nil | 170 » if age < at.SettleDelay.Duration() { |
| 99 » } | 171 » » log.Fields{ |
| 100 | 172 » » » "age": age, |
| 101 » // Deserialize and validate the descriptor protobuf. | 173 » » » "settleDelay": at.SettleDelay.Duration(), |
| 102 » var desc logpb.LogStreamDescriptor | 174 » » }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.") |
| 103 » if err := proto.Unmarshal(ls.Desc, &desc); err != nil { | 175 » » return false, errors.New("log stream is within settle delay") |
| 176 » } | |
| 177 | |
| 178 » // Are we required to archive a complete log stream? | |
| 179 » complete := (age <= at.CompletePeriod.Duration()) | |
| 180 » if complete && ls.State.TerminalIndex < 0 { | |
| 181 » » log.Warningf(c, "Cannot archive complete stream with no terminal index.") | |
| 182 » » return false, errors.New("completeness required, but stream has no terminal index") | |
| 183 » } | |
| 184 | |
| 185 » ar := logdog.ArchiveStreamRequest{ | |
| 186 » » Path: at.Path, | |
| 187 » } | |
| 188 | |
| 189 » // Archive to staging. | |
| 190 » // | |
| 191 » // If a non-transient failure occurs here, we will report it to the Arch ivist | |
| 192 » // under the assumption that it will continue occurring. | |
| 193 » // | |
| 194 » // We will handle error creating the plan and executing the plan in the same | |
| 195 » // switch statement below. | |
| 196 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID()) | |
| 197 » if err != nil { | |
| 198 » » log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") | |
| 199 » } else { | |
| 200 » » err = staged.stage(c, complete) | |
| 201 » } | |
| 202 | |
| 203 » switch { | |
| 204 » case errors.IsTransient(err): | |
| 205 » » // If this is a transient error, exit immediately and do not del ete the | |
| 206 » » // archival task. | |
| 207 » » log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.") | |
| 208 » » return false, err | |
| 209 | |
| 210 » case err != nil: | |
| 211 » » // This is a non-transient error, so we are confident that any f uture | |
| 212 » » // Archival will also encounter this error. We will mark this ar chival | |
| 213 » » // as an error and report it to the Coordinator. | |
| 214 » » log.WithError(err).Errorf(c, "Archival failed with non-transient error.") | |
| 215 » » ar.Error = err.Error() | |
| 216 » » if ar.Error == "" { | |
| 217 » » » // This needs to be non-nil, so if our acutal error has an empty string, | |
| 218 » » » // fill in a generic message. | |
| 219 » » » ar.Error = "archival error" | |
| 220 » » } | |
| 221 | |
| 222 » default: | |
| 223 » » // In case something fails, clean up our staged archival (best e ffort). | |
| 224 » » defer staged.cleanup(c) | |
| 225 | |
| 226 » » // Finalize the archival. First, extend our lease to confirm tha t we still | |
| 227 » » // hold it. | |
| 228 » » if err := task.AssertLease(c); err != nil { | |
| 229 » » » log.WithError(err).Errorf(c, "Failed to extend task leas e before finalizing.") | |
| 230 » » » return false, err | |
| 231 » » } | |
| 232 | |
| 233 » » // Finalize the archival. | |
| 234 » » if err := staged.finalize(c, a.GSClient, &ar); err != nil { | |
| 235 » » » log.WithError(err).Errorf(c, "Failed to finalize archiva l.") | |
| 236 » » » return false, err | |
| 237 » » } | |
| 238 » } | |
| 239 | |
| 240 » log.Fields{ | |
| 241 » » "streamURL": ar.StreamUrl, | |
| 242 » » "indexURL": ar.IndexUrl, | |
| 243 » » "dataURL": ar.DataUrl, | |
| 244 » » "terminalIndex": ar.TerminalIndex, | |
| 245 » » "logEntryCount": ar.LogEntryCount, | |
| 246 » » "hadError": ar.Error, | |
| 247 » » "complete": ar.Complete(), | |
| 248 » }.Debugf(c, "Finished archival round. Reporting archive state.") | |
| 249 | |
| 250 » // Extend the lease again to confirm that we still hold it. | |
| 251 » if err := task.AssertLease(c); err != nil { | |
| 252 » » log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.") | |
| 253 » » return false, err | |
| 254 » } | |
| 255 | |
| 256 » if _, err := a.Service.ArchiveStream(c, &ar); err != nil { | |
| 257 » » log.WithError(err).Errorf(c, "Failed to report archive state.") | |
| 258 » » return false, err | |
| 259 » } | |
| 260 | |
| 261 » // Archival is complete and acknowledged by Coordinator. Consume the arc hival | |
| 262 » // task. | |
| 263 » return true, nil | |
| 264 } | |
| 265 | |
| 266 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( | |
| 267 » *stagedArchival, error) { | |
| 268 » sa := stagedArchival{ | |
| 269 » » Archivist: a, | |
| 270 » » path: path, | |
| 271 | |
| 272 » » terminalIndex: ls.State.TerminalIndex, | |
| 273 » } | |
| 274 | |
| 275 » // Deserialize and validate the descriptor protobuf. If this fails, it i s a | |
| 276 » // non-transient error. | |
| 277 » if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | |
| 104 log.Fields{ | 278 log.Fields{ |
| 105 log.ErrorKey: err, | 279 log.ErrorKey: err, |
| 106 "protoVersion": ls.State.ProtoVersion, | 280 "protoVersion": ls.State.ProtoVersion, |
| 107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 281 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| 108 » » return err | 282 » » return nil, err |
| 109 » } | 283 » } |
| 110 | 284 |
| 111 » task := &archiveTask{ | 285 » bext := sa.desc.BinaryFileExt |
| 112 » » Archivist: a, | |
| 113 » » ArchiveTask: t, | |
| 114 » » ls: ls, | |
| 115 » » desc: &desc, | |
| 116 » } | |
| 117 » if err := task.archive(c); err != nil { | |
| 118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati on.") | |
| 119 » » return err | |
| 120 » } | |
| 121 » log.Fields{ | |
| 122 » » "streamURL": task.ar.StreamUrl, | |
| 123 » » "indexURL": task.ar.IndexUrl, | |
| 124 » » "dataURL": task.ar.DataUrl, | |
| 125 » » "terminalIndex": task.ar.TerminalIndex, | |
| 126 » » "complete": task.ar.Complete, | |
| 127 » }.Debugf(c, "Finished archive construction.") | |
| 128 | |
| 129 » if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { | |
| 130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") | |
| 131 » » return err | |
| 132 » } | |
| 133 » return nil | |
| 134 } | |
| 135 | |
| 136 // archiveTask is the set of parameters for a single archival. | |
| 137 type archiveTask struct { | |
| 138 » *Archivist | |
| 139 » *logdog.ArchiveTask | |
| 140 | |
| 141 » // ls is the log stream state. | |
| 142 » ls *logdog.LoadStreamResponse | |
| 143 » // desc is the unmarshaled log stream descriptor. | |
| 144 » desc *logpb.LogStreamDescriptor | |
| 145 | |
| 146 » // ar will be populated during archive construction. | |
| 147 » ar logdog.ArchiveStreamRequest | |
| 148 } | |
| 149 | |
| 150 // archiveState performs the archival operation on a stream described by a | |
| 151 // Coordinator State. Upon success, the State will be updated with the result | |
| 152 // of the archival operation. | |
| 153 func (t *archiveTask) archive(c context.Context) (err error) { | |
| 154 » // Generate our archival object managers. | |
| 155 » bext := t.desc.BinaryFileExt | |
| 156 if bext == "" { | 286 if bext == "" { |
| 157 bext = "bin" | 287 bext = "bin" |
| 158 } | 288 } |
| 159 | 289 |
| 160 » path := t.Path | 290 » // Construct our staged archival paths. |
| 161 » var streamO, indexO, dataO *gsObject | 291 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) |
| 162 » streamO, err = t.newGSObject(c, path, "logstream.entries") | 292 » sa.index = a.makeStagingPaths(path, "logstream.index", uid) |
| 163 » if err != nil { | 293 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) |
| 164 » » log.WithError(err).Errorf(c, "Failed to create log object.") | 294 » return &sa, nil |
| 165 » » return | 295 } |
| 166 » } | 296 |
| 167 | 297 // makeStagingPaths returns a stagingPaths instance for the given path and |
| 168 » indexO, err = t.newGSObject(c, path, "logstream.index") | 298 // file name. It incorporates a unique ID into the staging name to differentiate |
| 169 » if err != nil { | 299 // it from other staging paths for the same path/name. |
| 170 » » log.WithError(err).Errorf(c, "Failed to create index object.") | 300 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st agingPaths { |
| 171 » » return | 301 » return stagingPaths{ |
| 172 » } | 302 » » staged: a.GSStagingBase.Concat(string(path), uid, name), |
| 173 | 303 » » final: a.GSBase.Concat(string(path), name), |
| 174 » dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext)) | 304 » } |
| 175 » if err != nil { | 305 } |
| 176 » » log.WithError(err).Errorf(c, "Failed to create data object.") | 306 |
| 177 » » return | 307 type stagedArchival struct { |
| 178 » } | 308 » *Archivist |
| 179 | 309 |
| 180 » // Load the URLs into our state. | 310 » path types.StreamPath |
| 181 » t.ar.StreamUrl = streamO.url | 311 » desc logpb.LogStreamDescriptor |
| 182 » t.ar.IndexUrl = indexO.url | 312 |
| 183 » t.ar.DataUrl = dataO.url | 313 » stream stagingPaths |
| 184 | 314 » streamSize int64 |
| 315 | |
| 316 » index stagingPaths | |
| 317 » indexSize int64 | |
| 318 | |
| 319 » data stagingPaths | |
| 320 » dataSize int64 | |
| 321 | |
| 322 » finalized bool | |
| 323 » terminalIndex int64 | |
| 324 » logEntryCount int64 | |
| 325 } | |
| 326 | |
| 327 // stage executes the archival process, archiving to the staged storage paths. | |
| 328 // | |
| 329 // If stage fails, it may return a transient error. | |
| 330 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) { | |
| 185 log.Fields{ | 331 log.Fields{ |
| 186 » » "streamURL": t.ar.StreamUrl, | 332 » » "streamURL": sa.stream.staged, |
| 187 » » "indexURL": t.ar.IndexUrl, | 333 » » "indexURL": sa.index.staged, |
| 188 » » "dataURL": t.ar.DataUrl, | 334 » » "dataURL": sa.data.staged, |
| 189 » }.Infof(c, "Archiving log stream...") | 335 » }.Debugf(c, "Staging log stream...") |
| 190 | 336 |
| 191 » // We want to try and delete any GS objects that were created during a f ailed | 337 » // Group any transient errors that occur during cleanup. If we aren't |
| 192 » // archival attempt. | 338 » // returning a non-transient error, return a transient "terr". |
| 193 » deleteOnFail := func(o *gsObject) { | 339 » var terr errors.MultiError |
| 194 » » if o == nil || err == nil { | 340 » defer func() { |
| 195 » » » return | 341 » » if err == nil && len(terr) > 0 { |
| 196 » » } | 342 » » » err = errors.WrapTransient(terr) |
| 197 » » if ierr := o.delete(); ierr != nil { | 343 » » } |
| 344 » }() | |
| 345 | |
| 346 » // Close our writers on exit. If any of them fail to close, mark the arc hival | |
| 347 » // as a transient failure. | |
| 348 » closeWriter := func(closer io.Closer, path gs.Path) { | |
| 349 » » // Close the Writer. If this results in an error, append it to o ur transient | |
| 350 » » // error MultiError. | |
| 351 » » if ierr := closer.Close(); ierr != nil { | |
| 352 » » » terr = append(terr, ierr) | |
| 353 » » } | |
| 354 | |
| 355 » » // If we have an archival error, also delete the path associated with this | |
| 356 » » // stream. This is a non-fatal failure, since we've already hit a fatal | |
| 357 » » // one. | |
| 358 » » if err != nil || len(terr) > 0 { | |
| 359 » » » if ierr := sa.GSClient.Delete(path); ierr != nil { | |
| 360 » » » » log.Fields{ | |
| 361 » » » » » log.ErrorKey: ierr, | |
| 362 » » » » » "path": path, | |
| 363 » » » » }.Warningf(c, "Failed to delete stream on error. ") | |
| 364 » » » } | |
| 365 » » } | |
| 366 » } | |
| 367 | |
| 368 » // createWriter is a shorthand function for creating a writer to a path and | |
| 369 » // reporting an error if it failed. | |
| 370 » createWriter := func(p gs.Path) (gs.Writer, error) { | |
| 371 » » w, ierr := sa.GSClient.NewWriter(p) | |
| 372 » » if ierr != nil { | |
| 198 log.Fields{ | 373 log.Fields{ |
| 199 log.ErrorKey: ierr, | 374 log.ErrorKey: ierr, |
| 200 » » » » "url": o.url, | 375 » » » » "path": p, |
| 201 » » » }.Warningf(c, "Failed to clean-up GS object on failure." ) | 376 » » » }.Errorf(c, "Failed to create writer.") |
| 202 » » } | 377 » » » return nil, ierr |
| 203 » } | 378 » » } |
| 204 » defer deleteOnFail(streamO) | 379 » » return w, nil |
| 205 » defer deleteOnFail(indexO) | 380 » } |
| 206 » defer deleteOnFail(dataO) | 381 |
| 207 | 382 » var streamWriter, indexWriter, dataWriter gs.Writer |
| 208 » // Close our GS object managers on exit. If any of them fail to close, m arh | 383 » if streamWriter, err = createWriter(sa.stream.staged); err != nil { |
| 209 » // the archival as a failure. | 384 » » return |
| 210 » closeOM := func(o *gsObject) { | 385 » } |
| 211 » » if o == nil { | 386 » defer closeWriter(streamWriter, sa.stream.staged) |
| 212 » » » return | 387 |
| 213 » » } | 388 » if indexWriter, err = createWriter(sa.index.staged); err != nil { |
| 214 » » if ierr := o.Close(); ierr != nil { | 389 » » return err |
| 215 » » » err = ierr | 390 » } |
| 216 » » } | 391 » defer closeWriter(indexWriter, sa.index.staged) |
| 217 » } | 392 |
| 218 » defer closeOM(streamO) | 393 » if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| 219 » defer closeOM(indexO) | 394 » » return err |
| 220 » defer closeOM(dataO) | 395 » } |
| 396 » defer closeWriter(dataWriter, sa.data.staged) | |
| 221 | 397 |
| 222 // Read our log entries from intermediate storage. | 398 // Read our log entries from intermediate storage. |
| 223 ss := storageSource{ | 399 ss := storageSource{ |
| 224 Context: c, | 400 Context: c, |
| 225 » » st: t.Storage, | 401 » » st: sa.Storage, |
| 226 » » path: types.StreamPath(t.Path), | 402 » » path: sa.path, |
| 227 » » contiguous: t.Complete, | 403 » » contiguous: complete, |
| 228 » » terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), | 404 » » terminalIndex: types.MessageIndex(sa.terminalIndex), |
| 229 lastIndex: -1, | 405 lastIndex: -1, |
| 230 } | 406 } |
| 231 | 407 |
| 232 m := archive.Manifest{ | 408 m := archive.Manifest{ |
| 233 » » Desc: t.desc, | 409 » » Desc: &sa.desc, |
| 234 Source: &ss, | 410 Source: &ss, |
| 235 » » LogWriter: streamO, | 411 » » LogWriter: streamWriter, |
| 236 » » IndexWriter: indexO, | 412 » » IndexWriter: indexWriter, |
| 237 » » DataWriter: dataO, | 413 » » DataWriter: dataWriter, |
| 238 » » StreamIndexRange: t.StreamIndexRange, | 414 » » StreamIndexRange: sa.StreamIndexRange, |
| 239 » » PrefixIndexRange: t.PrefixIndexRange, | 415 » » PrefixIndexRange: sa.PrefixIndexRange, |
| 240 » » ByteRange: t.ByteRange, | 416 » » ByteRange: sa.ByteRange, |
| 241 | 417 |
| 242 Logger: log.Get(c), | 418 Logger: log.Get(c), |
| 243 } | 419 } |
| 244 » err = archive.Archive(m) | 420 » if err = archive.Archive(m); err != nil { |
| 245 » if err != nil { | |
| 246 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 421 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 247 return | 422 return |
| 248 } | 423 } |
| 249 | 424 |
| 250 » t.ar.TerminalIndex = int64(ss.lastIndex) | 425 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) { |
| 251 » if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { | 426 » » // Fail if we were requested to archive only the complete log. W e consider |
| 252 » » // Fail, if we were requested to archive only the complete log. | 427 » » // this a transient error with the expectation that the missing entries will |
| 253 » » if t.Complete { | 428 » » // show up in future retries. |
| 254 » » » log.Fields{ | 429 » » switch { |
| 255 » » » » "terminalIndex": tidx, | 430 » » case complete && ss.hasMissingEntries: |
| 256 » » » » "lastIndex": t.ar.TerminalIndex, | 431 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.") |
| 257 » » » }.Errorf(c, "Log stream archival stopped prior to termin al index.") | 432 » » » err = errors.WrapTransient(errors.New("stream has missin g entries")) |
| 258 » » » return errors.New("stream finished short of terminal ind ex") | 433 » » » return |
| 259 » » } | 434 |
| 260 | 435 » » case ss.logEntryCount == 0: |
| 261 » » if t.ar.TerminalIndex < 0 { | |
| 262 // If our last log index was <0, then no logs were archi ved. | 436 // If our last log index was <0, then no logs were archi ved. |
| 263 log.Warningf(c, "No log entries were archived.") | 437 log.Warningf(c, "No log entries were archived.") |
| 264 » » } else { | 438 |
| 439 » » default: | |
| 265 // Update our terminal index. | 440 // Update our terminal index. |
| 266 log.Fields{ | 441 log.Fields{ |
| 267 » » » » "from": tidx, | 442 » » » » "terminalIndex": ss.lastIndex, |
| 268 » » » » "to": t.ar.TerminalIndex, | 443 » » » » "logEntryCount": ss.logEntryCount, |
| 269 » » » }.Infof(c, "Updated log stream terminal index.") | 444 » » » » "hasMissingEntries": ss.hasMissingEntries, |
| 445 » » » }.Debugf(c, "Finished archiving log stream.") | |
| 270 } | 446 } |
| 271 } | 447 } |
| 272 | 448 |
| 273 // Update our state with archival results. | 449 // Update our state with archival results. |
| 274 » t.ar.Path = t.Path | 450 » sa.terminalIndex = int64(ss.lastIndex) |
| 275 » t.ar.StreamSize = streamO.Count() | 451 » sa.logEntryCount = ss.logEntryCount |
| 276 » t.ar.IndexSize = indexO.Count() | 452 » sa.stream.count = streamWriter.Count() |
| 277 » t.ar.DataSize = dataO.Count() | 453 » sa.index.count = indexWriter.Count() |
| 278 » t.ar.Complete = !ss.hasMissingEntries | 454 » sa.data.count = dataWriter.Count() |
| 279 return | 455 return |
| 280 } | 456 } |
| 281 | 457 |
| 282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { | 458 type stagingPaths struct { |
| 283 » p := t.GSBase.Concat(path, name) | 459 » staged gs.Path |
| 284 » o := gsObject{ | 460 » final gs.Path |
| 285 » » gs: t.GSClient, | 461 » count int64 |
| 286 » » bucket: p.Bucket(), | 462 } |
| 287 » » path: p.Filename(), | 463 |
| 288 » } | 464 func (d *stagingPaths) clearStaged() { |
| 289 | 465 » d.staged = "" |
| 290 » // Build our GS URL. Note that since buildGSPath joins with "/", the ini tial | 466 } |
| 291 » // token, "gs:/", will become "gs://". | 467 |
| 292 » o.url = string(p) | 468 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd og.ArchiveStreamRequest) error { |
| 293 | 469 » err := parallel.FanOutIn(func(taskC chan<- func() error) { |
| 294 » var err error | 470 » » for _, d := range sa.getStagingPaths() { |
| 295 » o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path) | 471 » » » d := d |
| 472 | |
| 473 » » » // Don't copy zero-sized streams. | |
| 474 » » » if d.count == 0 { | |
| 475 » » » » continue | |
| 476 » » » } | |
| 477 | |
| 478 » » » taskC <- func() error { | |
| 479 » » » » if err := client.Rename(d.staged, d.final); err != nil { | |
| 480 » » » » » log.Fields{ | |
| 481 » » » » » » log.ErrorKey: err, | |
| 482 » » » » » » "stagedPath": d.staged, | |
| 483 » » » » » » "finalPath": d.final, | |
| 484 » » » » » }.Errorf(c, "Failed to rename GS object. ") | |
| 485 » » » » » return err | |
| 486 » » » » } | |
| 487 | |
| 488 » » » » // Clear the staged value to indicate that it no longer exists. | |
| 489 » » » » d.clearStaged() | |
| 490 » » » » return nil | |
| 491 » » » } | |
| 492 » » } | |
| 493 » }) | |
| 296 if err != nil { | 494 if err != nil { |
| 297 » » log.Fields{ | 495 » » return err |
| 298 » » » log.ErrorKey: err, | 496 » } |
| 299 » » » "url": o.url, | 497 |
| 300 » » }.Errorf(c, "Failed to create Writer.") | 498 » ar.TerminalIndex = sa.terminalIndex |
| 301 » » return nil, err | 499 » ar.LogEntryCount = sa.logEntryCount |
| 302 » } | 500 » ar.StreamUrl = string(sa.stream.final) |
| 303 | 501 » ar.StreamSize = sa.stream.count |
| 304 » // Delete any existing object at this path. | 502 » ar.IndexUrl = string(sa.index.final) |
| 305 » if err := o.delete(); err != nil { | 503 » ar.IndexSize = sa.index.count |
| 306 » » closeErr := o.Close() | 504 » ar.DataUrl = string(sa.data.final) |
| 307 | 505 » ar.DataSize = sa.data.count |
| 308 » » log.Fields{ | 506 » return nil |
| 309 » » » log.ErrorKey: err, | 507 } |
| 310 » » » "closeErr": closeErr, | 508 |
| 311 » » » "url": o.url, | 509 func (sa *stagedArchival) cleanup(c context.Context) { |
| 312 » » }.Errorf(c, "Could not delete object during creation.") | 510 » for _, d := range sa.getStagingPaths() { |
| 313 » » return nil, err | 511 » » if d.staged == "" { |
| 314 » } | 512 » » » continue |
| 315 » return &o, nil | 513 » » } |
| 316 } | 514 |
| 317 | 515 » » if err := sa.GSClient.Delete(d.staged); err != nil { |
| 318 // gsObjectManger wraps a gsObject instance with metadata. | |
| 319 type gsObject struct { | |
| 320 » gs.Writer | |
| 321 | |
| 322 » // gs is the Client instance. | |
| 323 » gs gs.Client | |
| 324 » // bucket is the name of the object's bucket. | |
| 325 » bucket string | |
| 326 » // path is the bucket-relative path of the object. | |
| 327 » path string | |
| 328 » // url is the Google Storage URL (gs://) of this object. | |
| 329 » url string | |
| 330 } | |
| 331 | |
| 332 func (o *gsObject) delete() error { | |
| 333 » return o.gs.Delete(o.bucket, o.path) | |
| 334 } | |
| 335 | |
| 336 // storageSource is an archive.LogEntrySource that pulls log entries from | |
| 337 // intermediate storage via its storage.Storage instance. | |
| 338 type storageSource struct { | |
| 339 » context.Context | |
| 340 | |
| 341 » st storage.Storage // the storage instance to read from | |
| 342 » path types.StreamPath // the path of the log stream | |
| 343 » contiguous bool // if true, enforce contiguous entries | |
| 344 » terminalIndex types.MessageIndex // if >= 0, discard logs beyond this | |
| 345 | |
| 346 » buf []*logpb.LogEntry | |
| 347 » lastIndex types.MessageIndex | |
| 348 » hasMissingEntries bool // true if some log entries were missing. | |
| 349 } | |
| 350 | |
| 351 func (s *storageSource) bufferEntries(start types.MessageIndex) error { | |
| 352 » bytes := 0 | |
| 353 | |
| 354 » req := storage.GetRequest{ | |
| 355 » » Path: s.path, | |
| 356 » » Index: start, | |
| 357 » } | |
| 358 » return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { | |
| 359 » » le := logpb.LogEntry{} | |
| 360 » » if err := proto.Unmarshal(d, &le); err != nil { | |
| 361 log.Fields{ | 516 log.Fields{ |
| 362 » » » » log.ErrorKey: err, | 517 » » » » log.ErrorKey: err, |
| 363 » » » » "streamIndex": idx, | 518 » » » » "path": d.staged, |
| 364 » » » }.Errorf(s, "Failed to unmarshal LogEntry.") | 519 » » » }.Warningf(c, "Failed to clean up staged path.") |
| 365 » » » return false | 520 » » } |
| 366 » » } | 521 |
| 367 » » s.buf = append(s.buf, &le) | 522 » » d.clearStaged() |
| 368 | 523 » } |
| 369 » » // Stop loading if we've reached or exceeded our buffer size. | 524 } |
| 370 » » bytes += len(d) | 525 |
| 371 » » return bytes < storageBufferSize | 526 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
| 372 » }) | 527 » return []*stagingPaths{ |
| 373 } | 528 » » &sa.stream, |
| 374 | 529 » » &sa.index, |
| 375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { | 530 » » &sa.data, |
| 376 » if len(s.buf) == 0 { | 531 » } |
| 377 » » s.buf = s.buf[:0] | 532 } |
| 378 » » if err := s.bufferEntries(s.lastIndex + 1); err != nil { | |
| 379 » » » if err == storage.ErrDoesNotExist { | |
| 380 » » » » log.Warningf(s, "Archive target stream does not exist in intermediate storage.") | |
| 381 » » » » return nil, archive.ErrEndOfStream | |
| 382 » » » } | |
| 383 | |
| 384 » » » log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.") | |
| 385 » » » return nil, err | |
| 386 » » } | |
| 387 » } | |
| 388 | |
| 389 » if len(s.buf) == 0 { | |
| 390 » » log.Fields{ | |
| 391 » » » "lastIndex": s.lastIndex, | |
| 392 » » }.Debugf(s, "Encountered end of stream.") | |
| 393 » » return nil, archive.ErrEndOfStream | |
| 394 » } | |
| 395 | |
| 396 » var le *logpb.LogEntry | |
| 397 » le, s.buf = s.buf[0], s.buf[1:] | |
| 398 | |
| 399 » // If we're enforcing a contiguous log stream, error if this LogEntry is not | |
| 400 » // contiguous. | |
| 401 » sidx := types.MessageIndex(le.StreamIndex) | |
| 402 » nidx := (s.lastIndex + 1) | |
| 403 » if sidx != nidx { | |
| 404 » » s.hasMissingEntries = true | |
| 405 | |
| 406 » » if s.contiguous { | |
| 407 » » » log.Fields{ | |
| 408 » » » » "index": sidx, | |
| 409 » » » » "nextIndex": nidx, | |
| 410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing." ) | |
| 411 » » » return nil, errors.New("non-contiguous log stream") | |
| 412 » » } | |
| 413 » } | |
| 414 | |
| 415 » // If we're enforcing a maximum terminal index, return end of stream if this | |
| 416 » // LogEntry exceeds that index. | |
| 417 » if s.terminalIndex >= 0 && sidx > s.terminalIndex { | |
| 418 » » log.Fields{ | |
| 419 » » » "index": sidx, | |
| 420 » » » "terminalIndex": s.terminalIndex, | |
| 421 » » }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") | |
| 422 » » return nil, archive.ErrEndOfStream | |
| 423 » } | |
| 424 | |
| 425 » s.lastIndex = sidx | |
| 426 » return le, nil | |
| 427 } | |
| OLD | NEW |