Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
|
iannucci
2015/11/12 22:44:24
can we call this filed named_pipe? I thought it wa
dnj
2015/11/13 20:17:15
Done.
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package streamserver | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 "net" | |
| 12 "sync" | |
| 13 "time" | |
| 14 | |
| 15 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | |
| 16 "github.com/luci/luci-go/common/iotools" | |
| 17 log "github.com/luci/luci-go/common/logging" | |
| 18 "golang.org/x/net/context" | |
| 19 ) | |
| 20 | |
| 21 // An client ID, used for logging only | |
| 22 type clientID int | |
| 23 | |
| 24 // namedPipeStreamParams are parameters representing a negotiated stream ready | |
| 25 // to deliver. | |
| 26 type namedPipeStreamParams struct { | |
| 27 // The stream's ReadCloser connection. | |
| 28 rc io.ReadCloser | |
| 29 // Negotiated stream properties. | |
| 30 properties *streamproto.Properties | |
| 31 } | |
| 32 | |
| 33 // Base class for OS-specific stream server implementations. | |
| 34 type namedPipeServerBase struct { | |
| 35 ctx context.Context | |
| 36 | |
| 37 // serving, if true, indicates that the server is currently serving. | |
| 38 serving bool | |
| 39 | |
| 40 streamParamsC chan *namedPipeStreamParams | |
| 41 closedC chan struct{} | |
| 42 acceptFinishedC chan struct{} | |
| 43 l net.Listener // The Listener to use for client connectio ns. | |
| 44 } | |
| 45 | |
| 46 // Initializes the base structure members. | |
| 47 func (s *namedPipeServerBase) initBase(ctx context.Context) { | |
| 48 s.ctx = log.SetFilter(ctx, "streamServer") | |
| 49 } | |
| 50 | |
| 51 // Implements StreamServer.Connect | |
| 52 func (s *namedPipeServer) Listen() error { | |
| 53 // Initialize our stream channel. | |
|
iannucci
2015/11/12 22:44:24
why not during initBase?
dnj
2015/11/13 20:17:14
Done.
| |
| 54 s.streamParamsC = make(chan *namedPipeStreamParams) | |
| 55 s.closedC = make(chan struct{}) | |
| 56 s.acceptFinishedC = make(chan struct{}) | |
| 57 | |
| 58 // Create a listener (OS-specific). | |
| 59 var err error | |
| 60 s.l, err = s.createListener() | |
| 61 if err != nil { | |
| 62 return err | |
| 63 } | |
| 64 | |
| 65 // Poll the Listener for new connections in a separate goroutine. This w ill | |
| 66 // terminate when the server is Close()d. | |
| 67 s.serving = true | |
| 68 go s.serve() | |
| 69 return nil | |
| 70 } | |
| 71 | |
| 72 func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) { | |
| 73 if streamParams, ok := <-s.streamParamsC; ok { | |
| 74 return streamParams.rc, streamParams.properties | |
| 75 } | |
| 76 return nil, nil | |
| 77 } | |
| 78 | |
| 79 // Implements StreamServer.Close | |
| 80 func (s *namedPipeServer) Close() { | |
| 81 if !s.serving { | |
| 82 panic("server is not currently serving") | |
| 83 } | |
| 84 | |
| 85 // Close our Listener. This will cause our 'Accept' goroutine to termina te. | |
| 86 s.l.Close() | |
| 87 close(s.closedC) | |
| 88 <-s.acceptFinishedC | |
| 89 | |
| 90 // Close our streamParamsC to signal that we're closed. Any blocking Nex t will | |
| 91 // drain the channel, then return with nil. | |
| 92 close(s.streamParamsC) | |
| 93 s.serving = false | |
| 94 } | |
| 95 | |
| 96 // Continuously pulls connections from the supplied Listener and returns them as | |
| 97 // connections to streamParamsC for consumption by Next(). | |
| 98 func (s *namedPipeServer) serve() { | |
| 99 defer close(s.acceptFinishedC) | |
| 100 | |
| 101 nextID := clientID(0) | |
| 102 clientWG := sync.WaitGroup{} | |
| 103 for { | |
| 104 log.Debugf(s.ctx, "Beginning Accept() loop cycle.") | |
| 105 conn, err := s.l.Accept() | |
| 106 if err != nil { | |
| 107 log.Errorf(log.SetError(s.ctx, err), "Error during Accep t().") | |
| 108 break | |
| 109 } | |
| 110 | |
| 111 // Spawn a goroutine to handle this connection. This goroutine w ill take | |
| 112 // ownership of the connection, closing it as appropriate. | |
| 113 npConn := &namedPipeConn{ | |
| 114 closedC: s.closedC, | |
| 115 id: nextID, | |
| 116 conn: conn, | |
| 117 name: conn.RemoteAddr().String(), | |
| 118 } | |
| 119 clientWG.Add(1) | |
| 120 go func() { | |
| 121 defer clientWG.Done() | |
| 122 | |
| 123 c := log.SetFields(s.ctx, log.Fields{ | |
| 124 "id": npConn.id, | |
| 125 "name": npConn.name, | |
| 126 }) | |
| 127 defer func() { | |
| 128 if r := recover(); r != nil { | |
| 129 log.Fields{ | |
| 130 log.ErrorKey: r, | |
| 131 }.Errorf(c, "Failed to negotitate stream client.") | |
| 132 } | |
| 133 }() | |
| 134 | |
| 135 s.streamParamsC <- npConn.negotiate(s.ctx) | |
| 136 }() | |
| 137 nextID++ | |
| 138 } | |
| 139 | |
| 140 // Wait for client connections to finish. | |
| 141 clientWG.Wait() | |
| 142 log.Infof(log.SetFields(s.ctx, log.Fields{ | |
| 143 "streamServer": s, | |
| 144 "totalConnections": nextID, | |
| 145 }), "Exiting serve loop.") | |
| 146 } | |
| 147 | |
| 148 // | |
| 149 // namedPipeClient | |
| 150 // | |
| 151 | |
| 152 // Manages a single named pipe connection. | |
| 153 type namedPipeConn struct { | |
| 154 closedC chan struct{} // Signal channel to indicate that the server has closed. | |
| 155 id clientID // Client ID, used for debugging correlation. | |
| 156 conn net.Conn // The underlying client connection. | |
| 157 name string // The name of this connection, for debugging purp oses. | |
| 158 | |
| 159 // decoupleMu is used to ensure that decoupleConn is called at most one time. | |
| 160 decoupleMu sync.Mutex | |
| 161 } | |
| 162 | |
| 163 // String conversion (for logging). | |
| 164 func (c *namedPipeConn) String() string { | |
| 165 return fmt.Sprintf("(%d %s)", c.id, c.name) | |
| 166 } | |
| 167 | |
| 168 // negotiate will panic if an error occurs during stream negotiation. | |
| 169 func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams { | |
| 170 // Close the connection as a failsafe. If we have already decoupled it, this | |
| 171 // will end up being a no-op. | |
| 172 defer c.closeConn() | |
| 173 | |
| 174 ctx = log.SetFields(ctx, log.Fields{ | |
| 175 "id": c.id, | |
| 176 "local": c.conn.LocalAddr(), | |
| 177 "remote": c.conn.RemoteAddr(), | |
| 178 }) | |
| 179 log.Infof(ctx, "Received new connection.") | |
| 180 | |
| 181 // Monitor goroutine that will close our connection if our server has sh ut | |
| 182 // down before it's finished handshaking. | |
| 183 handshakeFinishedC := make(chan struct{}) | |
| 184 defer close(handshakeFinishedC) | |
| 185 | |
| 186 go func() { | |
| 187 select { | |
| 188 case <-c.closedC: | |
| 189 log.Warningf(ctx, "Received server close signal; closing client.") | |
| 190 c.closeConn() | |
| 191 break | |
| 192 | |
| 193 case <-handshakeFinishedC: | |
| 194 break | |
| 195 } | |
| 196 }() | |
| 197 | |
| 198 p, err := c.handshake(ctx) | |
| 199 if err != nil { | |
| 200 panic(fmt.Errorf("failed to negotiate stream config: %v", err)) | |
| 201 } | |
| 202 | |
| 203 // If we have a timeout configured, set it. | |
| 204 if p.Timeout > 0 { | |
| 205 c.setDeadline(p.Timeout) | |
| 206 } | |
| 207 | |
| 208 // Break off our handshake stream and send it to the Butler for reading. | |
| 209 return &namedPipeStreamParams{c.decoupleConn(), p} | |
| 210 } | |
| 211 | |
| 212 // handshake handles the handshaking and registration of a single connection. If | |
| 213 // the connection successfully handshakes, it will be registered as a stream and | |
| 214 // supplied to the local streamParamsC; otherwise, it will be closed. | |
| 215 // | |
| 216 // The client connection opens with a handshake protocol. Once complete, the | |
| 217 // connection itself becomes the stream. | |
| 218 func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties, error) { | |
| 219 // Read the handshake header. | |
| 220 log.Infof(ctx, "Beginning handshake.") | |
| 221 | |
| 222 // Determine which protocol we're using by reading the first (magic) byt es. | |
| 223 protocol, err := getProtocolFromMagic(ctx, c.conn) | |
| 224 if err != nil { | |
| 225 log.Errorf(log.SetError(ctx, err), | |
| 226 "Failed to determine protocol from header.") | |
| 227 return nil, errors.New("Failed to determine protocol.") | |
| 228 } | |
| 229 | |
| 230 // Perform the handshake. | |
| 231 return protocol.Handshake(ctx, c.conn) | |
| 232 } | |
| 233 | |
| 234 // Closes the underlying connection. | |
| 235 func (c *namedPipeConn) closeConn() { | |
| 236 conn := c.decoupleConn() | |
| 237 if conn != nil { | |
| 238 if err := conn.Close(); err != nil { | |
| 239 } | |
|
iannucci
2015/11/12 22:44:24
... something?
dnj
2015/11/13 20:17:14
Done.
| |
| 240 } | |
| 241 } | |
| 242 | |
| 243 func (c *namedPipeConn) setDeadline(d time.Duration) error { | |
| 244 c.conn = &iotools.DeadlineReader{ | |
| 245 Conn: c.conn, | |
| 246 Deadline: d, | |
| 247 } | |
| 248 return nil | |
| 249 } | |
| 250 | |
| 251 // Decouples the active connection, returning it and setting the connection to | |
| 252 // nil. | |
| 253 func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) { | |
| 254 c.decoupleMu.Lock() | |
| 255 defer c.decoupleMu.Unlock() | |
| 256 | |
| 257 conn, c.conn = c.conn, nil | |
| 258 return | |
| 259 } | |
| OLD | NEW |