| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "sort" | 9 "sort" |
| 10 "testing" | 10 "testing" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/luci/gae/filter/featureBreaker" | 13 "github.com/luci/gae/filter/featureBreaker" |
| 14 "github.com/luci/gae/impl/memory" | 14 "github.com/luci/gae/impl/memory" |
| 15 ds "github.com/luci/gae/service/datastore" | 15 ds "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 18 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 19 "github.com/luci/luci-go/common/clock/testclock" | 19 "github.com/luci/luci-go/common/clock/testclock" |
| 20 "github.com/luci/luci-go/common/errors" | 20 "github.com/luci/luci-go/common/errors" |
| 21 "github.com/luci/luci-go/common/logdog/types" | 21 "github.com/luci/luci-go/common/logdog/types" |
| 22 "github.com/luci/luci-go/common/proto/google" | 22 "github.com/luci/luci-go/common/proto/google" |
| 23 "github.com/luci/luci-go/common/proto/logdog/logpb" | 23 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 24 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
| 25 "github.com/luci/luci-go/server/auth" | 24 "github.com/luci/luci-go/server/auth" |
| 26 "github.com/luci/luci-go/server/auth/authtest" | 25 "github.com/luci/luci-go/server/auth/authtest" |
| 27 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 28 | 27 |
| 28 "github.com/luci/luci-go/common/logging/gologger" |
| 29 |
| 29 . "github.com/luci/luci-go/common/testing/assertions" | 30 . "github.com/luci/luci-go/common/testing/assertions" |
| 30 . "github.com/smartystreets/goconvey/convey" | 31 . "github.com/smartystreets/goconvey/convey" |
| 31 ) | 32 ) |
| 32 | 33 |
| 33 func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string { | 34 func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string { |
| 34 resp := actual.(*logdog.QueryResponse) | 35 resp := actual.(*logdog.QueryResponse) |
| 35 var paths []string | 36 var paths []string |
| 36 if len(resp.Streams) > 0 { | 37 if len(resp.Streams) > 0 { |
| 37 paths = make([]string, len(resp.Streams)) | 38 paths = make([]string, len(resp.Streams)) |
| 38 for i, s := range resp.Streams { | 39 for i, s := range resp.Streams { |
| (...skipping 20 matching lines...) Expand all Loading... |
| 59 | 60 |
| 60 return ShouldResemble(paths, exp) | 61 return ShouldResemble(paths, exp) |
| 61 } | 62 } |
| 62 | 63 |
| 63 func TestQuery(t *testing.T) { | 64 func TestQuery(t *testing.T) { |
| 64 t.Parallel() | 65 t.Parallel() |
| 65 | 66 |
| 66 Convey(`With a testing configuration, a Query request`, t, func() { | 67 Convey(`With a testing configuration, a Query request`, t, func() { |
| 67 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 68 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 68 c = memory.Use(c) | 69 c = memory.Use(c) |
| 70 c = gologger.Use(c) |
| 69 c, fb := featureBreaker.FilterRDS(c, nil) | 71 c, fb := featureBreaker.FilterRDS(c, nil) |
| 70 | 72 |
| 71 di := ds.Get(c) | 73 di := ds.Get(c) |
| 72 | 74 |
| 73 // Add LogStream indexes. | 75 // Add LogStream indexes. |
| 74 // | 76 // |
| 75 // These should be kept in sync with the "query endpoint" indexe
s in the | 77 // These should be kept in sync with the "query endpoint" indexe
s in the |
| 76 // vmuser module's "index.yaml" file. | 78 // vmuser module's "index.yaml" file. |
| 77 indexDefs := [][]string{ | 79 indexDefs := [][]string{ |
| 78 {"Prefix", "-Created"}, | 80 {"Prefix", "-Created"}, |
| (...skipping 20 matching lines...) Expand all Loading... |
| 99 } | 101 } |
| 100 } | 102 } |
| 101 indexes[i] = &ds.IndexDefinition{Kind: "LogStream", Sort
By: cols} | 103 indexes[i] = &ds.IndexDefinition{Kind: "LogStream", Sort
By: cols} |
| 102 } | 104 } |
| 103 di.Testable().AddIndexes(indexes...) | 105 di.Testable().AddIndexes(indexes...) |
| 104 di.Testable().Consistent(true) | 106 di.Testable().Consistent(true) |
| 105 | 107 |
| 106 fs := authtest.FakeState{} | 108 fs := authtest.FakeState{} |
| 107 c = auth.WithState(c, &fs) | 109 c = auth.WithState(c, &fs) |
| 108 | 110 |
| 109 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ | 111 » » svcStub := ct.Services{} |
| 110 » » » AdminAuthGroup: "test-administrators", | 112 » » svcStub.InitConfig() |
| 111 » » }) | 113 » » svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" |
| 112 | 114 |
| 113 » » s := Server{} | 115 » » s := Server{ |
| 116 » » » ServiceBase: coordinator.ServiceBase{&svcStub}, |
| 117 » » } |
| 114 | 118 |
| 115 req := logdog.QueryRequest{ | 119 req := logdog.QueryRequest{ |
| 116 Tags: map[string]string{}, | 120 Tags: map[string]string{}, |
| 117 } | 121 } |
| 118 | 122 |
| 119 // Install a set of stock log streams to query against. | 123 // Install a set of stock log streams to query against. |
| 120 var streamPaths []string | 124 var streamPaths []string |
| 121 var purgedStreamPaths []string | 125 var purgedStreamPaths []string |
| 122 streams := map[string]*coordinator.LogStream{} | 126 streams := map[string]*coordinator.LogStream{} |
| 123 descs := map[string]*logpb.LogStreamDescriptor{} | 127 descs := map[string]*logpb.LogStreamDescriptor{} |
| (...skipping 22 matching lines...) Expand all Loading... |
| 146 "name": string(name), | 150 "name": string(name), |
| 147 } | 151 } |
| 148 | 152 |
| 149 // Set an empty tag for each name segment. | 153 // Set an empty tag for each name segment. |
| 150 for _, p := range name.Segments() { | 154 for _, p := range name.Segments() { |
| 151 desc.Tags[p] = "" | 155 desc.Tags[p] = "" |
| 152 } | 156 } |
| 153 | 157 |
| 154 ls := ct.TestLogStream(c, desc) | 158 ls := ct.TestLogStream(c, desc) |
| 155 | 159 |
| 160 now := tc.Now().UTC() |
| 156 psegs := prefix.Segments() | 161 psegs := prefix.Segments() |
| 157 if psegs[0] == "meta" { | 162 if psegs[0] == "meta" { |
| 158 for _, p := range psegs[1:] { | 163 for _, p := range psegs[1:] { |
| 159 switch p { | 164 switch p { |
| 160 case "purged": | 165 case "purged": |
| 161 ls.Purged = true | 166 ls.Purged = true |
| 162 | 167 |
| 163 case "terminated": | 168 case "terminated": |
| 164 ls.State = coordinator.LSTermina
ted | |
| 165 ls.TerminalIndex = 1337 | 169 ls.TerminalIndex = 1337 |
| 170 ls.TerminatedTime = now |
| 166 | 171 |
| 167 case "archived": | 172 case "archived": |
| 173 ls.State = coordinator.LSArchive
d |
| 174 |
| 175 ls.TerminalIndex = 1337 |
| 176 ls.TerminatedTime = now |
| 177 |
| 168 ls.ArchiveStreamURL = "http://ex
ample.com" | 178 ls.ArchiveStreamURL = "http://ex
ample.com" |
| 169 » » » » » » ls.State = coordinator.LSArchive
d | 179 » » » » » » ls.ArchivedTime = now |
| 170 | 180 |
| 171 case "datagram": | 181 case "datagram": |
| 172 ls.StreamType = logpb.StreamType
_DATAGRAM | 182 ls.StreamType = logpb.StreamType
_DATAGRAM |
| 173 | 183 |
| 174 case "binary": | 184 case "binary": |
| 175 ls.StreamType = logpb.StreamType
_BINARY | 185 ls.StreamType = logpb.StreamType
_BINARY |
| 176 } | 186 } |
| 177 } | 187 } |
| 178 } | 188 } |
| 179 | 189 |
| 180 » » » if err := ls.Put(ds.Get(c)); err != nil { | 190 » » » if err := ds.Get(c).Put(ls); err != nil { |
| 181 panic(fmt.Errorf("failed to put log stream %d: %
v", i, err)) | 191 panic(fmt.Errorf("failed to put log stream %d: %
v", i, err)) |
| 182 } | 192 } |
| 183 | 193 |
| 184 descs[string(v)] = desc | 194 descs[string(v)] = desc |
| 185 streams[string(v)] = ls | 195 streams[string(v)] = ls |
| 186 if !ls.Purged { | 196 if !ls.Purged { |
| 187 streamPaths = append(streamPaths, string(v)) | 197 streamPaths = append(streamPaths, string(v)) |
| 188 } | 198 } |
| 189 purgedStreamPaths = append(purgedStreamPaths, string(v)) | 199 purgedStreamPaths = append(purgedStreamPaths, string(v)) |
| 190 tc.Add(time.Second) | 200 tc.Add(time.Second) |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 263 So(err, ShouldBeRPCOK) | 273 So(err, ShouldBeRPCOK) |
| 264 So(resp, shouldHaveLogPaths, "testing/+/
baz") | 274 So(resp, shouldHaveLogPaths, "testing/+/
baz") |
| 265 | 275 |
| 266 So(resp.Streams, ShouldHaveLength, 1) | 276 So(resp.Streams, ShouldHaveLength, 1) |
| 267 So(resp.Streams[0].State, ShouldResemble
, loadLogStreamState(stream)) | 277 So(resp.Streams[0].State, ShouldResemble
, loadLogStreamState(stream)) |
| 268 So(resp.Streams[0].Desc, ShouldResemble,
desc) | 278 So(resp.Streams[0].Desc, ShouldResemble,
desc) |
| 269 So(resp.Streams[0].DescProto, ShouldBeNi
l) | 279 So(resp.Streams[0].DescProto, ShouldBeNi
l) |
| 270 }) | 280 }) |
| 271 | 281 |
| 272 Convey(`When not requesting protobufs, and with
a corrupt descriptor, returns InternalServer error.`, func() { | 282 Convey(`When not requesting protobufs, and with
a corrupt descriptor, returns InternalServer error.`, func() { |
| 273 » » » » » // We can't use "stream.Put" here becaus
e it validates the protobuf! | 283 » » » » » stream.SetDSValidate(false) |
| 274 stream.Descriptor = []byte{0x00} // Inva
lid protobuf, zero tag. | 284 stream.Descriptor = []byte{0x00} // Inva
lid protobuf, zero tag. |
| 275 So(di.Put(stream), ShouldBeNil) | 285 So(di.Put(stream), ShouldBeNil) |
| 276 di.Testable().CatchupIndexes() | 286 di.Testable().CatchupIndexes() |
| 277 | 287 |
| 278 _, err := s.Query(c, &req) | 288 _, err := s.Query(c, &req) |
| 279 So(err, ShouldBeRPCInternal) | 289 So(err, ShouldBeRPCInternal) |
| 280 }) | 290 }) |
| 281 | 291 |
| 282 Convey(`When requesting protobufs, returns the r
aw protobuf descriptor.`, func() { | 292 Convey(`When requesting protobufs, returns the r
aw protobuf descriptor.`, func() { |
| 283 req.Proto = true | 293 req.Proto = true |
| (...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 504 | 514 |
| 505 Convey(`When an invalid tag is specified, returns BadReq
uest error`, func() { | 515 Convey(`When an invalid tag is specified, returns BadReq
uest error`, func() { |
| 506 req.Tags["+++not a valid tag+++"] = "" | 516 req.Tags["+++not a valid tag+++"] = "" |
| 507 | 517 |
| 508 _, err := s.Query(c, &req) | 518 _, err := s.Query(c, &req) |
| 509 So(err, ShouldBeRPCInvalidArgument, "invalid tag
constraint") | 519 So(err, ShouldBeRPCInvalidArgument, "invalid tag
constraint") |
| 510 }) | 520 }) |
| 511 }) | 521 }) |
| 512 }) | 522 }) |
| 513 } | 523 } |
| OLD | NEW |