Index: client/logdog/butlerlib/streamclient/client.go |
diff --git a/client/logdog/butlerlib/streamclient/client.go b/client/logdog/butlerlib/streamclient/client.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8a27c91f29dd7ea55c9d466995ba7ed971a60b80 |
--- /dev/null |
+++ b/client/logdog/butlerlib/streamclient/client.go |
@@ -0,0 +1,88 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package streamclient |
+ |
+import ( |
+ "encoding/json" |
+ "fmt" |
+ "io" |
+ |
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
+) |
+ |
+type clientFactory func(string) (Client, error) |
+ |
+// Client is a client to a LogDog Butler StreamServer. A Client will connect |
+// to a StreamServer, negotiate a stream configuration, and return an active |
+// stream object that can be written to. |
+type Client interface { |
+ // NewStream creates a new stream with the supplied stream properties. |
+ NewStream(f streamproto.Flags) (Stream, error) |
+} |
+ |
+// streamFactory is a factory method to generate an io.WriteCloser stream for |
+// the current clientImpl. |
+type streamFactory func() (io.WriteCloser, error) |
+ |
+// clientImpl is an implementation of the Client interface using a net.Conn |
+// factory to generate an individual stream. |
+type clientImpl struct { |
+ // network is the connection path to the stream server. |
+ factory streamFactory |
+} |
+ |
+// New instantiates a new Client instance. This type of instance will be parsed |
+// from the supplied path string, which takes the form: |
+// <protocol>:<protocol-specific-spec> |
+// |
+// Supported protocols and their respective specs are: |
+// - unix:/path/to/socket describes a stream server listening on UNIX domain |
+// socket at "/path/to/socket". |
+// |
+// Windows-only: |
+// - net.pipe:name describes a stream server listening on Windows named pipe |
+// "\\.\pipe\name". |
+func New(path string) (Client, error) { |
+ return defaultRegistry.newClient(path) |
+} |
+ |
+func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { |
+ p := f.Properties() |
+ if err := p.Validate(); err != nil { |
+ return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err) |
+ } |
+ |
+ client, err := c.factory() |
+ if err != nil { |
+ return nil, err |
+ } |
+ ownsClient := true |
+ defer func() { |
+ // If we haven't written out the connection, close. |
+ if ownsClient { |
+ client.Close() |
+ } |
+ }() |
+ |
+ data, err := json.Marshal(f) |
+ if err != nil { |
+ return nil, fmt.Errorf("failed to marshal properties JSON: %s", err) |
+ } |
+ |
+ // Perform the handshake: magic + size(data) + data. |
+ s := &streamImpl{ |
+ Properties: p, |
+ WriteCloser: client, |
+ } |
+ if _, err := s.writeRaw(streamproto.ProtocolFrameHeaderMagic); err != nil { |
+ return nil, fmt.Errorf("failed to write magic number: %s", err) |
+ } |
+ if err := s.writeRecord(data); err != nil { |
+ return nil, fmt.Errorf("failed to write properties: %s", err) |
+ } |
+ |
+ ownsClient = false // Passing ownership to caller. |
+ return s, nil |
+} |