| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "strings" | 9 "strings" |
| 11 "sync" | 10 "sync" |
| 12 "testing" | 11 "testing" |
| 13 "time" | 12 "time" |
| 14 | 13 |
| 15 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/clock/testclock" | 16 "github.com/luci/luci-go/common/clock/testclock" |
| 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/gcloud/gs" | 18 "github.com/luci/luci-go/common/gcloud/gs" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 "github.com/luci/luci-go/server/logdog/storage" | 22 "github.com/luci/luci-go/server/logdog/storage" |
| 23 "github.com/luci/luci-go/server/logdog/storage/memory" | 23 "github.com/luci/luci-go/server/logdog/storage/memory" |
| 24 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 25 "google.golang.org/grpc" | 25 "google.golang.org/grpc" |
| 26 | 26 |
| 27 . "github.com/luci/luci-go/common/testing/assertions" | 27 . "github.com/luci/luci-go/common/testing/assertions" |
| 28 . "github.com/smartystreets/goconvey/convey" | 28 . "github.com/smartystreets/goconvey/convey" |
| 29 ) | 29 ) |
| 30 | 30 |
| 31 // testTask is an instrumentable Task implementation. |
| 32 type testTask struct { |
| 33 task *logdog.ArchiveTask |
| 34 assertLeaseErr error |
| 35 assertCount int |
| 36 } |
| 37 |
| 38 func (t *testTask) UniqueID() string { |
| 39 return "totally unique ID" |
| 40 } |
| 41 |
| 42 func (t *testTask) Task() *logdog.ArchiveTask { |
| 43 return t.task |
| 44 } |
| 45 |
| 46 func (t *testTask) AssertLease(context.Context) error { |
| 47 if err := t.assertLeaseErr; err != nil { |
| 48 return err |
| 49 } |
| 50 t.assertCount++ |
| 51 return nil |
| 52 } |
| 53 |
| 31 // testServicesClient implements logdog.ServicesClient sufficient for testing | 54 // testServicesClient implements logdog.ServicesClient sufficient for testing |
| 32 // and instrumentation. | 55 // and instrumentation. |
| 33 type testServicesClient struct { | 56 type testServicesClient struct { |
| 34 logdog.ServicesClient | 57 logdog.ServicesClient |
| 35 | 58 |
| 36 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse,
error) | 59 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse,
error) |
| 37 asCallback func(*logdog.ArchiveStreamRequest) error | 60 asCallback func(*logdog.ArchiveStreamRequest) error |
| 38 } | 61 } |
| 39 | 62 |
| 40 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre
amRequest, o ...grpc.CallOption) ( | 63 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre
amRequest, o ...grpc.CallOption) ( |
| (...skipping 17 matching lines...) Expand all Loading... |
| 58 // testGSClient is a testing implementation of the gsClient interface. | 81 // testGSClient is a testing implementation of the gsClient interface. |
| 59 // | 82 // |
| 60 // It does not actually retain any of the written data, since that level of | 83 // It does not actually retain any of the written data, since that level of |
| 61 // testing is done in the archive package. | 84 // testing is done in the archive package. |
| 62 type testGSClient struct { | 85 type testGSClient struct { |
| 63 sync.Mutex | 86 sync.Mutex |
| 64 gs.Client | 87 gs.Client |
| 65 | 88 |
| 66 // objs is a map of filename to "write amount". The write amount is the | 89 // objs is a map of filename to "write amount". The write amount is the |
| 67 // cumulative amount of data written to the Writer for a given GS path. | 90 // cumulative amount of data written to the Writer for a given GS path. |
| 68 » objs map[string]int64 | 91 » objs map[gs.Path]int64 |
| 69 closed bool | 92 closed bool |
| 70 | 93 |
| 71 closeErr error | 94 closeErr error |
| 72 newWriterErr func(w *testGSWriter) error | 95 newWriterErr func(w *testGSWriter) error |
| 73 » deleteErr func(string, string) error | 96 » deleteErr func(gs.Path) error |
| 97 » renameErr func(gs.Path, gs.Path) error |
| 74 } | 98 } |
| 75 | 99 |
| 76 func (c *testGSClient) NewWriter(bucket, relpath string) (gs.Writer, error) { | 100 func (c *testGSClient) NewWriter(p gs.Path) (gs.Writer, error) { |
| 77 w := testGSWriter{ | 101 w := testGSWriter{ |
| 78 client: c, | 102 client: c, |
| 79 » » path: c.url(bucket, relpath), | 103 » » path: p, |
| 80 } | 104 } |
| 81 if c.newWriterErr != nil { | 105 if c.newWriterErr != nil { |
| 82 if err := c.newWriterErr(&w); err != nil { | 106 if err := c.newWriterErr(&w); err != nil { |
| 83 return nil, err | 107 return nil, err |
| 84 } | 108 } |
| 85 } | 109 } |
| 86 return &w, nil | 110 return &w, nil |
| 87 } | 111 } |
| 88 | 112 |
| 89 func (c *testGSClient) Close() error { | 113 func (c *testGSClient) Close() error { |
| 90 if c.closed { | 114 if c.closed { |
| 91 panic("double close") | 115 panic("double close") |
| 92 } | 116 } |
| 93 if err := c.closeErr; err != nil { | 117 if err := c.closeErr; err != nil { |
| 94 return err | 118 return err |
| 95 } | 119 } |
| 96 c.closed = true | 120 c.closed = true |
| 97 return nil | 121 return nil |
| 98 } | 122 } |
| 99 | 123 |
| 100 func (c *testGSClient) Delete(bucket, relpath string) error { | 124 func (c *testGSClient) Delete(p gs.Path) error { |
| 101 if c.deleteErr != nil { | 125 if c.deleteErr != nil { |
| 102 » » if err := c.deleteErr(bucket, relpath); err != nil { | 126 » » if err := c.deleteErr(p); err != nil { |
| 103 return err | 127 return err |
| 104 } | 128 } |
| 105 } | 129 } |
| 106 | 130 |
| 107 c.Lock() | 131 c.Lock() |
| 108 defer c.Unlock() | 132 defer c.Unlock() |
| 109 | 133 |
| 110 » delete(c.objs, c.url(bucket, relpath)) | 134 » delete(c.objs, p) |
| 111 return nil | 135 return nil |
| 112 } | 136 } |
| 113 | 137 |
| 114 func (c *testGSClient) url(bucket, relpath string) string { | 138 func (c *testGSClient) Rename(src, dst gs.Path) error { |
| 115 » return fmt.Sprintf("gs://%s/%s", bucket, relpath) | 139 » if c.renameErr != nil { |
| 140 » » if err := c.renameErr(src, dst); err != nil { |
| 141 » » » return err |
| 142 » » } |
| 143 » } |
| 144 |
| 145 » c.Lock() |
| 146 » defer c.Unlock() |
| 147 |
| 148 » c.objs[dst] = c.objs[src] |
| 149 » delete(c.objs, src) |
| 150 » return nil |
| 116 } | 151 } |
| 117 | 152 |
| 118 type testGSWriter struct { | 153 type testGSWriter struct { |
| 119 client *testGSClient | 154 client *testGSClient |
| 120 | 155 |
| 121 » path string | 156 » path gs.Path |
| 122 closed bool | 157 closed bool |
| 123 writeCount int64 | 158 writeCount int64 |
| 124 | 159 |
| 125 writeErr error | 160 writeErr error |
| 126 closeErr error | 161 closeErr error |
| 127 } | 162 } |
| 128 | 163 |
| 129 func (w *testGSWriter) Write(d []byte) (int, error) { | 164 func (w *testGSWriter) Write(d []byte) (int, error) { |
| 130 if err := w.writeErr; err != nil { | 165 if err := w.writeErr; err != nil { |
| 131 return 0, err | 166 return 0, err |
| 132 } | 167 } |
| 133 | 168 |
| 134 w.client.Lock() | 169 w.client.Lock() |
| 135 defer w.client.Unlock() | 170 defer w.client.Unlock() |
| 136 | 171 |
| 137 if w.client.objs == nil { | 172 if w.client.objs == nil { |
| 138 » » w.client.objs = make(map[string]int64) | 173 » » w.client.objs = make(map[gs.Path]int64) |
| 139 } | 174 } |
| 140 w.client.objs[w.path] += int64(len(d)) | 175 w.client.objs[w.path] += int64(len(d)) |
| 141 w.writeCount += int64(len(d)) | 176 w.writeCount += int64(len(d)) |
| 142 return len(d), nil | 177 return len(d), nil |
| 143 } | 178 } |
| 144 | 179 |
| 145 func (w *testGSWriter) Close() error { | 180 func (w *testGSWriter) Close() error { |
| 146 if w.closed { | 181 if w.closed { |
| 147 panic("double close") | 182 panic("double close") |
| 148 } | 183 } |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 205 }) | 240 }) |
| 206 if err != nil { | 241 if err != nil { |
| 207 panic(err) | 242 panic(err) |
| 208 } | 243 } |
| 209 | 244 |
| 210 // Advance the time for each log entry. | 245 // Advance the time for each log entry. |
| 211 tc.Add(time.Second) | 246 tc.Add(time.Second) |
| 212 } | 247 } |
| 213 } | 248 } |
| 214 | 249 |
| 250 // Set up our testing archival task. |
| 251 expired := 10 * time.Minute |
| 252 archiveTask := logdog.ArchiveTask{ |
| 253 Path: string(desc.Path()), |
| 254 SettleDelay: google.NewDuration(10 * time.Second), |
| 255 CompletePeriod: google.NewDuration(expired), |
| 256 Key: []byte("random archival key"), |
| 257 } |
| 258 expired++ // This represents a time PAST CompletePeriod. |
| 259 |
| 260 task := &testTask{ |
| 261 task: &archiveTask, |
| 262 } |
| 263 |
| 215 // Set up our test Coordinator client stubs. | 264 // Set up our test Coordinator client stubs. |
| 216 stream := logdog.LoadStreamResponse{ | 265 stream := logdog.LoadStreamResponse{ |
| 217 State: &logdog.LogStreamState{ | 266 State: &logdog.LogStreamState{ |
| 218 » » » » Path: string(desc.Path()), | 267 » » » » Path: archiveTask.Path, |
| 219 ProtoVersion: logpb.Version, | 268 ProtoVersion: logpb.Version, |
| 220 TerminalIndex: -1, | 269 TerminalIndex: -1, |
| 221 Archived: false, | 270 Archived: false, |
| 222 Purged: false, | 271 Purged: false, |
| 223 }, | 272 }, |
| 224 Desc: descBytes, | 273 Desc: descBytes, |
| 274 |
| 275 // Age is ON the expiration threshold, so not expired. |
| 276 Age: archiveTask.CompletePeriod, |
| 277 ArchivalKey: archiveTask.Key, |
| 225 } | 278 } |
| 226 | 279 |
| 227 var archiveRequest *logdog.ArchiveStreamRequest | 280 var archiveRequest *logdog.ArchiveStreamRequest |
| 281 var archiveStreamErr error |
| 228 sc := testServicesClient{ | 282 sc := testServicesClient{ |
| 229 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 283 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 230 return &stream, nil | 284 return &stream, nil |
| 231 }, | 285 }, |
| 232 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ | 286 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ |
| 233 archiveRequest = req | 287 archiveRequest = req |
| 234 » » » » return nil | 288 » » » » return archiveStreamErr |
| 235 }, | 289 }, |
| 236 } | 290 } |
| 237 | 291 |
| 238 ar := Archivist{ | 292 ar := Archivist{ |
| 239 » » » Service: &sc, | 293 » » » Service: &sc, |
| 240 » » » Storage: &st, | 294 » » » Storage: &st, |
| 241 » » » GSClient: &gsc, | 295 » » » GSClient: &gsc, |
| 242 » » » GSBase: gs.Path("gs://archive-test/path/to/archive/"),
// Extra slashes to test concatenation. | 296 » » » GSBase: gs.Path("gs://archive-test/path/to/archiv
e/"), // Extra slashes to test concatenation. |
| 243 » » } | 297 » » » GSStagingBase: gs.Path("gs://archive-test-staging/path/t
o/archive/"), // Extra slashes to test concatenation. |
| 244 | |
| 245 » » task := logdog.ArchiveTask{ | |
| 246 » » » Path: stream.State.Path, | |
| 247 » » » Complete: true, | |
| 248 } | 298 } |
| 249 | 299 |
| 250 gsURL := func(p string) string { | 300 gsURL := func(p string) string { |
| 251 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) | 301 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) |
| 252 } | 302 } |
| 253 | 303 |
| 254 // hasStreams can be called to check that the retained archiveRe
quest had | 304 // hasStreams can be called to check that the retained archiveRe
quest had |
| 255 // data sizes for the named archive stream types. | 305 // data sizes for the named archive stream types. |
| 256 // | 306 // |
| 257 // After checking, the values are set to zero. This allows us to
use | 307 // After checking, the values are set to zero. This allows us to
use |
| 258 // ShouldEqual without hardcoding specific archival sizes into t
he results. | 308 // ShouldEqual without hardcoding specific archival sizes into t
he results. |
| 259 hasStreams := func(log, index, data bool) bool { | 309 hasStreams := func(log, index, data bool) bool { |
| 260 So(archiveRequest, ShouldNotBeNil) | 310 So(archiveRequest, ShouldNotBeNil) |
| 261 if (log && archiveRequest.StreamSize <= 0) || | 311 if (log && archiveRequest.StreamSize <= 0) || |
| 262 (index && archiveRequest.IndexSize <= 0) || | 312 (index && archiveRequest.IndexSize <= 0) || |
| 263 (data && archiveRequest.DataSize <= 0) { | 313 (data && archiveRequest.DataSize <= 0) { |
| 264 return false | 314 return false |
| 265 } | 315 } |
| 266 | 316 |
| 267 archiveRequest.StreamSize = 0 | 317 archiveRequest.StreamSize = 0 |
| 268 archiveRequest.IndexSize = 0 | 318 archiveRequest.IndexSize = 0 |
| 269 archiveRequest.DataSize = 0 | 319 archiveRequest.DataSize = 0 |
| 270 return true | 320 return true |
| 271 } | 321 } |
| 272 | 322 |
| 273 » » Convey(`Will fail to archive if the specified stream state could
not be loaded.`, func() { | 323 » » Convey(`Will return task and fail to archive if the specified st
ream state could not be loaded.`, func() { |
| 274 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 324 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 275 return nil, errors.New("does not exist") | 325 return nil, errors.New("does not exist") |
| 276 } | 326 } |
| 277 » » » So(ar.Archive(c, &task), ShouldErrLike, "does not exist"
) | 327 |
| 328 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 329 » » » So(err, ShouldErrLike, "does not exist") |
| 330 » » » So(ack, ShouldBeFalse) |
| 278 }) | 331 }) |
| 279 | 332 |
| 280 » » Convey(`Will refrain from archiving if the stream is already arc
hived.`, func() { | 333 » » Convey(`Will complete task and refrain from archiving if the str
eam is already archived.`, func() { |
| 281 stream.State.Archived = true | 334 stream.State.Archived = true |
| 282 » » » So(ar.Archive(c, &task), ShouldBeNil) | 335 |
| 336 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 337 » » » So(err, ShouldErrLike, "log stream is archived") |
| 338 » » » So(ack, ShouldBeTrue) |
| 283 So(archiveRequest, ShouldBeNil) | 339 So(archiveRequest, ShouldBeNil) |
| 284 }) | 340 }) |
| 285 | 341 |
| 286 » » Convey(`Will refrain from archiving if the stream is purged.`, f
unc() { | 342 » » Convey(`Will complete task and refrain from archiving if the str
eam is purged.`, func() { |
| 287 stream.State.Purged = true | 343 stream.State.Purged = true |
| 288 » » » So(ar.Archive(c, &task), ShouldBeNil) | 344 |
| 345 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 346 » » » So(err, ShouldErrLike, "log stream is purged") |
| 347 » » » So(ack, ShouldBeTrue) |
| 348 » » » So(archiveRequest, ShouldBeNil) |
| 349 » » }) |
| 350 |
| 351 » » Convey(`Will return task if the stream is younger than its settl
e delay.`, func() { |
| 352 » » » stream.Age = google.NewDuration(time.Second) |
| 353 |
| 354 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 355 » » » So(err, ShouldErrLike, "log stream is within settle dela
y") |
| 356 » » » So(ack, ShouldBeFalse) |
| 357 » » » So(archiveRequest, ShouldBeNil) |
| 358 » » }) |
| 359 |
| 360 » » Convey(`Will return task if the log stream doesn't have an archi
val key yet.`, func() { |
| 361 » » » stream.Age = google.NewDuration(expired) |
| 362 » » » stream.ArchivalKey = nil |
| 363 |
| 364 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 365 » » » So(err, ShouldErrLike, "premature archival request") |
| 366 » » » So(ack, ShouldBeFalse) |
| 367 » » » So(archiveRequest, ShouldBeNil) |
| 368 » » }) |
| 369 |
| 370 » » Convey(`Will complete task and refrain from archiving if archiva
l keys dont' match.`, func() { |
| 371 » » » stream.Age = google.NewDuration(expired) |
| 372 » » » stream.ArchivalKey = []byte("non-matching archival key") |
| 373 |
| 374 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 375 » » » So(err, ShouldErrLike, "superfluous archival request") |
| 376 » » » So(ack, ShouldBeTrue) |
| 289 So(archiveRequest, ShouldBeNil) | 377 So(archiveRequest, ShouldBeNil) |
| 290 }) | 378 }) |
| 291 | 379 |
| 292 // Weird case: the log has been marked for archival, has not bee
n | 380 // Weird case: the log has been marked for archival, has not bee
n |
| 293 » » // terminated, and is within its completeness delay. This task w
ill not | 381 » » // |
| 294 » » // have been dispatched by our archive cron, but let's assert th
at it | 382 » » // terminated, and is within its completeness delay. This task s
hould not |
| 295 » » // behaves correctly regardless. | 383 » » // have been dispatched by our expired archive cron, but let's a
ssert that |
| 296 » » Convey(`Will succeed if the log stream had no entries and no ter
minal index.`, func() { | 384 » » // it behaves correctly regardless. |
| 297 » » » So(ar.Archive(c, &task), ShouldBeNil) | 385 » » Convey(`Will refuse to archive a complete stream with no termina
l index.`, func() { |
| 298 | 386 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 299 » » » So(hasStreams(true, true, false), ShouldBeTrue) | 387 » » » So(err, ShouldErrLike, "completeness required, but strea
m has no terminal index") |
| 300 » » » So(archiveRequest, ShouldResemble, &logdog.ArchiveStream
Request{ | 388 » » » So(ack, ShouldBeFalse) |
| 301 » » » » Path: task.Path, | |
| 302 » » » » Complete: true, | |
| 303 » » » » TerminalIndex: -1, | |
| 304 | |
| 305 » » » » StreamUrl: gsURL("logstream.entries"), | |
| 306 » » » » IndexUrl: gsURL("logstream.index"), | |
| 307 » » » » DataUrl: gsURL("data.bin"), | |
| 308 » » » }) | |
| 309 }) | 389 }) |
| 310 | 390 |
| 311 Convey(`With terminal index "3"`, func() { | 391 Convey(`With terminal index "3"`, func() { |
| 312 stream.State.TerminalIndex = 3 | 392 stream.State.TerminalIndex = 3 |
| 313 | 393 |
| 314 » » » Convey(`Will fail if the log stream had a terminal index
and no entries.`, func() { | 394 » » » Convey(`Will fail not ACK a log stream with no entries.`
, func() { |
| 315 » » » » So(ar.Archive(c, &task), ShouldErrLike, "stream
finished short of terminal index") | 395 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 396 » » » » So(err, ShouldErrLike, "stream has missing entri
es") |
| 397 » » » » So(ack, ShouldBeFalse) |
| 316 }) | 398 }) |
| 317 | 399 |
| 318 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { | 400 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { |
| 319 addTestEntry(0, 1, 2, 4) | 401 addTestEntry(0, 1, 2, 4) |
| 320 » » » » So(ar.Archive(c, &task), ShouldErrLike, "non-con
tiguous log stream") | 402 |
| 403 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 404 » » » » So(err, ShouldErrLike, "stream has missing entri
es") |
| 405 » » » » So(ack, ShouldBeFalse) |
| 321 }) | 406 }) |
| 322 | 407 |
| 323 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { | 408 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { |
| 324 addTestEntry(0, 1, 2, 3, 4) | 409 addTestEntry(0, 1, 2, 3, 4) |
| 325 » » » » So(ar.Archive(c, &task), ShouldBeNil) | 410 |
| 411 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 412 » » » » So(err, ShouldBeNil) |
| 413 » » » » So(ack, ShouldBeTrue) |
| 326 | 414 |
| 327 So(hasStreams(true, true, true), ShouldBeTrue) | 415 So(hasStreams(true, true, true), ShouldBeTrue) |
| 328 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ | 416 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 329 » » » » » Path: task.Path, | 417 » » » » » Path: archiveTask.Path, |
| 330 » » » » » Complete: true, | 418 » » » » » LogEntryCount: 4, |
| 331 TerminalIndex: 3, | 419 TerminalIndex: 3, |
| 332 | 420 |
| 333 StreamUrl: gsURL("logstream.entries"), | 421 StreamUrl: gsURL("logstream.entries"), |
| 334 IndexUrl: gsURL("logstream.index"), | 422 IndexUrl: gsURL("logstream.index"), |
| 335 DataUrl: gsURL("data.bin"), | 423 DataUrl: gsURL("data.bin"), |
| 336 }) | 424 }) |
| 337 }) | 425 }) |
| 426 |
| 427 Convey(`When atransient archival error occurs, will not
ACK it.`, func() { |
| 428 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } |
| 429 |
| 430 ack, err := ar.archiveTaskImpl(c, task) |
| 431 So(err, ShouldErrLike, "test error") |
| 432 So(ack, ShouldBeFalse) |
| 433 }) |
| 434 |
| 435 Convey(`When a non-transient archival error occurs`, fun
c() { |
| 436 archiveErr := errors.New("archive failure error"
) |
| 437 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } |
| 438 |
| 439 Convey(`If remote report returns an error, do no
t ACK.`, func() { |
| 440 archiveStreamErr = errors.New("test erro
r") |
| 441 |
| 442 ack, err := ar.archiveTaskImpl(c, task) |
| 443 So(err, ShouldErrLike, "test error") |
| 444 So(ack, ShouldBeFalse) |
| 445 |
| 446 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 447 Path: archiveTask.Path, |
| 448 Error: "archive failure error", |
| 449 }) |
| 450 }) |
| 451 |
| 452 Convey(`If remote report returns success, ACK.`,
func() { |
| 453 ack, err := ar.archiveTaskImpl(c, task) |
| 454 So(err, ShouldBeNil) |
| 455 So(ack, ShouldBeTrue) |
| 456 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 457 Path: archiveTask.Path, |
| 458 Error: "archive failure error", |
| 459 }) |
| 460 }) |
| 461 |
| 462 Convey(`If an empty error string is supplied, th
e generic error will be filled in.`, func() { |
| 463 archiveErr = errors.New("") |
| 464 |
| 465 ack, err := ar.archiveTaskImpl(c, task) |
| 466 So(err, ShouldBeNil) |
| 467 So(ack, ShouldBeTrue) |
| 468 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 469 Path: archiveTask.Path, |
| 470 Error: "archival error", |
| 471 }) |
| 472 }) |
| 473 }) |
| 338 }) | 474 }) |
| 339 | 475 |
| 340 Convey(`When not enforcing stream completeness`, func() { | 476 Convey(`When not enforcing stream completeness`, func() { |
| 341 » » » task.Complete = false | 477 » » » stream.Age = google.NewDuration(expired) |
| 342 | 478 |
| 343 Convey(`With no terminal index`, func() { | 479 Convey(`With no terminal index`, func() { |
| 344 Convey(`Will successfully archive if there are n
o entries.`, func() { | 480 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 345 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 481 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 482 » » » » » So(err, ShouldBeNil) |
| 483 » » » » » So(ack, ShouldBeTrue) |
| 346 | 484 |
| 347 So(hasStreams(true, true, false), Should
BeTrue) | 485 So(hasStreams(true, true, false), Should
BeTrue) |
| 348 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 486 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 349 » » » » » » Path: task.Path, | 487 » » » » » » Path: archiveTask.Path, |
| 350 » » » » » » Complete: true, | 488 » » » » » » LogEntryCount: 0, |
| 351 TerminalIndex: -1, | 489 TerminalIndex: -1, |
| 352 | 490 |
| 353 StreamUrl: gsURL("logstream.entr
ies"), | 491 StreamUrl: gsURL("logstream.entr
ies"), |
| 354 IndexUrl: gsURL("logstream.inde
x"), | 492 IndexUrl: gsURL("logstream.inde
x"), |
| 355 DataUrl: gsURL("data.bin"), | 493 DataUrl: gsURL("data.bin"), |
| 356 }) | 494 }) |
| 357 }) | 495 }) |
| 358 | 496 |
| 359 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { | 497 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { |
| 360 addTestEntry(0, 1, 2, 4) | 498 addTestEntry(0, 1, 2, 4) |
| 361 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 499 |
| 500 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 501 » » » » » So(err, ShouldBeNil) |
| 502 » » » » » So(ack, ShouldBeTrue) |
| 362 | 503 |
| 363 So(hasStreams(true, true, true), ShouldB
eTrue) | 504 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 364 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 505 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 365 » » » » » » Path: task.Path, | 506 » » » » » » Path: archiveTask.Path, |
| 366 » » » » » » Complete: false, | 507 » » » » » » LogEntryCount: 4, |
| 367 TerminalIndex: 4, | 508 TerminalIndex: 4, |
| 368 | 509 |
| 369 StreamUrl: gsURL("logstream.entr
ies"), | 510 StreamUrl: gsURL("logstream.entr
ies"), |
| 370 IndexUrl: gsURL("logstream.inde
x"), | 511 IndexUrl: gsURL("logstream.inde
x"), |
| 371 DataUrl: gsURL("data.bin"), | 512 DataUrl: gsURL("data.bin"), |
| 372 }) | 513 }) |
| 373 }) | 514 }) |
| 374 }) | 515 }) |
| 375 | 516 |
| 376 Convey(`With terminal index 3`, func() { | 517 Convey(`With terminal index 3`, func() { |
| 377 stream.State.TerminalIndex = 3 | 518 stream.State.TerminalIndex = 3 |
| 378 | 519 |
| 379 Convey(`Will successfully archive if there are n
o entries.`, func() { | 520 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 380 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 521 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 522 » » » » » So(err, ShouldBeNil) |
| 523 » » » » » So(ack, ShouldBeTrue) |
| 381 | 524 |
| 382 So(hasStreams(true, true, false), Should
BeTrue) | 525 So(hasStreams(true, true, false), Should
BeTrue) |
| 383 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 526 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 384 » » » » » » Path: task.Path, | 527 » » » » » » Path: archiveTask.Path, |
| 385 » » » » » » Complete: true, | 528 » » » » » » LogEntryCount: 0, |
| 386 TerminalIndex: -1, | 529 TerminalIndex: -1, |
| 387 | 530 |
| 388 StreamUrl: gsURL("logstream.entr
ies"), | 531 StreamUrl: gsURL("logstream.entr
ies"), |
| 389 IndexUrl: gsURL("logstream.inde
x"), | 532 IndexUrl: gsURL("logstream.inde
x"), |
| 390 DataUrl: gsURL("data.bin"), | 533 DataUrl: gsURL("data.bin"), |
| 391 }) | 534 }) |
| 392 }) | 535 }) |
| 393 | 536 |
| 394 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { | 537 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { |
| 395 addTestEntry(0, 1, 2, 4) | 538 addTestEntry(0, 1, 2, 4) |
| 396 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 539 |
| 540 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 541 » » » » » So(err, ShouldBeNil) |
| 542 » » » » » So(ack, ShouldBeTrue) |
| 397 | 543 |
| 398 So(hasStreams(true, true, true), ShouldB
eTrue) | 544 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 399 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 545 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 400 » » » » » » Path: task.Path, | 546 » » » » » » Path: archiveTask.Path, |
| 401 » » » » » » Complete: false, | 547 » » » » » » LogEntryCount: 3, |
| 402 TerminalIndex: 2, | 548 TerminalIndex: 2, |
| 403 | 549 |
| 404 StreamUrl: gsURL("logstream.entr
ies"), | 550 StreamUrl: gsURL("logstream.entr
ies"), |
| 405 IndexUrl: gsURL("logstream.inde
x"), | 551 IndexUrl: gsURL("logstream.inde
x"), |
| 406 DataUrl: gsURL("data.bin"), | 552 DataUrl: gsURL("data.bin"), |
| 407 }) | 553 }) |
| 408 }) | 554 }) |
| 409 }) | 555 }) |
| 410 }) | 556 }) |
| 411 | 557 |
| 412 // Simulate failures during the various stream generation operat
ions. | 558 // Simulate failures during the various stream generation operat
ions. |
| 413 Convey(`Stream generation failures`, func() { | 559 Convey(`Stream generation failures`, func() { |
| 414 stream.State.TerminalIndex = 3 | 560 stream.State.TerminalIndex = 3 |
| 415 addTestEntry(0, 1, 2, 3) | 561 addTestEntry(0, 1, 2, 3) |
| 416 | 562 |
| 417 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 563 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
| 418 for _, testCase := range []struct { | 564 for _, testCase := range []struct { |
| 419 name string | 565 name string |
| 420 setup func() | 566 setup func() |
| 421 }{ | 567 }{ |
| 422 {"delete failure", func() { | |
| 423 gsc.deleteErr = func(b, p string
) error { | |
| 424 if strings.HasSuffix(p,
failName) { | |
| 425 return errors.Ne
w("test error") | |
| 426 } | |
| 427 return nil | |
| 428 } | |
| 429 }}, | |
| 430 | |
| 431 {"writer create failure", func() { | 568 {"writer create failure", func() { |
| 432 gsc.newWriterErr = func(w *testG
SWriter) error { | 569 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 433 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 570 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 434 » » » » » » » » return errors.Ne
w("test error") | 571 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) |
| 435 } | 572 } |
| 436 return nil | 573 return nil |
| 437 } | 574 } |
| 438 }}, | 575 }}, |
| 439 | 576 |
| 440 {"write failure", func() { | 577 {"write failure", func() { |
| 441 gsc.newWriterErr = func(w *testG
SWriter) error { | 578 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 442 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 579 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 443 » » » » » » » » w.writeErr = err
ors.New("test error") | 580 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) |
| 444 } | 581 } |
| 445 return nil | 582 return nil |
| 446 } | 583 } |
| 584 }}, |
| 585 |
| 586 {"rename failure", func() { |
| 587 gsc.renameErr = func(src, dst gs
.Path) error { |
| 588 if strings.HasSuffix(str
ing(src), failName) { |
| 589 return errors.Wr
apTransient(errors.New("test error")) |
| 590 } |
| 591 return nil |
| 592 } |
| 447 }}, | 593 }}, |
| 448 | 594 |
| 449 {"close failure", func() { | 595 {"close failure", func() { |
| 450 gsc.newWriterErr = func(w *testG
SWriter) error { | 596 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 451 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 597 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 452 » » » » » » » » w.closeErr = err
ors.New("test error") | 598 » » » » » » » » w.closeErr = err
ors.WrapTransient(errors.New("test error")) |
| 453 } | 599 } |
| 454 return nil | 600 return nil |
| 455 } | 601 } |
| 456 }}, | 602 }}, |
| 457 | 603 |
| 458 » » » » » {"delete on fail failure (double-failure
)", func() { | 604 » » » » » {"delete failure after other failure", f
unc() { |
| 459 » » » » » » failed := false | |
| 460 | |
| 461 // Simulate a write failure. Thi
s is the error that will actually | 605 // Simulate a write failure. Thi
s is the error that will actually |
| 462 // be returned. | 606 // be returned. |
| 463 gsc.newWriterErr = func(w *testG
SWriter) error { | 607 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 464 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 608 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 465 » » » » » » » » w.writeErr = err
ors.New("test error") | 609 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) |
| 466 } | 610 } |
| 467 return nil | 611 return nil |
| 468 } | 612 } |
| 469 | 613 |
| 470 » » » » » » // This will trigger twice per s
tream, once on create and once on | 614 » » » » » » // This will trigger whe NewWrit
er fails from the above |
| 471 » » » » » » // cleanup after the write fails
. We want to return an error in | 615 » » » » » » // instrumentation. |
| 472 » » » » » » // the latter case. | 616 » » » » » » gsc.deleteErr = func(p gs.Path)
error { |
| 473 » » » » » » gsc.deleteErr = func(b, p string
) error { | 617 » » » » » » » if strings.HasSuffix(str
ing(p), failName) { |
| 474 » » » » » » » if strings.HasSuffix(p,
failName) { | 618 » » » » » » » » return errors.Ne
w("other error") |
| 475 » » » » » » » » if failed { | |
| 476 » » » » » » » » » return e
rrors.New("other error") | |
| 477 » » » » » » » » } | |
| 478 | |
| 479 » » » » » » » » // First delete
(on create). | |
| 480 » » » » » » » » failed = true | |
| 481 } | 619 } |
| 482 return nil | 620 return nil |
| 483 } | 621 } |
| 484 }}, | 622 }}, |
| 485 } { | 623 } { |
| 486 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
`, testCase.name, failName), func() { | 624 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { |
| 487 testCase.setup() | 625 testCase.setup() |
| 488 » » » » » » So(ar.Archive(c, &task), ShouldE
rrLike, "test error") | 626 |
| 627 » » » » » » ack, err := ar.archiveTaskImpl(c
, task) |
| 628 » » » » » » So(err, ShouldErrLike, "test err
or") |
| 629 » » » » » » So(ack, ShouldBeFalse) |
| 630 » » » » » » So(archiveRequest, ShouldBeNil) |
| 489 }) | 631 }) |
| 490 } | 632 } |
| 491 } | 633 } |
| 492 }) | 634 }) |
| 493 }) | 635 }) |
| 494 } | 636 } |
| OLD | NEW |