| Index: appengine/logdog/coordinator/backend/archiveCron_test.go
|
| diff --git a/appengine/logdog/coordinator/backend/archiveCron_test.go b/appengine/logdog/coordinator/backend/archiveCron_test.go
|
| index 88e45352d3a2a30ec23ec3ebc58597edec2d8aa8..331824396e0f0d6e7552a8269f8a0e75791bcc3a 100644
|
| --- a/appengine/logdog/coordinator/backend/archiveCron_test.go
|
| +++ b/appengine/logdog/coordinator/backend/archiveCron_test.go
|
| @@ -12,9 +12,7 @@ import (
|
| "time"
|
|
|
| "github.com/julienschmidt/httprouter"
|
| - "github.com/luci/gae/filter/featureBreaker"
|
| ds "github.com/luci/gae/service/datastore"
|
| - tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/appengine/gaetesting"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| @@ -22,7 +20,6 @@ import (
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/proto/google"
|
| - "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
|
|
| . "github.com/smartystreets/goconvey/convey"
|
| )
|
| @@ -40,39 +37,49 @@ func TestHandleArchiveCron(t *testing.T) {
|
| Kind: "LogStream",
|
| SortBy: []ds.IndexColumn{
|
| {Property: "State"},
|
| - {Property: "Updated"},
|
| + {Property: "Created", Descending: true},
|
| },
|
| },
|
| )
|
|
|
| // Install a base set of streams.
|
| + //
|
| + // If the stream was created >= 10min ago, it will be candidate for
|
| + // archival.
|
| + now := clock.Now(c)
|
| for _, v := range []struct {
|
| - name string
|
| - d time.Duration
|
| - term bool
|
| + name string
|
| + d time.Duration // Time before "now" for this streams Created.
|
| + state coordinator.LogStreamState
|
| }{
|
| - {"foo", 0, true}, // Not candidate for archival (too soon).
|
| - {"bar", 10 * time.Minute, true}, // Candidate for archival.
|
| - {"baz", 10 * time.Minute, false}, // Not candidate for archival (not terminal).
|
| - {"qux", 24 * time.Hour, false}, // Candidate for non-terminal archival.
|
| + {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
|
| + {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
|
| + {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
|
| + {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
|
| + {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
|
| } {
|
| ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
|
|
|
| // The entry was created a week ago.
|
| - ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour))
|
| - ls.Updated = clock.Now(c).Add(-v.d)
|
| - if v.term {
|
| - ls.State = coordinator.LSTerminated
|
| - }
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| + ls.Created = now.Add(-v.d)
|
| + ls.State = v.state
|
| + ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
|
| + ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| }
|
| }
|
| ds.Get(c).Testable().CatchupIndexes()
|
|
|
| // This allows us to update our Context in test setup.
|
| + tap := ct.ArchivalPublisher{}
|
| + svcStub := ct.Services{
|
| + AP: func() (coordinator.ArchivalPublisher, error) {
|
| + return &tap, nil
|
| + },
|
| + }
|
| b := Backend{
|
| - multiTaskBatchSize: 5,
|
| + ServiceBase: coordinator.ServiceBase{&svcStub},
|
| }
|
|
|
| tb := testBase{Context: c}
|
| @@ -82,156 +89,106 @@ func TestHandleArchiveCron(t *testing.T) {
|
| s := httptest.NewServer(r)
|
| defer s.Close()
|
|
|
| - Convey(`With no configuration loaded`, func() {
|
| - for _, ep := range []string{"terminal", "nonterminal"} {
|
| - Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| - })
|
| - }
|
| - })
|
| -
|
| - Convey(`With a configuration loaded`, func() {
|
| - qName := "archive-test-queue"
|
| - c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| - ArchiveDelay: google.NewDuration(10 * time.Minute),
|
| - ArchiveDelayMax: google.NewDuration(24 * time.Hour),
|
| - ArchiveTaskQueue: qName,
|
| - })
|
| - tb.Context = c
|
| + endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
|
|
|
| - Convey(`With no task queue configured`, func() {
|
| - for _, ep := range []string{"terminal", "nonterminal", "purge"} {
|
| - Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| - })
|
| - }
|
| - })
|
| + Convey(`With no configuration loaded, the endpoint will fail.`, func() {
|
| + resp, err := http.Get(endpoint)
|
| + So(err, ShouldBeNil)
|
| + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| + })
|
|
|
| - Convey(`With a task queue configured`, func() {
|
| - tq.Get(c).Testable().CreateQueue(qName)
|
| + Convey(`With no task topic configured, the endpoint will fail.`, func() {
|
| + svcStub.InitConfig()
|
|
|
| - Convey(`A terminal endpoint hit will be successful and idempotent.`, func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| -
|
| - // Candidate tasks should be scheduled.
|
| - tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
|
| - So(tasks, shouldHaveTasks, archiveTaskName("testing/+/bar"))
|
| + resp, err := http.Get(endpoint)
|
| + So(err, ShouldBeNil)
|
| + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| + })
|
|
|
| - // Hit the endpoint again, the same tasks should be scheduled.
|
| - resp, err = http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| + Convey(`With a configuration loaded`, func() {
|
| + svcStub.InitConfig()
|
| + svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/topics/archive-test-topic"
|
| + svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.NewDuration(10 * time.Second)
|
| + svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(10 * time.Minute)
|
| +
|
| + Convey(`A request to the endpoint will be successful.`, func() {
|
| + resp, err := http.Get(endpoint)
|
| + So(err, ShouldBeNil)
|
| + So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| +
|
| + // Candidate tasks should be scheduled.
|
| + So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"})
|
| +
|
| + // Hit the endpoint again, no additional tasks should be scheduled.
|
| + Convey(`A subsequent endpoint hit will not schedule any additional tasks.`, func() {
|
| + resp, err = http.Get(endpoint)
|
| So(err, ShouldBeNil)
|
| So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| -
|
| - tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
|
| - So(tasks2, ShouldResemble, tasks)
|
| + So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"})
|
| })
|
| + })
|
|
|
| - Convey(`A non-terminal endpoint hit will be successful and idempotent.`, func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| -
|
| - // Candidate tasks should be scheduled.
|
| - tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
|
| - So(tasks, shouldHaveTasks, archiveTaskName("testing/+/qux"))
|
| -
|
| - // Hit the endpoint again, the same tasks should be scheduled.
|
| - resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| + Convey(`When scheduling multiple tasks`, func() {
|
| + b.multiTaskBatchSize = 5
|
| +
|
| + // Create a lot of archival candidate tasks to schedule.
|
| + //
|
| + // Note that since task queue names are returned sorted, we want to
|
| + // name these streams greater than our stock stream names.
|
| + names := []string{"bar", "quux"}
|
| + for i := 0; i < 11; i++ {
|
| + ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i)))
|
| +
|
| + ls.Created = now.Add(-(10 * time.Minute))
|
| + ls.State = coordinator.LSStreaming
|
| + ls.TerminalIndex = 1337
|
| + ls.TerminatedTime = now
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| + panic(err)
|
| + }
|
|
|
| - tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
|
| - So(tasks2, ShouldResemble, tasks)
|
| - })
|
| + names = append(names, ls.Name)
|
| + }
|
| + ds.Get(c).Testable().CatchupIndexes()
|
|
|
| - Convey(`A terminal endpoint hit followed by a non-terminal endpoint hit will be successful.`, func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| + Convey(`Will schedule all pages properly.`, func() {
|
| + // Ensure that all of these tasks get added to the task queue.
|
| + resp, err := http.Get(endpoint)
|
| So(err, ShouldBeNil)
|
| So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| - So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar"))
|
| + So(tap.StreamNames(), ShouldResemble, names)
|
|
|
| - resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| - So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks,
|
| - archiveTaskName("testing/+/bar"), archiveTaskName("testing/+/qux"))
|
| - })
|
| -
|
| - Convey(`When scheduling multiple tasks`, func() {
|
| - // Create a lot of archival candidate tasks to schedule.
|
| - var names []string
|
| - for i := 0; i < 11; i++ {
|
| - ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i)))
|
| -
|
| - ls.Created = clock.Now(c).Add(-(10 * time.Minute))
|
| - ls.Updated = ls.Created
|
| - ls.State = coordinator.LSTerminated
|
| - ls.TerminalIndex = 1337
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| - panic(err)
|
| - }
|
| -
|
| - names = append(names, ls.Name)
|
| + // Use this opportunity to assert that none of the scheduled streams
|
| + // have any settle or completion delay.
|
| + for _, at := range tap.Tasks() {
|
| + So(at.SettleDelay.Duration(), ShouldEqual, 0)
|
| + So(at.CompletePeriod.Duration(), ShouldEqual, 0)
|
| }
|
| - names = append(names, "bar")
|
| - ds.Get(c).Testable().CatchupIndexes()
|
| -
|
| - Convey(`Will schedule all pages properly.`, func() {
|
| - taskNames := make([]interface{}, len(names))
|
| - for i, n := range names {
|
| - taskNames[i] = archiveTaskName(fmt.Sprintf("testing/+/%s", n))
|
| - }
|
|
|
| - // Ensure that all of these tasks get added to the task queue.
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| + Convey(`Will not schedule additional tasks on the next run.`, func() {
|
| + resp, err := http.Get(endpoint)
|
| So(err, ShouldBeNil)
|
| So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| - So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
|
| -
|
| - Convey(`Will be successful when rescheduling the same tasks.`, func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| - So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
|
| - })
|
| - })
|
| -
|
| - Convey(`Will return an error if task scheduling fails.`, func() {
|
| - c, fb := featureBreaker.FilterTQ(c, nil)
|
| - tb.Context = c
|
| - fb.BreakFeatures(errors.New("test error"), "AddMulti")
|
| -
|
| - // Ensure that all of these tasks get added to the task queue.
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| + So(tap.StreamNames(), ShouldResemble, names)
|
| })
|
| + })
|
|
|
| - Convey(`Will return an error if a single task scheduling fails.`, func() {
|
| - merr := make(errors.MultiError, len(names))
|
| - merr[0] = errors.New("test error")
|
| + Convey(`Will not schedule tasks if task scheduling fails.`, func() {
|
| + tap.Err = errors.New("test error")
|
|
|
| - c, fb := featureBreaker.FilterTQ(c, nil)
|
| - tb.Context = c
|
| - fb.BreakFeatures(merr, "AddMulti")
|
| + // Ensure that all of these tasks get added to the task queue.
|
| + resp, err := http.Get(endpoint)
|
| + So(err, ShouldBeNil)
|
| + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| + So(tap.StreamNames(), ShouldResemble, []string{})
|
|
|
| - // Ensure that all of these tasks get added to the task queue.
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
|
| - So(err, ShouldBeNil)
|
| - So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
| - })
|
| + Convey(`And will schedule streams next run.`, func() {
|
| + tap.Err = nil
|
|
|
| - Convey(`And a purge endpoint hit will purge the tasks.`, func() {
|
| - resp, err := http.Get(fmt.Sprintf("%s/archive/cron/purge", s.URL))
|
| + resp, err := http.Get(endpoint)
|
| So(err, ShouldBeNil)
|
| So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
| - So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks)
|
| + So(tap.StreamNames(), ShouldResemble, names)
|
| })
|
| })
|
| })
|
|
|