| Index: appengine/logdog/coordinator/logStream.go
|
| diff --git a/appengine/logdog/coordinator/logStream.go b/appengine/logdog/coordinator/logStream.go
|
| index 4285bd3e4f25dd4708d0e45025d41964cb9ca775..6a0d63630b768c22671dae53867215ec231be884 100644
|
| --- a/appengine/logdog/coordinator/logStream.go
|
| +++ b/appengine/logdog/coordinator/logStream.go
|
| @@ -18,20 +18,43 @@ import (
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| )
|
|
|
| +// currentLogStreamSchema is the current schema version of the LogStream.
|
| +// Changes that are not backward-compatible should update this field so
|
| +// migration logic and scripts can translate appropriately.
|
| +const currentLogStreamSchema = "1"
|
| +
|
| // LogStreamState is the archival state of the log stream.
|
| type LogStreamState int
|
|
|
| const (
|
| - // LSPending indicates that no archival has occurred yet.
|
| - LSPending LogStreamState = iota
|
| - // LSTerminated indicates that the log stream has received a terminal index
|
| - // and is awaiting archival.
|
| - LSTerminated
|
| - // LSArchived indicates that the log stream has been successfully archived but
|
| - // has not yet been cleaned up.
|
| + // LSStreaming indicates that the log stream is still streaming. This implies
|
| + // that no terminal index has been identified yet.
|
| + LSStreaming LogStreamState = iota
|
| + // LSArchiveTasked indicates that the log stream has had an archival task
|
| + // generated for it and is awaiting archival.
|
| + LSArchiveTasked
|
| + // LSArchived indicates that the log stream has been successfully archived.
|
| LSArchived
|
| )
|
|
|
| +func (s LogStreamState) String() string {
|
| + switch s {
|
| + case LSStreaming:
|
| + return "STREAMING"
|
| + case LSArchiveTasked:
|
| + return "ARCHIVE_TASKED"
|
| + case LSArchived:
|
| + return "ARCHIVED"
|
| + default:
|
| + return fmt.Sprintf("UNKNOWN(%d)", s)
|
| + }
|
| +}
|
| +
|
| +// Archived returns true if this LogStreamState represents a finished archival.
|
| +func (s LogStreamState) Archived() bool {
|
| + return s >= LSArchived
|
| +}
|
| +
|
| // LogStream is the primary datastore model containing information and state of
|
| // an individual log stream.
|
| //
|
| @@ -63,13 +86,27 @@ const (
|
| //
|
| // Most of the values in QueryBase are static. Those that change can only be
|
| // changed through service endpoint methods.
|
| -//
|
| -// LogStream's QueryBase is authortative.
|
| type LogStream struct {
|
| + // HashID is the LogStream ID. It is generated from the stream's Prefix/Name
|
| + // fields.
|
| + HashID string `gae:"$id"`
|
| +
|
| + // Schema is the datastore schema version for this object. This can be used
|
| + // to facilitate schema migrations.
|
| + //
|
| + // The current schema is currentLogStreamSchema.
|
| + Schema string
|
| +
|
| // Prefix is this log stream's prefix value. Log streams with the same prefix
|
| // are logically grouped.
|
| + //
|
| + // This value should not be changed once populated, as it will invalidate the
|
| + // HashID.
|
| Prefix string
|
| // Name is the unique name of this log stream within the Prefix scope.
|
| + //
|
| + // This value should not be changed once populated, as it will invalidate the
|
| + // HashID.
|
| Name string
|
|
|
| // State is the log stream's current state.
|
| @@ -87,9 +124,12 @@ type LogStream struct {
|
|
|
| // Created is the time when this stream was created.
|
| Created time.Time
|
| - // Updated is the Coordinator's record of when this log stream was last
|
| - // updated.
|
| - Updated time.Time
|
| + // TerminatedTime is the Coordinator's record of when this log stream was
|
| + // terminated.
|
| + TerminatedTime time.Time `gae:",noindex"`
|
| + // ArchivedTime is the Coordinator's record of when this log stream was
|
| + // archived.
|
| + ArchivedTime time.Time `gae:",noindex"`
|
|
|
| // ProtoVersion is the version string of the protobuf, as reported by the
|
| // Collector (and ultimately self-identified by the Butler).
|
| @@ -113,10 +153,22 @@ type LogStream struct {
|
| // Source is the set of source strings sent by the Butler.
|
| Source []string
|
|
|
| - // TerminalIndex is the log stream index of the last log entry in the stream.
|
| - // If the value is -1, the log is still streaming.
|
| + // TerminalIndex is the index of the last log entry in the stream.
|
| + //
|
| + // If this is <0, the log stream is either still streaming or has been
|
| + // archived with no log entries.
|
| TerminalIndex int64 `gae:",noindex"`
|
|
|
| + // ArchiveLogEntryCount is the number of LogEntry records that were archived
|
| + // for this log stream.
|
| + //
|
| + // This is valid only if the log stream is Archived.
|
| + ArchiveLogEntryCount int64 `gae:",noindex"`
|
| + // ArchivalKey is the archival key for this log stream. This is used to
|
| + // differentiate the real archival request from those that were dispatched,
|
| + // but that ultimately failed to update state.
|
| + ArchivalKey []byte `gae:",noindex"`
|
| +
|
| // ArchiveIndexURL is the Google Storage URL where the log stream's index is
|
| // archived.
|
| ArchiveIndexURL string `gae:",noindex"`
|
| @@ -137,23 +189,19 @@ type LogStream struct {
|
| // ArchiveDataSize is the size, in bytes, of the archived data. It will be
|
| // zero if the file is not archived.
|
| ArchiveDataSize int64 `gae:",noindex"`
|
| - // ArchiveWhole is true if archival is complete and the archived log stream
|
| - // was not missing any entries.
|
| - ArchiveWhole bool
|
| -
|
| - // _ causes datastore to ignore unrecognized fields and strip them in future
|
| - // writes.
|
| - _ ds.PropertyMap `gae:"-,extra"`
|
| -
|
| - // hashID is the cached generated ID from the stream's Prefix/Name fields. If
|
| - // this is populated, ID metadata will be retrieved from this field instead of
|
| - // generated.
|
| - hashID string
|
| +
|
| + // extra causes datastore to ignore unrecognized fields and strip them in
|
| + // future writes.
|
| + extra ds.PropertyMap `gae:"-,extra"`
|
| +
|
| + // noDSValidate is a testing parameter to instruct the LogStream not to
|
| + // validate before reading/writing to datastore. It can be controlled by
|
| + // calling SetDSValidate().
|
| + noDSValidate bool
|
| }
|
|
|
| var _ interface {
|
| ds.PropertyLoadSaver
|
| - ds.MetaGetterSetter
|
| } = (*LogStream)(nil)
|
|
|
| // NewLogStream returns a LogStream instance with its ID field initialized based
|
| @@ -181,7 +229,7 @@ func NewLogStream(value string) (*LogStream, error) {
|
| // LogStreamFromID returns an empty LogStream instance with a known hash ID.
|
| func LogStreamFromID(hashID string) *LogStream {
|
| return &LogStream{
|
| - hashID: hashID,
|
| + HashID: hashID,
|
| }
|
| }
|
|
|
| @@ -192,10 +240,12 @@ func LogStreamFromID(hashID string) *LogStream {
|
| func LogStreamFromPath(path types.StreamPath) *LogStream {
|
| // Load the prefix/name fields into the log stream.
|
| prefix, name := path.Split()
|
| - return &LogStream{
|
| + ls := LogStream{
|
| Prefix: string(prefix),
|
| Name: string(name),
|
| }
|
| + ls.recalculateHashID()
|
| + return &ls
|
| }
|
|
|
| // Path returns the LogDog path for this log stream.
|
| @@ -221,11 +271,35 @@ func (s *LogStream) Load(pmap ds.PropertyMap) error {
|
| delete(pmap, k)
|
| }
|
|
|
| - return ds.GetPLS(s).Load(pmap)
|
| + if err := ds.GetPLS(s).Load(pmap); err != nil {
|
| + return err
|
| + }
|
| +
|
| + // Migrate schema (if needed), then validate.
|
| + if err := s.migrateSchema(); err != nil {
|
| + return err
|
| + }
|
| +
|
| + // Validate the log stream. Don't enforce HashID correctness, since
|
| + // datastore hasn't populated that field yet.
|
| + if !s.noDSValidate {
|
| + if err := s.validateImpl(false); err != nil {
|
| + return err
|
| + }
|
| + }
|
| + return nil
|
| }
|
|
|
| // Save implements ds.PropertyLoadSaver.
|
| func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
|
| + if !s.noDSValidate {
|
| + if err := s.validateImpl(true); err != nil {
|
| + return nil, err
|
| + }
|
| + }
|
| + s.Schema = currentLogStreamSchema
|
| +
|
| + // Save default struct fields.
|
| pmap, err := ds.GetPLS(s).Save(withMeta)
|
| if err != nil {
|
| return nil, err
|
| @@ -247,55 +321,35 @@ func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
|
| return pmap, nil
|
| }
|
|
|
| -// GetMeta implements ds.MetaGetterSetter.
|
| -func (s *LogStream) GetMeta(key string) (interface{}, bool) {
|
| - switch key {
|
| - case "id":
|
| - return s.HashID(), true
|
| -
|
| - default:
|
| - return ds.GetPLS(s).GetMeta(key)
|
| - }
|
| +// recalculateHashID calculates the log stream's hash ID from its Prefix/Name
|
| +// fields, which must be populated else this function will panic.
|
| +//
|
| +// The value is loaded into its HashID field.
|
| +func (s *LogStream) recalculateHashID() {
|
| + s.HashID = s.getHashID()
|
| }
|
|
|
| -// GetAllMeta implements ds.MetaGetterSetter.
|
| -func (s *LogStream) GetAllMeta() ds.PropertyMap {
|
| - pmap := ds.GetPLS(s).GetAllMeta()
|
| - pmap.SetMeta("id", ds.MkProperty(s.HashID()))
|
| - return pmap
|
| +// recalculateHashID calculates the log stream's hash ID from its Prefix/Name
|
| +// fields, which must be populated else this function will panic.
|
| +func (s *LogStream) getHashID() string {
|
| + hash := sha256.Sum256([]byte(s.Path()))
|
| + return hex.EncodeToString(hash[:])
|
| }
|
|
|
| -// SetMeta implements ds.MetaGetterSetter.
|
| -func (s *LogStream) SetMeta(key string, val interface{}) bool {
|
| - return ds.GetPLS(s).SetMeta(key, val)
|
| +// Validate evaluates the state and data contents of the LogStream and returns
|
| +// an error if it is invalid.
|
| +func (s *LogStream) Validate() error {
|
| + return s.validateImpl(true)
|
| }
|
|
|
| -// HashID generates and populates the hashID field of a LogStream. This
|
| -// is the hash of the log stream's full path.
|
| -func (s *LogStream) HashID() string {
|
| - if s.hashID == "" {
|
| - if s.Prefix == "" || s.Name == "" {
|
| - panic("cannot generate ID hash: Prefix and Name are not populated")
|
| +func (s *LogStream) validateImpl(enforceHashID bool) error {
|
| + if enforceHashID {
|
| + // Make sure our Prefix and Name match the Hash ID.
|
| + if hid := s.getHashID(); hid != s.HashID {
|
| + return fmt.Errorf("hash IDs don't match (%q != %q)", hid, s.HashID)
|
| }
|
| -
|
| - hash := sha256.Sum256([]byte(s.Path()))
|
| - s.hashID = hex.EncodeToString(hash[:])
|
| - }
|
| - return s.hashID
|
| -}
|
| -
|
| -// Put writes this LogStream to the Datastore. Before writing, it validates that
|
| -// LogStream is complete.
|
| -func (s *LogStream) Put(di ds.Interface) error {
|
| - if err := s.Validate(); err != nil {
|
| - return err
|
| }
|
| - return di.Put(s)
|
| -}
|
|
|
| -// Validate evaluates the state and data contents of the LogStream and returns
|
| -// an error if it is invalid.
|
| -func (s *LogStream) Validate() error {
|
| if err := types.StreamName(s.Prefix).Validate(); err != nil {
|
| return fmt.Errorf("invalid prefix: %s", err)
|
| }
|
| @@ -311,11 +365,12 @@ func (s *LogStream) Validate() error {
|
| if s.Created.IsZero() {
|
| return errors.New("created time is not set")
|
| }
|
| - if s.Updated.IsZero() {
|
| - return errors.New("updated time is not set")
|
| +
|
| + if s.Terminated() && s.TerminatedTime.IsZero() {
|
| + return errors.New("log stream is terminated, but missing terminated time")
|
| }
|
| - if s.Updated.Before(s.Created) {
|
| - return fmt.Errorf("updated time must be >= created time (%s < %s)", s.Updated, s.Created)
|
| + if s.Archived() && s.ArchivedTime.IsZero() {
|
| + return errors.New("log stream is archived, but missing archived time")
|
| }
|
|
|
| switch s.StreamType {
|
| @@ -350,19 +405,21 @@ func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) {
|
|
|
| // Terminated returns true if this stream has been terminated.
|
| func (s *LogStream) Terminated() bool {
|
| - return s.State >= LSTerminated
|
| + if s.Archived() {
|
| + return true
|
| + }
|
| + return s.TerminalIndex >= 0
|
| }
|
|
|
| -// Archived returns true if this stream has been archived. A stream is archived
|
| -// if it has any of its archival properties set.
|
| +// Archived returns true if this stream has been archived.
|
| func (s *LogStream) Archived() bool {
|
| - return s.State >= LSArchived
|
| + return s.State.Archived()
|
| }
|
|
|
| -// ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs
|
| -// match the current values.
|
| -func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool {
|
| - return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.ArchiveDataURL == dURL)
|
| +// ArchiveComplete returns true if this stream has been archived and all of its
|
| +// log entries were present.
|
| +func (s *LogStream) ArchiveComplete() bool {
|
| + return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1))
|
| }
|
|
|
| // LoadDescriptor loads the fields in the log stream descriptor into this
|
| @@ -375,6 +432,14 @@ func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool {
|
| // - Timestamp
|
| // - Tags
|
| func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error {
|
| + // If the descriptor's Prefix/Name don't match ours, refuse to load it.
|
| + if desc.Prefix != s.Prefix {
|
| + return fmt.Errorf("prefixes don't match (%q != %q)", desc.Prefix, s.Prefix)
|
| + }
|
| + if desc.Name != s.Name {
|
| + return fmt.Errorf("names don't match (%q != %q)", desc.Name, s.Name)
|
| + }
|
| +
|
| if err := desc.Validate(true); err != nil {
|
| return fmt.Errorf("invalid descriptor: %v", err)
|
| }
|
| @@ -410,6 +475,14 @@ func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) {
|
| return &desc, nil
|
| }
|
|
|
| +// SetDSValidate controls whether this LogStream is validated prior to being
|
| +// read from or written to datastore.
|
| +//
|
| +// This is a testing parameter, and should NOT be used in production code.
|
| +func (s *LogStream) SetDSValidate(v bool) {
|
| + s.noDSValidate = !v
|
| +}
|
| +
|
| // normalizeHash takes a SHA256 hexadecimal string as input. It validates that
|
| // it is a valid SHA256 hash and, if so, returns a normalized version that can
|
| // be used as a log stream key.
|
|
|