Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Side by Side Diff: appengine/logdog/coordinator/logStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "testing" 10 "testing"
11 "time" 11 "time"
12 12
13 "github.com/luci/gae/impl/memory" 13 "github.com/luci/gae/impl/memory"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/clock/testclock" 15 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/logdog/types" 16 "github.com/luci/luci-go/common/logdog/types"
18 "github.com/luci/luci-go/common/proto/google" 17 "github.com/luci/luci-go/common/proto/google"
19 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
20 "golang.org/x/net/context" 19 "golang.org/x/net/context"
21 20
22 . "github.com/luci/luci-go/common/testing/assertions" 21 . "github.com/luci/luci-go/common/testing/assertions"
23 . "github.com/smartystreets/goconvey/convey" 22 . "github.com/smartystreets/goconvey/convey"
24 ) 23 )
25 24
(...skipping 28 matching lines...) Expand all
54 ps := make(ds.PropertySlice, len(values)) 53 ps := make(ds.PropertySlice, len(values))
55 for i, v := range values { 54 for i, v := range values {
56 ps[i] = ds.MkProperty(v) 55 ps[i] = ds.MkProperty(v)
57 } 56 }
58 return ps 57 return ps
59 } 58 }
60 59
61 func TestLogStream(t *testing.T) { 60 func TestLogStream(t *testing.T) {
62 t.Parallel() 61 t.Parallel()
63 62
64 » Convey(`A LogStream with invalid tags will fail to encode.`, t, func() { 63 » Convey(`When creating a LogStream`, t, func() {
65 » » ls := &LogStream{ 64 » » Convey(`Can create keyed on hash.`, func() {
66 » » » Prefix: "foo", 65 » » » ls, err := NewLogStream("0123456789abcdef0123456789ABCDE F0123456789abcdef0123456789ABCDEF")
67 » » » Name: "bar", 66 » » » So(err, ShouldBeNil)
68 » » » Tags: TagMap{ 67 » » » So(ls, ShouldNotBeNil)
69 » » » » "!!!invalid key!!!": "value", 68 » » })
70 » » » }, 69
71 » » } 70 » » Convey(`Will fail to create keyed on an invalid hash-length stri ng.`, func() {
72 » » _, err := ls.Save(true) 71 » » » _, err := NewLogStream("0123456789abcdef!@#$%^&*()ABCDEF 0123456789abcdef0123456789ABCDEF")
73 » » So(err, ShouldErrLike, "failed to encode tags") 72 » » » So(err, ShouldErrLike, "invalid path")
73 » » })
74
75 » » Convey(`Can create keyed on path.`, func() {
76 » » » ls, err := NewLogStream("a/b/+/c/d")
77 » » » So(err, ShouldBeNil)
78 » » » So(ls, ShouldNotBeNil)
79 » » })
80
81 » » Convey(`Will fail to create keyed on neither a path nor hash.`, func() {
82 » » » _, err := NewLogStream("")
83 » » » So(err, ShouldNotBeNil)
84 » » })
74 }) 85 })
75 86
76 » Convey(`A LogStream will skip invalid tags when loading.`, t, func() { 87 » Convey(`A testing log stream`, t, func() {
77 » » ls := &LogStream{
78 » » » Prefix: "foo",
79 » » » Name: "bar",
80 » » }
81 » » pmap := ds.PropertyMap{
82 » » » "_Tags": sps(encodeKey("!!!invalid key!!!")),
83 » » }
84 » » So(ls.Load(pmap), ShouldBeNil)
85 » » So(ls.Tags, ShouldResemble, TagMap(nil))
86 » })
87
88 » Convey(`With a testing configuration`, t, func() {
89 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 88 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
90 c = memory.Use(c) 89 c = memory.Use(c)
91 di := ds.Get(c) 90 di := ds.Get(c)
92 di.Testable().AutoIndex(true) 91 di.Testable().AutoIndex(true)
93 di.Testable().Consistent(true) 92 di.Testable().Consistent(true)
94 93
94 now := ds.RoundTime(tc.Now().UTC())
95
96 ls := LogStream{
97 Prefix: "testing",
98 Name: "log/stream",
99 State: LSStreaming,
100 TerminalIndex: -1,
101 Secret: bytes.Repeat([]byte{0x6F}, types.StreamSe cretLength),
102 Created: now.UTC(),
103 ContentType: string(types.ContentTypeText),
104 }
105 ls.recalculateHashID()
106
95 desc := logpb.LogStreamDescriptor{ 107 desc := logpb.LogStreamDescriptor{
96 Prefix: "testing", 108 Prefix: "testing",
97 Name: "log/stream", 109 Name: "log/stream",
98 StreamType: logpb.StreamType_TEXT, 110 StreamType: logpb.StreamType_TEXT,
99 ContentType: "application/text", 111 ContentType: "application/text",
100 » » » Timestamp: google.NewTimestamp(clock.Now(c)), 112 » » » Timestamp: google.NewTimestamp(now),
101 Tags: map[string]string{ 113 Tags: map[string]string{
102 "foo": "bar", 114 "foo": "bar",
103 "baz": "qux", 115 "baz": "qux",
104 "quux": "", 116 "quux": "",
105 }, 117 },
106 } 118 }
107 119
108 » » Convey(`Can create a LogStream keyed on hash.`, func() { 120 » » Convey(`Will skip invalid tags when loading.`, func() {
109 » » » ls, err := NewLogStream("0123456789abcdef0123456789ABCDE F0123456789abcdef0123456789ABCDEF") 121 » » » pmap, err := ls.Save(false)
110 So(err, ShouldBeNil) 122 So(err, ShouldBeNil)
111 » » » So(ls, ShouldNotBeNil) 123 » » » pmap["_Tags"] = sps(encodeKey("!!!invalid key!!!"))
124
125 » » » So(ls.Load(pmap), ShouldBeNil)
126 » » » So(ls.Tags, ShouldResemble, TagMap(nil))
112 }) 127 })
113 128
114 » » Convey(`Will fail to create a LogStream keyed on an invalid hash -length string.`, func() { 129 » » Convey(`With invalid tags will fail to encode.`, func() {
115 » » » _, err := NewLogStream("0123456789abcdef!@#$%^&*()ABCDEF 0123456789abcdef0123456789ABCDEF") 130 » » » ls.Tags = TagMap{
116 » » » So(err, ShouldErrLike, "invalid path") 131 » » » » "!!!invalid key!!!": "value",
132 » » » }
133
134 » » » ls.SetDSValidate(false)
135 » » » _, err := ls.Save(true)
136 » » » So(err, ShouldErrLike, "failed to encode tags")
117 }) 137 })
118 138
119 » » Convey(`Can create a LogStream keyed on path.`, func() { 139 » » Convey(`Can populate the LogStream with descriptor state.`, func () {
120 » » » ls, err := NewLogStream("a/b/+/c/d") 140 » » » So(ls.LoadDescriptor(&desc), ShouldBeNil)
121 » » » So(err, ShouldBeNil) 141 » » » So(ls.Validate(), ShouldBeNil)
122 » » » So(ls, ShouldNotBeNil)
123 » » })
124 142
125 » » Convey(`Will fail to create a LogStream keyed on neither a path nor hash.`, func() { 143 » » » Convey(`Will not validate`, func() {
126 » » » _, err := NewLogStream("") 144 » » » » Convey(`Without a valid Prefix`, func() {
127 » » » So(err, ShouldNotBeNil) 145 » » » » » ls.Prefix = ""
128 » » }) 146 » » » » » ls.recalculateHashID()
129 147
130 » » Convey(`Can create a new LogStream`, func() { 148 » » » » » So(ls.Validate(), ShouldErrLike, "invali d prefix")
131 » » » ls, err := NewLogStream(string(desc.Path())) 149 » » » » })
132 » » » So(err, ShouldBeNil) 150 » » » » Convey(`Without a valid Name`, func() {
151 » » » » » ls.Name = ""
152 » » » » » ls.recalculateHashID()
133 153
134 » » » Convey(`Can populate the LogStream with descriptor state .`, func() { 154 » » » » » So(ls.Validate(), ShouldErrLike, "invali d name")
135 » » » » ls.Created = ds.RoundTime(clock.Now(c).UTC()) 155 » » » » })
136 » » » » ls.Updated = ds.RoundTime(clock.Now(c).UTC()) 156 » » » » Convey(`Without a valid stream secret`, func() {
137 » » » » ls.Secret = bytes.Repeat([]byte{0x6F}, types.Str eamSecretLength) 157 » » » » » ls.Secret = nil
138 » » » » So(ls.LoadDescriptor(&desc), ShouldBeNil) 158 » » » » » So(ls.Validate(), ShouldErrLike, "invali d secret length")
139 » » » » So(ls.Validate(), ShouldBeNil) 159 » » » » })
160 » » » » Convey(`Without a valid content type`, func() {
161 » » » » » ls.ContentType = ""
162 » » » » » So(ls.Validate(), ShouldErrLike, "empty content type")
163 » » » » })
164 » » » » Convey(`Without a valid created time`, func() {
165 » » » » » ls.Created = time.Time{}
166 » » » » » So(ls.Validate(), ShouldErrLike, "create d time is not set")
167 » » » » })
168 » » » » Convey(`With a terminal index, will not validate without a TerminatedTime.`, func() {
169 » » » » » ls.State = LSArchiveTasked
170 » » » » » ls.TerminalIndex = 1337
171 » » » » » So(ls.Validate(), ShouldErrLike, "missin g terminated time")
140 172
141 » » » » Convey(`Will not validate`, func() { 173 » » » » » ls.TerminatedTime = now
142 » » » » » Convey(`Without a valid Prefix`, func() { 174 » » » » » So(ls.Validate(), ShouldBeNil)
143 » » » » » » ls.Prefix = ""
144 » » » » » » So(ls.Validate(), ShouldErrLike, "invalid prefix")
145 » » » » » })
146 » » » » » Convey(`Without a valid prefix`, func() {
147 » » » » » » ls.Name = ""
148 » » » » » » So(ls.Validate(), ShouldErrLike, "invalid name")
149 » » » » » })
150 » » » » » Convey(`Without a valid stream secret`, func() {
151 » » » » » » ls.Secret = nil
152 » » » » » » So(ls.Validate(), ShouldErrLike, "invalid secret length")
153 » » » » » })
154 » » » » » Convey(`Without a valid content type`, f unc() {
155 » » » » » » ls.ContentType = ""
156 » » » » » » So(ls.Validate(), ShouldErrLike, "empty content type")
157 » » » » » })
158 » » » » » Convey(`Without a valid created time`, f unc() {
159 » » » » » » ls.Created = time.Time{}
160 » » » » » » So(ls.Validate(), ShouldErrLike, "created time is not set")
161 » » » » » })
162 » » » » » Convey(`Without a valid updated time`, f unc() {
163 » » » » » » ls.Updated = time.Time{}
164 » » » » » » So(ls.Validate(), ShouldErrLike, "updated time is not set")
165 » » » » » })
166 » » » » » Convey(`With an updated time before the created time`, func() {
167 » » » » » » ls.Updated = ls.Created.Add(-tim e.Second)
168 » » » » » » So(ls.Validate(), ShouldErrLike, "updated time must be >= created time")
169 » » » » » })
170 » » » » » Convey(`Without a valid stream type`, fu nc() {
171 » » » » » » ls.StreamType = -1
172 » » » » » » So(ls.Validate(), ShouldErrLike, "unsupported stream type")
173 » » » » » })
174 » » » » » Convey(`Without an invalid tag: empty ke y`, func() {
175 » » » » » » ls.Tags[""] = "empty"
176 » » » » » » So(ls.Validate(), ShouldErrLike, "invalid tag")
177 » » » » » })
178 » » » » » Convey(`Without an invalid tag: bad key` , func() {
179 » » » » » » ls.Tags["!"] = "bad-value"
180 » » » » » » So(ls.Validate(), ShouldErrLike, "invalid tag")
181 » » » » » })
182 » » » » » Convey(`With an invalid descriptor proto buf`, func() {
183 » » » » » » ls.Descriptor = []byte{0x00} // Invalid tag, "0".
184 » » » » » » So(ls.Validate(), ShouldErrLike, "could not unmarshal descriptor")
185 » » » » » })
186 }) 175 })
176 Convey(`When archived, will not validate without an ArchivedTime.`, func() {
177 ls.State = LSArchived
178 So(ls.Validate(), ShouldErrLike, "missin g terminated time")
187 179
188 » » » » Convey(`Can write the LogStream to the Datastore .`, func() { 180 » » » » » ls.TerminatedTime = now
189 » » » » » So(di.Put(ls), ShouldBeNil) 181 » » » » » So(ls.Validate(), ShouldErrLike, "missin g archived time")
190 182
191 » » » » » Convey(`Can read the LogStream back from the Datastore.`, func() { 183 » » » » » ls.ArchivedTime = now
192 » » » » » » ls2 := LogStreamFromID(ls.HashID ()) 184 » » » » » So(ls.Validate(), ShouldBeNil)
193 » » » » » » So(di.Get(ls2), ShouldBeNil) 185 » » » » })
194 » » » » » » So(ls2, ShouldResemble, ls) 186 » » » » Convey(`Without a valid stream type`, func() {
195 » » » » » }) 187 » » » » » ls.StreamType = -1
188 » » » » » So(ls.Validate(), ShouldErrLike, "unsupp orted stream type")
189 » » » » })
190 » » » » Convey(`Without an invalid tag: empty key`, func () {
191 » » » » » ls.Tags[""] = "empty"
192 » » » » » So(ls.Validate(), ShouldErrLike, "invali d tag")
193 » » » » })
194 » » » » Convey(`Without an invalid tag: bad key`, func() {
195 » » » » » ls.Tags["!"] = "bad-value"
196 » » » » » So(ls.Validate(), ShouldErrLike, "invali d tag")
197 » » » » })
198 » » » » Convey(`With an invalid descriptor protobuf`, fu nc() {
199 » » » » » ls.Descriptor = []byte{0x00} // Invalid tag, "0".
200 » » » » » So(ls.Validate(), ShouldErrLike, "could not unmarshal descriptor")
196 }) 201 })
197 }) 202 })
198 203
199 » » » Convey(`Will refuse to populate from an invalid descript or.`, func() { 204 » » » Convey(`Can write the LogStream to the Datastore.`, func () {
200 » » » » desc.Name = "" 205 » » » » So(di.Put(&ls), ShouldBeNil)
201 » » » » So(ls.LoadDescriptor(&desc), ShouldErrLike, "inv alid descriptor") 206
207 » » » » Convey(`Can read the LogStream back from the Dat astore via prefix/name.`, func() {
208 » » » » » ls2 := LogStreamFromPath(ls.Path())
209 » » » » » So(di.Get(ls2), ShouldBeNil)
210 » » » » » So(ls2, ShouldResemble, &ls)
211 » » » » })
212
213 » » » » Convey(`Can read the LogStream back from the Dat astore via hash.`, func() {
214 » » » » » ls2 := LogStreamFromID(ls.HashID)
215 » » » » » So(di.Get(ls2), ShouldBeNil)
216 » » » » » So(ls2, ShouldResemble, &ls)
217 » » » » })
218
219 » » » » Convey(`Can read the LogStream back from the Dat astore.`, func() {
220 » » » » » var ls2 LogStream
221 » » » » » ds.PopulateKey(&ls2, ds.Get(c).KeyForObj (&ls))
222 » » » » » So(di.Get(&ls2), ShouldBeNil)
223 » » » » » So(ls2, ShouldResemble, ls)
224 » » » » })
202 }) 225 })
203 }) 226 })
204 227
228 Convey(`Will refuse to populate from an invalid descriptor.`, fu nc() {
229 desc.StreamType = -1
230 So(ls.LoadDescriptor(&desc), ShouldErrLike, "invalid des criptor")
231 })
232
205 Convey(`Writing multiple LogStream entries`, func() { 233 Convey(`Writing multiple LogStream entries`, func() {
206 times := map[string]time.Time{} 234 times := map[string]time.Time{}
207 streamNames := []string{ 235 streamNames := []string{
208 "foo/bar", 236 "foo/bar",
209 "foo/bar/baz", 237 "foo/bar/baz",
210 "baz/qux", 238 "baz/qux",
211 "cat/dog", 239 "cat/dog",
212 "cat/bird/dog", 240 "cat/bird/dog",
213 "bird/plane", 241 "bird/plane",
214 } 242 }
215 » » » for _, name := range streamNames { 243 » » » for i, name := range streamNames {
216 » » » » desc := desc 244 » » » » lsCopy := ls
217 » » » » desc.Name = name 245 » » » » lsCopy.Name = name
246 » » » » lsCopy.Created = ds.RoundTime(now.Add(time.Durat ion(i) * time.Second))
247 » » » » lsCopy.recalculateHashID()
218 248
219 » » » » ls, err := NewLogStream(string(desc.Path())) 249 » » » » descCopy := desc
220 » » » » So(err, ShouldBeNil) 250 » » » » descCopy.Name = name
221 251
222 » » » » ls.Secret = bytes.Repeat([]byte{0x55}, types.Str eamSecretLength) 252 » » » » if err := lsCopy.LoadDescriptor(&descCopy); err != nil {
223 » » » » So(ls.LoadDescriptor(&desc), ShouldBeNil) 253 » » » » » panic(err)
224 » » » » ls.Created = clock.Now(c).UTC() 254 » » » » }
225 » » » » ls.Updated = ls.Created 255 » » » » So(di.Put(&lsCopy), ShouldBeNil)
226 » » » » So(di.Put(ls), ShouldBeNil)
227 256
228 » » » » times[name] = clock.Now(c) 257 » » » » times[name] = lsCopy.Created
229 » » » » tc.Add(time.Second)
230 } 258 }
231 259
232 Convey(`When querying LogStream by -Created`, func() { 260 Convey(`When querying LogStream by -Created`, func() {
233 q := ds.NewQuery("LogStream").Order("-Created") 261 q := ds.NewQuery("LogStream").Order("-Created")
234 262
235 Convey(`LogStream path queries`, func() { 263 Convey(`LogStream path queries`, func() {
236 Convey(`A query for "foo/bar" should ret urn "foo/bar".`, func() { 264 Convey(`A query for "foo/bar" should ret urn "foo/bar".`, func() {
237 q, err := AddLogStreamPathFilter (q, "**/+/foo/bar") 265 q, err := AddLogStreamPathFilter (q, "**/+/foo/bar")
238 So(err, ShouldBeNil) 266 So(err, ShouldBeNil)
239 267
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 "_C": sps("PF:1:foo", "PR:1:baz", "PR:3:bar"), 441 "_C": sps("PF:1:foo", "PR:1:baz", "PR:3:bar"),
414 }) 442 })
415 }) 443 })
416 444
417 Convey(`Will error if more than one greedy glob is present.`, fu nc() { 445 Convey(`Will error if more than one greedy glob is present.`, fu nc() {
418 _, err := AddLogStreamPathFilter(q, "*/foo/**/bar/**") 446 _, err := AddLogStreamPathFilter(q, "*/foo/**/bar/**")
419 So(err, ShouldErrLike, "cannot have more than one greedy glob") 447 So(err, ShouldErrLike, "cannot have more than one greedy glob")
420 }) 448 })
421 }) 449 })
422 } 450 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698