Chromium Code Reviews| Index: appengine/logdog/coordinator/logStream.go |
| diff --git a/appengine/logdog/coordinator/logStream.go b/appengine/logdog/coordinator/logStream.go |
| index 4285bd3e4f25dd4708d0e45025d41964cb9ca775..6a0d63630b768c22671dae53867215ec231be884 100644 |
| --- a/appengine/logdog/coordinator/logStream.go |
| +++ b/appengine/logdog/coordinator/logStream.go |
| @@ -18,20 +18,43 @@ import ( |
| "github.com/luci/luci-go/common/proto/logdog/logpb" |
| ) |
| +// currentLogStreamSchema is the current schema version of the LogStream. |
| +// Changes that are not backward-compatible should update this field so |
| +// migration logic and scripts can translate appropriately. |
| +const currentLogStreamSchema = "1" |
| + |
| // LogStreamState is the archival state of the log stream. |
| type LogStreamState int |
| const ( |
| - // LSPending indicates that no archival has occurred yet. |
| - LSPending LogStreamState = iota |
| - // LSTerminated indicates that the log stream has received a terminal index |
| - // and is awaiting archival. |
| - LSTerminated |
| - // LSArchived indicates that the log stream has been successfully archived but |
| - // has not yet been cleaned up. |
| + // LSStreaming indicates that the log stream is still streaming. This implies |
| + // that no terminal index has been identified yet. |
| + LSStreaming LogStreamState = iota |
| + // LSArchiveTasked indicates that the log stream has had an archival task |
| + // generated for it and is awaiting archival. |
| + LSArchiveTasked |
| + // LSArchived indicates that the log stream has been successfully archived. |
| LSArchived |
| ) |
| +func (s LogStreamState) String() string { |
| + switch s { |
| + case LSStreaming: |
| + return "STREAMING" |
| + case LSArchiveTasked: |
| + return "ARCHIVE_TASKED" |
| + case LSArchived: |
| + return "ARCHIVED" |
| + default: |
| + return fmt.Sprintf("UNKNOWN(%d)", s) |
| + } |
| +} |
| + |
| +// Archived returns true if this LogStreamState represents a finished archival. |
| +func (s LogStreamState) Archived() bool { |
| + return s >= LSArchived |
| +} |
| + |
| // LogStream is the primary datastore model containing information and state of |
| // an individual log stream. |
| // |
| @@ -63,13 +86,27 @@ const ( |
| // |
| // Most of the values in QueryBase are static. Those that change can only be |
| // changed through service endpoint methods. |
| -// |
| -// LogStream's QueryBase is authortative. |
| type LogStream struct { |
| + // HashID is the LogStream ID. It is generated from the stream's Prefix/Name |
| + // fields. |
| + HashID string `gae:"$id"` |
|
dnj
2016/04/11 17:20:04
Note that this is now a mandatory field. I needed
|
| + |
| + // Schema is the datastore schema version for this object. This can be used |
| + // to facilitate schema migrations. |
| + // |
| + // The current schema is currentLogStreamSchema. |
| + Schema string |
| + |
| // Prefix is this log stream's prefix value. Log streams with the same prefix |
| // are logically grouped. |
| + // |
| + // This value should not be changed once populated, as it will invalidate the |
| + // HashID. |
| Prefix string |
| // Name is the unique name of this log stream within the Prefix scope. |
| + // |
| + // This value should not be changed once populated, as it will invalidate the |
| + // HashID. |
| Name string |
| // State is the log stream's current state. |
| @@ -87,9 +124,12 @@ type LogStream struct { |
| // Created is the time when this stream was created. |
| Created time.Time |
| - // Updated is the Coordinator's record of when this log stream was last |
| - // updated. |
| - Updated time.Time |
| + // TerminatedTime is the Coordinator's record of when this log stream was |
| + // terminated. |
| + TerminatedTime time.Time `gae:",noindex"` |
| + // ArchivedTime is the Coordinator's record of when this log stream was |
| + // archived. |
| + ArchivedTime time.Time `gae:",noindex"` |
| // ProtoVersion is the version string of the protobuf, as reported by the |
| // Collector (and ultimately self-identified by the Butler). |
| @@ -113,10 +153,22 @@ type LogStream struct { |
| // Source is the set of source strings sent by the Butler. |
| Source []string |
| - // TerminalIndex is the log stream index of the last log entry in the stream. |
| - // If the value is -1, the log is still streaming. |
| + // TerminalIndex is the index of the last log entry in the stream. |
| + // |
| + // If this is <0, the log stream is either still streaming or has been |
| + // archived with no log entries. |
| TerminalIndex int64 `gae:",noindex"` |
| + // ArchiveLogEntryCount is the number of LogEntry records that were archived |
| + // for this log stream. |
| + // |
| + // This is valid only if the log stream is Archived. |
| + ArchiveLogEntryCount int64 `gae:",noindex"` |
| + // ArchivalKey is the archival key for this log stream. This is used to |
| + // differentiate the real archival request from those that were dispatched, |
| + // but that ultimately failed to update state. |
| + ArchivalKey []byte `gae:",noindex"` |
| + |
| // ArchiveIndexURL is the Google Storage URL where the log stream's index is |
| // archived. |
| ArchiveIndexURL string `gae:",noindex"` |
| @@ -137,23 +189,19 @@ type LogStream struct { |
| // ArchiveDataSize is the size, in bytes, of the archived data. It will be |
| // zero if the file is not archived. |
| ArchiveDataSize int64 `gae:",noindex"` |
| - // ArchiveWhole is true if archival is complete and the archived log stream |
| - // was not missing any entries. |
| - ArchiveWhole bool |
| - |
| - // _ causes datastore to ignore unrecognized fields and strip them in future |
| - // writes. |
| - _ ds.PropertyMap `gae:"-,extra"` |
| - |
| - // hashID is the cached generated ID from the stream's Prefix/Name fields. If |
| - // this is populated, ID metadata will be retrieved from this field instead of |
| - // generated. |
| - hashID string |
| + |
| + // extra causes datastore to ignore unrecognized fields and strip them in |
| + // future writes. |
| + extra ds.PropertyMap `gae:"-,extra"` |
| + |
| + // noDSValidate is a testing parameter to instruct the LogStream not to |
| + // validate before reading/writing to datastore. It can be controlled by |
| + // calling SetDSValidate(). |
| + noDSValidate bool |
| } |
| var _ interface { |
| ds.PropertyLoadSaver |
| - ds.MetaGetterSetter |
| } = (*LogStream)(nil) |
| // NewLogStream returns a LogStream instance with its ID field initialized based |
| @@ -181,7 +229,7 @@ func NewLogStream(value string) (*LogStream, error) { |
| // LogStreamFromID returns an empty LogStream instance with a known hash ID. |
| func LogStreamFromID(hashID string) *LogStream { |
| return &LogStream{ |
| - hashID: hashID, |
| + HashID: hashID, |
| } |
| } |
| @@ -192,10 +240,12 @@ func LogStreamFromID(hashID string) *LogStream { |
| func LogStreamFromPath(path types.StreamPath) *LogStream { |
| // Load the prefix/name fields into the log stream. |
| prefix, name := path.Split() |
| - return &LogStream{ |
| + ls := LogStream{ |
| Prefix: string(prefix), |
| Name: string(name), |
| } |
| + ls.recalculateHashID() |
| + return &ls |
| } |
| // Path returns the LogDog path for this log stream. |
| @@ -221,11 +271,35 @@ func (s *LogStream) Load(pmap ds.PropertyMap) error { |
| delete(pmap, k) |
| } |
| - return ds.GetPLS(s).Load(pmap) |
| + if err := ds.GetPLS(s).Load(pmap); err != nil { |
| + return err |
| + } |
| + |
| + // Migrate schema (if needed), then validate. |
| + if err := s.migrateSchema(); err != nil { |
| + return err |
| + } |
| + |
| + // Validate the log stream. Don't enforce HashID correctness, since |
| + // datastore hasn't populated that field yet. |
| + if !s.noDSValidate { |
| + if err := s.validateImpl(false); err != nil { |
| + return err |
| + } |
| + } |
| + return nil |
| } |
| // Save implements ds.PropertyLoadSaver. |
| func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { |
| + if !s.noDSValidate { |
| + if err := s.validateImpl(true); err != nil { |
| + return nil, err |
| + } |
| + } |
| + s.Schema = currentLogStreamSchema |
| + |
| + // Save default struct fields. |
| pmap, err := ds.GetPLS(s).Save(withMeta) |
| if err != nil { |
| return nil, err |
| @@ -247,55 +321,35 @@ func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { |
| return pmap, nil |
| } |
| -// GetMeta implements ds.MetaGetterSetter. |
| -func (s *LogStream) GetMeta(key string) (interface{}, bool) { |
| - switch key { |
| - case "id": |
| - return s.HashID(), true |
| - |
| - default: |
| - return ds.GetPLS(s).GetMeta(key) |
| - } |
| +// recalculateHashID calculates the log stream's hash ID from its Prefix/Name |
| +// fields, which must be populated else this function will panic. |
| +// |
| +// The value is loaded into its HashID field. |
| +func (s *LogStream) recalculateHashID() { |
| + s.HashID = s.getHashID() |
| } |
| -// GetAllMeta implements ds.MetaGetterSetter. |
| -func (s *LogStream) GetAllMeta() ds.PropertyMap { |
| - pmap := ds.GetPLS(s).GetAllMeta() |
| - pmap.SetMeta("id", ds.MkProperty(s.HashID())) |
| - return pmap |
| +// recalculateHashID calculates the log stream's hash ID from its Prefix/Name |
| +// fields, which must be populated else this function will panic. |
| +func (s *LogStream) getHashID() string { |
| + hash := sha256.Sum256([]byte(s.Path())) |
| + return hex.EncodeToString(hash[:]) |
| } |
| -// SetMeta implements ds.MetaGetterSetter. |
| -func (s *LogStream) SetMeta(key string, val interface{}) bool { |
| - return ds.GetPLS(s).SetMeta(key, val) |
| +// Validate evaluates the state and data contents of the LogStream and returns |
| +// an error if it is invalid. |
| +func (s *LogStream) Validate() error { |
| + return s.validateImpl(true) |
| } |
| -// HashID generates and populates the hashID field of a LogStream. This |
| -// is the hash of the log stream's full path. |
| -func (s *LogStream) HashID() string { |
| - if s.hashID == "" { |
| - if s.Prefix == "" || s.Name == "" { |
| - panic("cannot generate ID hash: Prefix and Name are not populated") |
| +func (s *LogStream) validateImpl(enforceHashID bool) error { |
| + if enforceHashID { |
| + // Make sure our Prefix and Name match the Hash ID. |
| + if hid := s.getHashID(); hid != s.HashID { |
| + return fmt.Errorf("hash IDs don't match (%q != %q)", hid, s.HashID) |
| } |
| - |
| - hash := sha256.Sum256([]byte(s.Path())) |
| - s.hashID = hex.EncodeToString(hash[:]) |
| - } |
| - return s.hashID |
| -} |
| - |
| -// Put writes this LogStream to the Datastore. Before writing, it validates that |
| -// LogStream is complete. |
| -func (s *LogStream) Put(di ds.Interface) error { |
| - if err := s.Validate(); err != nil { |
| - return err |
| } |
| - return di.Put(s) |
| -} |
| -// Validate evaluates the state and data contents of the LogStream and returns |
| -// an error if it is invalid. |
| -func (s *LogStream) Validate() error { |
| if err := types.StreamName(s.Prefix).Validate(); err != nil { |
| return fmt.Errorf("invalid prefix: %s", err) |
| } |
| @@ -311,11 +365,12 @@ func (s *LogStream) Validate() error { |
| if s.Created.IsZero() { |
| return errors.New("created time is not set") |
| } |
| - if s.Updated.IsZero() { |
| - return errors.New("updated time is not set") |
| + |
| + if s.Terminated() && s.TerminatedTime.IsZero() { |
| + return errors.New("log stream is terminated, but missing terminated time") |
| } |
| - if s.Updated.Before(s.Created) { |
| - return fmt.Errorf("updated time must be >= created time (%s < %s)", s.Updated, s.Created) |
| + if s.Archived() && s.ArchivedTime.IsZero() { |
| + return errors.New("log stream is archived, but missing archived time") |
| } |
| switch s.StreamType { |
| @@ -350,19 +405,21 @@ func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { |
| // Terminated returns true if this stream has been terminated. |
| func (s *LogStream) Terminated() bool { |
| - return s.State >= LSTerminated |
| + if s.Archived() { |
| + return true |
| + } |
| + return s.TerminalIndex >= 0 |
| } |
| -// Archived returns true if this stream has been archived. A stream is archived |
| -// if it has any of its archival properties set. |
| +// Archived returns true if this stream has been archived. |
| func (s *LogStream) Archived() bool { |
| - return s.State >= LSArchived |
| + return s.State.Archived() |
| } |
| -// ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs |
| -// match the current values. |
| -func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { |
| - return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.ArchiveDataURL == dURL) |
| +// ArchiveComplete returns true if this stream has been archived and all of its |
| +// log entries were present. |
| +func (s *LogStream) ArchiveComplete() bool { |
| + return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1)) |
| } |
| // LoadDescriptor loads the fields in the log stream descriptor into this |
| @@ -375,6 +432,14 @@ func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { |
| // - Timestamp |
| // - Tags |
| func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error { |
| + // If the descriptor's Prefix/Name don't match ours, refuse to load it. |
| + if desc.Prefix != s.Prefix { |
| + return fmt.Errorf("prefixes don't match (%q != %q)", desc.Prefix, s.Prefix) |
| + } |
| + if desc.Name != s.Name { |
| + return fmt.Errorf("names don't match (%q != %q)", desc.Name, s.Name) |
| + } |
| + |
| if err := desc.Validate(true); err != nil { |
| return fmt.Errorf("invalid descriptor: %v", err) |
| } |
| @@ -410,6 +475,14 @@ func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { |
| return &desc, nil |
| } |
| +// SetDSValidate controls whether this LogStream is validated prior to being |
| +// read from or written to datastore. |
| +// |
| +// This is a testing parameter, and should NOT be used in production code. |
| +func (s *LogStream) SetDSValidate(v bool) { |
| + s.noDSValidate = !v |
| +} |
| + |
| // normalizeHash takes a SHA256 hexadecimal string as input. It validates that |
| // it is a valid SHA256 hash and, if so, returns a normalized version that can |
| // be used as a log stream key. |