Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(527)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 "time" 10 "time"
11 11
12 "github.com/luci/gae/filter/featureBreaker" 12 "github.com/luci/gae/filter/featureBreaker"
13 "github.com/luci/gae/impl/memory" 13 "github.com/luci/gae/impl/memory"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/luci-go/appengine/logdog/coordinator" 15 "github.com/luci/luci-go/appengine/logdog/coordinator"
16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
18 "github.com/luci/luci-go/common/clock/testclock" 18 "github.com/luci/luci-go/common/clock/testclock"
19 » "github.com/luci/luci-go/common/proto/logdog/svcconfig" 19 » "github.com/luci/luci-go/common/proto/google"
20 "github.com/luci/luci-go/server/auth" 20 "github.com/luci/luci-go/server/auth"
21 "github.com/luci/luci-go/server/auth/authtest" 21 "github.com/luci/luci-go/server/auth/authtest"
22 "golang.org/x/net/context" 22 "golang.org/x/net/context"
23 23
24 . "github.com/luci/luci-go/common/testing/assertions" 24 . "github.com/luci/luci-go/common/testing/assertions"
25 . "github.com/smartystreets/goconvey/convey" 25 . "github.com/smartystreets/goconvey/convey"
26 ) 26 )
27 27
28 func TestTerminateStream(t *testing.T) { 28 func TestTerminateStream(t *testing.T) {
29 t.Parallel() 29 t.Parallel()
30 30
31 Convey(`With a testing configuration`, t, func() { 31 Convey(`With a testing configuration`, t, func() {
32 » » c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 32 » » c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
33 c = memory.Use(c) 33 c = memory.Use(c)
34 » » be := Server{} 34
35 » » var tap ct.ArchivalPublisher
36 » » svcStub := ct.Services{
37 » » » AP: func() (coordinator.ArchivalPublisher, error) {
38 » » » » return &tap, nil
39 » » » },
40 » » }
41 » » svcStub.InitConfig()
42 » » svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi ces"
43 » » svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/ topics/archive"
44 » » svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.Ne wDuration(10 * time.Second)
45 » » svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDu ration(24 * time.Hour)
46
47 » » be := Server{
48 » » » ServiceBase: coordinator.ServiceBase{&svcStub},
49 » » }
35 50
36 desc := ct.TestLogStreamDescriptor(c, "foo/bar") 51 desc := ct.TestLogStreamDescriptor(c, "foo/bar")
37 ls := ct.TestLogStream(c, desc) 52 ls := ct.TestLogStream(c, desc)
38 53
39 req := logdog.TerminateStreamRequest{ 54 req := logdog.TerminateStreamRequest{
40 Path: "testing/+/foo/bar", 55 Path: "testing/+/foo/bar",
41 Secret: ls.Secret, 56 Secret: ls.Secret,
42 TerminalIndex: 1337, 57 TerminalIndex: 1337,
43 } 58 }
44 59
45 c = ct.UseConfig(c, &svcconfig.Coordinator{
46 ServiceAuthGroup: "test-services",
47 })
48 fs := authtest.FakeState{} 60 fs := authtest.FakeState{}
49 c = auth.WithState(c, &fs) 61 c = auth.WithState(c, &fs)
50 62
51 Convey(`Returns Forbidden error if not a service.`, func() { 63 Convey(`Returns Forbidden error if not a service.`, func() {
52 _, err := be.TerminateStream(c, &req) 64 _, err := be.TerminateStream(c, &req)
53 So(err, ShouldBeRPCPermissionDenied) 65 So(err, ShouldBeRPCPermissionDenied)
54 }) 66 })
55 67
56 Convey(`When logged in as a service`, func() { 68 Convey(`When logged in as a service`, func() {
57 fs.IdentityGroups = []string{"test-services"} 69 fs.IdentityGroups = []string{"test-services"}
58 70
59 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() { 71 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() {
60 » » » » So(ls.Put(ds.Get(c)), ShouldBeNil) 72 » » » » So(ds.Get(c).Put(ls), ShouldBeNil)
61 » » » » tc.Add(time.Second)
62 73
63 » » » » Convey(`Can be marked terminal.`, func() { 74 » » » » Convey(`Can be marked terminal and schedules an archival task.`, func() {
64 _, err := be.TerminateStream(c, &req) 75 _, err := be.TerminateStream(c, &req)
65 So(err, ShouldBeRPCOK) 76 So(err, ShouldBeRPCOK)
66 77
67 // Reload "ls" and confirm. 78 // Reload "ls" and confirm.
68 So(ds.Get(c).Get(ls), ShouldBeNil) 79 So(ds.Get(c).Get(ls), ShouldBeNil)
69 So(ls.TerminalIndex, ShouldEqual, 1337) 80 So(ls.TerminalIndex, ShouldEqual, 1337)
70 » » » » » So(ls.State, ShouldEqual, coordinator.LS Terminated) 81 » » » » » So(ls.State, ShouldEqual, coordinator.LS ArchiveTasked)
71 » » » » » So(ls.Updated, ShouldResemble, ls.Create d.Add(time.Second)) 82 » » » » » So(ls.Terminated(), ShouldBeTrue)
83 » » » » » So(tap.StreamNames(), ShouldResemble, [] string{ls.Name})
84
85 » » » » » // Assert that all archive tasks are sch eduled ArchiveSettleDelay in
86 » » » » » // the future.
87 » » » » » for _, t := range tap.Tasks() {
88 » » » » » » So(t.SettleDelay.Duration(), Sho uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration())
89 » » » » » » So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration())
90 » » » » » }
72 91
73 Convey(`Can be marked terminal again (id empotent).`, func() { 92 Convey(`Can be marked terminal again (id empotent).`, func() {
74 _, err := be.TerminateStream(c, &req) 93 _, err := be.TerminateStream(c, &req)
75 So(err, ShouldBeRPCOK) 94 So(err, ShouldBeRPCOK)
76 95
77 // Reload "ls" and confirm. 96 // Reload "ls" and confirm.
78 So(ds.Get(c).Get(ls), ShouldBeNi l) 97 So(ds.Get(c).Get(ls), ShouldBeNi l)
98
99 So(ls.Terminated(), ShouldBeTrue )
79 So(ls.TerminalIndex, ShouldEqual , 1337) 100 So(ls.TerminalIndex, ShouldEqual , 1337)
80 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 101 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
102 » » » » » » So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
81 }) 103 })
82 104
83 Convey(`Will reject attempts to change t he terminal index.`, func() { 105 Convey(`Will reject attempts to change t he terminal index.`, func() {
84 req.TerminalIndex = 1338 106 req.TerminalIndex = 1338
85 _, err := be.TerminateStream(c, &req) 107 _, err := be.TerminateStream(c, &req)
86 » » » » » » So(err, ShouldBeRPCAlreadyExists , "Terminal index is already set") 108 » » » » » » So(err, ShouldBeRPCFailedPrecond ition, "Log stream is not in streaming state.")
87 109
88 // Reload "ls" and confirm. 110 // Reload "ls" and confirm.
89 So(ds.Get(c).Get(ls), ShouldBeNi l) 111 So(ds.Get(c).Get(ls), ShouldBeNi l)
90 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 112
113 » » » » » » So(ls.Terminated(), ShouldBeTrue )
114 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
91 So(ls.TerminalIndex, ShouldEqual , 1337) 115 So(ls.TerminalIndex, ShouldEqual , 1337)
116 So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
92 }) 117 })
93 118
94 Convey(`Will reject attempts to clear th e terminal index.`, func() { 119 Convey(`Will reject attempts to clear th e terminal index.`, func() {
95 req.TerminalIndex = -1 120 req.TerminalIndex = -1
96 _, err := be.TerminateStream(c, &req) 121 _, err := be.TerminateStream(c, &req)
97 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.") 122 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.")
98 123
99 // Reload "ls" and confirm. 124 // Reload "ls" and confirm.
100 So(ds.Get(c).Get(ls), ShouldBeNi l) 125 So(ds.Get(c).Get(ls), ShouldBeNi l)
101 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 126
127 » » » » » » So(ls.Terminated(), ShouldBeTrue )
128 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
102 So(ls.TerminalIndex, ShouldEqual , 1337) 129 So(ls.TerminalIndex, ShouldEqual , 1337)
130 So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
103 }) 131 })
104 }) 132 })
105 133
106 Convey(`Will return an internal server error if Put() fails.`, func() { 134 Convey(`Will return an internal server error if Put() fails.`, func() {
107 c, fb := featureBreaker.FilterRDS(c, nil ) 135 c, fb := featureBreaker.FilterRDS(c, nil )
108 fb.BreakFeatures(errors.New("test error" ), "PutMulti") 136 fb.BreakFeatures(errors.New("test error" ), "PutMulti")
109 _, err := be.TerminateStream(c, &req) 137 _, err := be.TerminateStream(c, &req)
110 So(err, ShouldBeRPCInternal) 138 So(err, ShouldBeRPCInternal)
111 }) 139 })
112 140
(...skipping 17 matching lines...) Expand all
130 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h") 158 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h")
131 }) 159 })
132 160
133 Convey(`Will fail if the stream is not registered.`, fun c() { 161 Convey(`Will fail if the stream is not registered.`, fun c() {
134 _, err := be.TerminateStream(c, &req) 162 _, err := be.TerminateStream(c, &req)
135 So(err, ShouldBeRPCNotFound, "is not registered" ) 163 So(err, ShouldBeRPCNotFound, "is not registered" )
136 }) 164 })
137 }) 165 })
138 }) 166 })
139 } 167 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698