Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: client/logdog/butlerlib/streamclient/client.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
iannucci 2015/11/12 22:44:24 why not butlerlib/stream/client and butlerlib/stre
dnj 2015/11/13 20:17:15 Mainly b/c on import, "streamclient" package is mo
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamclient
6
7 import (
8 "encoding/json"
9 "fmt"
10 "io"
11
12 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
13 )
14
15 type clientFactory func(string) (Client, error)
16
17 var (
18 // ProtocolFrameHeaderMagic is the number at the beginning of streams th at
19 // identifies the stream handshake version.
20 //
21 // This serves two purposes:
22 // - To disambiguate a Butler stream from some happenstance string of bytes
23 // (which probably won't start with these characters).
24 // - To allow an upgrade to the wire format, if one is ever needed. e. g.,
25 // a switch to something other than recordio/JSON.
26 ProtocolFrameHeaderMagic = []byte("BTLR1\x1E")
iannucci 2015/11/12 22:44:24 should be defined in butlerlib/stream and shared b
dnj 2015/11/13 20:17:15 Oops, it actually already is :/
27 )
28
29 // Client is a client to a LogDog Butler StreamServer. A Client will connect
30 // to a StreamServer, negotiate a stream configuration, and return an active
31 // stream object that can be written to.
32 type Client interface {
33 // NewStream creates a new stream with the supplied stream properties.
34 NewStream(f streamproto.Flags) (Stream, error)
35 }
36
37 // streamFactory is a factory method to generate an io.WriteCloser stream for
38 // the current clientImpl.
39 type streamFactory func() (io.WriteCloser, error)
40
41 // clientImpl is an implementation of the Client interface using a net.Conn
42 // factory to generate an individual stream.
43 type clientImpl struct {
44 // network is the connection path to the stream server.
45 factory streamFactory
46 }
47
48 // New instantiates a new Client instance. This type of instance will be parsed
49 // from the supplied path string, which takes the form:
50 // <protocol>:<protocol-specific-spec>
51 //
52 // Supported protocols and their respective specs are:
53 // - unix:path/to/socket describes a stream server listening on UNIX domain
54 // socket at "path/to/socket".
iannucci 2015/11/12 22:44:25 I assume you actually want /path/to/socket, right?
dnj 2015/11/13 20:17:15 Sure - I wasn't putting too much stock in the exam
55 //
56 // Windows-only:
57 // - net.pipe:name describes a stream server listening on Windows named pipe
iannucci 2015/11/12 22:44:24 net.pipe? not named.pipe? or pipe:?
dnj 2015/11/13 20:17:15 I went with "net.pipe" because it is actually the
58 // "\\.\pipe\name".
59 func New(path string) (Client, error) {
60 return defaultRegistry.newClient(path)
61 }
62
63 func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) {
64 p := f.Properties()
65 if err := p.Validate(); err != nil {
66 return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err)
67 }
68
69 client, err := c.factory()
70 if err != nil {
71 return nil, err
72 }
73 passing := false
iannucci 2015/11/12 22:44:24 passing? handedOff?
dnj 2015/11/13 20:17:15 Done.
74 defer func() {
75 // If we haven't written out the connection, close.
76 if !passing {
77 client.Close()
78 }
79 }()
80
81 data, err := json.Marshal(f)
82 if err != nil {
83 return nil, fmt.Errorf("failed to marshal properties JSON: %s", err)
84 }
85
86 // Perform the handshake: magic + size(data) + data.
87 s := &streamImpl{
88 Properties: p,
89 WriteCloser: client,
90 }
91 if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil {
92 return nil, fmt.Errorf("failed to write magic number: %s", err)
93 }
94 if err := s.writeRecord(data); err != nil {
95 return nil, fmt.Errorf("failed to write properties: %s", err)
96 }
97
98 passing = true
99 return s, nil
100 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698