Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package streamserver | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "encoding/json" | |
| 10 "errors" | |
| 11 "fmt" | |
| 12 "io" | |
| 13 | |
| 14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/common/iotools" | |
| 17 "github.com/luci/luci-go/common/logdog/protocol" | |
| 18 log "github.com/luci/luci-go/common/logging" | |
| 19 "github.com/luci/luci-go/common/proto/google" | |
| 20 "github.com/luci/luci-go/common/recordio" | |
| 21 "golang.org/x/net/context" | |
| 22 ) | |
| 23 | |
| 24 type handshakeProtocol interface { | |
| 25 // Handshake executes the protocol handshake, advancing the underlying | |
| 26 // stream and returning the derived configuration. | |
| 27 // | |
| 28 // If the configured stream is invalid, an error will be returned. | |
| 29 Handshake(context.Context, io.Reader) (*streamproto.Properties, error) | |
| 30 } | |
| 31 | |
| 32 // handshakeProtocolImpl is a Butler handshake protocol that uses a JSON blob to | |
| 33 // describe the stream. | |
| 34 type handshakeProtocolImpl struct { | |
| 35 forceVerbose bool // (Testing) force verbose code path. | |
| 36 } | |
| 37 | |
| 38 const ( | |
| 39 // The maximum size of the header (1MB). | |
| 40 maxHeaderSize = 1 * 1024 * 1024 | |
| 41 ) | |
| 42 | |
| 43 // Reads header/magic bytes from the stream. Returns a protocol instance based | |
| 44 // on the content of the magic number. If the magic number is invalid, an error | |
| 45 // will be returned. | |
| 46 func getProtocolFromMagic(ctx context.Context, r io.Reader) (handshakeProtocol, error) { | |
|
iannucci
2015/11/12 22:44:24
why is this whole thing split into two functions?
dnj
2015/11/13 20:17:14
Essentially yes. Okay, SGTM Done.
| |
| 47 // Read the frame header magic number (version) | |
| 48 magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic)) | |
| 49 if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) { | |
| 50 return nil, fmt.Errorf("Failed to read frame header magic number : %s", err) | |
| 51 } | |
| 52 | |
| 53 // Check the magic number/version | |
| 54 if bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) { | |
| 55 return &handshakeProtocolImpl{}, nil | |
| 56 } | |
| 57 | |
| 58 log.Errorf(log.SetField(ctx, "magic", fmt.Sprintf("%#X", magic)), | |
| 59 "Unrecognized frame header magic number.") | |
| 60 return nil, errors.New("streamserver: Unknown protocol magic in frame he ader") | |
| 61 } | |
| 62 | |
| 63 func (p *handshakeProtocolImpl) defaultFlags() *streamproto.Flags { | |
| 64 return &streamproto.Flags{ | |
| 65 Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT), | |
| 66 Tee: streamproto.TeeNone, | |
| 67 } | |
| 68 } | |
| 69 | |
| 70 func (p *handshakeProtocolImpl) Handshake(ctx context.Context, r io.Reader) (*st reamproto.Properties, error) { | |
| 71 // Load the JSON into our descriptor field. | |
| 72 flags, err := p.loadFlags(ctx, r) | |
| 73 if err != nil { | |
| 74 return nil, err | |
| 75 } | |
| 76 | |
| 77 props := flags.Properties() | |
| 78 if props.Timestamp == nil { | |
| 79 props.Timestamp = google.NewTimestamp(clock.Now(ctx)) | |
| 80 } | |
| 81 if err := props.Validate(); err != nil { | |
| 82 return nil, err | |
| 83 } | |
| 84 | |
| 85 return props, nil | |
| 86 } | |
| 87 | |
| 88 func (p *handshakeProtocolImpl) loadFlags(ctx context.Context, r io.Reader) (*st reamproto.Flags, error) { | |
| 89 fr := recordio.NewReader(r, maxHeaderSize) | |
| 90 | |
| 91 // Read the header frame. | |
| 92 count, hr, err := fr.ReadFrame() | |
|
iannucci
2015/11/12 22:44:24
s/count/frameSize ?
dnj
2015/11/13 20:17:14
Done.
| |
| 93 if err != nil { | |
| 94 log.Errorf(log.SetError(ctx, err), "Failed to read header frame. ") | |
| 95 return nil, err | |
| 96 } | |
| 97 | |
| 98 // When tracing, buffer the JSON data locally so we can emit it via log. | |
| 99 headerBuf := bytes.Buffer{} | |
| 100 captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose | |
| 101 if captureHeader { | |
| 102 hr = io.TeeReader(hr, &headerBuf) | |
| 103 } | |
| 104 | |
| 105 // When we hand the header reader to the "json" library, we want to coun t how | |
| 106 // many bytes it reads from it. We will assert that it has read the full set | |
| 107 // of bytes. | |
| 108 chr := &iotools.CountingReader{Reader: hr} | |
| 109 | |
| 110 // Decode into our protocol description structure. Note that extra field s | |
| 111 // are ignored (no error) and missing fields retain their zero value. | |
| 112 f := p.defaultFlags() | |
| 113 err = json.NewDecoder(chr).Decode(f) | |
| 114 if captureHeader { | |
| 115 log.Fields{ | |
| 116 "size": count, | |
| 117 "decodeSize": headerBuf.Len(), | |
| 118 }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String()) | |
| 119 } | |
| 120 if err != nil { | |
| 121 log.Errorf(log.SetError(ctx, err), | |
| 122 "Failed to decode stream description data JSON.") | |
| 123 return nil, err | |
| 124 } | |
| 125 | |
| 126 // Make sure that this consumed the full JSON size that was specified. | |
| 127 // | |
| 128 // We use a countReader because the 'json' library doesn't give us a way to | |
| 129 // know how many bytes it consumed when it decoded. | |
| 130 if chr.Count() != count { | |
| 131 return nil, fmt.Errorf("Stream description block was not fully c onsumed (%d != %d)", | |
| 132 chr.Count(), count) | |
| 133 } | |
| 134 | |
| 135 return f, nil | |
| 136 } | |
| OLD | NEW |