 Chromium Code Reviews
 Chromium Code Reviews Issue 1429993002:
  LogDog: Add Butler stream server package.  (Closed) 
  Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
    
  
    Issue 1429993002:
  LogDog: Add Butler stream server package.  (Closed) 
  Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto| Index: client/logdog/butlerlib/streamclient/client.go | 
| diff --git a/client/logdog/butlerlib/streamclient/client.go b/client/logdog/butlerlib/streamclient/client.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..0c18018ceb67f4055b906f386b7b7e5e73ccf61d | 
| --- /dev/null | 
| +++ b/client/logdog/butlerlib/streamclient/client.go | 
| @@ -0,0 +1,100 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| 
iannucci
2015/11/12 22:44:24
why not butlerlib/stream/client and butlerlib/stre
 
dnj
2015/11/13 20:17:15
Mainly b/c on import, "streamclient" package is mo
 | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package streamclient | 
| + | 
| +import ( | 
| + "encoding/json" | 
| + "fmt" | 
| + "io" | 
| + | 
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 
| +) | 
| + | 
| +type clientFactory func(string) (Client, error) | 
| + | 
| +var ( | 
| + // ProtocolFrameHeaderMagic is the number at the beginning of streams that | 
| + // identifies the stream handshake version. | 
| + // | 
| + // This serves two purposes: | 
| + // - To disambiguate a Butler stream from some happenstance string of bytes | 
| + // (which probably won't start with these characters). | 
| + // - To allow an upgrade to the wire format, if one is ever needed. e.g., | 
| + // a switch to something other than recordio/JSON. | 
| + ProtocolFrameHeaderMagic = []byte("BTLR1\x1E") | 
| 
iannucci
2015/11/12 22:44:24
should be defined in butlerlib/stream and shared b
 
dnj
2015/11/13 20:17:15
Oops, it actually already is :/
 | 
| +) | 
| + | 
| +// Client is a client to a LogDog Butler StreamServer. A Client will connect | 
| +// to a StreamServer, negotiate a stream configuration, and return an active | 
| +// stream object that can be written to. | 
| +type Client interface { | 
| + // NewStream creates a new stream with the supplied stream properties. | 
| + NewStream(f streamproto.Flags) (Stream, error) | 
| +} | 
| + | 
| +// streamFactory is a factory method to generate an io.WriteCloser stream for | 
| +// the current clientImpl. | 
| +type streamFactory func() (io.WriteCloser, error) | 
| + | 
| +// clientImpl is an implementation of the Client interface using a net.Conn | 
| +// factory to generate an individual stream. | 
| +type clientImpl struct { | 
| + // network is the connection path to the stream server. | 
| + factory streamFactory | 
| +} | 
| + | 
| +// New instantiates a new Client instance. This type of instance will be parsed | 
| +// from the supplied path string, which takes the form: | 
| +// <protocol>:<protocol-specific-spec> | 
| +// | 
| +// Supported protocols and their respective specs are: | 
| +// - unix:path/to/socket describes a stream server listening on UNIX domain | 
| +// socket at "path/to/socket". | 
| 
iannucci
2015/11/12 22:44:25
I assume you actually want /path/to/socket, right?
 
dnj
2015/11/13 20:17:15
Sure - I wasn't putting too much stock in the exam
 | 
| +// | 
| +// Windows-only: | 
| +// - net.pipe:name describes a stream server listening on Windows named pipe | 
| 
iannucci
2015/11/12 22:44:24
net.pipe? not named.pipe?
or pipe:?
 
dnj
2015/11/13 20:17:15
I went with "net.pipe" because it is actually the
 | 
| +// "\\.\pipe\name". | 
| +func New(path string) (Client, error) { | 
| + return defaultRegistry.newClient(path) | 
| +} | 
| + | 
| +func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { | 
| + p := f.Properties() | 
| + if err := p.Validate(); err != nil { | 
| + return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err) | 
| + } | 
| + | 
| + client, err := c.factory() | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + passing := false | 
| 
iannucci
2015/11/12 22:44:24
passing? handedOff?
 
dnj
2015/11/13 20:17:15
Done.
 | 
| + defer func() { | 
| + // If we haven't written out the connection, close. | 
| + if !passing { | 
| + client.Close() | 
| + } | 
| + }() | 
| + | 
| + data, err := json.Marshal(f) | 
| + if err != nil { | 
| + return nil, fmt.Errorf("failed to marshal properties JSON: %s", err) | 
| + } | 
| + | 
| + // Perform the handshake: magic + size(data) + data. | 
| + s := &streamImpl{ | 
| + Properties: p, | 
| + WriteCloser: client, | 
| + } | 
| + if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil { | 
| + return nil, fmt.Errorf("failed to write magic number: %s", err) | 
| + } | 
| + if err := s.writeRecord(data); err != nil { | 
| + return nil, fmt.Errorf("failed to write properties: %s", err) | 
| + } | 
| + | 
| + passing = true | 
| + return s, nil | 
| +} |