Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7380)

Unified Diff: client/internal/logdog/butler/streamserver/handshake.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/streamserver/handshake.go
diff --git a/client/internal/logdog/butler/streamserver/handshake.go b/client/internal/logdog/butler/streamserver/handshake.go
new file mode 100644
index 0000000000000000000000000000000000000000..9c0a073273196698a65b44db34499582c3c0c804
--- /dev/null
+++ b/client/internal/logdog/butler/streamserver/handshake.go
@@ -0,0 +1,136 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package streamserver
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/iotools"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/recordio"
+ "golang.org/x/net/context"
+)
+
+type handshakeProtocol interface {
+ // Handshake executes the protocol handshake, advancing the underlying
+ // stream and returning the derived configuration.
+ //
+ // If the configured stream is invalid, an error will be returned.
+ Handshake(context.Context, io.Reader) (*streamproto.Properties, error)
+}
+
+// handshakeProtocolImpl is a Butler handshake protocol that uses a JSON blob to
+// describe the stream.
+type handshakeProtocolImpl struct {
+ forceVerbose bool // (Testing) force verbose code path.
+}
+
+const (
+ // The maximum size of the header (1MB).
+ maxHeaderSize = 1 * 1024 * 1024
+)
+
+// Reads header/magic bytes from the stream. Returns a protocol instance based
+// on the content of the magic number. If the magic number is invalid, an error
+// will be returned.
+func getProtocolFromMagic(ctx context.Context, r io.Reader) (handshakeProtocol, error) {
iannucci 2015/11/12 22:44:24 why is this whole thing split into two functions?
dnj 2015/11/13 20:17:14 Essentially yes. Okay, SGTM Done.
+ // Read the frame header magic number (version)
+ magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic))
+ if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) {
+ return nil, fmt.Errorf("Failed to read frame header magic number: %s", err)
+ }
+
+ // Check the magic number/version
+ if bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) {
+ return &handshakeProtocolImpl{}, nil
+ }
+
+ log.Errorf(log.SetField(ctx, "magic", fmt.Sprintf("%#X", magic)),
+ "Unrecognized frame header magic number.")
+ return nil, errors.New("streamserver: Unknown protocol magic in frame header")
+}
+
+func (p *handshakeProtocolImpl) defaultFlags() *streamproto.Flags {
+ return &streamproto.Flags{
+ Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT),
+ Tee: streamproto.TeeNone,
+ }
+}
+
+func (p *handshakeProtocolImpl) Handshake(ctx context.Context, r io.Reader) (*streamproto.Properties, error) {
+ // Load the JSON into our descriptor field.
+ flags, err := p.loadFlags(ctx, r)
+ if err != nil {
+ return nil, err
+ }
+
+ props := flags.Properties()
+ if props.Timestamp == nil {
+ props.Timestamp = google.NewTimestamp(clock.Now(ctx))
+ }
+ if err := props.Validate(); err != nil {
+ return nil, err
+ }
+
+ return props, nil
+}
+
+func (p *handshakeProtocolImpl) loadFlags(ctx context.Context, r io.Reader) (*streamproto.Flags, error) {
+ fr := recordio.NewReader(r, maxHeaderSize)
+
+ // Read the header frame.
+ count, hr, err := fr.ReadFrame()
iannucci 2015/11/12 22:44:24 s/count/frameSize ?
dnj 2015/11/13 20:17:14 Done.
+ if err != nil {
+ log.Errorf(log.SetError(ctx, err), "Failed to read header frame.")
+ return nil, err
+ }
+
+ // When tracing, buffer the JSON data locally so we can emit it via log.
+ headerBuf := bytes.Buffer{}
+ captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose
+ if captureHeader {
+ hr = io.TeeReader(hr, &headerBuf)
+ }
+
+ // When we hand the header reader to the "json" library, we want to count how
+ // many bytes it reads from it. We will assert that it has read the full set
+ // of bytes.
+ chr := &iotools.CountingReader{Reader: hr}
+
+ // Decode into our protocol description structure. Note that extra fields
+ // are ignored (no error) and missing fields retain their zero value.
+ f := p.defaultFlags()
+ err = json.NewDecoder(chr).Decode(f)
+ if captureHeader {
+ log.Fields{
+ "size": count,
+ "decodeSize": headerBuf.Len(),
+ }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String())
+ }
+ if err != nil {
+ log.Errorf(log.SetError(ctx, err),
+ "Failed to decode stream description data JSON.")
+ return nil, err
+ }
+
+ // Make sure that this consumed the full JSON size that was specified.
+ //
+ // We use a countReader because the 'json' library doesn't give us a way to
+ // know how many bytes it consumed when it decoded.
+ if chr.Count() != count {
+ return nil, fmt.Errorf("Stream description block was not fully consumed (%d != %d)",
+ chr.Count(), count)
+ }
+
+ return f, nil
+}

Powered by Google App Engine
This is Rietveld 408576698