Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/query_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "sort" 9 "sort"
10 "testing" 10 "testing"
11 "time" 11 "time"
12 12
13 "github.com/luci/gae/filter/featureBreaker" 13 "github.com/luci/gae/filter/featureBreaker"
14 "github.com/luci/gae/impl/memory" 14 "github.com/luci/gae/impl/memory"
15 ds "github.com/luci/gae/service/datastore" 15 ds "github.com/luci/gae/service/datastore"
16 "github.com/luci/luci-go/appengine/logdog/coordinator" 16 "github.com/luci/luci-go/appengine/logdog/coordinator"
17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
18 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
19 "github.com/luci/luci-go/common/clock/testclock" 19 "github.com/luci/luci-go/common/clock/testclock"
20 "github.com/luci/luci-go/common/errors" 20 "github.com/luci/luci-go/common/errors"
21 "github.com/luci/luci-go/common/logdog/types" 21 "github.com/luci/luci-go/common/logdog/types"
22 "github.com/luci/luci-go/common/proto/google" 22 "github.com/luci/luci-go/common/proto/google"
23 "github.com/luci/luci-go/common/proto/logdog/logpb" 23 "github.com/luci/luci-go/common/proto/logdog/logpb"
24 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
25 "github.com/luci/luci-go/server/auth" 24 "github.com/luci/luci-go/server/auth"
26 "github.com/luci/luci-go/server/auth/authtest" 25 "github.com/luci/luci-go/server/auth/authtest"
27 "golang.org/x/net/context" 26 "golang.org/x/net/context"
28 27
28 "github.com/luci/luci-go/common/logging/gologger"
29
29 . "github.com/luci/luci-go/common/testing/assertions" 30 . "github.com/luci/luci-go/common/testing/assertions"
30 . "github.com/smartystreets/goconvey/convey" 31 . "github.com/smartystreets/goconvey/convey"
31 ) 32 )
32 33
33 func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string { 34 func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string {
34 resp := actual.(*logdog.QueryResponse) 35 resp := actual.(*logdog.QueryResponse)
35 var paths []string 36 var paths []string
36 if len(resp.Streams) > 0 { 37 if len(resp.Streams) > 0 {
37 paths = make([]string, len(resp.Streams)) 38 paths = make([]string, len(resp.Streams))
38 for i, s := range resp.Streams { 39 for i, s := range resp.Streams {
(...skipping 20 matching lines...) Expand all
59 60
60 return ShouldResemble(paths, exp) 61 return ShouldResemble(paths, exp)
61 } 62 }
62 63
63 func TestQuery(t *testing.T) { 64 func TestQuery(t *testing.T) {
64 t.Parallel() 65 t.Parallel()
65 66
66 Convey(`With a testing configuration, a Query request`, t, func() { 67 Convey(`With a testing configuration, a Query request`, t, func() {
67 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 68 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
68 c = memory.Use(c) 69 c = memory.Use(c)
70 c = gologger.Use(c)
69 c, fb := featureBreaker.FilterRDS(c, nil) 71 c, fb := featureBreaker.FilterRDS(c, nil)
70 72
71 di := ds.Get(c) 73 di := ds.Get(c)
72 74
73 // Add LogStream indexes. 75 // Add LogStream indexes.
74 // 76 //
75 // These should be kept in sync with the "query endpoint" indexe s in the 77 // These should be kept in sync with the "query endpoint" indexe s in the
76 // vmuser module's "index.yaml" file. 78 // vmuser module's "index.yaml" file.
77 indexDefs := [][]string{ 79 indexDefs := [][]string{
78 {"Prefix", "-Created"}, 80 {"Prefix", "-Created"},
(...skipping 20 matching lines...) Expand all
99 } 101 }
100 } 102 }
101 indexes[i] = &ds.IndexDefinition{Kind: "LogStream", Sort By: cols} 103 indexes[i] = &ds.IndexDefinition{Kind: "LogStream", Sort By: cols}
102 } 104 }
103 di.Testable().AddIndexes(indexes...) 105 di.Testable().AddIndexes(indexes...)
104 di.Testable().Consistent(true) 106 di.Testable().Consistent(true)
105 107
106 fs := authtest.FakeState{} 108 fs := authtest.FakeState{}
107 c = auth.WithState(c, &fs) 109 c = auth.WithState(c, &fs)
108 110
109 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 111 » » svcStub := ct.Services{}
110 » » » AdminAuthGroup: "test-administrators", 112 » » svcStub.InitConfig()
111 » » }) 113 » » svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis trators"
112 114
113 » » s := Server{} 115 » » s := Server{
116 » » » ServiceBase: coordinator.ServiceBase{&svcStub},
117 » » }
114 118
115 req := logdog.QueryRequest{ 119 req := logdog.QueryRequest{
116 Tags: map[string]string{}, 120 Tags: map[string]string{},
117 } 121 }
118 122
119 // Install a set of stock log streams to query against. 123 // Install a set of stock log streams to query against.
120 var streamPaths []string 124 var streamPaths []string
121 var purgedStreamPaths []string 125 var purgedStreamPaths []string
122 streams := map[string]*coordinator.LogStream{} 126 streams := map[string]*coordinator.LogStream{}
123 descs := map[string]*logpb.LogStreamDescriptor{} 127 descs := map[string]*logpb.LogStreamDescriptor{}
(...skipping 22 matching lines...) Expand all
146 "name": string(name), 150 "name": string(name),
147 } 151 }
148 152
149 // Set an empty tag for each name segment. 153 // Set an empty tag for each name segment.
150 for _, p := range name.Segments() { 154 for _, p := range name.Segments() {
151 desc.Tags[p] = "" 155 desc.Tags[p] = ""
152 } 156 }
153 157
154 ls := ct.TestLogStream(c, desc) 158 ls := ct.TestLogStream(c, desc)
155 159
160 now := tc.Now().UTC()
156 psegs := prefix.Segments() 161 psegs := prefix.Segments()
157 if psegs[0] == "meta" { 162 if psegs[0] == "meta" {
158 for _, p := range psegs[1:] { 163 for _, p := range psegs[1:] {
159 switch p { 164 switch p {
160 case "purged": 165 case "purged":
161 ls.Purged = true 166 ls.Purged = true
162 167
163 case "terminated": 168 case "terminated":
164 ls.State = coordinator.LSTermina ted
165 ls.TerminalIndex = 1337 169 ls.TerminalIndex = 1337
170 ls.TerminatedTime = now
166 171
167 case "archived": 172 case "archived":
173 ls.State = coordinator.LSArchive d
174
175 ls.TerminalIndex = 1337
176 ls.TerminatedTime = now
177
168 ls.ArchiveStreamURL = "http://ex ample.com" 178 ls.ArchiveStreamURL = "http://ex ample.com"
169 » » » » » » ls.State = coordinator.LSArchive d 179 » » » » » » ls.ArchivedTime = now
170 180
171 case "datagram": 181 case "datagram":
172 ls.StreamType = logpb.StreamType _DATAGRAM 182 ls.StreamType = logpb.StreamType _DATAGRAM
173 183
174 case "binary": 184 case "binary":
175 ls.StreamType = logpb.StreamType _BINARY 185 ls.StreamType = logpb.StreamType _BINARY
176 } 186 }
177 } 187 }
178 } 188 }
179 189
180 » » » if err := ls.Put(ds.Get(c)); err != nil { 190 » » » if err := ds.Get(c).Put(ls); err != nil {
181 panic(fmt.Errorf("failed to put log stream %d: % v", i, err)) 191 panic(fmt.Errorf("failed to put log stream %d: % v", i, err))
182 } 192 }
183 193
184 descs[string(v)] = desc 194 descs[string(v)] = desc
185 streams[string(v)] = ls 195 streams[string(v)] = ls
186 if !ls.Purged { 196 if !ls.Purged {
187 streamPaths = append(streamPaths, string(v)) 197 streamPaths = append(streamPaths, string(v))
188 } 198 }
189 purgedStreamPaths = append(purgedStreamPaths, string(v)) 199 purgedStreamPaths = append(purgedStreamPaths, string(v))
190 tc.Add(time.Second) 200 tc.Add(time.Second)
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
263 So(err, ShouldBeRPCOK) 273 So(err, ShouldBeRPCOK)
264 So(resp, shouldHaveLogPaths, "testing/+/ baz") 274 So(resp, shouldHaveLogPaths, "testing/+/ baz")
265 275
266 So(resp.Streams, ShouldHaveLength, 1) 276 So(resp.Streams, ShouldHaveLength, 1)
267 So(resp.Streams[0].State, ShouldResemble , loadLogStreamState(stream)) 277 So(resp.Streams[0].State, ShouldResemble , loadLogStreamState(stream))
268 So(resp.Streams[0].Desc, ShouldResemble, desc) 278 So(resp.Streams[0].Desc, ShouldResemble, desc)
269 So(resp.Streams[0].DescProto, ShouldBeNi l) 279 So(resp.Streams[0].DescProto, ShouldBeNi l)
270 }) 280 })
271 281
272 Convey(`When not requesting protobufs, and with a corrupt descriptor, returns InternalServer error.`, func() { 282 Convey(`When not requesting protobufs, and with a corrupt descriptor, returns InternalServer error.`, func() {
273 » » » » » // We can't use "stream.Put" here becaus e it validates the protobuf! 283 » » » » » stream.SetDSValidate(false)
274 stream.Descriptor = []byte{0x00} // Inva lid protobuf, zero tag. 284 stream.Descriptor = []byte{0x00} // Inva lid protobuf, zero tag.
275 So(di.Put(stream), ShouldBeNil) 285 So(di.Put(stream), ShouldBeNil)
276 di.Testable().CatchupIndexes() 286 di.Testable().CatchupIndexes()
277 287
278 _, err := s.Query(c, &req) 288 _, err := s.Query(c, &req)
279 So(err, ShouldBeRPCInternal) 289 So(err, ShouldBeRPCInternal)
280 }) 290 })
281 291
282 Convey(`When requesting protobufs, returns the r aw protobuf descriptor.`, func() { 292 Convey(`When requesting protobufs, returns the r aw protobuf descriptor.`, func() {
283 req.Proto = true 293 req.Proto = true
(...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after
504 514
505 Convey(`When an invalid tag is specified, returns BadReq uest error`, func() { 515 Convey(`When an invalid tag is specified, returns BadReq uest error`, func() {
506 req.Tags["+++not a valid tag+++"] = "" 516 req.Tags["+++not a valid tag+++"] = ""
507 517
508 _, err := s.Query(c, &req) 518 _, err := s.Query(c, &req)
509 So(err, ShouldBeRPCInvalidArgument, "invalid tag constraint") 519 So(err, ShouldBeRPCInvalidArgument, "invalid tag constraint")
510 }) 520 })
511 }) 521 })
512 }) 522 })
513 } 523 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698