| Index: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| index 8750e9396a8c6b0ac89660fb3490ab2f1727bee1..7dcae214899ad78fb2d470dacc0a8469f6b16551 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| @@ -16,7 +16,7 @@ import (
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| - "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| "golang.org/x/net/context"
|
| @@ -29,9 +29,24 @@ func TestTerminateStream(t *testing.T) {
|
| t.Parallel()
|
|
|
| Convey(`With a testing configuration`, t, func() {
|
| - c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
| + c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
| c = memory.Use(c)
|
| - be := Server{}
|
| +
|
| + var tap ct.ArchivalPublisher
|
| + svcStub := ct.Services{
|
| + AP: func() (coordinator.ArchivalPublisher, error) {
|
| + return &tap, nil
|
| + },
|
| + }
|
| + svcStub.InitConfig()
|
| + svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-services"
|
| + svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/topics/archive"
|
| + svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.NewDuration(10 * time.Second)
|
| + svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(24 * time.Hour)
|
| +
|
| + be := Server{
|
| + ServiceBase: coordinator.ServiceBase{&svcStub},
|
| + }
|
|
|
| desc := ct.TestLogStreamDescriptor(c, "foo/bar")
|
| ls := ct.TestLogStream(c, desc)
|
| @@ -42,9 +57,6 @@ func TestTerminateStream(t *testing.T) {
|
| TerminalIndex: 1337,
|
| }
|
|
|
| - c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| - ServiceAuthGroup: "test-services",
|
| - })
|
| fs := authtest.FakeState{}
|
| c = auth.WithState(c, &fs)
|
|
|
| @@ -57,18 +69,25 @@ func TestTerminateStream(t *testing.T) {
|
| fs.IdentityGroups = []string{"test-services"}
|
|
|
| Convey(`A non-terminal registered stream, "testing/+/foo/bar"`, func() {
|
| - So(ls.Put(ds.Get(c)), ShouldBeNil)
|
| - tc.Add(time.Second)
|
| + So(ds.Get(c).Put(ls), ShouldBeNil)
|
|
|
| - Convey(`Can be marked terminal.`, func() {
|
| + Convey(`Can be marked terminal and schedules an archival task.`, func() {
|
| _, err := be.TerminateStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| - So(ls.Updated, ShouldResemble, ls.Created.Add(time.Second))
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| +
|
| + // Assert that all archive tasks are scheduled ArchiveSettleDelay in
|
| + // the future.
|
| + for _, t := range tap.Tasks() {
|
| + So(t.SettleDelay.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration())
|
| + So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration())
|
| + }
|
|
|
| Convey(`Can be marked terminal again (idempotent).`, func() {
|
| _, err := be.TerminateStream(c, &req)
|
| @@ -76,19 +95,25 @@ func TestTerminateStream(t *testing.T) {
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| + So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to change the terminal index.`, func() {
|
| req.TerminalIndex = 1338
|
| _, err := be.TerminateStream(c, &req)
|
| - So(err, ShouldBeRPCAlreadyExists, "Terminal index is already set")
|
| + So(err, ShouldBeRPCFailedPrecondition, "Log stream is not in streaming state.")
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| + So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to clear the terminal index.`, func() {
|
| @@ -98,8 +123,11 @@ func TestTerminateStream(t *testing.T) {
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| + So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
| })
|
|
|
|
|