| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "strings" | 9 "strings" |
| 11 "sync" | 10 "sync" |
| 12 "testing" | 11 "testing" |
| 13 "time" | 12 "time" |
| 14 | 13 |
| 15 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/clock/testclock" | 16 "github.com/luci/luci-go/common/clock/testclock" |
| 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/gcloud/gs" | 18 "github.com/luci/luci-go/common/gcloud/gs" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 "github.com/luci/luci-go/server/logdog/storage" | 22 "github.com/luci/luci-go/server/logdog/storage" |
| 23 "github.com/luci/luci-go/server/logdog/storage/memory" | 23 "github.com/luci/luci-go/server/logdog/storage/memory" |
| 24 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 25 "google.golang.org/grpc" | 25 "google.golang.org/grpc" |
| 26 | 26 |
| 27 . "github.com/luci/luci-go/common/testing/assertions" | 27 . "github.com/luci/luci-go/common/testing/assertions" |
| 28 . "github.com/smartystreets/goconvey/convey" | 28 . "github.com/smartystreets/goconvey/convey" |
| 29 ) | 29 ) |
| 30 | 30 |
| 31 // testTask is an instrumentable Task implementation. |
| 32 type testTask struct { |
| 33 data []byte |
| 34 assertLeaseErr error |
| 35 assertCount int |
| 36 } |
| 37 |
| 38 func (t *testTask) UniqueID() string { |
| 39 return "totally unique ID" |
| 40 } |
| 41 |
| 42 func (t *testTask) Data() []byte { |
| 43 return t.data |
| 44 } |
| 45 |
| 46 func (t *testTask) AssertLease(context.Context) error { |
| 47 if err := t.assertLeaseErr; err != nil { |
| 48 return err |
| 49 } |
| 50 t.assertCount++ |
| 51 return nil |
| 52 } |
| 53 |
| 31 // testServicesClient implements logdog.ServicesClient sufficient for testing | 54 // testServicesClient implements logdog.ServicesClient sufficient for testing |
| 32 // and instrumentation. | 55 // and instrumentation. |
| 33 type testServicesClient struct { | 56 type testServicesClient struct { |
| 34 logdog.ServicesClient | 57 logdog.ServicesClient |
| 35 | 58 |
| 36 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse,
error) | 59 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse,
error) |
| 37 asCallback func(*logdog.ArchiveStreamRequest) error | 60 asCallback func(*logdog.ArchiveStreamRequest) error |
| 38 } | 61 } |
| 39 | 62 |
| 40 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre
amRequest, o ...grpc.CallOption) ( | 63 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre
amRequest, o ...grpc.CallOption) ( |
| (...skipping 17 matching lines...) Expand all Loading... |
| 58 // testGSClient is a testing implementation of the gsClient interface. | 81 // testGSClient is a testing implementation of the gsClient interface. |
| 59 // | 82 // |
| 60 // It does not actually retain any of the written data, since that level of | 83 // It does not actually retain any of the written data, since that level of |
| 61 // testing is done in the archive package. | 84 // testing is done in the archive package. |
| 62 type testGSClient struct { | 85 type testGSClient struct { |
| 63 sync.Mutex | 86 sync.Mutex |
| 64 gs.Client | 87 gs.Client |
| 65 | 88 |
| 66 // objs is a map of filename to "write amount". The write amount is the | 89 // objs is a map of filename to "write amount". The write amount is the |
| 67 // cumulative amount of data written to the Writer for a given GS path. | 90 // cumulative amount of data written to the Writer for a given GS path. |
| 68 » objs map[string]int64 | 91 » objs map[gs.Path]int64 |
| 69 closed bool | 92 closed bool |
| 70 | 93 |
| 71 closeErr error | 94 closeErr error |
| 72 newWriterErr func(w *testGSWriter) error | 95 newWriterErr func(w *testGSWriter) error |
| 73 » deleteErr func(string, string) error | 96 » deleteErr func(gs.Path) error |
| 97 » renameErr func(gs.Path, gs.Path) error |
| 74 } | 98 } |
| 75 | 99 |
| 76 func (c *testGSClient) NewWriter(bucket, relpath string) (gs.Writer, error) { | 100 func (c *testGSClient) NewWriter(p gs.Path) (gs.Writer, error) { |
| 77 w := testGSWriter{ | 101 w := testGSWriter{ |
| 78 client: c, | 102 client: c, |
| 79 » » path: c.url(bucket, relpath), | 103 » » path: p, |
| 80 } | 104 } |
| 81 if c.newWriterErr != nil { | 105 if c.newWriterErr != nil { |
| 82 if err := c.newWriterErr(&w); err != nil { | 106 if err := c.newWriterErr(&w); err != nil { |
| 83 return nil, err | 107 return nil, err |
| 84 } | 108 } |
| 85 } | 109 } |
| 86 return &w, nil | 110 return &w, nil |
| 87 } | 111 } |
| 88 | 112 |
| 89 func (c *testGSClient) Close() error { | 113 func (c *testGSClient) Close() error { |
| 90 if c.closed { | 114 if c.closed { |
| 91 panic("double close") | 115 panic("double close") |
| 92 } | 116 } |
| 93 if err := c.closeErr; err != nil { | 117 if err := c.closeErr; err != nil { |
| 94 return err | 118 return err |
| 95 } | 119 } |
| 96 c.closed = true | 120 c.closed = true |
| 97 return nil | 121 return nil |
| 98 } | 122 } |
| 99 | 123 |
| 100 func (c *testGSClient) Delete(bucket, relpath string) error { | 124 func (c *testGSClient) Delete(p gs.Path) error { |
| 101 if c.deleteErr != nil { | 125 if c.deleteErr != nil { |
| 102 » » if err := c.deleteErr(bucket, relpath); err != nil { | 126 » » if err := c.deleteErr(p); err != nil { |
| 103 return err | 127 return err |
| 104 } | 128 } |
| 105 } | 129 } |
| 106 | 130 |
| 107 c.Lock() | 131 c.Lock() |
| 108 defer c.Unlock() | 132 defer c.Unlock() |
| 109 | 133 |
| 110 » delete(c.objs, c.url(bucket, relpath)) | 134 » delete(c.objs, p) |
| 111 return nil | 135 return nil |
| 112 } | 136 } |
| 113 | 137 |
| 114 func (c *testGSClient) url(bucket, relpath string) string { | 138 func (c *testGSClient) Rename(src, dst gs.Path) error { |
| 115 » return fmt.Sprintf("gs://%s/%s", bucket, relpath) | 139 » if c.renameErr != nil { |
| 140 » » if err := c.renameErr(src, dst); err != nil { |
| 141 » » » return err |
| 142 » » } |
| 143 » } |
| 144 |
| 145 » c.Lock() |
| 146 » defer c.Unlock() |
| 147 |
| 148 » c.objs[dst] = c.objs[src] |
| 149 » delete(c.objs, src) |
| 150 » return nil |
| 116 } | 151 } |
| 117 | 152 |
| 118 type testGSWriter struct { | 153 type testGSWriter struct { |
| 119 client *testGSClient | 154 client *testGSClient |
| 120 | 155 |
| 121 » path string | 156 » path gs.Path |
| 122 closed bool | 157 closed bool |
| 123 writeCount int64 | 158 writeCount int64 |
| 124 | 159 |
| 125 writeErr error | 160 writeErr error |
| 126 closeErr error | 161 closeErr error |
| 127 } | 162 } |
| 128 | 163 |
| 129 func (w *testGSWriter) Write(d []byte) (int, error) { | 164 func (w *testGSWriter) Write(d []byte) (int, error) { |
| 130 if err := w.writeErr; err != nil { | 165 if err := w.writeErr; err != nil { |
| 131 return 0, err | 166 return 0, err |
| 132 } | 167 } |
| 133 | 168 |
| 134 w.client.Lock() | 169 w.client.Lock() |
| 135 defer w.client.Unlock() | 170 defer w.client.Unlock() |
| 136 | 171 |
| 137 if w.client.objs == nil { | 172 if w.client.objs == nil { |
| 138 » » w.client.objs = make(map[string]int64) | 173 » » w.client.objs = make(map[gs.Path]int64) |
| 139 } | 174 } |
| 140 w.client.objs[w.path] += int64(len(d)) | 175 w.client.objs[w.path] += int64(len(d)) |
| 141 w.writeCount += int64(len(d)) | 176 w.writeCount += int64(len(d)) |
| 142 return len(d), nil | 177 return len(d), nil |
| 143 } | 178 } |
| 144 | 179 |
| 145 func (w *testGSWriter) Close() error { | 180 func (w *testGSWriter) Close() error { |
| 146 if w.closed { | 181 if w.closed { |
| 147 panic("double close") | 182 panic("double close") |
| 148 } | 183 } |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 205 }) | 240 }) |
| 206 if err != nil { | 241 if err != nil { |
| 207 panic(err) | 242 panic(err) |
| 208 } | 243 } |
| 209 | 244 |
| 210 // Advance the time for each log entry. | 245 // Advance the time for each log entry. |
| 211 tc.Add(time.Second) | 246 tc.Add(time.Second) |
| 212 } | 247 } |
| 213 } | 248 } |
| 214 | 249 |
| 250 // Set up our testing archival task. |
| 251 expired := 10 * time.Minute |
| 252 archiveTask := logdog.ArchiveTask{ |
| 253 Path: string(desc.Path()), |
| 254 SettleDelay: google.NewDuration(10 * time.Second), |
| 255 CompletePeriod: google.NewDuration(expired), |
| 256 Key: []byte("random archival key"), |
| 257 } |
| 258 expired++ // This represents a time PAST CompletePeriod. |
| 259 |
| 260 archiveTaskData, err := proto.Marshal(&archiveTask) |
| 261 if err != nil { |
| 262 panic(err) |
| 263 } |
| 264 |
| 265 task := &testTask{ |
| 266 data: archiveTaskData, |
| 267 } |
| 268 |
| 215 // Set up our test Coordinator client stubs. | 269 // Set up our test Coordinator client stubs. |
| 216 stream := logdog.LoadStreamResponse{ | 270 stream := logdog.LoadStreamResponse{ |
| 217 State: &logdog.LogStreamState{ | 271 State: &logdog.LogStreamState{ |
| 218 » » » » Path: string(desc.Path()), | 272 » » » » Path: archiveTask.Path, |
| 219 ProtoVersion: logpb.Version, | 273 ProtoVersion: logpb.Version, |
| 220 TerminalIndex: -1, | 274 TerminalIndex: -1, |
| 221 Archived: false, | 275 Archived: false, |
| 222 Purged: false, | 276 Purged: false, |
| 223 }, | 277 }, |
| 224 Desc: descBytes, | 278 Desc: descBytes, |
| 279 |
| 280 // Age is ON the expiration threshold, so not expired. |
| 281 Age: archiveTask.CompletePeriod, |
| 282 ArchivalKey: archiveTask.Key, |
| 225 } | 283 } |
| 226 | 284 |
| 227 var archiveRequest *logdog.ArchiveStreamRequest | 285 var archiveRequest *logdog.ArchiveStreamRequest |
| 286 var archiveStreamErr error |
| 228 sc := testServicesClient{ | 287 sc := testServicesClient{ |
| 229 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 288 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 230 return &stream, nil | 289 return &stream, nil |
| 231 }, | 290 }, |
| 232 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ | 291 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ |
| 233 archiveRequest = req | 292 archiveRequest = req |
| 234 » » » » return nil | 293 » » » » return archiveStreamErr |
| 235 }, | 294 }, |
| 236 } | 295 } |
| 237 | 296 |
| 238 ar := Archivist{ | 297 ar := Archivist{ |
| 239 » » » Service: &sc, | 298 » » » Service: &sc, |
| 240 » » » Storage: &st, | 299 » » » Storage: &st, |
| 241 » » » GSClient: &gsc, | 300 » » » GSClient: &gsc, |
| 242 » » » GSBase: gs.Path("gs://archive-test/path/to/archive/"),
// Extra slashes to test concatenation. | 301 » » » GSBase: gs.Path("gs://archive-test/path/to/archiv
e/"), // Extra slashes to test concatenation. |
| 243 » » } | 302 » » » GSStagingBase: gs.Path("gs://archive-test-staging/path/t
o/archive/"), // Extra slashes to test concatenation. |
| 244 | |
| 245 » » task := logdog.ArchiveTask{ | |
| 246 » » » Path: stream.State.Path, | |
| 247 » » » Complete: true, | |
| 248 } | 303 } |
| 249 | 304 |
| 250 gsURL := func(p string) string { | 305 gsURL := func(p string) string { |
| 251 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) | 306 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) |
| 252 } | 307 } |
| 253 | 308 |
| 254 // hasStreams can be called to check that the retained archiveRe
quest had | 309 // hasStreams can be called to check that the retained archiveRe
quest had |
| 255 // data sizes for the named archive stream types. | 310 // data sizes for the named archive stream types. |
| 256 // | 311 // |
| 257 // After checking, the values are set to zero. This allows us to
use | 312 // After checking, the values are set to zero. This allows us to
use |
| 258 // ShouldEqual without hardcoding specific archival sizes into t
he results. | 313 // ShouldEqual without hardcoding specific archival sizes into t
he results. |
| 259 hasStreams := func(log, index, data bool) bool { | 314 hasStreams := func(log, index, data bool) bool { |
| 260 So(archiveRequest, ShouldNotBeNil) | 315 So(archiveRequest, ShouldNotBeNil) |
| 261 if (log && archiveRequest.StreamSize <= 0) || | 316 if (log && archiveRequest.StreamSize <= 0) || |
| 262 (index && archiveRequest.IndexSize <= 0) || | 317 (index && archiveRequest.IndexSize <= 0) || |
| 263 (data && archiveRequest.DataSize <= 0) { | 318 (data && archiveRequest.DataSize <= 0) { |
| 264 return false | 319 return false |
| 265 } | 320 } |
| 266 | 321 |
| 267 archiveRequest.StreamSize = 0 | 322 archiveRequest.StreamSize = 0 |
| 268 archiveRequest.IndexSize = 0 | 323 archiveRequest.IndexSize = 0 |
| 269 archiveRequest.DataSize = 0 | 324 archiveRequest.DataSize = 0 |
| 270 return true | 325 return true |
| 271 } | 326 } |
| 272 | 327 |
| 273 » » Convey(`Will fail to archive if the specified stream state could
not be loaded.`, func() { | 328 » » Convey(`Will return task and fail to archive if the specified st
ream state could not be loaded.`, func() { |
| 274 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 329 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 275 return nil, errors.New("does not exist") | 330 return nil, errors.New("does not exist") |
| 276 } | 331 } |
| 277 » » » So(ar.Archive(c, &task), ShouldErrLike, "does not exist"
) | 332 |
| 333 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 334 » » » So(err, ShouldErrLike, "does not exist") |
| 335 » » » So(ack, ShouldBeFalse) |
| 278 }) | 336 }) |
| 279 | 337 |
| 280 » » Convey(`Will refrain from archiving if the stream is already arc
hived.`, func() { | 338 » » Convey(`Will complete task and refrain from archiving if the str
eam is already archived.`, func() { |
| 281 stream.State.Archived = true | 339 stream.State.Archived = true |
| 282 » » » So(ar.Archive(c, &task), ShouldBeNil) | 340 |
| 341 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 342 » » » So(err, ShouldErrLike, "log stream is archived") |
| 343 » » » So(ack, ShouldBeTrue) |
| 283 So(archiveRequest, ShouldBeNil) | 344 So(archiveRequest, ShouldBeNil) |
| 284 }) | 345 }) |
| 285 | 346 |
| 286 » » Convey(`Will refrain from archiving if the stream is purged.`, f
unc() { | 347 » » Convey(`Will complete task and refrain from archiving if the str
eam is purged.`, func() { |
| 287 stream.State.Purged = true | 348 stream.State.Purged = true |
| 288 » » » So(ar.Archive(c, &task), ShouldBeNil) | 349 |
| 350 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 351 » » » So(err, ShouldErrLike, "log stream is purged") |
| 352 » » » So(ack, ShouldBeTrue) |
| 353 » » » So(archiveRequest, ShouldBeNil) |
| 354 » » }) |
| 355 |
| 356 » » Convey(`Will return task if the stream is younger than its settl
e delay.`, func() { |
| 357 » » » stream.Age = google.NewDuration(time.Second) |
| 358 |
| 359 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 360 » » » So(err, ShouldErrLike, "log stream is within settle dela
y") |
| 361 » » » So(ack, ShouldBeFalse) |
| 362 » » » So(archiveRequest, ShouldBeNil) |
| 363 » » }) |
| 364 |
| 365 » » Convey(`Will return task if the log stream doesn't have an archi
val key yet.`, func() { |
| 366 » » » stream.Age = google.NewDuration(expired) |
| 367 » » » stream.ArchivalKey = nil |
| 368 |
| 369 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 370 » » » So(err, ShouldErrLike, "premature archival request") |
| 371 » » » So(ack, ShouldBeFalse) |
| 372 » » » So(archiveRequest, ShouldBeNil) |
| 373 » » }) |
| 374 |
| 375 » » Convey(`Will complete task and refrain from archiving if archiva
l keys dont' match.`, func() { |
| 376 » » » stream.Age = google.NewDuration(expired) |
| 377 » » » stream.ArchivalKey = []byte("non-matching archival key") |
| 378 |
| 379 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 380 » » » So(err, ShouldErrLike, "superfluous archival request") |
| 381 » » » So(ack, ShouldBeTrue) |
| 289 So(archiveRequest, ShouldBeNil) | 382 So(archiveRequest, ShouldBeNil) |
| 290 }) | 383 }) |
| 291 | 384 |
| 292 // Weird case: the log has been marked for archival, has not bee
n | 385 // Weird case: the log has been marked for archival, has not bee
n |
| 293 » » // terminated, and is within its completeness delay. This task w
ill not | 386 » » // |
| 294 » » // have been dispatched by our archive cron, but let's assert th
at it | 387 » » // terminated, and is within its completeness delay. This task s
hould not |
| 295 » » // behaves correctly regardless. | 388 » » // have been dispatched by our expired archive cron, but let's a
ssert that |
| 296 » » Convey(`Will succeed if the log stream had no entries and no ter
minal index.`, func() { | 389 » » // it behaves correctly regardless. |
| 297 » » » So(ar.Archive(c, &task), ShouldBeNil) | 390 » » Convey(`Will refuse to archive a complete stream with no termina
l index.`, func() { |
| 298 | 391 » » » ack, err := ar.archiveTaskImpl(c, task) |
| 299 » » » So(hasStreams(true, true, false), ShouldBeTrue) | 392 » » » So(err, ShouldErrLike, "completeness required, but strea
m has no terminal index") |
| 300 » » » So(archiveRequest, ShouldResemble, &logdog.ArchiveStream
Request{ | 393 » » » So(ack, ShouldBeFalse) |
| 301 » » » » Path: task.Path, | |
| 302 » » » » Complete: true, | |
| 303 » » » » TerminalIndex: -1, | |
| 304 | |
| 305 » » » » StreamUrl: gsURL("logstream.entries"), | |
| 306 » » » » IndexUrl: gsURL("logstream.index"), | |
| 307 » » » » DataUrl: gsURL("data.bin"), | |
| 308 » » » }) | |
| 309 }) | 394 }) |
| 310 | 395 |
| 311 Convey(`With terminal index "3"`, func() { | 396 Convey(`With terminal index "3"`, func() { |
| 312 stream.State.TerminalIndex = 3 | 397 stream.State.TerminalIndex = 3 |
| 313 | 398 |
| 314 » » » Convey(`Will fail if the log stream had a terminal index
and no entries.`, func() { | 399 » » » Convey(`Will fail not ACK a log stream with no entries.`
, func() { |
| 315 » » » » So(ar.Archive(c, &task), ShouldErrLike, "stream
finished short of terminal index") | 400 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 401 » » » » So(err, ShouldErrLike, "stream has missing entri
es") |
| 402 » » » » So(ack, ShouldBeFalse) |
| 316 }) | 403 }) |
| 317 | 404 |
| 318 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { | 405 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { |
| 319 addTestEntry(0, 1, 2, 4) | 406 addTestEntry(0, 1, 2, 4) |
| 320 » » » » So(ar.Archive(c, &task), ShouldErrLike, "non-con
tiguous log stream") | 407 |
| 408 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 409 » » » » So(err, ShouldErrLike, "stream has missing entri
es") |
| 410 » » » » So(ack, ShouldBeFalse) |
| 321 }) | 411 }) |
| 322 | 412 |
| 323 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { | 413 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { |
| 324 addTestEntry(0, 1, 2, 3, 4) | 414 addTestEntry(0, 1, 2, 3, 4) |
| 325 » » » » So(ar.Archive(c, &task), ShouldBeNil) | 415 |
| 416 » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 417 » » » » So(err, ShouldBeNil) |
| 418 » » » » So(ack, ShouldBeTrue) |
| 326 | 419 |
| 327 So(hasStreams(true, true, true), ShouldBeTrue) | 420 So(hasStreams(true, true, true), ShouldBeTrue) |
| 328 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ | 421 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 329 » » » » » Path: task.Path, | 422 » » » » » Path: archiveTask.Path, |
| 330 » » » » » Complete: true, | 423 » » » » » LogEntryCount: 4, |
| 331 TerminalIndex: 3, | 424 TerminalIndex: 3, |
| 332 | 425 |
| 333 StreamUrl: gsURL("logstream.entries"), | 426 StreamUrl: gsURL("logstream.entries"), |
| 334 IndexUrl: gsURL("logstream.index"), | 427 IndexUrl: gsURL("logstream.index"), |
| 335 DataUrl: gsURL("data.bin"), | 428 DataUrl: gsURL("data.bin"), |
| 336 }) | 429 }) |
| 337 }) | 430 }) |
| 431 |
| 432 Convey(`When atransient archival error occurs, will not
ACK it.`, func() { |
| 433 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } |
| 434 |
| 435 ack, err := ar.archiveTaskImpl(c, task) |
| 436 So(err, ShouldErrLike, "test error") |
| 437 So(ack, ShouldBeFalse) |
| 438 }) |
| 439 |
| 440 Convey(`When a non-transient archival error occurs`, fun
c() { |
| 441 archiveErr := errors.New("archive failure error"
) |
| 442 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } |
| 443 |
| 444 Convey(`If remote report returns an error, do no
t ACK.`, func() { |
| 445 archiveStreamErr = errors.New("test erro
r") |
| 446 |
| 447 ack, err := ar.archiveTaskImpl(c, task) |
| 448 So(err, ShouldErrLike, "test error") |
| 449 So(ack, ShouldBeFalse) |
| 450 |
| 451 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 452 Path: archiveTask.Path, |
| 453 Error: "archive failure error", |
| 454 }) |
| 455 }) |
| 456 |
| 457 Convey(`If remote report returns success, ACK.`,
func() { |
| 458 ack, err := ar.archiveTaskImpl(c, task) |
| 459 So(err, ShouldBeNil) |
| 460 So(ack, ShouldBeTrue) |
| 461 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 462 Path: archiveTask.Path, |
| 463 Error: "archive failure error", |
| 464 }) |
| 465 }) |
| 466 |
| 467 Convey(`If an empty error string is supplied, th
e generic error will be filled in.`, func() { |
| 468 archiveErr = errors.New("") |
| 469 |
| 470 ack, err := ar.archiveTaskImpl(c, task) |
| 471 So(err, ShouldBeNil) |
| 472 So(ack, ShouldBeTrue) |
| 473 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 474 Path: archiveTask.Path, |
| 475 Error: "archival error", |
| 476 }) |
| 477 }) |
| 478 }) |
| 338 }) | 479 }) |
| 339 | 480 |
| 340 Convey(`When not enforcing stream completeness`, func() { | 481 Convey(`When not enforcing stream completeness`, func() { |
| 341 » » » task.Complete = false | 482 » » » stream.Age = google.NewDuration(expired) |
| 342 | 483 |
| 343 Convey(`With no terminal index`, func() { | 484 Convey(`With no terminal index`, func() { |
| 344 Convey(`Will successfully archive if there are n
o entries.`, func() { | 485 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 345 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 486 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 487 » » » » » So(err, ShouldBeNil) |
| 488 » » » » » So(ack, ShouldBeTrue) |
| 346 | 489 |
| 347 So(hasStreams(true, true, false), Should
BeTrue) | 490 So(hasStreams(true, true, false), Should
BeTrue) |
| 348 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 491 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 349 » » » » » » Path: task.Path, | 492 » » » » » » Path: archiveTask.Path, |
| 350 » » » » » » Complete: true, | 493 » » » » » » LogEntryCount: 0, |
| 351 TerminalIndex: -1, | 494 TerminalIndex: -1, |
| 352 | 495 |
| 353 StreamUrl: gsURL("logstream.entr
ies"), | 496 StreamUrl: gsURL("logstream.entr
ies"), |
| 354 IndexUrl: gsURL("logstream.inde
x"), | 497 IndexUrl: gsURL("logstream.inde
x"), |
| 355 DataUrl: gsURL("data.bin"), | 498 DataUrl: gsURL("data.bin"), |
| 356 }) | 499 }) |
| 357 }) | 500 }) |
| 358 | 501 |
| 359 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { | 502 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { |
| 360 addTestEntry(0, 1, 2, 4) | 503 addTestEntry(0, 1, 2, 4) |
| 361 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 504 |
| 505 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 506 » » » » » So(err, ShouldBeNil) |
| 507 » » » » » So(ack, ShouldBeTrue) |
| 362 | 508 |
| 363 So(hasStreams(true, true, true), ShouldB
eTrue) | 509 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 364 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 510 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 365 » » » » » » Path: task.Path, | 511 » » » » » » Path: archiveTask.Path, |
| 366 » » » » » » Complete: false, | 512 » » » » » » LogEntryCount: 4, |
| 367 TerminalIndex: 4, | 513 TerminalIndex: 4, |
| 368 | 514 |
| 369 StreamUrl: gsURL("logstream.entr
ies"), | 515 StreamUrl: gsURL("logstream.entr
ies"), |
| 370 IndexUrl: gsURL("logstream.inde
x"), | 516 IndexUrl: gsURL("logstream.inde
x"), |
| 371 DataUrl: gsURL("data.bin"), | 517 DataUrl: gsURL("data.bin"), |
| 372 }) | 518 }) |
| 373 }) | 519 }) |
| 374 }) | 520 }) |
| 375 | 521 |
| 376 Convey(`With terminal index 3`, func() { | 522 Convey(`With terminal index 3`, func() { |
| 377 stream.State.TerminalIndex = 3 | 523 stream.State.TerminalIndex = 3 |
| 378 | 524 |
| 379 Convey(`Will successfully archive if there are n
o entries.`, func() { | 525 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 380 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 526 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 527 » » » » » So(err, ShouldBeNil) |
| 528 » » » » » So(ack, ShouldBeTrue) |
| 381 | 529 |
| 382 So(hasStreams(true, true, false), Should
BeTrue) | 530 So(hasStreams(true, true, false), Should
BeTrue) |
| 383 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 531 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 384 » » » » » » Path: task.Path, | 532 » » » » » » Path: archiveTask.Path, |
| 385 » » » » » » Complete: true, | 533 » » » » » » LogEntryCount: 0, |
| 386 TerminalIndex: -1, | 534 TerminalIndex: -1, |
| 387 | 535 |
| 388 StreamUrl: gsURL("logstream.entr
ies"), | 536 StreamUrl: gsURL("logstream.entr
ies"), |
| 389 IndexUrl: gsURL("logstream.inde
x"), | 537 IndexUrl: gsURL("logstream.inde
x"), |
| 390 DataUrl: gsURL("data.bin"), | 538 DataUrl: gsURL("data.bin"), |
| 391 }) | 539 }) |
| 392 }) | 540 }) |
| 393 | 541 |
| 394 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { | 542 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { |
| 395 addTestEntry(0, 1, 2, 4) | 543 addTestEntry(0, 1, 2, 4) |
| 396 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 544 |
| 545 » » » » » ack, err := ar.archiveTaskImpl(c, task) |
| 546 » » » » » So(err, ShouldBeNil) |
| 547 » » » » » So(ack, ShouldBeTrue) |
| 397 | 548 |
| 398 So(hasStreams(true, true, true), ShouldB
eTrue) | 549 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 399 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 550 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 400 » » » » » » Path: task.Path, | 551 » » » » » » Path: archiveTask.Path, |
| 401 » » » » » » Complete: false, | 552 » » » » » » LogEntryCount: 3, |
| 402 TerminalIndex: 2, | 553 TerminalIndex: 2, |
| 403 | 554 |
| 404 StreamUrl: gsURL("logstream.entr
ies"), | 555 StreamUrl: gsURL("logstream.entr
ies"), |
| 405 IndexUrl: gsURL("logstream.inde
x"), | 556 IndexUrl: gsURL("logstream.inde
x"), |
| 406 DataUrl: gsURL("data.bin"), | 557 DataUrl: gsURL("data.bin"), |
| 407 }) | 558 }) |
| 408 }) | 559 }) |
| 409 }) | 560 }) |
| 410 }) | 561 }) |
| 411 | 562 |
| 412 // Simulate failures during the various stream generation operat
ions. | 563 // Simulate failures during the various stream generation operat
ions. |
| 413 Convey(`Stream generation failures`, func() { | 564 Convey(`Stream generation failures`, func() { |
| 414 stream.State.TerminalIndex = 3 | 565 stream.State.TerminalIndex = 3 |
| 415 addTestEntry(0, 1, 2, 3) | 566 addTestEntry(0, 1, 2, 3) |
| 416 | 567 |
| 417 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 568 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
| 418 for _, testCase := range []struct { | 569 for _, testCase := range []struct { |
| 419 name string | 570 name string |
| 420 setup func() | 571 setup func() |
| 421 }{ | 572 }{ |
| 422 {"delete failure", func() { | |
| 423 gsc.deleteErr = func(b, p string
) error { | |
| 424 if strings.HasSuffix(p,
failName) { | |
| 425 return errors.Ne
w("test error") | |
| 426 } | |
| 427 return nil | |
| 428 } | |
| 429 }}, | |
| 430 | |
| 431 {"writer create failure", func() { | 573 {"writer create failure", func() { |
| 432 gsc.newWriterErr = func(w *testG
SWriter) error { | 574 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 433 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 575 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 434 » » » » » » » » return errors.Ne
w("test error") | 576 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) |
| 435 } | 577 } |
| 436 return nil | 578 return nil |
| 437 } | 579 } |
| 438 }}, | 580 }}, |
| 439 | 581 |
| 440 {"write failure", func() { | 582 {"write failure", func() { |
| 441 gsc.newWriterErr = func(w *testG
SWriter) error { | 583 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 442 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 584 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 443 » » » » » » » » w.writeErr = err
ors.New("test error") | 585 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) |
| 444 } | 586 } |
| 445 return nil | 587 return nil |
| 446 } | 588 } |
| 589 }}, |
| 590 |
| 591 {"rename failure", func() { |
| 592 gsc.renameErr = func(src, dst gs
.Path) error { |
| 593 if strings.HasSuffix(str
ing(src), failName) { |
| 594 return errors.Wr
apTransient(errors.New("test error")) |
| 595 } |
| 596 return nil |
| 597 } |
| 447 }}, | 598 }}, |
| 448 | 599 |
| 449 {"close failure", func() { | 600 {"close failure", func() { |
| 450 gsc.newWriterErr = func(w *testG
SWriter) error { | 601 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 451 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 602 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 452 » » » » » » » » w.closeErr = err
ors.New("test error") | 603 » » » » » » » » w.closeErr = err
ors.WrapTransient(errors.New("test error")) |
| 453 } | 604 } |
| 454 return nil | 605 return nil |
| 455 } | 606 } |
| 456 }}, | 607 }}, |
| 457 | 608 |
| 458 » » » » » {"delete on fail failure (double-failure
)", func() { | 609 » » » » » {"delete failure after other failure", f
unc() { |
| 459 » » » » » » failed := false | |
| 460 | |
| 461 // Simulate a write failure. Thi
s is the error that will actually | 610 // Simulate a write failure. Thi
s is the error that will actually |
| 462 // be returned. | 611 // be returned. |
| 463 gsc.newWriterErr = func(w *testG
SWriter) error { | 612 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 464 » » » » » » » if strings.HasSuffix(w.p
ath, failName) { | 613 » » » » » » » if strings.HasSuffix(str
ing(w.path), failName) { |
| 465 » » » » » » » » w.writeErr = err
ors.New("test error") | 614 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) |
| 466 } | 615 } |
| 467 return nil | 616 return nil |
| 468 } | 617 } |
| 469 | 618 |
| 470 » » » » » » // This will trigger twice per s
tream, once on create and once on | 619 » » » » » » // This will trigger whe NewWrit
er fails from the above |
| 471 » » » » » » // cleanup after the write fails
. We want to return an error in | 620 » » » » » » // instrumentation. |
| 472 » » » » » » // the latter case. | 621 » » » » » » gsc.deleteErr = func(p gs.Path)
error { |
| 473 » » » » » » gsc.deleteErr = func(b, p string
) error { | 622 » » » » » » » if strings.HasSuffix(str
ing(p), failName) { |
| 474 » » » » » » » if strings.HasSuffix(p,
failName) { | 623 » » » » » » » » return errors.Ne
w("other error") |
| 475 » » » » » » » » if failed { | |
| 476 » » » » » » » » » return e
rrors.New("other error") | |
| 477 » » » » » » » » } | |
| 478 | |
| 479 » » » » » » » » // First delete
(on create). | |
| 480 » » » » » » » » failed = true | |
| 481 } | 624 } |
| 482 return nil | 625 return nil |
| 483 } | 626 } |
| 484 }}, | 627 }}, |
| 485 } { | 628 } { |
| 486 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
`, testCase.name, failName), func() { | 629 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { |
| 487 testCase.setup() | 630 testCase.setup() |
| 488 » » » » » » So(ar.Archive(c, &task), ShouldE
rrLike, "test error") | 631 |
| 632 » » » » » » ack, err := ar.archiveTaskImpl(c
, task) |
| 633 » » » » » » So(err, ShouldErrLike, "test err
or") |
| 634 » » » » » » So(ack, ShouldBeFalse) |
| 635 » » » » » » So(archiveRequest, ShouldBeNil) |
| 489 }) | 636 }) |
| 490 } | 637 } |
| 491 } | 638 } |
| 492 }) | 639 }) |
| 493 }) | 640 }) |
| 494 } | 641 } |
| OLD | NEW |