Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2327)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/backend/archiveCron_test.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron_test.go b/appengine/logdog/coordinator/backend/archiveCron_test.go
index 88e45352d3a2a30ec23ec3ebc58597edec2d8aa8..331824396e0f0d6e7552a8269f8a0e75791bcc3a 100644
--- a/appengine/logdog/coordinator/backend/archiveCron_test.go
+++ b/appengine/logdog/coordinator/backend/archiveCron_test.go
@@ -12,9 +12,7 @@ import (
"time"
"github.com/julienschmidt/httprouter"
- "github.com/luci/gae/filter/featureBreaker"
ds "github.com/luci/gae/service/datastore"
- tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/gaetesting"
"github.com/luci/luci-go/appengine/logdog/coordinator"
ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
@@ -22,7 +20,6 @@ import (
"github.com/luci/luci-go/common/clock/testclock"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/proto/google"
- "github.com/luci/luci-go/common/proto/logdog/svcconfig"
. "github.com/smartystreets/goconvey/convey"
)
@@ -40,39 +37,49 @@ func TestHandleArchiveCron(t *testing.T) {
Kind: "LogStream",
SortBy: []ds.IndexColumn{
{Property: "State"},
- {Property: "Updated"},
+ {Property: "Created", Descending: true},
},
},
)
// Install a base set of streams.
+ //
+ // If the stream was created >= 10min ago, it will be candidate for
+ // archival.
+ now := clock.Now(c)
for _, v := range []struct {
- name string
- d time.Duration
- term bool
+ name string
+ d time.Duration // Time before "now" for this streams Created.
+ state coordinator.LogStreamState
}{
- {"foo", 0, true}, // Not candidate for archival (too soon).
- {"bar", 10 * time.Minute, true}, // Candidate for archival.
- {"baz", 10 * time.Minute, false}, // Not candidate for archival (not terminal).
- {"qux", 24 * time.Hour, false}, // Candidate for non-terminal archival.
+ {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
+ {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
+ {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
+ {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
+ {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
} {
ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
// The entry was created a week ago.
- ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour))
- ls.Updated = clock.Now(c).Add(-v.d)
- if v.term {
- ls.State = coordinator.LSTerminated
- }
- if err := ls.Put(ds.Get(c)); err != nil {
+ ls.Created = now.Add(-v.d)
+ ls.State = v.state
+ ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
+ ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
+ if err := ds.Get(c).Put(ls); err != nil {
panic(err)
}
}
ds.Get(c).Testable().CatchupIndexes()
// This allows us to update our Context in test setup.
+ tap := ct.ArchivalPublisher{}
+ svcStub := ct.Services{
+ AP: func() (coordinator.ArchivalPublisher, error) {
+ return &tap, nil
+ },
+ }
b := Backend{
- multiTaskBatchSize: 5,
+ ServiceBase: coordinator.ServiceBase{&svcStub},
}
tb := testBase{Context: c}
@@ -82,156 +89,106 @@ func TestHandleArchiveCron(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()
- Convey(`With no configuration loaded`, func() {
- for _, ep := range []string{"terminal", "nonterminal"} {
- Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
- }
- })
-
- Convey(`With a configuration loaded`, func() {
- qName := "archive-test-queue"
- c = ct.UseConfig(c, &svcconfig.Coordinator{
- ArchiveDelay: google.NewDuration(10 * time.Minute),
- ArchiveDelayMax: google.NewDuration(24 * time.Hour),
- ArchiveTaskQueue: qName,
- })
- tb.Context = c
+ endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
- Convey(`With no task queue configured`, func() {
- for _, ep := range []string{"terminal", "nonterminal", "purge"} {
- Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
- }
- })
+ Convey(`With no configuration loaded, the endpoint will fail.`, func() {
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
+ })
- Convey(`With a task queue configured`, func() {
- tq.Get(c).Testable().CreateQueue(qName)
+ Convey(`With no task topic configured, the endpoint will fail.`, func() {
+ svcStub.InitConfig()
- Convey(`A terminal endpoint hit will be successful and idempotent.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- // Candidate tasks should be scheduled.
- tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks, shouldHaveTasks, archiveTaskName("testing/+/bar"))
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
+ })
- // Hit the endpoint again, the same tasks should be scheduled.
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ Convey(`With a configuration loaded`, func() {
+ svcStub.InitConfig()
+ svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/topics/archive-test-topic"
+ svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.NewDuration(10 * time.Second)
+ svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(10 * time.Minute)
+
+ Convey(`A request to the endpoint will be successful.`, func() {
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusOK)
+
+ // Candidate tasks should be scheduled.
+ So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"})
+
+ // Hit the endpoint again, no additional tasks should be scheduled.
+ Convey(`A subsequent endpoint hit will not schedule any additional tasks.`, func() {
+ resp, err = http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks2, ShouldResemble, tasks)
+ So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"})
})
+ })
- Convey(`A non-terminal endpoint hit will be successful and idempotent.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- // Candidate tasks should be scheduled.
- tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks, shouldHaveTasks, archiveTaskName("testing/+/qux"))
-
- // Hit the endpoint again, the same tasks should be scheduled.
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
+ Convey(`When scheduling multiple tasks`, func() {
+ b.multiTaskBatchSize = 5
+
+ // Create a lot of archival candidate tasks to schedule.
+ //
+ // Note that since task queue names are returned sorted, we want to
+ // name these streams greater than our stock stream names.
+ names := []string{"bar", "quux"}
+ for i := 0; i < 11; i++ {
+ ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i)))
+
+ ls.Created = now.Add(-(10 * time.Minute))
+ ls.State = coordinator.LSStreaming
+ ls.TerminalIndex = 1337
+ ls.TerminatedTime = now
+ if err := ds.Get(c).Put(ls); err != nil {
+ panic(err)
+ }
- tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks2, ShouldResemble, tasks)
- })
+ names = append(names, ls.Name)
+ }
+ ds.Get(c).Testable().CatchupIndexes()
- Convey(`A terminal endpoint hit followed by a non-terminal endpoint hit will be successful.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ Convey(`Will schedule all pages properly.`, func() {
+ // Ensure that all of these tasks get added to the task queue.
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar"))
+ So(tap.StreamNames(), ShouldResemble, names)
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks,
- archiveTaskName("testing/+/bar"), archiveTaskName("testing/+/qux"))
- })
-
- Convey(`When scheduling multiple tasks`, func() {
- // Create a lot of archival candidate tasks to schedule.
- var names []string
- for i := 0; i < 11; i++ {
- ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i)))
-
- ls.Created = clock.Now(c).Add(-(10 * time.Minute))
- ls.Updated = ls.Created
- ls.State = coordinator.LSTerminated
- ls.TerminalIndex = 1337
- if err := ls.Put(ds.Get(c)); err != nil {
- panic(err)
- }
-
- names = append(names, ls.Name)
+ // Use this opportunity to assert that none of the scheduled streams
+ // have any settle or completion delay.
+ for _, at := range tap.Tasks() {
+ So(at.SettleDelay.Duration(), ShouldEqual, 0)
+ So(at.CompletePeriod.Duration(), ShouldEqual, 0)
}
- names = append(names, "bar")
- ds.Get(c).Testable().CatchupIndexes()
-
- Convey(`Will schedule all pages properly.`, func() {
- taskNames := make([]interface{}, len(names))
- for i, n := range names {
- taskNames[i] = archiveTaskName(fmt.Sprintf("testing/+/%s", n))
- }
- // Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ Convey(`Will not schedule additional tasks on the next run.`, func() {
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
-
- Convey(`Will be successful when rescheduling the same tasks.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
- })
- })
-
- Convey(`Will return an error if task scheduling fails.`, func() {
- c, fb := featureBreaker.FilterTQ(c, nil)
- tb.Context = c
- fb.BreakFeatures(errors.New("test error"), "AddMulti")
-
- // Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
+ So(tap.StreamNames(), ShouldResemble, names)
})
+ })
- Convey(`Will return an error if a single task scheduling fails.`, func() {
- merr := make(errors.MultiError, len(names))
- merr[0] = errors.New("test error")
+ Convey(`Will not schedule tasks if task scheduling fails.`, func() {
+ tap.Err = errors.New("test error")
- c, fb := featureBreaker.FilterTQ(c, nil)
- tb.Context = c
- fb.BreakFeatures(merr, "AddMulti")
+ // Ensure that all of these tasks get added to the task queue.
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
+ So(tap.StreamNames(), ShouldResemble, []string{})
- // Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
+ Convey(`And will schedule streams next run.`, func() {
+ tap.Err = nil
- Convey(`And a purge endpoint hit will purge the tasks.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/purge", s.URL))
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks)
+ So(tap.StreamNames(), ShouldResemble, names)
})
})
})
« no previous file with comments | « appengine/logdog/coordinator/backend/archiveCron.go ('k') | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698