| Index: server/internal/logdog/archivist/archivist_test.go
|
| diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
|
| index 12daac169e7ca7d70f8181b69d077782b5ac5e8d..4713c188547522947b5d95247580bb3d94bae4b2 100644
|
| --- a/server/internal/logdog/archivist/archivist_test.go
|
| +++ b/server/internal/logdog/archivist/archivist_test.go
|
| @@ -5,7 +5,6 @@
|
| package archivist
|
|
|
| import (
|
| - "errors"
|
| "fmt"
|
| "strings"
|
| "sync"
|
| @@ -15,6 +14,7 @@ import (
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -28,6 +28,29 @@ import (
|
| . "github.com/smartystreets/goconvey/convey"
|
| )
|
|
|
| +// testTask is an instrumentable Task implementation.
|
| +type testTask struct {
|
| + data []byte
|
| + assertLeaseErr error
|
| + assertCount int
|
| +}
|
| +
|
| +func (t *testTask) UniqueID() string {
|
| + return "totally unique ID"
|
| +}
|
| +
|
| +func (t *testTask) Data() []byte {
|
| + return t.data
|
| +}
|
| +
|
| +func (t *testTask) AssertLease(context.Context) error {
|
| + if err := t.assertLeaseErr; err != nil {
|
| + return err
|
| + }
|
| + t.assertCount++
|
| + return nil
|
| +}
|
| +
|
| // testServicesClient implements logdog.ServicesClient sufficient for testing
|
| // and instrumentation.
|
| type testServicesClient struct {
|
| @@ -65,18 +88,19 @@ type testGSClient struct {
|
|
|
| // objs is a map of filename to "write amount". The write amount is the
|
| // cumulative amount of data written to the Writer for a given GS path.
|
| - objs map[string]int64
|
| + objs map[gs.Path]int64
|
| closed bool
|
|
|
| closeErr error
|
| newWriterErr func(w *testGSWriter) error
|
| - deleteErr func(string, string) error
|
| + deleteErr func(gs.Path) error
|
| + renameErr func(gs.Path, gs.Path) error
|
| }
|
|
|
| -func (c *testGSClient) NewWriter(bucket, relpath string) (gs.Writer, error) {
|
| +func (c *testGSClient) NewWriter(p gs.Path) (gs.Writer, error) {
|
| w := testGSWriter{
|
| client: c,
|
| - path: c.url(bucket, relpath),
|
| + path: p,
|
| }
|
| if c.newWriterErr != nil {
|
| if err := c.newWriterErr(&w); err != nil {
|
| @@ -97,9 +121,9 @@ func (c *testGSClient) Close() error {
|
| return nil
|
| }
|
|
|
| -func (c *testGSClient) Delete(bucket, relpath string) error {
|
| +func (c *testGSClient) Delete(p gs.Path) error {
|
| if c.deleteErr != nil {
|
| - if err := c.deleteErr(bucket, relpath); err != nil {
|
| + if err := c.deleteErr(p); err != nil {
|
| return err
|
| }
|
| }
|
| @@ -107,18 +131,29 @@ func (c *testGSClient) Delete(bucket, relpath string) error {
|
| c.Lock()
|
| defer c.Unlock()
|
|
|
| - delete(c.objs, c.url(bucket, relpath))
|
| + delete(c.objs, p)
|
| return nil
|
| }
|
|
|
| -func (c *testGSClient) url(bucket, relpath string) string {
|
| - return fmt.Sprintf("gs://%s/%s", bucket, relpath)
|
| +func (c *testGSClient) Rename(src, dst gs.Path) error {
|
| + if c.renameErr != nil {
|
| + if err := c.renameErr(src, dst); err != nil {
|
| + return err
|
| + }
|
| + }
|
| +
|
| + c.Lock()
|
| + defer c.Unlock()
|
| +
|
| + c.objs[dst] = c.objs[src]
|
| + delete(c.objs, src)
|
| + return nil
|
| }
|
|
|
| type testGSWriter struct {
|
| client *testGSClient
|
|
|
| - path string
|
| + path gs.Path
|
| closed bool
|
| writeCount int64
|
|
|
| @@ -135,7 +170,7 @@ func (w *testGSWriter) Write(d []byte) (int, error) {
|
| defer w.client.Unlock()
|
|
|
| if w.client.objs == nil {
|
| - w.client.objs = make(map[string]int64)
|
| + w.client.objs = make(map[gs.Path]int64)
|
| }
|
| w.client.objs[w.path] += int64(len(d))
|
| w.writeCount += int64(len(d))
|
| @@ -212,39 +247,59 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }
|
|
|
| + // Set up our testing archival task.
|
| + expired := 10 * time.Minute
|
| + archiveTask := logdog.ArchiveTask{
|
| + Path: string(desc.Path()),
|
| + SettleDelay: google.NewDuration(10 * time.Second),
|
| + CompletePeriod: google.NewDuration(expired),
|
| + Key: []byte("random archival key"),
|
| + }
|
| + expired++ // This represents a time PAST CompletePeriod.
|
| +
|
| + archiveTaskData, err := proto.Marshal(&archiveTask)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| +
|
| + task := &testTask{
|
| + data: archiveTaskData,
|
| + }
|
| +
|
| // Set up our test Coordinator client stubs.
|
| stream := logdog.LoadStreamResponse{
|
| State: &logdog.LogStreamState{
|
| - Path: string(desc.Path()),
|
| + Path: archiveTask.Path,
|
| ProtoVersion: logpb.Version,
|
| TerminalIndex: -1,
|
| Archived: false,
|
| Purged: false,
|
| },
|
| Desc: descBytes,
|
| +
|
| + // Age is ON the expiration threshold, so not expired.
|
| + Age: archiveTask.CompletePeriod,
|
| + ArchivalKey: archiveTask.Key,
|
| }
|
|
|
| var archiveRequest *logdog.ArchiveStreamRequest
|
| + var archiveStreamErr error
|
| sc := testServicesClient{
|
| lsCallback: func(req *logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
|
| return &stream, nil
|
| },
|
| asCallback: func(req *logdog.ArchiveStreamRequest) error {
|
| archiveRequest = req
|
| - return nil
|
| + return archiveStreamErr
|
| },
|
| }
|
|
|
| ar := Archivist{
|
| - Service: &sc,
|
| - Storage: &st,
|
| - GSClient: &gsc,
|
| - GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
|
| - }
|
| -
|
| - task := logdog.ArchiveTask{
|
| - Path: stream.State.Path,
|
| - Complete: true,
|
| + Service: &sc,
|
| + Storage: &st,
|
| + GSClient: &gsc,
|
| + GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
|
| + GSStagingBase: gs.Path("gs://archive-test-staging/path/to/archive/"), // Extra slashes to test concatenation.
|
| }
|
|
|
| gsURL := func(p string) string {
|
| @@ -270,64 +325,102 @@ func TestHandleArchive(t *testing.T) {
|
| return true
|
| }
|
|
|
| - Convey(`Will fail to archive if the specified stream state could not be loaded.`, func() {
|
| + Convey(`Will return task and fail to archive if the specified stream state could not be loaded.`, func() {
|
| sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
|
| return nil, errors.New("does not exist")
|
| }
|
| - So(ar.Archive(c, &task), ShouldErrLike, "does not exist")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "does not exist")
|
| + So(ack, ShouldBeFalse)
|
| })
|
|
|
| - Convey(`Will refrain from archiving if the stream is already archived.`, func() {
|
| + Convey(`Will complete task and refrain from archiving if the stream is already archived.`, func() {
|
| stream.State.Archived = true
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "log stream is archived")
|
| + So(ack, ShouldBeTrue)
|
| So(archiveRequest, ShouldBeNil)
|
| })
|
|
|
| - Convey(`Will refrain from archiving if the stream is purged.`, func() {
|
| + Convey(`Will complete task and refrain from archiving if the stream is purged.`, func() {
|
| stream.State.Purged = true
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "log stream is purged")
|
| + So(ack, ShouldBeTrue)
|
| So(archiveRequest, ShouldBeNil)
|
| })
|
|
|
| - // Weird case: the log has been marked for archival, has not been
|
| - // terminated, and is within its completeness delay. This task will not
|
| - // have been dispatched by our archive cron, but let's assert that it
|
| - // behaves correctly regardless.
|
| - Convey(`Will succeed if the log stream had no entries and no terminal index.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| -
|
| - So(hasStreams(true, true, false), ShouldBeTrue)
|
| - So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: true,
|
| - TerminalIndex: -1,
|
| + Convey(`Will return task if the stream is younger than its settle delay.`, func() {
|
| + stream.Age = google.NewDuration(time.Second)
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| - })
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "log stream is within settle delay")
|
| + So(ack, ShouldBeFalse)
|
| + So(archiveRequest, ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will return task if the log stream doesn't have an archival key yet.`, func() {
|
| + stream.Age = google.NewDuration(expired)
|
| + stream.ArchivalKey = nil
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "premature archival request")
|
| + So(ack, ShouldBeFalse)
|
| + So(archiveRequest, ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will complete task and refrain from archiving if archival keys dont' match.`, func() {
|
| + stream.Age = google.NewDuration(expired)
|
| + stream.ArchivalKey = []byte("non-matching archival key")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "superfluous archival request")
|
| + So(ack, ShouldBeTrue)
|
| + So(archiveRequest, ShouldBeNil)
|
| + })
|
| +
|
| + // Weird case: the log has been marked for archival, has not been
|
| + //
|
| + // terminated, and is within its completeness delay. This task should not
|
| + // have been dispatched by our expired archive cron, but let's assert that
|
| + // it behaves correctly regardless.
|
| + Convey(`Will refuse to archive a complete stream with no terminal index.`, func() {
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "completeness required, but stream has no terminal index")
|
| + So(ack, ShouldBeFalse)
|
| })
|
|
|
| Convey(`With terminal index "3"`, func() {
|
| stream.State.TerminalIndex = 3
|
|
|
| - Convey(`Will fail if the log stream had a terminal index and no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldErrLike, "stream finished short of terminal index")
|
| + Convey(`Will fail not ACK a log stream with no entries.`, func() {
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "stream has missing entries")
|
| + So(ack, ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldErrLike, "non-contiguous log stream")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "stream has missing entries")
|
| + So(ack, ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
|
| addTestEntry(0, 1, 2, 3, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: true,
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 4,
|
| TerminalIndex: 3,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -335,19 +428,69 @@ func TestHandleArchive(t *testing.T) {
|
| DataUrl: gsURL("data.bin"),
|
| })
|
| })
|
| +
|
| + Convey(`When atransient archival error occurs, will not ACK it.`, func() {
|
| + gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "test error")
|
| + So(ack, ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`When a non-transient archival error occurs`, func() {
|
| + archiveErr := errors.New("archive failure error")
|
| + gsc.newWriterErr = func(*testGSWriter) error { return archiveErr }
|
| +
|
| + Convey(`If remote report returns an error, do not ACK.`, func() {
|
| + archiveStreamErr = errors.New("test error")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "test error")
|
| + So(ack, ShouldBeFalse)
|
| +
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Path: archiveTask.Path,
|
| + Error: "archive failure error",
|
| + })
|
| + })
|
| +
|
| + Convey(`If remote report returns success, ACK.`, func() {
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Path: archiveTask.Path,
|
| + Error: "archive failure error",
|
| + })
|
| + })
|
| +
|
| + Convey(`If an empty error string is supplied, the generic error will be filled in.`, func() {
|
| + archiveErr = errors.New("")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Path: archiveTask.Path,
|
| + Error: "archival error",
|
| + })
|
| + })
|
| + })
|
| })
|
|
|
| Convey(`When not enforcing stream completeness`, func() {
|
| - task.Complete = false
|
| + stream.Age = google.NewDuration(expired)
|
|
|
| Convey(`With no terminal index`, func() {
|
| Convey(`Will successfully archive if there are no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: true,
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -358,12 +501,15 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: false,
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 4,
|
| TerminalIndex: 4,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -377,12 +523,14 @@ func TestHandleArchive(t *testing.T) {
|
| stream.State.TerminalIndex = 3
|
|
|
| Convey(`Will successfully archive if there are no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: true,
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -393,12 +541,15 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: false,
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 3,
|
| TerminalIndex: 2,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -419,28 +570,28 @@ func TestHandleArchive(t *testing.T) {
|
| name string
|
| setup func()
|
| }{
|
| - {"delete failure", func() {
|
| - gsc.deleteErr = func(b, p string) error {
|
| - if strings.HasSuffix(p, failName) {
|
| - return errors.New("test error")
|
| + {"writer create failure", func() {
|
| + gsc.newWriterErr = func(w *testGSWriter) error {
|
| + if strings.HasSuffix(string(w.path), failName) {
|
| + return errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
| }},
|
|
|
| - {"writer create failure", func() {
|
| + {"write failure", func() {
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| - if strings.HasSuffix(w.path, failName) {
|
| - return errors.New("test error")
|
| + if strings.HasSuffix(string(w.path), failName) {
|
| + w.writeErr = errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
| }},
|
|
|
| - {"write failure", func() {
|
| - gsc.newWriterErr = func(w *testGSWriter) error {
|
| - if strings.HasSuffix(w.path, failName) {
|
| - w.writeErr = errors.New("test error")
|
| + {"rename failure", func() {
|
| + gsc.renameErr = func(src, dst gs.Path) error {
|
| + if strings.HasSuffix(string(src), failName) {
|
| + return errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
| @@ -448,44 +599,40 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| {"close failure", func() {
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| - if strings.HasSuffix(w.path, failName) {
|
| - w.closeErr = errors.New("test error")
|
| + if strings.HasSuffix(string(w.path), failName) {
|
| + w.closeErr = errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
| }},
|
|
|
| - {"delete on fail failure (double-failure)", func() {
|
| - failed := false
|
| -
|
| + {"delete failure after other failure", func() {
|
| // Simulate a write failure. This is the error that will actually
|
| // be returned.
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| - if strings.HasSuffix(w.path, failName) {
|
| - w.writeErr = errors.New("test error")
|
| + if strings.HasSuffix(string(w.path), failName) {
|
| + w.writeErr = errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
|
|
| - // This will trigger twice per stream, once on create and once on
|
| - // cleanup after the write fails. We want to return an error in
|
| - // the latter case.
|
| - gsc.deleteErr = func(b, p string) error {
|
| - if strings.HasSuffix(p, failName) {
|
| - if failed {
|
| - return errors.New("other error")
|
| - }
|
| -
|
| - // First delete (on create).
|
| - failed = true
|
| + // This will trigger whe NewWriter fails from the above
|
| + // instrumentation.
|
| + gsc.deleteErr = func(p gs.Path) error {
|
| + if strings.HasSuffix(string(p), failName) {
|
| + return errors.New("other error")
|
| }
|
| return nil
|
| }
|
| }},
|
| } {
|
| - Convey(fmt.Sprintf(`Can handle %s for %s`, testCase.name, failName), func() {
|
| + Convey(fmt.Sprintf(`Can handle %s for %s, and will not archive.`, testCase.name, failName), func() {
|
| testCase.setup()
|
| - So(ar.Archive(c, &task), ShouldErrLike, "test error")
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldErrLike, "test error")
|
| + So(ack, ShouldBeFalse)
|
| + So(archiveRequest, ShouldBeNil)
|
| })
|
| }
|
| }
|
|
|