Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1269)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 10
11 "github.com/luci/gae/filter/featureBreaker" 11 "github.com/luci/gae/filter/featureBreaker"
12 "github.com/luci/gae/impl/memory" 12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/appengine/logdog/coordinator" 14 "github.com/luci/luci-go/appengine/logdog/coordinator"
15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 » "github.com/luci/luci-go/common/proto/logdog/svcconfig" 17 » "github.com/luci/luci-go/common/clock/testclock"
18 "github.com/luci/luci-go/server/auth" 18 "github.com/luci/luci-go/server/auth"
19 "github.com/luci/luci-go/server/auth/authtest" 19 "github.com/luci/luci-go/server/auth/authtest"
20 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 21
22 . "github.com/luci/luci-go/common/testing/assertions" 22 . "github.com/luci/luci-go/common/testing/assertions"
23 . "github.com/smartystreets/goconvey/convey" 23 . "github.com/smartystreets/goconvey/convey"
24 ) 24 )
25 25
26 func TestArchiveStream(t *testing.T) { 26 func TestArchiveStream(t *testing.T) {
27 t.Parallel() 27 t.Parallel()
28 28
29 Convey(`With a testing configuration`, t, func() { 29 Convey(`With a testing configuration`, t, func() {
30 » » c := memory.Use(context.Background()) 30 » » c, tc := testclock.UseTime(context.Background(), testclock.TestT imeUTC)
31 » » be := Server{} 31 » » c = memory.Use(c)
32 32
33 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 33 » » svcStub := ct.Services{}
34 » » » ServiceAuthGroup: "test-services", 34 » » svcStub.InitConfig()
35 » » }) 35 » » svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi ces"
36
37 » » be := Server{
38 » » » ServiceBase: coordinator.ServiceBase{&svcStub},
39 » » }
40
41 » » now := ds.RoundTime(tc.Now().UTC())
42
36 fs := authtest.FakeState{} 43 fs := authtest.FakeState{}
37 c = auth.WithState(c, &fs) 44 c = auth.WithState(c, &fs)
38 45
39 » » // Register a testing log stream (not archived). 46 » » // Register a testing log stream with an archive tasked.
40 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo")) 47 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo"))
41 » » if err := ls.Put(ds.Get(c)); err != nil { 48 » » ls.State = coordinator.LSArchiveTasked
49 » » ls.ArchivalKey = []byte("archival key")
50 » » if err := ds.Get(c).Put(ls); err != nil {
42 panic(err) 51 panic(err)
43 } 52 }
44 53
45 req := &logdog.ArchiveStreamRequest{ 54 req := &logdog.ArchiveStreamRequest{
46 Path: string(ls.Path()), 55 Path: string(ls.Path()),
47 Complete: true,
48 TerminalIndex: 13, 56 TerminalIndex: 13,
57 LogEntryCount: 14,
49 StreamUrl: "gs://fake.stream", 58 StreamUrl: "gs://fake.stream",
50 StreamSize: 10, 59 StreamSize: 10,
51 IndexUrl: "gs://fake.index", 60 IndexUrl: "gs://fake.index",
52 IndexSize: 20, 61 IndexSize: 20,
53 DataUrl: "gs://fake.data", 62 DataUrl: "gs://fake.data",
54 DataSize: 30, 63 DataSize: 30,
55 } 64 }
56 65
57 Convey(`Returns Forbidden error if not a service.`, func() { 66 Convey(`Returns Forbidden error if not a service.`, func() {
58 _, err := be.ArchiveStream(c, req) 67 _, err := be.ArchiveStream(c, req)
59 So(err, ShouldBeRPCPermissionDenied) 68 So(err, ShouldBeRPCPermissionDenied)
60 }) 69 })
61 70
62 Convey(`When logged in as a service`, func() { 71 Convey(`When logged in as a service`, func() {
63 fs.IdentityGroups = []string{"test-services"} 72 fs.IdentityGroups = []string{"test-services"}
64 73
65 Convey(`Will mark the stream as archived.`, func() { 74 Convey(`Will mark the stream as archived.`, func() {
66 _, err := be.ArchiveStream(c, req) 75 _, err := be.ArchiveStream(c, req)
67 So(err, ShouldBeNil) 76 So(err, ShouldBeNil)
68 77
69 So(ds.Get(c).Get(ls), ShouldBeNil) 78 So(ds.Get(c).Get(ls), ShouldBeNil)
79 So(ls.Terminated(), ShouldBeTrue)
70 So(ls.Archived(), ShouldBeTrue) 80 So(ls.Archived(), ShouldBeTrue)
71 » » » » So(ls.ArchiveWhole, ShouldBeTrue) 81 » » » » So(ls.ArchiveComplete(), ShouldBeTrue)
82
83 » » » » So(ls.State, ShouldEqual, coordinator.LSArchived )
84 » » » » So(ls.ArchivalKey, ShouldBeNil)
85 » » » » So(ls.TerminatedTime, ShouldResemble, now)
86 » » » » So(ls.ArchivedTime, ShouldResemble, now)
72 So(ls.TerminalIndex, ShouldEqual, 13) 87 So(ls.TerminalIndex, ShouldEqual, 13)
88 So(ls.ArchiveLogEntryCount, ShouldEqual, 14)
73 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream") 89 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
74 So(ls.ArchiveStreamSize, ShouldEqual, 10) 90 So(ls.ArchiveStreamSize, ShouldEqual, 10)
75 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex") 91 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
92 So(ls.ArchiveIndexSize, ShouldEqual, 20)
93 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
94 So(ls.ArchiveDataSize, ShouldEqual, 30)
95 })
96
97 Convey(`Will mark the stream as partially archived if no t complete.`, func() {
98 req.LogEntryCount = 13
99
100 _, err := be.ArchiveStream(c, req)
101 So(err, ShouldBeNil)
102
103 So(ds.Get(c).Get(ls), ShouldBeNil)
104 So(ls.Terminated(), ShouldBeTrue)
105 So(ls.Archived(), ShouldBeTrue)
106 So(ls.ArchiveComplete(), ShouldBeFalse)
107
108 So(ls.State, ShouldEqual, coordinator.LSArchived )
109 So(ls.ArchivalKey, ShouldBeNil)
110 So(ls.TerminatedTime, ShouldResemble, now)
111 So(ls.ArchivedTime, ShouldResemble, now)
112 So(ls.TerminalIndex, ShouldEqual, 13)
113 So(ls.ArchiveLogEntryCount, ShouldEqual, 13)
114 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
115 So(ls.ArchiveStreamSize, ShouldEqual, 10)
116 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
76 So(ls.ArchiveIndexSize, ShouldEqual, 20) 117 So(ls.ArchiveIndexSize, ShouldEqual, 20)
77 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta") 118 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
78 So(ls.ArchiveDataSize, ShouldEqual, 30) 119 So(ls.ArchiveDataSize, ShouldEqual, 30)
79 }) 120 })
80 121
81 Convey(`Will refuse to process an invalid stream path.`, func() { 122 Convey(`Will refuse to process an invalid stream path.`, func() {
82 req.Path = "!!!invalid!!!" 123 req.Path = "!!!invalid!!!"
83 _, err := be.ArchiveStream(c, req) 124 _, err := be.ArchiveStream(c, req)
84 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path") 125 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path")
85 }) 126 })
86 127
87 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() { 128 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() {
88 req.IndexUrl = "" 129 req.IndexUrl = ""
89 130
90 _, err := be.ArchiveStream(c, req) 131 _, err := be.ArchiveStream(c, req)
91 So(err, ShouldBeRPCInvalidArgument) 132 So(err, ShouldBeRPCInvalidArgument)
92 }) 133 })
93 134
94 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() { 135 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() {
95 req.StreamUrl = "" 136 req.StreamUrl = ""
96 137
97 _, err := be.ArchiveStream(c, req) 138 _, err := be.ArchiveStream(c, req)
98 So(err, ShouldBeRPCInvalidArgument) 139 So(err, ShouldBeRPCInvalidArgument)
99 }) 140 })
100 141
101 Convey(`If stream is already archived, will not update a nd return success.`, func() { 142 Convey(`If stream is already archived, will not update a nd return success.`, func() {
102 ls.State = coordinator.LSArchived 143 ls.State = coordinator.LSArchived
103 ls.TerminalIndex = 1337 144 ls.TerminalIndex = 1337
145 ls.ArchiveLogEntryCount = 42
146 ls.ArchivedTime = now
147 ls.TerminatedTime = now
148 So(ds.Get(c).Put(ls), ShouldBeNil)
149 So(ls.Terminated(), ShouldBeTrue)
104 So(ls.Archived(), ShouldBeTrue) 150 So(ls.Archived(), ShouldBeTrue)
105 So(ls.Put(ds.Get(c)), ShouldBeNil)
106 151
107 _, err := be.ArchiveStream(c, req) 152 _, err := be.ArchiveStream(c, req)
108 So(err, ShouldBeNil) 153 So(err, ShouldBeNil)
109 154
110 ls.TerminalIndex = -1 // To make sure it reloade d. 155 ls.TerminalIndex = -1 // To make sure it reloade d.
111 So(ds.Get(c).Get(ls), ShouldBeNil) 156 So(ds.Get(c).Get(ls), ShouldBeNil)
157 So(ls.Terminated(), ShouldBeTrue)
112 So(ls.Archived(), ShouldBeTrue) 158 So(ls.Archived(), ShouldBeTrue)
159
160 So(ls.State, ShouldEqual, coordinator.LSArchived )
113 So(ls.TerminalIndex, ShouldEqual, 1337) 161 So(ls.TerminalIndex, ShouldEqual, 1337)
162 So(ls.ArchiveLogEntryCount, ShouldEqual, 42)
163 })
164
165 Convey(`If the archive has failed, it is archived as an empty stream.`, func() {
166 req.Error = "archive error"
167
168 _, err := be.ArchiveStream(c, req)
169 So(err, ShouldBeNil)
170 So(ds.Get(c).Get(ls), ShouldBeNil)
171 So(ls.Archived(), ShouldBeTrue)
172
173 So(ls.State, ShouldEqual, coordinator.LSArchived )
174 So(ls.ArchivalKey, ShouldBeNil)
175 So(ls.TerminalIndex, ShouldEqual, -1)
176 So(ls.ArchiveLogEntryCount, ShouldEqual, 0)
114 }) 177 })
115 178
116 Convey(`When datastore Get fails, returns internal error .`, func() { 179 Convey(`When datastore Get fails, returns internal error .`, func() {
117 c, fb := featureBreaker.FilterRDS(c, nil) 180 c, fb := featureBreaker.FilterRDS(c, nil)
118 fb.BreakFeatures(errors.New("test error"), "GetM ulti") 181 fb.BreakFeatures(errors.New("test error"), "GetM ulti")
119 182
120 _, err := be.ArchiveStream(c, req) 183 _, err := be.ArchiveStream(c, req)
121 So(err, ShouldBeRPCInternal) 184 So(err, ShouldBeRPCInternal)
122 }) 185 })
123 186
124 Convey(`When datastore Put fails, returns internal error .`, func() { 187 Convey(`When datastore Put fails, returns internal error .`, func() {
125 c, fb := featureBreaker.FilterRDS(c, nil) 188 c, fb := featureBreaker.FilterRDS(c, nil)
126 fb.BreakFeatures(errors.New("test error"), "PutM ulti") 189 fb.BreakFeatures(errors.New("test error"), "PutM ulti")
127 190
128 _, err := be.ArchiveStream(c, req) 191 _, err := be.ArchiveStream(c, req)
129 So(err, ShouldBeRPCInternal) 192 So(err, ShouldBeRPCInternal)
130 }) 193 })
131 }) 194 })
132 }) 195 })
133 } 196 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698