| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "testing" | 9 "testing" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/gae/filter/featureBreaker" | 12 "github.com/luci/gae/filter/featureBreaker" |
| 13 "github.com/luci/gae/impl/memory" | 13 "github.com/luci/gae/impl/memory" |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 "github.com/luci/luci-go/appengine/logdog/coordinator" | 15 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 18 "github.com/luci/luci-go/common/clock/testclock" | 18 "github.com/luci/luci-go/common/clock/testclock" |
| 19 » "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 19 » "github.com/luci/luci-go/common/proto/google" |
| 20 "github.com/luci/luci-go/server/auth" | 20 "github.com/luci/luci-go/server/auth" |
| 21 "github.com/luci/luci-go/server/auth/authtest" | 21 "github.com/luci/luci-go/server/auth/authtest" |
| 22 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 23 | 23 |
| 24 . "github.com/luci/luci-go/common/testing/assertions" | 24 . "github.com/luci/luci-go/common/testing/assertions" |
| 25 . "github.com/smartystreets/goconvey/convey" | 25 . "github.com/smartystreets/goconvey/convey" |
| 26 ) | 26 ) |
| 27 | 27 |
| 28 func TestTerminateStream(t *testing.T) { | 28 func TestTerminateStream(t *testing.T) { |
| 29 t.Parallel() | 29 t.Parallel() |
| 30 | 30 |
| 31 Convey(`With a testing configuration`, t, func() { | 31 Convey(`With a testing configuration`, t, func() { |
| 32 » » c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 32 » » c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) |
| 33 c = memory.Use(c) | 33 c = memory.Use(c) |
| 34 » » be := Server{} | 34 |
| 35 » » var tap ct.ArchivalPublisher |
| 36 » » svcStub := ct.Services{ |
| 37 » » » AP: func() (coordinator.ArchivalPublisher, error) { |
| 38 » » » » return &tap, nil |
| 39 » » » }, |
| 40 » » } |
| 41 » » svcStub.InitConfig() |
| 42 » » svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" |
| 43 » » svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/
topics/archive" |
| 44 » » svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.Ne
wDuration(10 * time.Second) |
| 45 » » svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDu
ration(24 * time.Hour) |
| 46 |
| 47 » » be := Server{ |
| 48 » » » ServiceBase: coordinator.ServiceBase{&svcStub}, |
| 49 » » } |
| 35 | 50 |
| 36 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 51 desc := ct.TestLogStreamDescriptor(c, "foo/bar") |
| 37 ls := ct.TestLogStream(c, desc) | 52 ls := ct.TestLogStream(c, desc) |
| 38 | 53 |
| 39 req := logdog.TerminateStreamRequest{ | 54 req := logdog.TerminateStreamRequest{ |
| 40 Path: "testing/+/foo/bar", | 55 Path: "testing/+/foo/bar", |
| 41 Secret: ls.Secret, | 56 Secret: ls.Secret, |
| 42 TerminalIndex: 1337, | 57 TerminalIndex: 1337, |
| 43 } | 58 } |
| 44 | 59 |
| 45 c = ct.UseConfig(c, &svcconfig.Coordinator{ | |
| 46 ServiceAuthGroup: "test-services", | |
| 47 }) | |
| 48 fs := authtest.FakeState{} | 60 fs := authtest.FakeState{} |
| 49 c = auth.WithState(c, &fs) | 61 c = auth.WithState(c, &fs) |
| 50 | 62 |
| 51 Convey(`Returns Forbidden error if not a service.`, func() { | 63 Convey(`Returns Forbidden error if not a service.`, func() { |
| 52 _, err := be.TerminateStream(c, &req) | 64 _, err := be.TerminateStream(c, &req) |
| 53 So(err, ShouldBeRPCPermissionDenied) | 65 So(err, ShouldBeRPCPermissionDenied) |
| 54 }) | 66 }) |
| 55 | 67 |
| 56 Convey(`When logged in as a service`, func() { | 68 Convey(`When logged in as a service`, func() { |
| 57 fs.IdentityGroups = []string{"test-services"} | 69 fs.IdentityGroups = []string{"test-services"} |
| 58 | 70 |
| 59 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { | 71 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { |
| 60 » » » » So(ls.Put(ds.Get(c)), ShouldBeNil) | 72 » » » » So(ds.Get(c).Put(ls), ShouldBeNil) |
| 61 » » » » tc.Add(time.Second) | |
| 62 | 73 |
| 63 » » » » Convey(`Can be marked terminal.`, func() { | 74 » » » » Convey(`Can be marked terminal and schedules an
archival task.`, func() { |
| 64 _, err := be.TerminateStream(c, &req) | 75 _, err := be.TerminateStream(c, &req) |
| 65 So(err, ShouldBeRPCOK) | 76 So(err, ShouldBeRPCOK) |
| 66 | 77 |
| 67 // Reload "ls" and confirm. | 78 // Reload "ls" and confirm. |
| 68 So(ds.Get(c).Get(ls), ShouldBeNil) | 79 So(ds.Get(c).Get(ls), ShouldBeNil) |
| 69 So(ls.TerminalIndex, ShouldEqual, 1337) | 80 So(ls.TerminalIndex, ShouldEqual, 1337) |
| 70 » » » » » So(ls.State, ShouldEqual, coordinator.LS
Terminated) | 81 » » » » » So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) |
| 71 » » » » » So(ls.Updated, ShouldResemble, ls.Create
d.Add(time.Second)) | 82 » » » » » So(ls.Terminated(), ShouldBeTrue) |
| 83 » » » » » So(tap.StreamNames(), ShouldResemble, []
string{ls.Name}) |
| 84 |
| 85 » » » » » // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
| 86 » » » » » // the future. |
| 87 » » » » » for _, t := range tap.Tasks() { |
| 88 » » » » » » So(t.SettleDelay.Duration(), Sho
uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) |
| 89 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) |
| 90 » » » » » } |
| 72 | 91 |
| 73 Convey(`Can be marked terminal again (id
empotent).`, func() { | 92 Convey(`Can be marked terminal again (id
empotent).`, func() { |
| 74 _, err := be.TerminateStream(c,
&req) | 93 _, err := be.TerminateStream(c,
&req) |
| 75 So(err, ShouldBeRPCOK) | 94 So(err, ShouldBeRPCOK) |
| 76 | 95 |
| 77 // Reload "ls" and confirm. | 96 // Reload "ls" and confirm. |
| 78 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 97 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 98 |
| 99 So(ls.Terminated(), ShouldBeTrue
) |
| 79 So(ls.TerminalIndex, ShouldEqual
, 1337) | 100 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 80 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 101 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 102 » » » » » » So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) |
| 81 }) | 103 }) |
| 82 | 104 |
| 83 Convey(`Will reject attempts to change t
he terminal index.`, func() { | 105 Convey(`Will reject attempts to change t
he terminal index.`, func() { |
| 84 req.TerminalIndex = 1338 | 106 req.TerminalIndex = 1338 |
| 85 _, err := be.TerminateStream(c,
&req) | 107 _, err := be.TerminateStream(c,
&req) |
| 86 » » » » » » So(err, ShouldBeRPCAlreadyExists
, "Terminal index is already set") | 108 » » » » » » So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") |
| 87 | 109 |
| 88 // Reload "ls" and confirm. | 110 // Reload "ls" and confirm. |
| 89 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 111 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 90 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 112 |
| 113 » » » » » » So(ls.Terminated(), ShouldBeTrue
) |
| 114 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 91 So(ls.TerminalIndex, ShouldEqual
, 1337) | 115 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 116 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) |
| 92 }) | 117 }) |
| 93 | 118 |
| 94 Convey(`Will reject attempts to clear th
e terminal index.`, func() { | 119 Convey(`Will reject attempts to clear th
e terminal index.`, func() { |
| 95 req.TerminalIndex = -1 | 120 req.TerminalIndex = -1 |
| 96 _, err := be.TerminateStream(c,
&req) | 121 _, err := be.TerminateStream(c,
&req) |
| 97 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") | 122 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") |
| 98 | 123 |
| 99 // Reload "ls" and confirm. | 124 // Reload "ls" and confirm. |
| 100 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 125 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 101 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 126 |
| 127 » » » » » » So(ls.Terminated(), ShouldBeTrue
) |
| 128 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 102 So(ls.TerminalIndex, ShouldEqual
, 1337) | 129 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 130 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) |
| 103 }) | 131 }) |
| 104 }) | 132 }) |
| 105 | 133 |
| 106 Convey(`Will return an internal server error if
Put() fails.`, func() { | 134 Convey(`Will return an internal server error if
Put() fails.`, func() { |
| 107 c, fb := featureBreaker.FilterRDS(c, nil
) | 135 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 108 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 136 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
| 109 _, err := be.TerminateStream(c, &req) | 137 _, err := be.TerminateStream(c, &req) |
| 110 So(err, ShouldBeRPCInternal) | 138 So(err, ShouldBeRPCInternal) |
| 111 }) | 139 }) |
| 112 | 140 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 130 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") | 158 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") |
| 131 }) | 159 }) |
| 132 | 160 |
| 133 Convey(`Will fail if the stream is not registered.`, fun
c() { | 161 Convey(`Will fail if the stream is not registered.`, fun
c() { |
| 134 _, err := be.TerminateStream(c, &req) | 162 _, err := be.TerminateStream(c, &req) |
| 135 So(err, ShouldBeRPCNotFound, "is not registered"
) | 163 So(err, ShouldBeRPCNotFound, "is not registered"
) |
| 136 }) | 164 }) |
| 137 }) | 165 }) |
| 138 }) | 166 }) |
| 139 } | 167 } |
| OLD | NEW |