Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Unified Diff: server/internal/logdog/archivist/archivist_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/internal/logdog/archivist/archivist.go ('k') | server/internal/logdog/archivist/storageSource.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/archivist/archivist_test.go
diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
index 12daac169e7ca7d70f8181b69d077782b5ac5e8d..a501fd225cf8ffdc0e1cf99c44f1f584d7476b56 100644
--- a/server/internal/logdog/archivist/archivist_test.go
+++ b/server/internal/logdog/archivist/archivist_test.go
@@ -5,7 +5,6 @@
package archivist
import (
- "errors"
"fmt"
"strings"
"sync"
@@ -15,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/logdog/types"
"github.com/luci/luci-go/common/proto/google"
@@ -28,6 +28,29 @@ import (
. "github.com/smartystreets/goconvey/convey"
)
+// testTask is an instrumentable Task implementation.
+type testTask struct {
+ task *logdog.ArchiveTask
+ assertLeaseErr error
+ assertCount int
+}
+
+func (t *testTask) UniqueID() string {
+ return "totally unique ID"
+}
+
+func (t *testTask) Task() *logdog.ArchiveTask {
+ return t.task
+}
+
+func (t *testTask) AssertLease(context.Context) error {
+ if err := t.assertLeaseErr; err != nil {
+ return err
+ }
+ t.assertCount++
+ return nil
+}
+
// testServicesClient implements logdog.ServicesClient sufficient for testing
// and instrumentation.
type testServicesClient struct {
@@ -65,18 +88,19 @@ type testGSClient struct {
// objs is a map of filename to "write amount". The write amount is the
// cumulative amount of data written to the Writer for a given GS path.
- objs map[string]int64
+ objs map[gs.Path]int64
closed bool
closeErr error
newWriterErr func(w *testGSWriter) error
- deleteErr func(string, string) error
+ deleteErr func(gs.Path) error
+ renameErr func(gs.Path, gs.Path) error
}
-func (c *testGSClient) NewWriter(bucket, relpath string) (gs.Writer, error) {
+func (c *testGSClient) NewWriter(p gs.Path) (gs.Writer, error) {
w := testGSWriter{
client: c,
- path: c.url(bucket, relpath),
+ path: p,
}
if c.newWriterErr != nil {
if err := c.newWriterErr(&w); err != nil {
@@ -97,9 +121,9 @@ func (c *testGSClient) Close() error {
return nil
}
-func (c *testGSClient) Delete(bucket, relpath string) error {
+func (c *testGSClient) Delete(p gs.Path) error {
if c.deleteErr != nil {
- if err := c.deleteErr(bucket, relpath); err != nil {
+ if err := c.deleteErr(p); err != nil {
return err
}
}
@@ -107,18 +131,29 @@ func (c *testGSClient) Delete(bucket, relpath string) error {
c.Lock()
defer c.Unlock()
- delete(c.objs, c.url(bucket, relpath))
+ delete(c.objs, p)
return nil
}
-func (c *testGSClient) url(bucket, relpath string) string {
- return fmt.Sprintf("gs://%s/%s", bucket, relpath)
+func (c *testGSClient) Rename(src, dst gs.Path) error {
+ if c.renameErr != nil {
+ if err := c.renameErr(src, dst); err != nil {
+ return err
+ }
+ }
+
+ c.Lock()
+ defer c.Unlock()
+
+ c.objs[dst] = c.objs[src]
+ delete(c.objs, src)
+ return nil
}
type testGSWriter struct {
client *testGSClient
- path string
+ path gs.Path
closed bool
writeCount int64
@@ -135,7 +170,7 @@ func (w *testGSWriter) Write(d []byte) (int, error) {
defer w.client.Unlock()
if w.client.objs == nil {
- w.client.objs = make(map[string]int64)
+ w.client.objs = make(map[gs.Path]int64)
}
w.client.objs[w.path] += int64(len(d))
w.writeCount += int64(len(d))
@@ -212,39 +247,54 @@ func TestHandleArchive(t *testing.T) {
}
}
+ // Set up our testing archival task.
+ expired := 10 * time.Minute
+ archiveTask := logdog.ArchiveTask{
+ Path: string(desc.Path()),
+ SettleDelay: google.NewDuration(10 * time.Second),
+ CompletePeriod: google.NewDuration(expired),
+ Key: []byte("random archival key"),
+ }
+ expired++ // This represents a time PAST CompletePeriod.
+
+ task := &testTask{
+ task: &archiveTask,
+ }
+
// Set up our test Coordinator client stubs.
stream := logdog.LoadStreamResponse{
State: &logdog.LogStreamState{
- Path: string(desc.Path()),
+ Path: archiveTask.Path,
ProtoVersion: logpb.Version,
TerminalIndex: -1,
Archived: false,
Purged: false,
},
Desc: descBytes,
+
+ // Age is ON the expiration threshold, so not expired.
+ Age: archiveTask.CompletePeriod,
+ ArchivalKey: archiveTask.Key,
}
var archiveRequest *logdog.ArchiveStreamRequest
+ var archiveStreamErr error
sc := testServicesClient{
lsCallback: func(req *logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
return &stream, nil
},
asCallback: func(req *logdog.ArchiveStreamRequest) error {
archiveRequest = req
- return nil
+ return archiveStreamErr
},
}
ar := Archivist{
- Service: &sc,
- Storage: &st,
- GSClient: &gsc,
- GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
- }
-
- task := logdog.ArchiveTask{
- Path: stream.State.Path,
- Complete: true,
+ Service: &sc,
+ Storage: &st,
+ GSClient: &gsc,
+ GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
+ GSStagingBase: gs.Path("gs://archive-test-staging/path/to/archive/"), // Extra slashes to test concatenation.
}
gsURL := func(p string) string {
@@ -270,64 +320,102 @@ func TestHandleArchive(t *testing.T) {
return true
}
- Convey(`Will fail to archive if the specified stream state could not be loaded.`, func() {
+ Convey(`Will return task and fail to archive if the specified stream state could not be loaded.`, func() {
sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
return nil, errors.New("does not exist")
}
- So(ar.Archive(c, &task), ShouldErrLike, "does not exist")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "does not exist")
+ So(ack, ShouldBeFalse)
})
- Convey(`Will refrain from archiving if the stream is already archived.`, func() {
+ Convey(`Will complete task and refrain from archiving if the stream is already archived.`, func() {
stream.State.Archived = true
- So(ar.Archive(c, &task), ShouldBeNil)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "log stream is archived")
+ So(ack, ShouldBeTrue)
So(archiveRequest, ShouldBeNil)
})
- Convey(`Will refrain from archiving if the stream is purged.`, func() {
+ Convey(`Will complete task and refrain from archiving if the stream is purged.`, func() {
stream.State.Purged = true
- So(ar.Archive(c, &task), ShouldBeNil)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "log stream is purged")
+ So(ack, ShouldBeTrue)
So(archiveRequest, ShouldBeNil)
})
- // Weird case: the log has been marked for archival, has not been
- // terminated, and is within its completeness delay. This task will not
- // have been dispatched by our archive cron, but let's assert that it
- // behaves correctly regardless.
- Convey(`Will succeed if the log stream had no entries and no terminal index.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
-
- So(hasStreams(true, true, false), ShouldBeTrue)
- So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: true,
- TerminalIndex: -1,
+ Convey(`Will return task if the stream is younger than its settle delay.`, func() {
+ stream.Age = google.NewDuration(time.Second)
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
- })
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "log stream is within settle delay")
+ So(ack, ShouldBeFalse)
+ So(archiveRequest, ShouldBeNil)
+ })
+
+ Convey(`Will return task if the log stream doesn't have an archival key yet.`, func() {
+ stream.Age = google.NewDuration(expired)
+ stream.ArchivalKey = nil
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "premature archival request")
+ So(ack, ShouldBeFalse)
+ So(archiveRequest, ShouldBeNil)
+ })
+
+ Convey(`Will complete task and refrain from archiving if archival keys dont' match.`, func() {
+ stream.Age = google.NewDuration(expired)
+ stream.ArchivalKey = []byte("non-matching archival key")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "superfluous archival request")
+ So(ack, ShouldBeTrue)
+ So(archiveRequest, ShouldBeNil)
+ })
+
+ // Weird case: the log has been marked for archival, has not been
+ //
+ // terminated, and is within its completeness delay. This task should not
+ // have been dispatched by our expired archive cron, but let's assert that
+ // it behaves correctly regardless.
+ Convey(`Will refuse to archive a complete stream with no terminal index.`, func() {
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "completeness required, but stream has no terminal index")
+ So(ack, ShouldBeFalse)
})
Convey(`With terminal index "3"`, func() {
stream.State.TerminalIndex = 3
- Convey(`Will fail if the log stream had a terminal index and no entries.`, func() {
- So(ar.Archive(c, &task), ShouldErrLike, "stream finished short of terminal index")
+ Convey(`Will fail not ACK a log stream with no entries.`, func() {
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "stream has missing entries")
+ So(ack, ShouldBeFalse)
})
Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldErrLike, "non-contiguous log stream")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "stream has missing entries")
+ So(ack, ShouldBeFalse)
})
Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
addTestEntry(0, 1, 2, 3, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: true,
+ Path: archiveTask.Path,
+ LogEntryCount: 4,
TerminalIndex: 3,
StreamUrl: gsURL("logstream.entries"),
@@ -335,19 +423,69 @@ func TestHandleArchive(t *testing.T) {
DataUrl: gsURL("data.bin"),
})
})
+
+ Convey(`When atransient archival error occurs, will not ACK it.`, func() {
+ gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "test error")
+ So(ack, ShouldBeFalse)
+ })
+
+ Convey(`When a non-transient archival error occurs`, func() {
+ archiveErr := errors.New("archive failure error")
+ gsc.newWriterErr = func(*testGSWriter) error { return archiveErr }
+
+ Convey(`If remote report returns an error, do not ACK.`, func() {
+ archiveStreamErr = errors.New("test error")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "test error")
+ So(ack, ShouldBeFalse)
+
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Path: archiveTask.Path,
+ Error: "archive failure error",
+ })
+ })
+
+ Convey(`If remote report returns success, ACK.`, func() {
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Path: archiveTask.Path,
+ Error: "archive failure error",
+ })
+ })
+
+ Convey(`If an empty error string is supplied, the generic error will be filled in.`, func() {
+ archiveErr = errors.New("")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Path: archiveTask.Path,
+ Error: "archival error",
+ })
+ })
+ })
})
Convey(`When not enforcing stream completeness`, func() {
- task.Complete = false
+ stream.Age = google.NewDuration(expired)
Convey(`With no terminal index`, func() {
Convey(`Will successfully archive if there are no entries.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: true,
+ Path: archiveTask.Path,
+ LogEntryCount: 0,
TerminalIndex: -1,
StreamUrl: gsURL("logstream.entries"),
@@ -358,12 +496,15 @@ func TestHandleArchive(t *testing.T) {
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: false,
+ Path: archiveTask.Path,
+ LogEntryCount: 4,
TerminalIndex: 4,
StreamUrl: gsURL("logstream.entries"),
@@ -377,12 +518,14 @@ func TestHandleArchive(t *testing.T) {
stream.State.TerminalIndex = 3
Convey(`Will successfully archive if there are no entries.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: true,
+ Path: archiveTask.Path,
+ LogEntryCount: 0,
TerminalIndex: -1,
StreamUrl: gsURL("logstream.entries"),
@@ -393,12 +536,15 @@ func TestHandleArchive(t *testing.T) {
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: false,
+ Path: archiveTask.Path,
+ LogEntryCount: 3,
TerminalIndex: 2,
StreamUrl: gsURL("logstream.entries"),
@@ -419,28 +565,28 @@ func TestHandleArchive(t *testing.T) {
name string
setup func()
}{
- {"delete failure", func() {
- gsc.deleteErr = func(b, p string) error {
- if strings.HasSuffix(p, failName) {
- return errors.New("test error")
+ {"writer create failure", func() {
+ gsc.newWriterErr = func(w *testGSWriter) error {
+ if strings.HasSuffix(string(w.path), failName) {
+ return errors.WrapTransient(errors.New("test error"))
}
return nil
}
}},
- {"writer create failure", func() {
+ {"write failure", func() {
gsc.newWriterErr = func(w *testGSWriter) error {
- if strings.HasSuffix(w.path, failName) {
- return errors.New("test error")
+ if strings.HasSuffix(string(w.path), failName) {
+ w.writeErr = errors.WrapTransient(errors.New("test error"))
}
return nil
}
}},
- {"write failure", func() {
- gsc.newWriterErr = func(w *testGSWriter) error {
- if strings.HasSuffix(w.path, failName) {
- w.writeErr = errors.New("test error")
+ {"rename failure", func() {
+ gsc.renameErr = func(src, dst gs.Path) error {
+ if strings.HasSuffix(string(src), failName) {
+ return errors.WrapTransient(errors.New("test error"))
}
return nil
}
@@ -448,44 +594,40 @@ func TestHandleArchive(t *testing.T) {
{"close failure", func() {
gsc.newWriterErr = func(w *testGSWriter) error {
- if strings.HasSuffix(w.path, failName) {
- w.closeErr = errors.New("test error")
+ if strings.HasSuffix(string(w.path), failName) {
+ w.closeErr = errors.WrapTransient(errors.New("test error"))
}
return nil
}
}},
- {"delete on fail failure (double-failure)", func() {
- failed := false
-
+ {"delete failure after other failure", func() {
// Simulate a write failure. This is the error that will actually
// be returned.
gsc.newWriterErr = func(w *testGSWriter) error {
- if strings.HasSuffix(w.path, failName) {
- w.writeErr = errors.New("test error")
+ if strings.HasSuffix(string(w.path), failName) {
+ w.writeErr = errors.WrapTransient(errors.New("test error"))
}
return nil
}
- // This will trigger twice per stream, once on create and once on
- // cleanup after the write fails. We want to return an error in
- // the latter case.
- gsc.deleteErr = func(b, p string) error {
- if strings.HasSuffix(p, failName) {
- if failed {
- return errors.New("other error")
- }
-
- // First delete (on create).
- failed = true
+ // This will trigger whe NewWriter fails from the above
+ // instrumentation.
+ gsc.deleteErr = func(p gs.Path) error {
+ if strings.HasSuffix(string(p), failName) {
+ return errors.New("other error")
}
return nil
}
}},
} {
- Convey(fmt.Sprintf(`Can handle %s for %s`, testCase.name, failName), func() {
+ Convey(fmt.Sprintf(`Can handle %s for %s, and will not archive.`, testCase.name, failName), func() {
testCase.setup()
- So(ar.Archive(c, &task), ShouldErrLike, "test error")
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldErrLike, "test error")
+ So(ack, ShouldBeFalse)
+ So(archiveRequest, ShouldBeNil)
})
}
}
« no previous file with comments | « server/internal/logdog/archivist/archivist.go ('k') | server/internal/logdog/archivist/storageSource.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698