Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: appengine/logdog/coordinator/logStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "crypto/sha256" 8 "crypto/sha256"
9 "encoding/hex" 9 "encoding/hex"
10 "errors" 10 "errors"
11 "fmt" 11 "fmt"
12 "strings" 12 "strings"
13 "time" 13 "time"
14 14
15 "github.com/golang/protobuf/proto" 15 "github.com/golang/protobuf/proto"
16 ds "github.com/luci/gae/service/datastore" 16 ds "github.com/luci/gae/service/datastore"
17 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
18 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
19 ) 19 )
20 20
21 // currentLogStreamSchema is the current schema version of the LogStream.
22 // Changes that are not backward-compatible should update this field so
23 // migration logic and scripts can translate appropriately.
24 const currentLogStreamSchema = "1"
25
21 // LogStreamState is the archival state of the log stream. 26 // LogStreamState is the archival state of the log stream.
22 type LogStreamState int 27 type LogStreamState int
23 28
24 const ( 29 const (
25 » // LSPending indicates that no archival has occurred yet. 30 » // LSStreaming indicates that the log stream is still streaming. This im plies
26 » LSPending LogStreamState = iota 31 » // that no terminal index has been identified yet.
27 » // LSTerminated indicates that the log stream has received a terminal in dex 32 » LSStreaming LogStreamState = iota
28 » // and is awaiting archival. 33 » // LSArchiveTasked indicates that the log stream has had an archival tas k
29 » LSTerminated 34 » // generated for it and is awaiting archival.
30 » // LSArchived indicates that the log stream has been successfully archiv ed but 35 » LSArchiveTasked
31 » // has not yet been cleaned up. 36 » // LSArchived indicates that the log stream has been successfully archiv ed.
32 LSArchived 37 LSArchived
33 ) 38 )
34 39
40 func (s LogStreamState) String() string {
41 switch s {
42 case LSStreaming:
43 return "STREAMING"
44 case LSArchiveTasked:
45 return "ARCHIVE_TASKED"
46 case LSArchived:
47 return "ARCHIVED"
48 default:
49 return fmt.Sprintf("UNKNOWN(%d)", s)
50 }
51 }
52
53 // Archived returns true if this LogStreamState represents a finished archival.
54 func (s LogStreamState) Archived() bool {
55 return s >= LSArchived
56 }
57
35 // LogStream is the primary datastore model containing information and state of 58 // LogStream is the primary datastore model containing information and state of
36 // an individual log stream. 59 // an individual log stream.
37 // 60 //
38 // This structure contains the standard queryable fields, and is the source of 61 // This structure contains the standard queryable fields, and is the source of
39 // truth for log stream state. Writes to LogStream should be done via Put, which 62 // truth for log stream state. Writes to LogStream should be done via Put, which
40 // will ensure that the LogStream's related query objects are kept in sync. 63 // will ensure that the LogStream's related query objects are kept in sync.
41 // 64 //
42 // This structure has additional datastore fields imposed by the 65 // This structure has additional datastore fields imposed by the
43 // PropertyLoadSaver. These fields enable querying against some of the complex 66 // PropertyLoadSaver. These fields enable querying against some of the complex
44 // data types: 67 // data types:
(...skipping 11 matching lines...) Expand all
56 // 79 //
57 // - _Tags is a string slice containing: 80 // - _Tags is a string slice containing:
58 // - KEY=[VALUE] key/value tags. 81 // - KEY=[VALUE] key/value tags.
59 // - KEY key presence tags. 82 // - KEY key presence tags.
60 // 83 //
61 // - _Terminated is true if the LogStream has been terminated. 84 // - _Terminated is true if the LogStream has been terminated.
62 // - _Archived is true if the LogStream has been archived. 85 // - _Archived is true if the LogStream has been archived.
63 // 86 //
64 // Most of the values in QueryBase are static. Those that change can only be 87 // Most of the values in QueryBase are static. Those that change can only be
65 // changed through service endpoint methods. 88 // changed through service endpoint methods.
66 //
67 // LogStream's QueryBase is authortative.
68 type LogStream struct { 89 type LogStream struct {
90 // HashID is the LogStream ID. It is generated from the stream's Prefix/ Name
91 // fields.
92 HashID string `gae:"$id"`
93
94 // Schema is the datastore schema version for this object. This can be u sed
95 // to facilitate schema migrations.
96 //
97 // The current schema is currentLogStreamSchema.
98 Schema string
99
69 // Prefix is this log stream's prefix value. Log streams with the same p refix 100 // Prefix is this log stream's prefix value. Log streams with the same p refix
70 // are logically grouped. 101 // are logically grouped.
102 //
103 // This value should not be changed once populated, as it will invalidat e the
104 // HashID.
71 Prefix string 105 Prefix string
72 // Name is the unique name of this log stream within the Prefix scope. 106 // Name is the unique name of this log stream within the Prefix scope.
107 //
108 // This value should not be changed once populated, as it will invalidat e the
109 // HashID.
73 Name string 110 Name string
74 111
75 // State is the log stream's current state. 112 // State is the log stream's current state.
76 State LogStreamState 113 State LogStreamState
77 114
78 // Purged, if true, indicates that this log stream has been marked as pu rged. 115 // Purged, if true, indicates that this log stream has been marked as pu rged.
79 // Non-administrative queries and requests for this stream will operate as 116 // Non-administrative queries and requests for this stream will operate as
80 // if this entry doesn't exist. 117 // if this entry doesn't exist.
81 Purged bool 118 Purged bool
82 119
83 // Secret is the Butler secret value for this stream. 120 // Secret is the Butler secret value for this stream.
84 // 121 //
85 // This value may only be returned to LogDog services; it is not user-vi sible. 122 // This value may only be returned to LogDog services; it is not user-vi sible.
86 Secret []byte `gae:",noindex"` 123 Secret []byte `gae:",noindex"`
87 124
88 // Created is the time when this stream was created. 125 // Created is the time when this stream was created.
89 Created time.Time 126 Created time.Time
90 » // Updated is the Coordinator's record of when this log stream was last 127 » // TerminatedTime is the Coordinator's record of when this log stream wa s
91 » // updated. 128 » // terminated.
92 » Updated time.Time 129 » TerminatedTime time.Time `gae:",noindex"`
130 » // ArchivedTime is the Coordinator's record of when this log stream was
131 » // archived.
132 » ArchivedTime time.Time `gae:",noindex"`
93 133
94 // ProtoVersion is the version string of the protobuf, as reported by th e 134 // ProtoVersion is the version string of the protobuf, as reported by th e
95 // Collector (and ultimately self-identified by the Butler). 135 // Collector (and ultimately self-identified by the Butler).
96 ProtoVersion string 136 ProtoVersion string
97 // Descriptor is the binary protobuf data LogStreamDescriptor. 137 // Descriptor is the binary protobuf data LogStreamDescriptor.
98 Descriptor []byte `gae:",noindex"` 138 Descriptor []byte `gae:",noindex"`
99 // ContentType is the MIME-style content type string for this stream. 139 // ContentType is the MIME-style content type string for this stream.
100 ContentType string 140 ContentType string
101 // StreamType is the data type of the stream. 141 // StreamType is the data type of the stream.
102 StreamType logpb.StreamType 142 StreamType logpb.StreamType
103 // Timestamp is the Descriptor's recorded client-side timestamp. 143 // Timestamp is the Descriptor's recorded client-side timestamp.
104 Timestamp time.Time 144 Timestamp time.Time
105 145
106 // Tags is a set of arbitrary key/value tags associated with this stream . Tags 146 // Tags is a set of arbitrary key/value tags associated with this stream . Tags
107 // can be queried against. 147 // can be queried against.
108 // 148 //
109 // The serialization/deserialization is handled manually in order to ena ble 149 // The serialization/deserialization is handled manually in order to ena ble
110 // key/value queries. 150 // key/value queries.
111 Tags TagMap `gae:"-"` 151 Tags TagMap `gae:"-"`
112 152
113 // Source is the set of source strings sent by the Butler. 153 // Source is the set of source strings sent by the Butler.
114 Source []string 154 Source []string
115 155
116 » // TerminalIndex is the log stream index of the last log entry in the st ream. 156 » // TerminalIndex is the index of the last log entry in the stream.
117 » // If the value is -1, the log is still streaming. 157 » //
158 » // If this is <0, the log stream is either still streaming or has been
159 » // archived with no log entries.
118 TerminalIndex int64 `gae:",noindex"` 160 TerminalIndex int64 `gae:",noindex"`
119 161
162 // ArchiveLogEntryCount is the number of LogEntry records that were arch ived
163 // for this log stream.
164 //
165 // This is valid only if the log stream is Archived.
166 ArchiveLogEntryCount int64 `gae:",noindex"`
167 // ArchivalKey is the archival key for this log stream. This is used to
168 // differentiate the real archival request from those that were dispatch ed,
169 // but that ultimately failed to update state.
170 ArchivalKey []byte `gae:",noindex"`
171
120 // ArchiveIndexURL is the Google Storage URL where the log stream's inde x is 172 // ArchiveIndexURL is the Google Storage URL where the log stream's inde x is
121 // archived. 173 // archived.
122 ArchiveIndexURL string `gae:",noindex"` 174 ArchiveIndexURL string `gae:",noindex"`
123 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil l be 175 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil l be
124 // zero if the file is not archived. 176 // zero if the file is not archived.
125 ArchiveIndexSize int64 `gae:",noindex"` 177 ArchiveIndexSize int64 `gae:",noindex"`
126 // ArchiveStreamURL is the Google Storage URL where the log stream's raw 178 // ArchiveStreamURL is the Google Storage URL where the log stream's raw
127 // stream data is archived. If this is not empty, the log stream is cons idered 179 // stream data is archived. If this is not empty, the log stream is cons idered
128 // archived. 180 // archived.
129 ArchiveStreamURL string `gae:",noindex"` 181 ArchiveStreamURL string `gae:",noindex"`
130 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w ill be 182 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w ill be
131 // zero if the file is not archived. 183 // zero if the file is not archived.
132 ArchiveStreamSize int64 `gae:",noindex"` 184 ArchiveStreamSize int64 `gae:",noindex"`
133 // ArchiveDataURL is the Google Storage URL where the log stream's assem bled 185 // ArchiveDataURL is the Google Storage URL where the log stream's assem bled
134 // data is archived. If this is not empty, the log stream is considered 186 // data is archived. If this is not empty, the log stream is considered
135 // archived. 187 // archived.
136 ArchiveDataURL string `gae:",noindex"` 188 ArchiveDataURL string `gae:",noindex"`
137 // ArchiveDataSize is the size, in bytes, of the archived data. It will be 189 // ArchiveDataSize is the size, in bytes, of the archived data. It will be
138 // zero if the file is not archived. 190 // zero if the file is not archived.
139 ArchiveDataSize int64 `gae:",noindex"` 191 ArchiveDataSize int64 `gae:",noindex"`
140 // ArchiveWhole is true if archival is complete and the archived log str eam
141 // was not missing any entries.
142 ArchiveWhole bool
143 192
144 » // _ causes datastore to ignore unrecognized fields and strip them in fu ture 193 » // extra causes datastore to ignore unrecognized fields and strip them i n
145 » // writes. 194 » // future writes.
146 » _ ds.PropertyMap `gae:"-,extra"` 195 » extra ds.PropertyMap `gae:"-,extra"`
147 196
148 » // hashID is the cached generated ID from the stream's Prefix/Name field s. If 197 » // noDSValidate is a testing parameter to instruct the LogStream not to
149 » // this is populated, ID metadata will be retrieved from this field inst ead of 198 » // validate before reading/writing to datastore. It can be controlled by
150 » // generated. 199 » // calling SetDSValidate().
151 » hashID string 200 » noDSValidate bool
152 } 201 }
153 202
154 var _ interface { 203 var _ interface {
155 ds.PropertyLoadSaver 204 ds.PropertyLoadSaver
156 ds.MetaGetterSetter
157 } = (*LogStream)(nil) 205 } = (*LogStream)(nil)
158 206
159 // NewLogStream returns a LogStream instance with its ID field initialized based 207 // NewLogStream returns a LogStream instance with its ID field initialized based
160 // on the supplied path. 208 // on the supplied path.
161 // 209 //
162 // The supplied value is a LogDog stream path or a hash of the LogDog stream 210 // The supplied value is a LogDog stream path or a hash of the LogDog stream
163 // path. 211 // path.
164 func NewLogStream(value string) (*LogStream, error) { 212 func NewLogStream(value string) (*LogStream, error) {
165 path := types.StreamPath(value) 213 path := types.StreamPath(value)
166 if err := path.Validate(); err != nil { 214 if err := path.Validate(); err != nil {
167 // If it's not a path, see if it's a SHA256 sum. 215 // If it's not a path, see if it's a SHA256 sum.
168 hash, hashErr := normalizeHash(value) 216 hash, hashErr := normalizeHash(value)
169 if hashErr != nil { 217 if hashErr != nil {
170 return nil, fmt.Errorf("invalid path (%s) and hash (%s)" , err, hashErr) 218 return nil, fmt.Errorf("invalid path (%s) and hash (%s)" , err, hashErr)
171 } 219 }
172 220
173 // Load this LogStream with its SHA256 hash directly. This strea m will not 221 // Load this LogStream with its SHA256 hash directly. This strea m will not
174 // have its Prefix/Name fields populated until it's loaded from datastore. 222 // have its Prefix/Name fields populated until it's loaded from datastore.
175 return LogStreamFromID(hash), nil 223 return LogStreamFromID(hash), nil
176 } 224 }
177 225
178 return LogStreamFromPath(path), nil 226 return LogStreamFromPath(path), nil
179 } 227 }
180 228
181 // LogStreamFromID returns an empty LogStream instance with a known hash ID. 229 // LogStreamFromID returns an empty LogStream instance with a known hash ID.
182 func LogStreamFromID(hashID string) *LogStream { 230 func LogStreamFromID(hashID string) *LogStream {
183 return &LogStream{ 231 return &LogStream{
184 » » hashID: hashID, 232 » » HashID: hashID,
185 } 233 }
186 } 234 }
187 235
188 // LogStreamFromPath returns an empty LogStream instance initialized from a 236 // LogStreamFromPath returns an empty LogStream instance initialized from a
189 // known path value. 237 // known path value.
190 // 238 //
191 // The supplied path is assumed to be valid and is not checked. 239 // The supplied path is assumed to be valid and is not checked.
192 func LogStreamFromPath(path types.StreamPath) *LogStream { 240 func LogStreamFromPath(path types.StreamPath) *LogStream {
193 // Load the prefix/name fields into the log stream. 241 // Load the prefix/name fields into the log stream.
194 prefix, name := path.Split() 242 prefix, name := path.Split()
195 » return &LogStream{ 243 » ls := LogStream{
196 Prefix: string(prefix), 244 Prefix: string(prefix),
197 Name: string(name), 245 Name: string(name),
198 } 246 }
247 ls.recalculateHashID()
248 return &ls
199 } 249 }
200 250
201 // Path returns the LogDog path for this log stream. 251 // Path returns the LogDog path for this log stream.
202 func (s *LogStream) Path() types.StreamPath { 252 func (s *LogStream) Path() types.StreamPath {
203 return types.StreamName(s.Prefix).Join(types.StreamName(s.Name)) 253 return types.StreamName(s.Prefix).Join(types.StreamName(s.Name))
204 } 254 }
205 255
206 // Load implements ds.PropertyLoadSaver. 256 // Load implements ds.PropertyLoadSaver.
207 func (s *LogStream) Load(pmap ds.PropertyMap) error { 257 func (s *LogStream) Load(pmap ds.PropertyMap) error {
208 // Handle custom properties. Consume them before using the default 258 // Handle custom properties. Consume them before using the default
209 // PropertyLoadSaver. 259 // PropertyLoadSaver.
210 for k, v := range pmap { 260 for k, v := range pmap {
211 if !strings.HasPrefix(k, "_") { 261 if !strings.HasPrefix(k, "_") {
212 continue 262 continue
213 } 263 }
214 264
215 switch k { 265 switch k {
216 case "_Tags": 266 case "_Tags":
217 // Load the tag map. Ignore errors. 267 // Load the tag map. Ignore errors.
218 tm, _ := tagMapFromProperties(v) 268 tm, _ := tagMapFromProperties(v)
219 s.Tags = tm 269 s.Tags = tm
220 } 270 }
221 delete(pmap, k) 271 delete(pmap, k)
222 } 272 }
223 273
224 » return ds.GetPLS(s).Load(pmap) 274 » if err := ds.GetPLS(s).Load(pmap); err != nil {
275 » » return err
276 » }
277
278 » // Migrate schema (if needed), then validate.
279 » if err := s.migrateSchema(); err != nil {
280 » » return err
281 » }
282
283 » // Validate the log stream. Don't enforce HashID correctness, since
284 » // datastore hasn't populated that field yet.
285 » if !s.noDSValidate {
286 » » if err := s.validateImpl(false); err != nil {
287 » » » return err
288 » » }
289 » }
290 » return nil
225 } 291 }
226 292
227 // Save implements ds.PropertyLoadSaver. 293 // Save implements ds.PropertyLoadSaver.
228 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { 294 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
295 if !s.noDSValidate {
296 if err := s.validateImpl(true); err != nil {
297 return nil, err
298 }
299 }
300 s.Schema = currentLogStreamSchema
301
302 // Save default struct fields.
229 pmap, err := ds.GetPLS(s).Save(withMeta) 303 pmap, err := ds.GetPLS(s).Save(withMeta)
230 if err != nil { 304 if err != nil {
231 return nil, err 305 return nil, err
232 } 306 }
233 307
234 // Encode _Tags. 308 // Encode _Tags.
235 pmap["_Tags"], err = s.Tags.toProperties() 309 pmap["_Tags"], err = s.Tags.toProperties()
236 if err != nil { 310 if err != nil {
237 return nil, fmt.Errorf("failed to encode tags: %v", err) 311 return nil, fmt.Errorf("failed to encode tags: %v", err)
238 } 312 }
239 313
240 // Generate our path components, "_C". 314 // Generate our path components, "_C".
241 pmap["_C"] = generatePathComponents(s.Prefix, s.Name) 315 pmap["_C"] = generatePathComponents(s.Prefix, s.Name)
242 316
243 // Add our derived statuses. 317 // Add our derived statuses.
244 pmap["_Terminated"] = []ds.Property{ds.MkProperty(s.Terminated())} 318 pmap["_Terminated"] = []ds.Property{ds.MkProperty(s.Terminated())}
245 pmap["_Archived"] = []ds.Property{ds.MkProperty(s.Archived())} 319 pmap["_Archived"] = []ds.Property{ds.MkProperty(s.Archived())}
246 320
247 return pmap, nil 321 return pmap, nil
248 } 322 }
249 323
250 // GetMeta implements ds.MetaGetterSetter. 324 // recalculateHashID calculates the log stream's hash ID from its Prefix/Name
251 func (s *LogStream) GetMeta(key string) (interface{}, bool) { 325 // fields, which must be populated else this function will panic.
252 » switch key { 326 //
253 » case "id": 327 // The value is loaded into its HashID field.
254 » » return s.HashID(), true 328 func (s *LogStream) recalculateHashID() {
255 329 » s.HashID = s.getHashID()
256 » default:
257 » » return ds.GetPLS(s).GetMeta(key)
258 » }
259 } 330 }
260 331
261 // GetAllMeta implements ds.MetaGetterSetter. 332 // recalculateHashID calculates the log stream's hash ID from its Prefix/Name
262 func (s *LogStream) GetAllMeta() ds.PropertyMap { 333 // fields, which must be populated else this function will panic.
263 » pmap := ds.GetPLS(s).GetAllMeta() 334 func (s *LogStream) getHashID() string {
264 » pmap.SetMeta("id", ds.MkProperty(s.HashID())) 335 » hash := sha256.Sum256([]byte(s.Path()))
265 » return pmap 336 » return hex.EncodeToString(hash[:])
266 }
267
268 // SetMeta implements ds.MetaGetterSetter.
269 func (s *LogStream) SetMeta(key string, val interface{}) bool {
270 » return ds.GetPLS(s).SetMeta(key, val)
271 }
272
273 // HashID generates and populates the hashID field of a LogStream. This
274 // is the hash of the log stream's full path.
275 func (s *LogStream) HashID() string {
276 » if s.hashID == "" {
277 » » if s.Prefix == "" || s.Name == "" {
278 » » » panic("cannot generate ID hash: Prefix and Name are not populated")
279 » » }
280
281 » » hash := sha256.Sum256([]byte(s.Path()))
282 » » s.hashID = hex.EncodeToString(hash[:])
283 » }
284 » return s.hashID
285 }
286
287 // Put writes this LogStream to the Datastore. Before writing, it validates that
288 // LogStream is complete.
289 func (s *LogStream) Put(di ds.Interface) error {
290 » if err := s.Validate(); err != nil {
291 » » return err
292 » }
293 » return di.Put(s)
294 } 337 }
295 338
296 // Validate evaluates the state and data contents of the LogStream and returns 339 // Validate evaluates the state and data contents of the LogStream and returns
297 // an error if it is invalid. 340 // an error if it is invalid.
298 func (s *LogStream) Validate() error { 341 func (s *LogStream) Validate() error {
342 return s.validateImpl(true)
343 }
344
345 func (s *LogStream) validateImpl(enforceHashID bool) error {
346 if enforceHashID {
347 // Make sure our Prefix and Name match the Hash ID.
348 if hid := s.getHashID(); hid != s.HashID {
349 return fmt.Errorf("hash IDs don't match (%q != %q)", hid , s.HashID)
350 }
351 }
352
299 if err := types.StreamName(s.Prefix).Validate(); err != nil { 353 if err := types.StreamName(s.Prefix).Validate(); err != nil {
300 return fmt.Errorf("invalid prefix: %s", err) 354 return fmt.Errorf("invalid prefix: %s", err)
301 } 355 }
302 if err := types.StreamName(s.Name).Validate(); err != nil { 356 if err := types.StreamName(s.Name).Validate(); err != nil {
303 return fmt.Errorf("invalid name: %s", err) 357 return fmt.Errorf("invalid name: %s", err)
304 } 358 }
305 if len(s.Secret) != types.StreamSecretLength { 359 if len(s.Secret) != types.StreamSecretLength {
306 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr et), types.StreamSecretLength) 360 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr et), types.StreamSecretLength)
307 } 361 }
308 if s.ContentType == "" { 362 if s.ContentType == "" {
309 return errors.New("empty content type") 363 return errors.New("empty content type")
310 } 364 }
311 if s.Created.IsZero() { 365 if s.Created.IsZero() {
312 return errors.New("created time is not set") 366 return errors.New("created time is not set")
313 } 367 }
314 » if s.Updated.IsZero() { 368
315 » » return errors.New("updated time is not set") 369 » if s.Terminated() && s.TerminatedTime.IsZero() {
370 » » return errors.New("log stream is terminated, but missing termina ted time")
316 } 371 }
317 » if s.Updated.Before(s.Created) { 372 » if s.Archived() && s.ArchivedTime.IsZero() {
318 » » return fmt.Errorf("updated time must be >= created time (%s < %s )", s.Updated, s.Created) 373 » » return errors.New("log stream is archived, but missing archived time")
319 } 374 }
320 375
321 switch s.StreamType { 376 switch s.StreamType {
322 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA TAGRAM: 377 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA TAGRAM:
323 break 378 break
324 379
325 default: 380 default:
326 return fmt.Errorf("unsupported stream type: %v", s.StreamType) 381 return fmt.Errorf("unsupported stream type: %v", s.StreamType)
327 } 382 }
328 383
(...skipping 14 matching lines...) Expand all
343 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { 398 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) {
344 pb := logpb.LogStreamDescriptor{} 399 pb := logpb.LogStreamDescriptor{}
345 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil { 400 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil {
346 return nil, err 401 return nil, err
347 } 402 }
348 return &pb, nil 403 return &pb, nil
349 } 404 }
350 405
351 // Terminated returns true if this stream has been terminated. 406 // Terminated returns true if this stream has been terminated.
352 func (s *LogStream) Terminated() bool { 407 func (s *LogStream) Terminated() bool {
353 » return s.State >= LSTerminated 408 » if s.Archived() {
409 » » return true
410 » }
411 » return s.TerminalIndex >= 0
354 } 412 }
355 413
356 // Archived returns true if this stream has been archived. A stream is archived 414 // Archived returns true if this stream has been archived.
357 // if it has any of its archival properties set.
358 func (s *LogStream) Archived() bool { 415 func (s *LogStream) Archived() bool {
359 » return s.State >= LSArchived 416 » return s.State.Archived()
360 } 417 }
361 418
362 // ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs 419 // ArchiveComplete returns true if this stream has been archived and all of its
363 // match the current values. 420 // log entries were present.
364 func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { 421 func (s *LogStream) ArchiveComplete() bool {
365 » return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.Arc hiveDataURL == dURL) 422 » return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1))
366 } 423 }
367 424
368 // LoadDescriptor loads the fields in the log stream descriptor into this 425 // LoadDescriptor loads the fields in the log stream descriptor into this
369 // LogStream entry. These fields are: 426 // LogStream entry. These fields are:
370 // - Prefix 427 // - Prefix
371 // - Name 428 // - Name
372 // - ContentType 429 // - ContentType
373 // - StreamType 430 // - StreamType
374 // - Descriptor 431 // - Descriptor
375 // - Timestamp 432 // - Timestamp
376 // - Tags 433 // - Tags
377 func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error { 434 func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error {
435 // If the descriptor's Prefix/Name don't match ours, refuse to load it.
436 if desc.Prefix != s.Prefix {
437 return fmt.Errorf("prefixes don't match (%q != %q)", desc.Prefix , s.Prefix)
438 }
439 if desc.Name != s.Name {
440 return fmt.Errorf("names don't match (%q != %q)", desc.Name, s.N ame)
441 }
442
378 if err := desc.Validate(true); err != nil { 443 if err := desc.Validate(true); err != nil {
379 return fmt.Errorf("invalid descriptor: %v", err) 444 return fmt.Errorf("invalid descriptor: %v", err)
380 } 445 }
381 446
382 pb, err := proto.Marshal(desc) 447 pb, err := proto.Marshal(desc)
383 if err != nil { 448 if err != nil {
384 return fmt.Errorf("failed to marshal descriptor: %v", err) 449 return fmt.Errorf("failed to marshal descriptor: %v", err)
385 } 450 }
386 451
387 s.Prefix = desc.Prefix 452 s.Prefix = desc.Prefix
(...skipping 15 matching lines...) Expand all
403 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor 468 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor
404 // field. It will return an error if the unmarshalling fails. 469 // field. It will return an error if the unmarshalling fails.
405 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { 470 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) {
406 desc := logpb.LogStreamDescriptor{} 471 desc := logpb.LogStreamDescriptor{}
407 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil { 472 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil {
408 return nil, err 473 return nil, err
409 } 474 }
410 return &desc, nil 475 return &desc, nil
411 } 476 }
412 477
478 // SetDSValidate controls whether this LogStream is validated prior to being
479 // read from or written to datastore.
480 //
481 // This is a testing parameter, and should NOT be used in production code.
482 func (s *LogStream) SetDSValidate(v bool) {
483 s.noDSValidate = !v
484 }
485
413 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that 486 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that
414 // it is a valid SHA256 hash and, if so, returns a normalized version that can 487 // it is a valid SHA256 hash and, if so, returns a normalized version that can
415 // be used as a log stream key. 488 // be used as a log stream key.
416 func normalizeHash(v string) (string, error) { 489 func normalizeHash(v string) (string, error) {
417 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size { 490 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size {
418 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec odeSize, sha256.Size) 491 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec odeSize, sha256.Size)
419 } 492 }
420 b, err := hex.DecodeString(v) 493 b, err := hex.DecodeString(v)
421 if err != nil { 494 if err != nil {
422 return "", err 495 return "", err
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
574 // were created before the supplied time. 647 // were created before the supplied time.
575 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query { 648 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query {
576 return q.Lt("Created", t.UTC()).Order("-Created") 649 return q.Lt("Created", t.UTC()).Order("-Created")
577 } 650 }
578 651
579 // AddNewerFilter adds a filter to queries that restricts them to results that 652 // AddNewerFilter adds a filter to queries that restricts them to results that
580 // were created after the supplied time. 653 // were created after the supplied time.
581 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query { 654 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query {
582 return q.Gt("Created", t.UTC()).Order("-Created") 655 return q.Gt("Created", t.UTC()).Order("-Created")
583 } 656 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698