Chromium Code Reviews| Index: client/internal/logdog/butler/streamserver/handshake.go |
| diff --git a/client/internal/logdog/butler/streamserver/handshake.go b/client/internal/logdog/butler/streamserver/handshake.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9c0a073273196698a65b44db34499582c3c0c804 |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/streamserver/handshake.go |
| @@ -0,0 +1,136 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package streamserver |
| + |
| +import ( |
| + "bytes" |
| + "encoding/json" |
| + "errors" |
| + "fmt" |
| + "io" |
| + |
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/iotools" |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| + "github.com/luci/luci-go/common/recordio" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +type handshakeProtocol interface { |
| + // Handshake executes the protocol handshake, advancing the underlying |
| + // stream and returning the derived configuration. |
| + // |
| + // If the configured stream is invalid, an error will be returned. |
| + Handshake(context.Context, io.Reader) (*streamproto.Properties, error) |
| +} |
| + |
| +// handshakeProtocolImpl is a Butler handshake protocol that uses a JSON blob to |
| +// describe the stream. |
| +type handshakeProtocolImpl struct { |
| + forceVerbose bool // (Testing) force verbose code path. |
| +} |
| + |
| +const ( |
| + // The maximum size of the header (1MB). |
| + maxHeaderSize = 1 * 1024 * 1024 |
| +) |
| + |
| +// Reads header/magic bytes from the stream. Returns a protocol instance based |
| +// on the content of the magic number. If the magic number is invalid, an error |
| +// will be returned. |
| +func getProtocolFromMagic(ctx context.Context, r io.Reader) (handshakeProtocol, error) { |
|
iannucci
2015/11/12 22:44:24
why is this whole thing split into two functions?
dnj
2015/11/13 20:17:14
Essentially yes. Okay, SGTM Done.
|
| + // Read the frame header magic number (version) |
| + magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic)) |
| + if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) { |
| + return nil, fmt.Errorf("Failed to read frame header magic number: %s", err) |
| + } |
| + |
| + // Check the magic number/version |
| + if bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) { |
| + return &handshakeProtocolImpl{}, nil |
| + } |
| + |
| + log.Errorf(log.SetField(ctx, "magic", fmt.Sprintf("%#X", magic)), |
| + "Unrecognized frame header magic number.") |
| + return nil, errors.New("streamserver: Unknown protocol magic in frame header") |
| +} |
| + |
| +func (p *handshakeProtocolImpl) defaultFlags() *streamproto.Flags { |
| + return &streamproto.Flags{ |
| + Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT), |
| + Tee: streamproto.TeeNone, |
| + } |
| +} |
| + |
| +func (p *handshakeProtocolImpl) Handshake(ctx context.Context, r io.Reader) (*streamproto.Properties, error) { |
| + // Load the JSON into our descriptor field. |
| + flags, err := p.loadFlags(ctx, r) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + props := flags.Properties() |
| + if props.Timestamp == nil { |
| + props.Timestamp = google.NewTimestamp(clock.Now(ctx)) |
| + } |
| + if err := props.Validate(); err != nil { |
| + return nil, err |
| + } |
| + |
| + return props, nil |
| +} |
| + |
| +func (p *handshakeProtocolImpl) loadFlags(ctx context.Context, r io.Reader) (*streamproto.Flags, error) { |
| + fr := recordio.NewReader(r, maxHeaderSize) |
| + |
| + // Read the header frame. |
| + count, hr, err := fr.ReadFrame() |
|
iannucci
2015/11/12 22:44:24
s/count/frameSize ?
dnj
2015/11/13 20:17:14
Done.
|
| + if err != nil { |
| + log.Errorf(log.SetError(ctx, err), "Failed to read header frame.") |
| + return nil, err |
| + } |
| + |
| + // When tracing, buffer the JSON data locally so we can emit it via log. |
| + headerBuf := bytes.Buffer{} |
| + captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose |
| + if captureHeader { |
| + hr = io.TeeReader(hr, &headerBuf) |
| + } |
| + |
| + // When we hand the header reader to the "json" library, we want to count how |
| + // many bytes it reads from it. We will assert that it has read the full set |
| + // of bytes. |
| + chr := &iotools.CountingReader{Reader: hr} |
| + |
| + // Decode into our protocol description structure. Note that extra fields |
| + // are ignored (no error) and missing fields retain their zero value. |
| + f := p.defaultFlags() |
| + err = json.NewDecoder(chr).Decode(f) |
| + if captureHeader { |
| + log.Fields{ |
| + "size": count, |
| + "decodeSize": headerBuf.Len(), |
| + }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String()) |
| + } |
| + if err != nil { |
| + log.Errorf(log.SetError(ctx, err), |
| + "Failed to decode stream description data JSON.") |
| + return nil, err |
| + } |
| + |
| + // Make sure that this consumed the full JSON size that was specified. |
| + // |
| + // We use a countReader because the 'json' library doesn't give us a way to |
| + // know how many bytes it consumed when it decoded. |
| + if chr.Count() != count { |
| + return nil, fmt.Errorf("Stream description block was not fully consumed (%d != %d)", |
| + chr.Count(), count) |
| + } |
| + |
| + return f, nil |
| +} |