Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
|
iannucci
2015/11/12 22:44:24
why not butlerlib/stream/client and butlerlib/stre
dnj
2015/11/13 20:17:15
Mainly b/c on import, "streamclient" package is mo
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package streamclient | |
| 6 | |
| 7 import ( | |
| 8 "encoding/json" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 | |
| 12 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | |
| 13 ) | |
| 14 | |
| 15 type clientFactory func(string) (Client, error) | |
| 16 | |
| 17 var ( | |
| 18 // ProtocolFrameHeaderMagic is the number at the beginning of streams th at | |
| 19 // identifies the stream handshake version. | |
| 20 // | |
| 21 // This serves two purposes: | |
| 22 // - To disambiguate a Butler stream from some happenstance string of bytes | |
| 23 // (which probably won't start with these characters). | |
| 24 // - To allow an upgrade to the wire format, if one is ever needed. e. g., | |
| 25 // a switch to something other than recordio/JSON. | |
| 26 ProtocolFrameHeaderMagic = []byte("BTLR1\x1E") | |
|
iannucci
2015/11/12 22:44:24
should be defined in butlerlib/stream and shared b
dnj
2015/11/13 20:17:15
Oops, it actually already is :/
| |
| 27 ) | |
| 28 | |
| 29 // Client is a client to a LogDog Butler StreamServer. A Client will connect | |
| 30 // to a StreamServer, negotiate a stream configuration, and return an active | |
| 31 // stream object that can be written to. | |
| 32 type Client interface { | |
| 33 // NewStream creates a new stream with the supplied stream properties. | |
| 34 NewStream(f streamproto.Flags) (Stream, error) | |
| 35 } | |
| 36 | |
| 37 // streamFactory is a factory method to generate an io.WriteCloser stream for | |
| 38 // the current clientImpl. | |
| 39 type streamFactory func() (io.WriteCloser, error) | |
| 40 | |
| 41 // clientImpl is an implementation of the Client interface using a net.Conn | |
| 42 // factory to generate an individual stream. | |
| 43 type clientImpl struct { | |
| 44 // network is the connection path to the stream server. | |
| 45 factory streamFactory | |
| 46 } | |
| 47 | |
| 48 // New instantiates a new Client instance. This type of instance will be parsed | |
| 49 // from the supplied path string, which takes the form: | |
| 50 // <protocol>:<protocol-specific-spec> | |
| 51 // | |
| 52 // Supported protocols and their respective specs are: | |
| 53 // - unix:path/to/socket describes a stream server listening on UNIX domain | |
| 54 // socket at "path/to/socket". | |
|
iannucci
2015/11/12 22:44:25
I assume you actually want /path/to/socket, right?
dnj
2015/11/13 20:17:15
Sure - I wasn't putting too much stock in the exam
| |
| 55 // | |
| 56 // Windows-only: | |
| 57 // - net.pipe:name describes a stream server listening on Windows named pipe | |
|
iannucci
2015/11/12 22:44:24
net.pipe? not named.pipe?
or pipe:?
dnj
2015/11/13 20:17:15
I went with "net.pipe" because it is actually the
| |
| 58 // "\\.\pipe\name". | |
| 59 func New(path string) (Client, error) { | |
| 60 return defaultRegistry.newClient(path) | |
| 61 } | |
| 62 | |
| 63 func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { | |
| 64 p := f.Properties() | |
| 65 if err := p.Validate(); err != nil { | |
| 66 return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err) | |
| 67 } | |
| 68 | |
| 69 client, err := c.factory() | |
| 70 if err != nil { | |
| 71 return nil, err | |
| 72 } | |
| 73 passing := false | |
|
iannucci
2015/11/12 22:44:24
passing? handedOff?
dnj
2015/11/13 20:17:15
Done.
| |
| 74 defer func() { | |
| 75 // If we haven't written out the connection, close. | |
| 76 if !passing { | |
| 77 client.Close() | |
| 78 } | |
| 79 }() | |
| 80 | |
| 81 data, err := json.Marshal(f) | |
| 82 if err != nil { | |
| 83 return nil, fmt.Errorf("failed to marshal properties JSON: %s", err) | |
| 84 } | |
| 85 | |
| 86 // Perform the handshake: magic + size(data) + data. | |
| 87 s := &streamImpl{ | |
| 88 Properties: p, | |
| 89 WriteCloser: client, | |
| 90 } | |
| 91 if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil { | |
| 92 return nil, fmt.Errorf("failed to write magic number: %s", err) | |
| 93 } | |
| 94 if err := s.writeRecord(data); err != nil { | |
| 95 return nil, fmt.Errorf("failed to write properties: %s", err) | |
| 96 } | |
| 97 | |
| 98 passing = true | |
| 99 return s, nil | |
| 100 } | |
| OLD | NEW |