Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Unified Diff: appengine/logdog/coordinator/logStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/logStream.go
diff --git a/appengine/logdog/coordinator/logStream.go b/appengine/logdog/coordinator/logStream.go
index 4285bd3e4f25dd4708d0e45025d41964cb9ca775..6a0d63630b768c22671dae53867215ec231be884 100644
--- a/appengine/logdog/coordinator/logStream.go
+++ b/appengine/logdog/coordinator/logStream.go
@@ -18,20 +18,43 @@ import (
"github.com/luci/luci-go/common/proto/logdog/logpb"
)
+// currentLogStreamSchema is the current schema version of the LogStream.
+// Changes that are not backward-compatible should update this field so
+// migration logic and scripts can translate appropriately.
+const currentLogStreamSchema = "1"
+
// LogStreamState is the archival state of the log stream.
type LogStreamState int
const (
- // LSPending indicates that no archival has occurred yet.
- LSPending LogStreamState = iota
- // LSTerminated indicates that the log stream has received a terminal index
- // and is awaiting archival.
- LSTerminated
- // LSArchived indicates that the log stream has been successfully archived but
- // has not yet been cleaned up.
+ // LSStreaming indicates that the log stream is still streaming. This implies
+ // that no terminal index has been identified yet.
+ LSStreaming LogStreamState = iota
+ // LSArchiveTasked indicates that the log stream has had an archival task
+ // generated for it and is awaiting archival.
+ LSArchiveTasked
+ // LSArchived indicates that the log stream has been successfully archived.
LSArchived
)
+func (s LogStreamState) String() string {
+ switch s {
+ case LSStreaming:
+ return "STREAMING"
+ case LSArchiveTasked:
+ return "ARCHIVE_TASKED"
+ case LSArchived:
+ return "ARCHIVED"
+ default:
+ return fmt.Sprintf("UNKNOWN(%d)", s)
+ }
+}
+
+// Archived returns true if this LogStreamState represents a finished archival.
+func (s LogStreamState) Archived() bool {
+ return s >= LSArchived
+}
+
// LogStream is the primary datastore model containing information and state of
// an individual log stream.
//
@@ -63,13 +86,27 @@ const (
//
// Most of the values in QueryBase are static. Those that change can only be
// changed through service endpoint methods.
-//
-// LogStream's QueryBase is authortative.
type LogStream struct {
+ // HashID is the LogStream ID. It is generated from the stream's Prefix/Name
+ // fields.
+ HashID string `gae:"$id"`
+
+ // Schema is the datastore schema version for this object. This can be used
+ // to facilitate schema migrations.
+ //
+ // The current schema is currentLogStreamSchema.
+ Schema string
+
// Prefix is this log stream's prefix value. Log streams with the same prefix
// are logically grouped.
+ //
+ // This value should not be changed once populated, as it will invalidate the
+ // HashID.
Prefix string
// Name is the unique name of this log stream within the Prefix scope.
+ //
+ // This value should not be changed once populated, as it will invalidate the
+ // HashID.
Name string
// State is the log stream's current state.
@@ -87,9 +124,12 @@ type LogStream struct {
// Created is the time when this stream was created.
Created time.Time
- // Updated is the Coordinator's record of when this log stream was last
- // updated.
- Updated time.Time
+ // TerminatedTime is the Coordinator's record of when this log stream was
+ // terminated.
+ TerminatedTime time.Time `gae:",noindex"`
+ // ArchivedTime is the Coordinator's record of when this log stream was
+ // archived.
+ ArchivedTime time.Time `gae:",noindex"`
// ProtoVersion is the version string of the protobuf, as reported by the
// Collector (and ultimately self-identified by the Butler).
@@ -113,10 +153,22 @@ type LogStream struct {
// Source is the set of source strings sent by the Butler.
Source []string
- // TerminalIndex is the log stream index of the last log entry in the stream.
- // If the value is -1, the log is still streaming.
+ // TerminalIndex is the index of the last log entry in the stream.
+ //
+ // If this is <0, the log stream is either still streaming or has been
+ // archived with no log entries.
TerminalIndex int64 `gae:",noindex"`
+ // ArchiveLogEntryCount is the number of LogEntry records that were archived
+ // for this log stream.
+ //
+ // This is valid only if the log stream is Archived.
+ ArchiveLogEntryCount int64 `gae:",noindex"`
+ // ArchivalKey is the archival key for this log stream. This is used to
+ // differentiate the real archival request from those that were dispatched,
+ // but that ultimately failed to update state.
+ ArchivalKey []byte `gae:",noindex"`
+
// ArchiveIndexURL is the Google Storage URL where the log stream's index is
// archived.
ArchiveIndexURL string `gae:",noindex"`
@@ -137,23 +189,19 @@ type LogStream struct {
// ArchiveDataSize is the size, in bytes, of the archived data. It will be
// zero if the file is not archived.
ArchiveDataSize int64 `gae:",noindex"`
- // ArchiveWhole is true if archival is complete and the archived log stream
- // was not missing any entries.
- ArchiveWhole bool
-
- // _ causes datastore to ignore unrecognized fields and strip them in future
- // writes.
- _ ds.PropertyMap `gae:"-,extra"`
-
- // hashID is the cached generated ID from the stream's Prefix/Name fields. If
- // this is populated, ID metadata will be retrieved from this field instead of
- // generated.
- hashID string
+
+ // extra causes datastore to ignore unrecognized fields and strip them in
+ // future writes.
+ extra ds.PropertyMap `gae:"-,extra"`
+
+ // noDSValidate is a testing parameter to instruct the LogStream not to
+ // validate before reading/writing to datastore. It can be controlled by
+ // calling SetDSValidate().
+ noDSValidate bool
}
var _ interface {
ds.PropertyLoadSaver
- ds.MetaGetterSetter
} = (*LogStream)(nil)
// NewLogStream returns a LogStream instance with its ID field initialized based
@@ -181,7 +229,7 @@ func NewLogStream(value string) (*LogStream, error) {
// LogStreamFromID returns an empty LogStream instance with a known hash ID.
func LogStreamFromID(hashID string) *LogStream {
return &LogStream{
- hashID: hashID,
+ HashID: hashID,
}
}
@@ -192,10 +240,12 @@ func LogStreamFromID(hashID string) *LogStream {
func LogStreamFromPath(path types.StreamPath) *LogStream {
// Load the prefix/name fields into the log stream.
prefix, name := path.Split()
- return &LogStream{
+ ls := LogStream{
Prefix: string(prefix),
Name: string(name),
}
+ ls.recalculateHashID()
+ return &ls
}
// Path returns the LogDog path for this log stream.
@@ -221,11 +271,35 @@ func (s *LogStream) Load(pmap ds.PropertyMap) error {
delete(pmap, k)
}
- return ds.GetPLS(s).Load(pmap)
+ if err := ds.GetPLS(s).Load(pmap); err != nil {
+ return err
+ }
+
+ // Migrate schema (if needed), then validate.
+ if err := s.migrateSchema(); err != nil {
+ return err
+ }
+
+ // Validate the log stream. Don't enforce HashID correctness, since
+ // datastore hasn't populated that field yet.
+ if !s.noDSValidate {
+ if err := s.validateImpl(false); err != nil {
+ return err
+ }
+ }
+ return nil
}
// Save implements ds.PropertyLoadSaver.
func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
+ if !s.noDSValidate {
+ if err := s.validateImpl(true); err != nil {
+ return nil, err
+ }
+ }
+ s.Schema = currentLogStreamSchema
+
+ // Save default struct fields.
pmap, err := ds.GetPLS(s).Save(withMeta)
if err != nil {
return nil, err
@@ -247,55 +321,35 @@ func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
return pmap, nil
}
-// GetMeta implements ds.MetaGetterSetter.
-func (s *LogStream) GetMeta(key string) (interface{}, bool) {
- switch key {
- case "id":
- return s.HashID(), true
-
- default:
- return ds.GetPLS(s).GetMeta(key)
- }
+// recalculateHashID calculates the log stream's hash ID from its Prefix/Name
+// fields, which must be populated else this function will panic.
+//
+// The value is loaded into its HashID field.
+func (s *LogStream) recalculateHashID() {
+ s.HashID = s.getHashID()
}
-// GetAllMeta implements ds.MetaGetterSetter.
-func (s *LogStream) GetAllMeta() ds.PropertyMap {
- pmap := ds.GetPLS(s).GetAllMeta()
- pmap.SetMeta("id", ds.MkProperty(s.HashID()))
- return pmap
+// recalculateHashID calculates the log stream's hash ID from its Prefix/Name
+// fields, which must be populated else this function will panic.
+func (s *LogStream) getHashID() string {
+ hash := sha256.Sum256([]byte(s.Path()))
+ return hex.EncodeToString(hash[:])
}
-// SetMeta implements ds.MetaGetterSetter.
-func (s *LogStream) SetMeta(key string, val interface{}) bool {
- return ds.GetPLS(s).SetMeta(key, val)
+// Validate evaluates the state and data contents of the LogStream and returns
+// an error if it is invalid.
+func (s *LogStream) Validate() error {
+ return s.validateImpl(true)
}
-// HashID generates and populates the hashID field of a LogStream. This
-// is the hash of the log stream's full path.
-func (s *LogStream) HashID() string {
- if s.hashID == "" {
- if s.Prefix == "" || s.Name == "" {
- panic("cannot generate ID hash: Prefix and Name are not populated")
+func (s *LogStream) validateImpl(enforceHashID bool) error {
+ if enforceHashID {
+ // Make sure our Prefix and Name match the Hash ID.
+ if hid := s.getHashID(); hid != s.HashID {
+ return fmt.Errorf("hash IDs don't match (%q != %q)", hid, s.HashID)
}
-
- hash := sha256.Sum256([]byte(s.Path()))
- s.hashID = hex.EncodeToString(hash[:])
- }
- return s.hashID
-}
-
-// Put writes this LogStream to the Datastore. Before writing, it validates that
-// LogStream is complete.
-func (s *LogStream) Put(di ds.Interface) error {
- if err := s.Validate(); err != nil {
- return err
}
- return di.Put(s)
-}
-// Validate evaluates the state and data contents of the LogStream and returns
-// an error if it is invalid.
-func (s *LogStream) Validate() error {
if err := types.StreamName(s.Prefix).Validate(); err != nil {
return fmt.Errorf("invalid prefix: %s", err)
}
@@ -311,11 +365,12 @@ func (s *LogStream) Validate() error {
if s.Created.IsZero() {
return errors.New("created time is not set")
}
- if s.Updated.IsZero() {
- return errors.New("updated time is not set")
+
+ if s.Terminated() && s.TerminatedTime.IsZero() {
+ return errors.New("log stream is terminated, but missing terminated time")
}
- if s.Updated.Before(s.Created) {
- return fmt.Errorf("updated time must be >= created time (%s < %s)", s.Updated, s.Created)
+ if s.Archived() && s.ArchivedTime.IsZero() {
+ return errors.New("log stream is archived, but missing archived time")
}
switch s.StreamType {
@@ -350,19 +405,21 @@ func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) {
// Terminated returns true if this stream has been terminated.
func (s *LogStream) Terminated() bool {
- return s.State >= LSTerminated
+ if s.Archived() {
+ return true
+ }
+ return s.TerminalIndex >= 0
}
-// Archived returns true if this stream has been archived. A stream is archived
-// if it has any of its archival properties set.
+// Archived returns true if this stream has been archived.
func (s *LogStream) Archived() bool {
- return s.State >= LSArchived
+ return s.State.Archived()
}
-// ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs
-// match the current values.
-func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool {
- return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.ArchiveDataURL == dURL)
+// ArchiveComplete returns true if this stream has been archived and all of its
+// log entries were present.
+func (s *LogStream) ArchiveComplete() bool {
+ return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1))
}
// LoadDescriptor loads the fields in the log stream descriptor into this
@@ -375,6 +432,14 @@ func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool {
// - Timestamp
// - Tags
func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error {
+ // If the descriptor's Prefix/Name don't match ours, refuse to load it.
+ if desc.Prefix != s.Prefix {
+ return fmt.Errorf("prefixes don't match (%q != %q)", desc.Prefix, s.Prefix)
+ }
+ if desc.Name != s.Name {
+ return fmt.Errorf("names don't match (%q != %q)", desc.Name, s.Name)
+ }
+
if err := desc.Validate(true); err != nil {
return fmt.Errorf("invalid descriptor: %v", err)
}
@@ -410,6 +475,14 @@ func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) {
return &desc, nil
}
+// SetDSValidate controls whether this LogStream is validated prior to being
+// read from or written to datastore.
+//
+// This is a testing parameter, and should NOT be used in production code.
+func (s *LogStream) SetDSValidate(v bool) {
+ s.noDSValidate = !v
+}
+
// normalizeHash takes a SHA256 hexadecimal string as input. It validates that
// it is a valid SHA256 hash and, if so, returns a normalized version that can
// be used as a log stream key.

Powered by Google App Engine
This is Rietveld 408576698