| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamserver |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "net" |
| 13 "testing" |
| 14 "time" |
| 15 |
| 16 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" |
| 17 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 18 . "github.com/smartystreets/goconvey/convey" |
| 19 "golang.org/x/net/context" |
| 20 ) |
| 21 |
| 22 // Testing interface for OS-level test routine abstraction. |
| 23 type namedPipeTester interface { |
| 24 // Decorator to create and destroy a NamedPipeServer instance. |
| 25 withServer(func(i *namedPipeTestInstance)) func() |
| 26 } |
| 27 |
| 28 // Base class for a named pipe test instance, bound to a single server. |
| 29 type namedPipeTestInstance struct { |
| 30 S *namedPipeServer |
| 31 connect func() io.WriteCloser |
| 32 } |
| 33 |
| 34 type testAddr string |
| 35 |
| 36 func (a testAddr) Network() string { return string(a) } |
| 37 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network())
} |
| 38 |
| 39 type testConn struct { |
| 40 bytes.Buffer |
| 41 |
| 42 readDeadline time.Time |
| 43 writeDeadline time.Time |
| 44 } |
| 45 |
| 46 func (c *testConn) Close() error { return nil } |
| 47 func (c *testConn) LocalAddr() net.Addr { return testAddr("local") } |
| 48 func (c *testConn) RemoteAddr() net.Addr { return testAddr("remote") } |
| 49 |
| 50 func (c *testConn) SetReadDeadline(t time.Time) error { |
| 51 c.readDeadline = t |
| 52 return nil |
| 53 } |
| 54 |
| 55 func (c *testConn) SetWriteDeadline(t time.Time) error { |
| 56 c.writeDeadline = t |
| 57 return nil |
| 58 } |
| 59 |
| 60 func (c *testConn) SetDeadline(t time.Time) error { |
| 61 c.readDeadline = t |
| 62 c.writeDeadline = t |
| 63 return nil |
| 64 } |
| 65 |
| 66 func testNamedPipeServer(t *testing.T, npt namedPipeTester) { |
| 67 Convey(`In a testing environment`, t, func() { |
| 68 c := context.Background() |
| 69 hb := handshakeBuilder{ |
| 70 magic: streamclient.ProtocolFrameHeaderMagic, |
| 71 } |
| 72 |
| 73 Convey(`A named pipe server will panic if closed without listeni
ng.`, func() { |
| 74 s := &namedPipeServer{} |
| 75 So(func() { s.Close() }, ShouldPanic) |
| 76 }) |
| 77 |
| 78 Convey(`A listening named pipe server will panic if double-close
d.`, |
| 79 npt.withServer(func(i *namedPipeTestInstance) { |
| 80 i.S.Close() |
| 81 So(func() { i.S.Close() }, ShouldPanic) |
| 82 })) |
| 83 |
| 84 Convey(`A test client connection`, func() { |
| 85 tc := testConn{} |
| 86 npc := &namedPipeConn{ |
| 87 id: 1337, |
| 88 conn: &tc, |
| 89 name: "test", |
| 90 } |
| 91 |
| 92 Convey(`Will reject an invalid handshake magic.`, func()
{ |
| 93 hb.magic = []byte(`NOT A HANDSHAKE MAGIC`) |
| 94 hb.writeTo(&tc, "", nil) |
| 95 So(func() { |
| 96 npc.negotiate(c) |
| 97 }, ShouldPanic) |
| 98 }) |
| 99 |
| 100 Convey(`Will reject an invalid handshake.`, func() { |
| 101 hb.writeTo(&tc, "CLEARLY NOT JSON", nil) |
| 102 So(func() { |
| 103 npc.negotiate(c) |
| 104 }, ShouldPanic) |
| 105 }) |
| 106 }) |
| 107 |
| 108 Convey(`A listening named pipe server`, npt.withServer(func(i *n
amedPipeTestInstance) { |
| 109 // Setup a goroutine to pipe test data through a client
connection. |
| 110 readerC := make(chan io.Reader) |
| 111 defer close(readerC) |
| 112 |
| 113 doneC := make(chan struct{}) |
| 114 go func() { |
| 115 defer close(doneC) |
| 116 |
| 117 // Create a connection to our server instance. |
| 118 w := i.connect() |
| 119 if w == nil { |
| 120 return |
| 121 } |
| 122 defer w.Close() |
| 123 |
| 124 // Write supplied data to the client connection. |
| 125 r := <-readerC |
| 126 if r != nil { |
| 127 io.Copy(w, r) |
| 128 } |
| 129 }() |
| 130 |
| 131 Convey(`Can receive stream data.`, func() { |
| 132 // Write our handshake and data to the stream. |
| 133 handshake := `{"name": "test", "contentType": "a
pplication/octet-stream"}` |
| 134 content := bytes.Repeat([]byte("THIS IS A TEST S
TREAM "), 100) |
| 135 readerC <- hb.reader(handshake, content) |
| 136 |
| 137 // Retrieve the ensuing stream. |
| 138 stream, props := i.S.Next() |
| 139 So(stream, ShouldNotBeNil) |
| 140 defer stream.Close() |
| 141 So(props, ShouldNotBeNil) |
| 142 |
| 143 <-doneC |
| 144 |
| 145 // Consume all of the data in the stream. |
| 146 recvData, _ := ioutil.ReadAll(stream) |
| 147 So(recvData, ShouldResemble, content) |
| 148 }) |
| 149 |
| 150 Convey(`Will exit Next if closed.`, func() { |
| 151 type streamBundle struct { |
| 152 rc io.ReadCloser |
| 153 props *streamproto.Properties |
| 154 } |
| 155 streamC := make(chan *streamBundle) |
| 156 defer close(streamC) |
| 157 |
| 158 // Get the stream. |
| 159 go func() { |
| 160 rc, props := i.S.Next() |
| 161 streamC <- &streamBundle{rc, props} |
| 162 }() |
| 163 |
| 164 // Close the stream server. |
| 165 i.S.Close() |
| 166 |
| 167 // Next must exit with nil. |
| 168 bundle := <-streamC |
| 169 So(bundle.rc, ShouldBeNil) |
| 170 So(bundle.props, ShouldBeNil) |
| 171 }) |
| 172 })) |
| 173 }) |
| 174 } |
| OLD | NEW |