Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: server/internal/logdog/archivist/archivist_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
10 "strings" 9 "strings"
11 "sync" 10 "sync"
12 "testing" 11 "testing"
13 "time" 12 "time"
14 13
15 "github.com/golang/protobuf/proto" 14 "github.com/golang/protobuf/proto"
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/clock/testclock" 16 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/errors"
18 "github.com/luci/luci-go/common/gcloud/gs" 18 "github.com/luci/luci-go/common/gcloud/gs"
19 "github.com/luci/luci-go/common/logdog/types" 19 "github.com/luci/luci-go/common/logdog/types"
20 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
21 "github.com/luci/luci-go/common/proto/logdog/logpb" 21 "github.com/luci/luci-go/common/proto/logdog/logpb"
22 "github.com/luci/luci-go/server/logdog/storage" 22 "github.com/luci/luci-go/server/logdog/storage"
23 "github.com/luci/luci-go/server/logdog/storage/memory" 23 "github.com/luci/luci-go/server/logdog/storage/memory"
24 "golang.org/x/net/context" 24 "golang.org/x/net/context"
25 "google.golang.org/grpc" 25 "google.golang.org/grpc"
26 26
27 . "github.com/luci/luci-go/common/testing/assertions" 27 . "github.com/luci/luci-go/common/testing/assertions"
28 . "github.com/smartystreets/goconvey/convey" 28 . "github.com/smartystreets/goconvey/convey"
29 ) 29 )
30 30
31 // testTask is an instrumentable Task implementation.
32 type testTask struct {
33 task *logdog.ArchiveTask
34 assertLeaseErr error
35 assertCount int
36 }
37
38 func (t *testTask) UniqueID() string {
39 return "totally unique ID"
40 }
41
42 func (t *testTask) Task() *logdog.ArchiveTask {
43 return t.task
44 }
45
46 func (t *testTask) AssertLease(context.Context) error {
47 if err := t.assertLeaseErr; err != nil {
48 return err
49 }
50 t.assertCount++
51 return nil
52 }
53
31 // testServicesClient implements logdog.ServicesClient sufficient for testing 54 // testServicesClient implements logdog.ServicesClient sufficient for testing
32 // and instrumentation. 55 // and instrumentation.
33 type testServicesClient struct { 56 type testServicesClient struct {
34 logdog.ServicesClient 57 logdog.ServicesClient
35 58
36 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) 59 lsCallback func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error)
37 asCallback func(*logdog.ArchiveStreamRequest) error 60 asCallback func(*logdog.ArchiveStreamRequest) error
38 } 61 }
39 62
40 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre amRequest, o ...grpc.CallOption) ( 63 func (sc *testServicesClient) LoadStream(c context.Context, req *logdog.LoadStre amRequest, o ...grpc.CallOption) (
(...skipping 17 matching lines...) Expand all
58 // testGSClient is a testing implementation of the gsClient interface. 81 // testGSClient is a testing implementation of the gsClient interface.
59 // 82 //
60 // It does not actually retain any of the written data, since that level of 83 // It does not actually retain any of the written data, since that level of
61 // testing is done in the archive package. 84 // testing is done in the archive package.
62 type testGSClient struct { 85 type testGSClient struct {
63 sync.Mutex 86 sync.Mutex
64 gs.Client 87 gs.Client
65 88
66 // objs is a map of filename to "write amount". The write amount is the 89 // objs is a map of filename to "write amount". The write amount is the
67 // cumulative amount of data written to the Writer for a given GS path. 90 // cumulative amount of data written to the Writer for a given GS path.
68 » objs map[string]int64 91 » objs map[gs.Path]int64
69 closed bool 92 closed bool
70 93
71 closeErr error 94 closeErr error
72 newWriterErr func(w *testGSWriter) error 95 newWriterErr func(w *testGSWriter) error
73 » deleteErr func(string, string) error 96 » deleteErr func(gs.Path) error
97 » renameErr func(gs.Path, gs.Path) error
74 } 98 }
75 99
76 func (c *testGSClient) NewWriter(bucket, relpath string) (gs.Writer, error) { 100 func (c *testGSClient) NewWriter(p gs.Path) (gs.Writer, error) {
77 w := testGSWriter{ 101 w := testGSWriter{
78 client: c, 102 client: c,
79 » » path: c.url(bucket, relpath), 103 » » path: p,
80 } 104 }
81 if c.newWriterErr != nil { 105 if c.newWriterErr != nil {
82 if err := c.newWriterErr(&w); err != nil { 106 if err := c.newWriterErr(&w); err != nil {
83 return nil, err 107 return nil, err
84 } 108 }
85 } 109 }
86 return &w, nil 110 return &w, nil
87 } 111 }
88 112
89 func (c *testGSClient) Close() error { 113 func (c *testGSClient) Close() error {
90 if c.closed { 114 if c.closed {
91 panic("double close") 115 panic("double close")
92 } 116 }
93 if err := c.closeErr; err != nil { 117 if err := c.closeErr; err != nil {
94 return err 118 return err
95 } 119 }
96 c.closed = true 120 c.closed = true
97 return nil 121 return nil
98 } 122 }
99 123
100 func (c *testGSClient) Delete(bucket, relpath string) error { 124 func (c *testGSClient) Delete(p gs.Path) error {
101 if c.deleteErr != nil { 125 if c.deleteErr != nil {
102 » » if err := c.deleteErr(bucket, relpath); err != nil { 126 » » if err := c.deleteErr(p); err != nil {
103 return err 127 return err
104 } 128 }
105 } 129 }
106 130
107 c.Lock() 131 c.Lock()
108 defer c.Unlock() 132 defer c.Unlock()
109 133
110 » delete(c.objs, c.url(bucket, relpath)) 134 » delete(c.objs, p)
111 return nil 135 return nil
112 } 136 }
113 137
114 func (c *testGSClient) url(bucket, relpath string) string { 138 func (c *testGSClient) Rename(src, dst gs.Path) error {
115 » return fmt.Sprintf("gs://%s/%s", bucket, relpath) 139 » if c.renameErr != nil {
140 » » if err := c.renameErr(src, dst); err != nil {
141 » » » return err
142 » » }
143 » }
144
145 » c.Lock()
146 » defer c.Unlock()
147
148 » c.objs[dst] = c.objs[src]
149 » delete(c.objs, src)
150 » return nil
116 } 151 }
117 152
118 type testGSWriter struct { 153 type testGSWriter struct {
119 client *testGSClient 154 client *testGSClient
120 155
121 » path string 156 » path gs.Path
122 closed bool 157 closed bool
123 writeCount int64 158 writeCount int64
124 159
125 writeErr error 160 writeErr error
126 closeErr error 161 closeErr error
127 } 162 }
128 163
129 func (w *testGSWriter) Write(d []byte) (int, error) { 164 func (w *testGSWriter) Write(d []byte) (int, error) {
130 if err := w.writeErr; err != nil { 165 if err := w.writeErr; err != nil {
131 return 0, err 166 return 0, err
132 } 167 }
133 168
134 w.client.Lock() 169 w.client.Lock()
135 defer w.client.Unlock() 170 defer w.client.Unlock()
136 171
137 if w.client.objs == nil { 172 if w.client.objs == nil {
138 » » w.client.objs = make(map[string]int64) 173 » » w.client.objs = make(map[gs.Path]int64)
139 } 174 }
140 w.client.objs[w.path] += int64(len(d)) 175 w.client.objs[w.path] += int64(len(d))
141 w.writeCount += int64(len(d)) 176 w.writeCount += int64(len(d))
142 return len(d), nil 177 return len(d), nil
143 } 178 }
144 179
145 func (w *testGSWriter) Close() error { 180 func (w *testGSWriter) Close() error {
146 if w.closed { 181 if w.closed {
147 panic("double close") 182 panic("double close")
148 } 183 }
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
205 }) 240 })
206 if err != nil { 241 if err != nil {
207 panic(err) 242 panic(err)
208 } 243 }
209 244
210 // Advance the time for each log entry. 245 // Advance the time for each log entry.
211 tc.Add(time.Second) 246 tc.Add(time.Second)
212 } 247 }
213 } 248 }
214 249
250 // Set up our testing archival task.
251 expired := 10 * time.Minute
252 archiveTask := logdog.ArchiveTask{
253 Path: string(desc.Path()),
254 SettleDelay: google.NewDuration(10 * time.Second),
255 CompletePeriod: google.NewDuration(expired),
256 Key: []byte("random archival key"),
257 }
258 expired++ // This represents a time PAST CompletePeriod.
259
260 task := &testTask{
261 task: &archiveTask,
262 }
263
215 // Set up our test Coordinator client stubs. 264 // Set up our test Coordinator client stubs.
216 stream := logdog.LoadStreamResponse{ 265 stream := logdog.LoadStreamResponse{
217 State: &logdog.LogStreamState{ 266 State: &logdog.LogStreamState{
218 » » » » Path: string(desc.Path()), 267 » » » » Path: archiveTask.Path,
219 ProtoVersion: logpb.Version, 268 ProtoVersion: logpb.Version,
220 TerminalIndex: -1, 269 TerminalIndex: -1,
221 Archived: false, 270 Archived: false,
222 Purged: false, 271 Purged: false,
223 }, 272 },
224 Desc: descBytes, 273 Desc: descBytes,
274
275 // Age is ON the expiration threshold, so not expired.
276 Age: archiveTask.CompletePeriod,
277 ArchivalKey: archiveTask.Key,
225 } 278 }
226 279
227 var archiveRequest *logdog.ArchiveStreamRequest 280 var archiveRequest *logdog.ArchiveStreamRequest
281 var archiveStreamErr error
228 sc := testServicesClient{ 282 sc := testServicesClient{
229 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog .LoadStreamResponse, error) { 283 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog .LoadStreamResponse, error) {
230 return &stream, nil 284 return &stream, nil
231 }, 285 },
232 asCallback: func(req *logdog.ArchiveStreamRequest) error { 286 asCallback: func(req *logdog.ArchiveStreamRequest) error {
233 archiveRequest = req 287 archiveRequest = req
234 » » » » return nil 288 » » » » return archiveStreamErr
235 }, 289 },
236 } 290 }
237 291
238 ar := Archivist{ 292 ar := Archivist{
239 » » » Service: &sc, 293 » » » Service: &sc,
240 » » » Storage: &st, 294 » » » Storage: &st,
241 » » » GSClient: &gsc, 295 » » » GSClient: &gsc,
242 » » » GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation. 296 » » » GSBase: gs.Path("gs://archive-test/path/to/archiv e/"), // Extra slashes to test concatenation.
243 » » } 297 » » » GSStagingBase: gs.Path("gs://archive-test-staging/path/t o/archive/"), // Extra slashes to test concatenation.
244
245 » » task := logdog.ArchiveTask{
246 » » » Path: stream.State.Path,
247 » » » Complete: true,
248 } 298 }
249 299
250 gsURL := func(p string) string { 300 gsURL := func(p string) string {
251 return fmt.Sprintf("gs://archive-test/path/to/archive/%s /%s", desc.Path(), p) 301 return fmt.Sprintf("gs://archive-test/path/to/archive/%s /%s", desc.Path(), p)
252 } 302 }
253 303
254 // hasStreams can be called to check that the retained archiveRe quest had 304 // hasStreams can be called to check that the retained archiveRe quest had
255 // data sizes for the named archive stream types. 305 // data sizes for the named archive stream types.
256 // 306 //
257 // After checking, the values are set to zero. This allows us to use 307 // After checking, the values are set to zero. This allows us to use
258 // ShouldEqual without hardcoding specific archival sizes into t he results. 308 // ShouldEqual without hardcoding specific archival sizes into t he results.
259 hasStreams := func(log, index, data bool) bool { 309 hasStreams := func(log, index, data bool) bool {
260 So(archiveRequest, ShouldNotBeNil) 310 So(archiveRequest, ShouldNotBeNil)
261 if (log && archiveRequest.StreamSize <= 0) || 311 if (log && archiveRequest.StreamSize <= 0) ||
262 (index && archiveRequest.IndexSize <= 0) || 312 (index && archiveRequest.IndexSize <= 0) ||
263 (data && archiveRequest.DataSize <= 0) { 313 (data && archiveRequest.DataSize <= 0) {
264 return false 314 return false
265 } 315 }
266 316
267 archiveRequest.StreamSize = 0 317 archiveRequest.StreamSize = 0
268 archiveRequest.IndexSize = 0 318 archiveRequest.IndexSize = 0
269 archiveRequest.DataSize = 0 319 archiveRequest.DataSize = 0
270 return true 320 return true
271 } 321 }
272 322
273 » » Convey(`Will fail to archive if the specified stream state could not be loaded.`, func() { 323 » » Convey(`Will return task and fail to archive if the specified st ream state could not be loaded.`, func() {
274 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog .LoadStreamResponse, error) { 324 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog .LoadStreamResponse, error) {
275 return nil, errors.New("does not exist") 325 return nil, errors.New("does not exist")
276 } 326 }
277 » » » So(ar.Archive(c, &task), ShouldErrLike, "does not exist" ) 327
328 » » » ack, err := ar.archiveTaskImpl(c, task)
329 » » » So(err, ShouldErrLike, "does not exist")
330 » » » So(ack, ShouldBeFalse)
278 }) 331 })
279 332
280 » » Convey(`Will refrain from archiving if the stream is already arc hived.`, func() { 333 » » Convey(`Will complete task and refrain from archiving if the str eam is already archived.`, func() {
281 stream.State.Archived = true 334 stream.State.Archived = true
282 » » » So(ar.Archive(c, &task), ShouldBeNil) 335
336 » » » ack, err := ar.archiveTaskImpl(c, task)
337 » » » So(err, ShouldErrLike, "log stream is archived")
338 » » » So(ack, ShouldBeTrue)
283 So(archiveRequest, ShouldBeNil) 339 So(archiveRequest, ShouldBeNil)
284 }) 340 })
285 341
286 » » Convey(`Will refrain from archiving if the stream is purged.`, f unc() { 342 » » Convey(`Will complete task and refrain from archiving if the str eam is purged.`, func() {
287 stream.State.Purged = true 343 stream.State.Purged = true
288 » » » So(ar.Archive(c, &task), ShouldBeNil) 344
345 » » » ack, err := ar.archiveTaskImpl(c, task)
346 » » » So(err, ShouldErrLike, "log stream is purged")
347 » » » So(ack, ShouldBeTrue)
348 » » » So(archiveRequest, ShouldBeNil)
349 » » })
350
351 » » Convey(`Will return task if the stream is younger than its settl e delay.`, func() {
352 » » » stream.Age = google.NewDuration(time.Second)
353
354 » » » ack, err := ar.archiveTaskImpl(c, task)
355 » » » So(err, ShouldErrLike, "log stream is within settle dela y")
356 » » » So(ack, ShouldBeFalse)
357 » » » So(archiveRequest, ShouldBeNil)
358 » » })
359
360 » » Convey(`Will return task if the log stream doesn't have an archi val key yet.`, func() {
361 » » » stream.Age = google.NewDuration(expired)
362 » » » stream.ArchivalKey = nil
363
364 » » » ack, err := ar.archiveTaskImpl(c, task)
365 » » » So(err, ShouldErrLike, "premature archival request")
366 » » » So(ack, ShouldBeFalse)
367 » » » So(archiveRequest, ShouldBeNil)
368 » » })
369
370 » » Convey(`Will complete task and refrain from archiving if archiva l keys dont' match.`, func() {
371 » » » stream.Age = google.NewDuration(expired)
372 » » » stream.ArchivalKey = []byte("non-matching archival key")
373
374 » » » ack, err := ar.archiveTaskImpl(c, task)
375 » » » So(err, ShouldErrLike, "superfluous archival request")
376 » » » So(ack, ShouldBeTrue)
289 So(archiveRequest, ShouldBeNil) 377 So(archiveRequest, ShouldBeNil)
290 }) 378 })
291 379
292 // Weird case: the log has been marked for archival, has not bee n 380 // Weird case: the log has been marked for archival, has not bee n
293 » » // terminated, and is within its completeness delay. This task w ill not 381 » » //
294 » » // have been dispatched by our archive cron, but let's assert th at it 382 » » // terminated, and is within its completeness delay. This task s hould not
295 » » // behaves correctly regardless. 383 » » // have been dispatched by our expired archive cron, but let's a ssert that
296 » » Convey(`Will succeed if the log stream had no entries and no ter minal index.`, func() { 384 » » // it behaves correctly regardless.
297 » » » So(ar.Archive(c, &task), ShouldBeNil) 385 » » Convey(`Will refuse to archive a complete stream with no termina l index.`, func() {
298 386 » » » ack, err := ar.archiveTaskImpl(c, task)
299 » » » So(hasStreams(true, true, false), ShouldBeTrue) 387 » » » So(err, ShouldErrLike, "completeness required, but strea m has no terminal index")
300 » » » So(archiveRequest, ShouldResemble, &logdog.ArchiveStream Request{ 388 » » » So(ack, ShouldBeFalse)
301 » » » » Path: task.Path,
302 » » » » Complete: true,
303 » » » » TerminalIndex: -1,
304
305 » » » » StreamUrl: gsURL("logstream.entries"),
306 » » » » IndexUrl: gsURL("logstream.index"),
307 » » » » DataUrl: gsURL("data.bin"),
308 » » » })
309 }) 389 })
310 390
311 Convey(`With terminal index "3"`, func() { 391 Convey(`With terminal index "3"`, func() {
312 stream.State.TerminalIndex = 3 392 stream.State.TerminalIndex = 3
313 393
314 » » » Convey(`Will fail if the log stream had a terminal index and no entries.`, func() { 394 » » » Convey(`Will fail not ACK a log stream with no entries.` , func() {
315 » » » » So(ar.Archive(c, &task), ShouldErrLike, "stream finished short of terminal index") 395 » » » » ack, err := ar.archiveTaskImpl(c, task)
396 » » » » So(err, ShouldErrLike, "stream has missing entri es")
397 » » » » So(ack, ShouldBeFalse)
316 }) 398 })
317 399
318 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).` , func() { 400 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).` , func() {
319 addTestEntry(0, 1, 2, 4) 401 addTestEntry(0, 1, 2, 4)
320 » » » » So(ar.Archive(c, &task), ShouldErrLike, "non-con tiguous log stream") 402
403 » » » » ack, err := ar.archiveTaskImpl(c, task)
404 » » » » So(err, ShouldErrLike, "stream has missing entri es")
405 » » » » So(ack, ShouldBeFalse)
321 }) 406 })
322 407
323 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp ing at the terminal index.`, func() { 408 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp ing at the terminal index.`, func() {
324 addTestEntry(0, 1, 2, 3, 4) 409 addTestEntry(0, 1, 2, 3, 4)
325 » » » » So(ar.Archive(c, &task), ShouldBeNil) 410
411 » » » » ack, err := ar.archiveTaskImpl(c, task)
412 » » » » So(err, ShouldBeNil)
413 » » » » So(ack, ShouldBeTrue)
326 414
327 So(hasStreams(true, true, true), ShouldBeTrue) 415 So(hasStreams(true, true, true), ShouldBeTrue)
328 So(archiveRequest, ShouldResemble, &logdog.Archi veStreamRequest{ 416 So(archiveRequest, ShouldResemble, &logdog.Archi veStreamRequest{
329 » » » » » Path: task.Path, 417 » » » » » Path: archiveTask.Path,
330 » » » » » Complete: true, 418 » » » » » LogEntryCount: 4,
331 TerminalIndex: 3, 419 TerminalIndex: 3,
332 420
333 StreamUrl: gsURL("logstream.entries"), 421 StreamUrl: gsURL("logstream.entries"),
334 IndexUrl: gsURL("logstream.index"), 422 IndexUrl: gsURL("logstream.index"),
335 DataUrl: gsURL("data.bin"), 423 DataUrl: gsURL("data.bin"),
336 }) 424 })
337 }) 425 })
426
427 Convey(`When atransient archival error occurs, will not ACK it.`, func() {
428 gsc.newWriterErr = func(*testGSWriter) error { r eturn errors.WrapTransient(errors.New("test error")) }
429
430 ack, err := ar.archiveTaskImpl(c, task)
431 So(err, ShouldErrLike, "test error")
432 So(ack, ShouldBeFalse)
433 })
434
435 Convey(`When a non-transient archival error occurs`, fun c() {
436 archiveErr := errors.New("archive failure error" )
437 gsc.newWriterErr = func(*testGSWriter) error { r eturn archiveErr }
438
439 Convey(`If remote report returns an error, do no t ACK.`, func() {
440 archiveStreamErr = errors.New("test erro r")
441
442 ack, err := ar.archiveTaskImpl(c, task)
443 So(err, ShouldErrLike, "test error")
444 So(ack, ShouldBeFalse)
445
446 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
447 Path: archiveTask.Path,
448 Error: "archive failure error",
449 })
450 })
451
452 Convey(`If remote report returns success, ACK.`, func() {
453 ack, err := ar.archiveTaskImpl(c, task)
454 So(err, ShouldBeNil)
455 So(ack, ShouldBeTrue)
456 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
457 Path: archiveTask.Path,
458 Error: "archive failure error",
459 })
460 })
461
462 Convey(`If an empty error string is supplied, th e generic error will be filled in.`, func() {
463 archiveErr = errors.New("")
464
465 ack, err := ar.archiveTaskImpl(c, task)
466 So(err, ShouldBeNil)
467 So(ack, ShouldBeTrue)
468 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
469 Path: archiveTask.Path,
470 Error: "archival error",
471 })
472 })
473 })
338 }) 474 })
339 475
340 Convey(`When not enforcing stream completeness`, func() { 476 Convey(`When not enforcing stream completeness`, func() {
341 » » » task.Complete = false 477 » » » stream.Age = google.NewDuration(expired)
342 478
343 Convey(`With no terminal index`, func() { 479 Convey(`With no terminal index`, func() {
344 Convey(`Will successfully archive if there are n o entries.`, func() { 480 Convey(`Will successfully archive if there are n o entries.`, func() {
345 » » » » » So(ar.Archive(c, &task), ShouldBeNil) 481 » » » » » ack, err := ar.archiveTaskImpl(c, task)
482 » » » » » So(err, ShouldBeNil)
483 » » » » » So(ack, ShouldBeTrue)
346 484
347 So(hasStreams(true, true, false), Should BeTrue) 485 So(hasStreams(true, true, false), Should BeTrue)
348 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 486 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
349 » » » » » » Path: task.Path, 487 » » » » » » Path: archiveTask.Path,
350 » » » » » » Complete: true, 488 » » » » » » LogEntryCount: 0,
351 TerminalIndex: -1, 489 TerminalIndex: -1,
352 490
353 StreamUrl: gsURL("logstream.entr ies"), 491 StreamUrl: gsURL("logstream.entr ies"),
354 IndexUrl: gsURL("logstream.inde x"), 492 IndexUrl: gsURL("logstream.inde x"),
355 DataUrl: gsURL("data.bin"), 493 DataUrl: gsURL("data.bin"),
356 }) 494 })
357 }) 495 })
358 496
359 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index.`, func() { 497 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index.`, func() {
360 addTestEntry(0, 1, 2, 4) 498 addTestEntry(0, 1, 2, 4)
361 » » » » » So(ar.Archive(c, &task), ShouldBeNil) 499
500 » » » » » ack, err := ar.archiveTaskImpl(c, task)
501 » » » » » So(err, ShouldBeNil)
502 » » » » » So(ack, ShouldBeTrue)
362 503
363 So(hasStreams(true, true, true), ShouldB eTrue) 504 So(hasStreams(true, true, true), ShouldB eTrue)
364 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 505 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
365 » » » » » » Path: task.Path, 506 » » » » » » Path: archiveTask.Path,
366 » » » » » » Complete: false, 507 » » » » » » LogEntryCount: 4,
367 TerminalIndex: 4, 508 TerminalIndex: 4,
368 509
369 StreamUrl: gsURL("logstream.entr ies"), 510 StreamUrl: gsURL("logstream.entr ies"),
370 IndexUrl: gsURL("logstream.inde x"), 511 IndexUrl: gsURL("logstream.inde x"),
371 DataUrl: gsURL("data.bin"), 512 DataUrl: gsURL("data.bin"),
372 }) 513 })
373 }) 514 })
374 }) 515 })
375 516
376 Convey(`With terminal index 3`, func() { 517 Convey(`With terminal index 3`, func() {
377 stream.State.TerminalIndex = 3 518 stream.State.TerminalIndex = 3
378 519
379 Convey(`Will successfully archive if there are n o entries.`, func() { 520 Convey(`Will successfully archive if there are n o entries.`, func() {
380 » » » » » So(ar.Archive(c, &task), ShouldBeNil) 521 » » » » » ack, err := ar.archiveTaskImpl(c, task)
522 » » » » » So(err, ShouldBeNil)
523 » » » » » So(ack, ShouldBeTrue)
381 524
382 So(hasStreams(true, true, false), Should BeTrue) 525 So(hasStreams(true, true, false), Should BeTrue)
383 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 526 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
384 » » » » » » Path: task.Path, 527 » » » » » » Path: archiveTask.Path,
385 » » » » » » Complete: true, 528 » » » » » » LogEntryCount: 0,
386 TerminalIndex: -1, 529 TerminalIndex: -1,
387 530
388 StreamUrl: gsURL("logstream.entr ies"), 531 StreamUrl: gsURL("logstream.entr ies"),
389 IndexUrl: gsURL("logstream.inde x"), 532 IndexUrl: gsURL("logstream.inde x"),
390 DataUrl: gsURL("data.bin"), 533 DataUrl: gsURL("data.bin"),
391 }) 534 })
392 }) 535 })
393 536
394 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index to 2.`, func() { 537 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index to 2.`, func() {
395 addTestEntry(0, 1, 2, 4) 538 addTestEntry(0, 1, 2, 4)
396 » » » » » So(ar.Archive(c, &task), ShouldBeNil) 539
540 » » » » » ack, err := ar.archiveTaskImpl(c, task)
541 » » » » » So(err, ShouldBeNil)
542 » » » » » So(ack, ShouldBeTrue)
397 543
398 So(hasStreams(true, true, true), ShouldB eTrue) 544 So(hasStreams(true, true, true), ShouldB eTrue)
399 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 545 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
400 » » » » » » Path: task.Path, 546 » » » » » » Path: archiveTask.Path,
401 » » » » » » Complete: false, 547 » » » » » » LogEntryCount: 3,
402 TerminalIndex: 2, 548 TerminalIndex: 2,
403 549
404 StreamUrl: gsURL("logstream.entr ies"), 550 StreamUrl: gsURL("logstream.entr ies"),
405 IndexUrl: gsURL("logstream.inde x"), 551 IndexUrl: gsURL("logstream.inde x"),
406 DataUrl: gsURL("data.bin"), 552 DataUrl: gsURL("data.bin"),
407 }) 553 })
408 }) 554 })
409 }) 555 })
410 }) 556 })
411 557
412 // Simulate failures during the various stream generation operat ions. 558 // Simulate failures during the various stream generation operat ions.
413 Convey(`Stream generation failures`, func() { 559 Convey(`Stream generation failures`, func() {
414 stream.State.TerminalIndex = 3 560 stream.State.TerminalIndex = 3
415 addTestEntry(0, 1, 2, 3) 561 addTestEntry(0, 1, 2, 3)
416 562
417 for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} { 563 for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
418 for _, testCase := range []struct { 564 for _, testCase := range []struct {
419 name string 565 name string
420 setup func() 566 setup func()
421 }{ 567 }{
422 {"delete failure", func() {
423 gsc.deleteErr = func(b, p string ) error {
424 if strings.HasSuffix(p, failName) {
425 return errors.Ne w("test error")
426 }
427 return nil
428 }
429 }},
430
431 {"writer create failure", func() { 568 {"writer create failure", func() {
432 gsc.newWriterErr = func(w *testG SWriter) error { 569 gsc.newWriterErr = func(w *testG SWriter) error {
433 » » » » » » » if strings.HasSuffix(w.p ath, failName) { 570 » » » » » » » if strings.HasSuffix(str ing(w.path), failName) {
434 » » » » » » » » return errors.Ne w("test error") 571 » » » » » » » » return errors.Wr apTransient(errors.New("test error"))
435 } 572 }
436 return nil 573 return nil
437 } 574 }
438 }}, 575 }},
439 576
440 {"write failure", func() { 577 {"write failure", func() {
441 gsc.newWriterErr = func(w *testG SWriter) error { 578 gsc.newWriterErr = func(w *testG SWriter) error {
442 » » » » » » » if strings.HasSuffix(w.p ath, failName) { 579 » » » » » » » if strings.HasSuffix(str ing(w.path), failName) {
443 » » » » » » » » w.writeErr = err ors.New("test error") 580 » » » » » » » » w.writeErr = err ors.WrapTransient(errors.New("test error"))
444 } 581 }
445 return nil 582 return nil
446 } 583 }
584 }},
585
586 {"rename failure", func() {
587 gsc.renameErr = func(src, dst gs .Path) error {
588 if strings.HasSuffix(str ing(src), failName) {
589 return errors.Wr apTransient(errors.New("test error"))
590 }
591 return nil
592 }
447 }}, 593 }},
448 594
449 {"close failure", func() { 595 {"close failure", func() {
450 gsc.newWriterErr = func(w *testG SWriter) error { 596 gsc.newWriterErr = func(w *testG SWriter) error {
451 » » » » » » » if strings.HasSuffix(w.p ath, failName) { 597 » » » » » » » if strings.HasSuffix(str ing(w.path), failName) {
452 » » » » » » » » w.closeErr = err ors.New("test error") 598 » » » » » » » » w.closeErr = err ors.WrapTransient(errors.New("test error"))
453 } 599 }
454 return nil 600 return nil
455 } 601 }
456 }}, 602 }},
457 603
458 » » » » » {"delete on fail failure (double-failure )", func() { 604 » » » » » {"delete failure after other failure", f unc() {
459 » » » » » » failed := false
460
461 // Simulate a write failure. Thi s is the error that will actually 605 // Simulate a write failure. Thi s is the error that will actually
462 // be returned. 606 // be returned.
463 gsc.newWriterErr = func(w *testG SWriter) error { 607 gsc.newWriterErr = func(w *testG SWriter) error {
464 » » » » » » » if strings.HasSuffix(w.p ath, failName) { 608 » » » » » » » if strings.HasSuffix(str ing(w.path), failName) {
465 » » » » » » » » w.writeErr = err ors.New("test error") 609 » » » » » » » » w.writeErr = err ors.WrapTransient(errors.New("test error"))
466 } 610 }
467 return nil 611 return nil
468 } 612 }
469 613
470 » » » » » » // This will trigger twice per s tream, once on create and once on 614 » » » » » » // This will trigger whe NewWrit er fails from the above
471 » » » » » » // cleanup after the write fails . We want to return an error in 615 » » » » » » // instrumentation.
472 » » » » » » // the latter case. 616 » » » » » » gsc.deleteErr = func(p gs.Path) error {
473 » » » » » » gsc.deleteErr = func(b, p string ) error { 617 » » » » » » » if strings.HasSuffix(str ing(p), failName) {
474 » » » » » » » if strings.HasSuffix(p, failName) { 618 » » » » » » » » return errors.Ne w("other error")
475 » » » » » » » » if failed {
476 » » » » » » » » » return e rrors.New("other error")
477 » » » » » » » » }
478
479 » » » » » » » » // First delete (on create).
480 » » » » » » » » failed = true
481 } 619 }
482 return nil 620 return nil
483 } 621 }
484 }}, 622 }},
485 } { 623 } {
486 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s `, testCase.name, failName), func() { 624 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s , and will not archive.`, testCase.name, failName), func() {
487 testCase.setup() 625 testCase.setup()
488 » » » » » » So(ar.Archive(c, &task), ShouldE rrLike, "test error") 626
627 » » » » » » ack, err := ar.archiveTaskImpl(c , task)
628 » » » » » » So(err, ShouldErrLike, "test err or")
629 » » » » » » So(ack, ShouldBeFalse)
630 » » » » » » So(archiveRequest, ShouldBeNil)
489 }) 631 })
490 } 632 }
491 } 633 }
492 }) 634 })
493 }) 635 })
494 } 636 }
OLDNEW
« no previous file with comments | « server/internal/logdog/archivist/archivist.go ('k') | server/internal/logdog/archivist/storageSource.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698