Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: client/internal/logdog/butler/streamserver/npipe.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/streamserver/npipe.go
diff --git a/client/internal/logdog/butler/streamserver/npipe.go b/client/internal/logdog/butler/streamserver/npipe.go
new file mode 100644
index 0000000000000000000000000000000000000000..bf13e266410d194a969175d678bb7217d517679c
--- /dev/null
+++ b/client/internal/logdog/butler/streamserver/npipe.go
@@ -0,0 +1,259 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
iannucci 2015/11/12 22:44:24 can we call this filed named_pipe? I thought it wa
dnj 2015/11/13 20:17:15 Done.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package streamserver
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+ "github.com/luci/luci-go/common/iotools"
+ log "github.com/luci/luci-go/common/logging"
+ "golang.org/x/net/context"
+)
+
+// An client ID, used for logging only
+type clientID int
+
+// namedPipeStreamParams are parameters representing a negotiated stream ready
+// to deliver.
+type namedPipeStreamParams struct {
+ // The stream's ReadCloser connection.
+ rc io.ReadCloser
+ // Negotiated stream properties.
+ properties *streamproto.Properties
+}
+
+// Base class for OS-specific stream server implementations.
+type namedPipeServerBase struct {
+ ctx context.Context
+
+ // serving, if true, indicates that the server is currently serving.
+ serving bool
+
+ streamParamsC chan *namedPipeStreamParams
+ closedC chan struct{}
+ acceptFinishedC chan struct{}
+ l net.Listener // The Listener to use for client connections.
+}
+
+// Initializes the base structure members.
+func (s *namedPipeServerBase) initBase(ctx context.Context) {
+ s.ctx = log.SetFilter(ctx, "streamServer")
+}
+
+// Implements StreamServer.Connect
+func (s *namedPipeServer) Listen() error {
+ // Initialize our stream channel.
iannucci 2015/11/12 22:44:24 why not during initBase?
dnj 2015/11/13 20:17:14 Done.
+ s.streamParamsC = make(chan *namedPipeStreamParams)
+ s.closedC = make(chan struct{})
+ s.acceptFinishedC = make(chan struct{})
+
+ // Create a listener (OS-specific).
+ var err error
+ s.l, err = s.createListener()
+ if err != nil {
+ return err
+ }
+
+ // Poll the Listener for new connections in a separate goroutine. This will
+ // terminate when the server is Close()d.
+ s.serving = true
+ go s.serve()
+ return nil
+}
+
+func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) {
+ if streamParams, ok := <-s.streamParamsC; ok {
+ return streamParams.rc, streamParams.properties
+ }
+ return nil, nil
+}
+
+// Implements StreamServer.Close
+func (s *namedPipeServer) Close() {
+ if !s.serving {
+ panic("server is not currently serving")
+ }
+
+ // Close our Listener. This will cause our 'Accept' goroutine to terminate.
+ s.l.Close()
+ close(s.closedC)
+ <-s.acceptFinishedC
+
+ // Close our streamParamsC to signal that we're closed. Any blocking Next will
+ // drain the channel, then return with nil.
+ close(s.streamParamsC)
+ s.serving = false
+}
+
+// Continuously pulls connections from the supplied Listener and returns them as
+// connections to streamParamsC for consumption by Next().
+func (s *namedPipeServer) serve() {
+ defer close(s.acceptFinishedC)
+
+ nextID := clientID(0)
+ clientWG := sync.WaitGroup{}
+ for {
+ log.Debugf(s.ctx, "Beginning Accept() loop cycle.")
+ conn, err := s.l.Accept()
+ if err != nil {
+ log.Errorf(log.SetError(s.ctx, err), "Error during Accept().")
+ break
+ }
+
+ // Spawn a goroutine to handle this connection. This goroutine will take
+ // ownership of the connection, closing it as appropriate.
+ npConn := &namedPipeConn{
+ closedC: s.closedC,
+ id: nextID,
+ conn: conn,
+ name: conn.RemoteAddr().String(),
+ }
+ clientWG.Add(1)
+ go func() {
+ defer clientWG.Done()
+
+ c := log.SetFields(s.ctx, log.Fields{
+ "id": npConn.id,
+ "name": npConn.name,
+ })
+ defer func() {
+ if r := recover(); r != nil {
+ log.Fields{
+ log.ErrorKey: r,
+ }.Errorf(c, "Failed to negotitate stream client.")
+ }
+ }()
+
+ s.streamParamsC <- npConn.negotiate(s.ctx)
+ }()
+ nextID++
+ }
+
+ // Wait for client connections to finish.
+ clientWG.Wait()
+ log.Infof(log.SetFields(s.ctx, log.Fields{
+ "streamServer": s,
+ "totalConnections": nextID,
+ }), "Exiting serve loop.")
+}
+
+//
+// namedPipeClient
+//
+
+// Manages a single named pipe connection.
+type namedPipeConn struct {
+ closedC chan struct{} // Signal channel to indicate that the server has closed.
+ id clientID // Client ID, used for debugging correlation.
+ conn net.Conn // The underlying client connection.
+ name string // The name of this connection, for debugging purposes.
+
+ // decoupleMu is used to ensure that decoupleConn is called at most one time.
+ decoupleMu sync.Mutex
+}
+
+// String conversion (for logging).
+func (c *namedPipeConn) String() string {
+ return fmt.Sprintf("(%d %s)", c.id, c.name)
+}
+
+// negotiate will panic if an error occurs during stream negotiation.
+func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams {
+ // Close the connection as a failsafe. If we have already decoupled it, this
+ // will end up being a no-op.
+ defer c.closeConn()
+
+ ctx = log.SetFields(ctx, log.Fields{
+ "id": c.id,
+ "local": c.conn.LocalAddr(),
+ "remote": c.conn.RemoteAddr(),
+ })
+ log.Infof(ctx, "Received new connection.")
+
+ // Monitor goroutine that will close our connection if our server has shut
+ // down before it's finished handshaking.
+ handshakeFinishedC := make(chan struct{})
+ defer close(handshakeFinishedC)
+
+ go func() {
+ select {
+ case <-c.closedC:
+ log.Warningf(ctx, "Received server close signal; closing client.")
+ c.closeConn()
+ break
+
+ case <-handshakeFinishedC:
+ break
+ }
+ }()
+
+ p, err := c.handshake(ctx)
+ if err != nil {
+ panic(fmt.Errorf("failed to negotiate stream config: %v", err))
+ }
+
+ // If we have a timeout configured, set it.
+ if p.Timeout > 0 {
+ c.setDeadline(p.Timeout)
+ }
+
+ // Break off our handshake stream and send it to the Butler for reading.
+ return &namedPipeStreamParams{c.decoupleConn(), p}
+}
+
+// handshake handles the handshaking and registration of a single connection. If
+// the connection successfully handshakes, it will be registered as a stream and
+// supplied to the local streamParamsC; otherwise, it will be closed.
+//
+// The client connection opens with a handshake protocol. Once complete, the
+// connection itself becomes the stream.
+func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties, error) {
+ // Read the handshake header.
+ log.Infof(ctx, "Beginning handshake.")
+
+ // Determine which protocol we're using by reading the first (magic) bytes.
+ protocol, err := getProtocolFromMagic(ctx, c.conn)
+ if err != nil {
+ log.Errorf(log.SetError(ctx, err),
+ "Failed to determine protocol from header.")
+ return nil, errors.New("Failed to determine protocol.")
+ }
+
+ // Perform the handshake.
+ return protocol.Handshake(ctx, c.conn)
+}
+
+// Closes the underlying connection.
+func (c *namedPipeConn) closeConn() {
+ conn := c.decoupleConn()
+ if conn != nil {
+ if err := conn.Close(); err != nil {
+ }
iannucci 2015/11/12 22:44:24 ... something?
dnj 2015/11/13 20:17:14 Done.
+ }
+}
+
+func (c *namedPipeConn) setDeadline(d time.Duration) error {
+ c.conn = &iotools.DeadlineReader{
+ Conn: c.conn,
+ Deadline: d,
+ }
+ return nil
+}
+
+// Decouples the active connection, returning it and setting the connection to
+// nil.
+func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) {
+ c.decoupleMu.Lock()
+ defer c.decoupleMu.Unlock()
+
+ conn, c.conn = c.conn, nil
+ return
+}

Powered by Google App Engine
This is Rietveld 408576698