Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(730)

Side by Side Diff: client/internal/logdog/butler/streamserver/npipe.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
iannucci 2015/11/12 22:44:24 can we call this filed named_pipe? I thought it wa
dnj 2015/11/13 20:17:15 Done.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "errors"
9 "fmt"
10 "io"
11 "net"
12 "sync"
13 "time"
14
15 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
16 "github.com/luci/luci-go/common/iotools"
17 log "github.com/luci/luci-go/common/logging"
18 "golang.org/x/net/context"
19 )
20
21 // An client ID, used for logging only
22 type clientID int
23
24 // namedPipeStreamParams are parameters representing a negotiated stream ready
25 // to deliver.
26 type namedPipeStreamParams struct {
27 // The stream's ReadCloser connection.
28 rc io.ReadCloser
29 // Negotiated stream properties.
30 properties *streamproto.Properties
31 }
32
33 // Base class for OS-specific stream server implementations.
34 type namedPipeServerBase struct {
35 ctx context.Context
36
37 // serving, if true, indicates that the server is currently serving.
38 serving bool
39
40 streamParamsC chan *namedPipeStreamParams
41 closedC chan struct{}
42 acceptFinishedC chan struct{}
43 l net.Listener // The Listener to use for client connectio ns.
44 }
45
46 // Initializes the base structure members.
47 func (s *namedPipeServerBase) initBase(ctx context.Context) {
48 s.ctx = log.SetFilter(ctx, "streamServer")
49 }
50
51 // Implements StreamServer.Connect
52 func (s *namedPipeServer) Listen() error {
53 // Initialize our stream channel.
iannucci 2015/11/12 22:44:24 why not during initBase?
dnj 2015/11/13 20:17:14 Done.
54 s.streamParamsC = make(chan *namedPipeStreamParams)
55 s.closedC = make(chan struct{})
56 s.acceptFinishedC = make(chan struct{})
57
58 // Create a listener (OS-specific).
59 var err error
60 s.l, err = s.createListener()
61 if err != nil {
62 return err
63 }
64
65 // Poll the Listener for new connections in a separate goroutine. This w ill
66 // terminate when the server is Close()d.
67 s.serving = true
68 go s.serve()
69 return nil
70 }
71
72 func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) {
73 if streamParams, ok := <-s.streamParamsC; ok {
74 return streamParams.rc, streamParams.properties
75 }
76 return nil, nil
77 }
78
79 // Implements StreamServer.Close
80 func (s *namedPipeServer) Close() {
81 if !s.serving {
82 panic("server is not currently serving")
83 }
84
85 // Close our Listener. This will cause our 'Accept' goroutine to termina te.
86 s.l.Close()
87 close(s.closedC)
88 <-s.acceptFinishedC
89
90 // Close our streamParamsC to signal that we're closed. Any blocking Nex t will
91 // drain the channel, then return with nil.
92 close(s.streamParamsC)
93 s.serving = false
94 }
95
96 // Continuously pulls connections from the supplied Listener and returns them as
97 // connections to streamParamsC for consumption by Next().
98 func (s *namedPipeServer) serve() {
99 defer close(s.acceptFinishedC)
100
101 nextID := clientID(0)
102 clientWG := sync.WaitGroup{}
103 for {
104 log.Debugf(s.ctx, "Beginning Accept() loop cycle.")
105 conn, err := s.l.Accept()
106 if err != nil {
107 log.Errorf(log.SetError(s.ctx, err), "Error during Accep t().")
108 break
109 }
110
111 // Spawn a goroutine to handle this connection. This goroutine w ill take
112 // ownership of the connection, closing it as appropriate.
113 npConn := &namedPipeConn{
114 closedC: s.closedC,
115 id: nextID,
116 conn: conn,
117 name: conn.RemoteAddr().String(),
118 }
119 clientWG.Add(1)
120 go func() {
121 defer clientWG.Done()
122
123 c := log.SetFields(s.ctx, log.Fields{
124 "id": npConn.id,
125 "name": npConn.name,
126 })
127 defer func() {
128 if r := recover(); r != nil {
129 log.Fields{
130 log.ErrorKey: r,
131 }.Errorf(c, "Failed to negotitate stream client.")
132 }
133 }()
134
135 s.streamParamsC <- npConn.negotiate(s.ctx)
136 }()
137 nextID++
138 }
139
140 // Wait for client connections to finish.
141 clientWG.Wait()
142 log.Infof(log.SetFields(s.ctx, log.Fields{
143 "streamServer": s,
144 "totalConnections": nextID,
145 }), "Exiting serve loop.")
146 }
147
148 //
149 // namedPipeClient
150 //
151
152 // Manages a single named pipe connection.
153 type namedPipeConn struct {
154 closedC chan struct{} // Signal channel to indicate that the server has closed.
155 id clientID // Client ID, used for debugging correlation.
156 conn net.Conn // The underlying client connection.
157 name string // The name of this connection, for debugging purp oses.
158
159 // decoupleMu is used to ensure that decoupleConn is called at most one time.
160 decoupleMu sync.Mutex
161 }
162
163 // String conversion (for logging).
164 func (c *namedPipeConn) String() string {
165 return fmt.Sprintf("(%d %s)", c.id, c.name)
166 }
167
168 // negotiate will panic if an error occurs during stream negotiation.
169 func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams {
170 // Close the connection as a failsafe. If we have already decoupled it, this
171 // will end up being a no-op.
172 defer c.closeConn()
173
174 ctx = log.SetFields(ctx, log.Fields{
175 "id": c.id,
176 "local": c.conn.LocalAddr(),
177 "remote": c.conn.RemoteAddr(),
178 })
179 log.Infof(ctx, "Received new connection.")
180
181 // Monitor goroutine that will close our connection if our server has sh ut
182 // down before it's finished handshaking.
183 handshakeFinishedC := make(chan struct{})
184 defer close(handshakeFinishedC)
185
186 go func() {
187 select {
188 case <-c.closedC:
189 log.Warningf(ctx, "Received server close signal; closing client.")
190 c.closeConn()
191 break
192
193 case <-handshakeFinishedC:
194 break
195 }
196 }()
197
198 p, err := c.handshake(ctx)
199 if err != nil {
200 panic(fmt.Errorf("failed to negotiate stream config: %v", err))
201 }
202
203 // If we have a timeout configured, set it.
204 if p.Timeout > 0 {
205 c.setDeadline(p.Timeout)
206 }
207
208 // Break off our handshake stream and send it to the Butler for reading.
209 return &namedPipeStreamParams{c.decoupleConn(), p}
210 }
211
212 // handshake handles the handshaking and registration of a single connection. If
213 // the connection successfully handshakes, it will be registered as a stream and
214 // supplied to the local streamParamsC; otherwise, it will be closed.
215 //
216 // The client connection opens with a handshake protocol. Once complete, the
217 // connection itself becomes the stream.
218 func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties, error) {
219 // Read the handshake header.
220 log.Infof(ctx, "Beginning handshake.")
221
222 // Determine which protocol we're using by reading the first (magic) byt es.
223 protocol, err := getProtocolFromMagic(ctx, c.conn)
224 if err != nil {
225 log.Errorf(log.SetError(ctx, err),
226 "Failed to determine protocol from header.")
227 return nil, errors.New("Failed to determine protocol.")
228 }
229
230 // Perform the handshake.
231 return protocol.Handshake(ctx, c.conn)
232 }
233
234 // Closes the underlying connection.
235 func (c *namedPipeConn) closeConn() {
236 conn := c.decoupleConn()
237 if conn != nil {
238 if err := conn.Close(); err != nil {
239 }
iannucci 2015/11/12 22:44:24 ... something?
dnj 2015/11/13 20:17:14 Done.
240 }
241 }
242
243 func (c *namedPipeConn) setDeadline(d time.Duration) error {
244 c.conn = &iotools.DeadlineReader{
245 Conn: c.conn,
246 Deadline: d,
247 }
248 return nil
249 }
250
251 // Decouples the active connection, returning it and setting the connection to
252 // nil.
253 func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) {
254 c.decoupleMu.Lock()
255 defer c.decoupleMu.Unlock()
256
257 conn, c.conn = c.conn, nil
258 return
259 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698