Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: client/internal/logdog/butler/streamserver/handshake.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "bytes"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "io"
13
14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/iotools"
17 "github.com/luci/luci-go/common/logdog/protocol"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/proto/google"
20 "github.com/luci/luci-go/common/recordio"
21 "golang.org/x/net/context"
22 )
23
24 type handshakeProtocol interface {
25 // Handshake executes the protocol handshake, advancing the underlying
26 // stream and returning the derived configuration.
27 //
28 // If the configured stream is invalid, an error will be returned.
29 Handshake(context.Context, io.Reader) (*streamproto.Properties, error)
30 }
31
32 // handshakeProtocolImpl is a Butler handshake protocol that uses a JSON blob to
33 // describe the stream.
34 type handshakeProtocolImpl struct {
35 forceVerbose bool // (Testing) force verbose code path.
36 }
37
38 const (
39 // The maximum size of the header (1MB).
40 maxHeaderSize = 1 * 1024 * 1024
41 )
42
43 // Reads header/magic bytes from the stream. Returns a protocol instance based
44 // on the content of the magic number. If the magic number is invalid, an error
45 // will be returned.
46 func getProtocolFromMagic(ctx context.Context, r io.Reader) (handshakeProtocol, error) {
iannucci 2015/11/12 22:44:24 why is this whole thing split into two functions?
dnj 2015/11/13 20:17:14 Essentially yes. Okay, SGTM Done.
47 // Read the frame header magic number (version)
48 magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic))
49 if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) {
50 return nil, fmt.Errorf("Failed to read frame header magic number : %s", err)
51 }
52
53 // Check the magic number/version
54 if bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) {
55 return &handshakeProtocolImpl{}, nil
56 }
57
58 log.Errorf(log.SetField(ctx, "magic", fmt.Sprintf("%#X", magic)),
59 "Unrecognized frame header magic number.")
60 return nil, errors.New("streamserver: Unknown protocol magic in frame he ader")
61 }
62
63 func (p *handshakeProtocolImpl) defaultFlags() *streamproto.Flags {
64 return &streamproto.Flags{
65 Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT),
66 Tee: streamproto.TeeNone,
67 }
68 }
69
70 func (p *handshakeProtocolImpl) Handshake(ctx context.Context, r io.Reader) (*st reamproto.Properties, error) {
71 // Load the JSON into our descriptor field.
72 flags, err := p.loadFlags(ctx, r)
73 if err != nil {
74 return nil, err
75 }
76
77 props := flags.Properties()
78 if props.Timestamp == nil {
79 props.Timestamp = google.NewTimestamp(clock.Now(ctx))
80 }
81 if err := props.Validate(); err != nil {
82 return nil, err
83 }
84
85 return props, nil
86 }
87
88 func (p *handshakeProtocolImpl) loadFlags(ctx context.Context, r io.Reader) (*st reamproto.Flags, error) {
89 fr := recordio.NewReader(r, maxHeaderSize)
90
91 // Read the header frame.
92 count, hr, err := fr.ReadFrame()
iannucci 2015/11/12 22:44:24 s/count/frameSize ?
dnj 2015/11/13 20:17:14 Done.
93 if err != nil {
94 log.Errorf(log.SetError(ctx, err), "Failed to read header frame. ")
95 return nil, err
96 }
97
98 // When tracing, buffer the JSON data locally so we can emit it via log.
99 headerBuf := bytes.Buffer{}
100 captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose
101 if captureHeader {
102 hr = io.TeeReader(hr, &headerBuf)
103 }
104
105 // When we hand the header reader to the "json" library, we want to coun t how
106 // many bytes it reads from it. We will assert that it has read the full set
107 // of bytes.
108 chr := &iotools.CountingReader{Reader: hr}
109
110 // Decode into our protocol description structure. Note that extra field s
111 // are ignored (no error) and missing fields retain their zero value.
112 f := p.defaultFlags()
113 err = json.NewDecoder(chr).Decode(f)
114 if captureHeader {
115 log.Fields{
116 "size": count,
117 "decodeSize": headerBuf.Len(),
118 }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String())
119 }
120 if err != nil {
121 log.Errorf(log.SetError(ctx, err),
122 "Failed to decode stream description data JSON.")
123 return nil, err
124 }
125
126 // Make sure that this consumed the full JSON size that was specified.
127 //
128 // We use a countReader because the 'json' library doesn't give us a way to
129 // know how many bytes it consumed when it decoded.
130 if chr.Count() != count {
131 return nil, fmt.Errorf("Stream description block was not fully c onsumed (%d != %d)",
132 chr.Count(), count)
133 }
134
135 return f, nil
136 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698