Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Side by Side Diff: client/internal/logdog/butler/streamserver/npipe_test.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net"
13 "testing"
14 "time"
15
16 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient"
17 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
18 . "github.com/smartystreets/goconvey/convey"
19 "golang.org/x/net/context"
20 )
21
22 // Testing interface for OS-level test routine abstraction.
23 type namedPipeTester interface {
24 // Decorator to create and destroy a NamedPipeServer instance.
25 withServer(func(i *namedPipeTestInstance)) func()
26 }
27
28 // Base class for a named pipe test instance, bound to a single server.
29 type namedPipeTestInstance struct {
30 S *namedPipeServer
31 connect func() io.WriteCloser
32 }
33
34 type testAddr string
35
36 func (a testAddr) Network() string { return string(a) }
37 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network()) }
38
39 type testConn struct {
40 bytes.Buffer
41
42 readDeadline time.Time
43 writeDeadline time.Time
44 }
45
46 func (c *testConn) Close() error { return nil }
47 func (c *testConn) LocalAddr() net.Addr { return testAddr("local") }
48 func (c *testConn) RemoteAddr() net.Addr { return testAddr("remote") }
49
50 func (c *testConn) SetReadDeadline(t time.Time) error {
51 c.readDeadline = t
52 return nil
53 }
54
55 func (c *testConn) SetWriteDeadline(t time.Time) error {
56 c.writeDeadline = t
57 return nil
58 }
59
60 func (c *testConn) SetDeadline(t time.Time) error {
61 c.readDeadline = t
62 c.writeDeadline = t
63 return nil
64 }
65
66 func testNamedPipeServer(t *testing.T, npt namedPipeTester) {
67 Convey(`In a testing environment`, t, func() {
68 c := context.Background()
69 hb := handshakeBuilder{
70 magic: streamclient.ProtocolFrameHeaderMagic,
71 }
72
73 Convey(`A named pipe server will panic if closed without listeni ng.`, func() {
74 s := &namedPipeServer{}
75 So(func() { s.Close() }, ShouldPanic)
76 })
77
78 Convey(`A listening named pipe server will panic if double-close d.`,
79 npt.withServer(func(i *namedPipeTestInstance) {
80 i.S.Close()
81 So(func() { i.S.Close() }, ShouldPanic)
82 }))
83
84 Convey(`A test client connection`, func() {
85 tc := testConn{}
86 npc := &namedPipeConn{
87 id: 1337,
88 conn: &tc,
89 name: "test",
90 }
91
92 Convey(`Will reject an invalid handshake magic.`, func() {
93 hb.magic = []byte(`NOT A HANDSHAKE MAGIC`)
94 hb.writeTo(&tc, "", nil)
95 So(func() {
96 npc.negotiate(c)
97 }, ShouldPanic)
98 })
99
100 Convey(`Will reject an invalid handshake.`, func() {
101 hb.writeTo(&tc, "CLEARLY NOT JSON", nil)
102 So(func() {
103 npc.negotiate(c)
104 }, ShouldPanic)
105 })
106 })
107
108 Convey(`A listening named pipe server`, npt.withServer(func(i *n amedPipeTestInstance) {
109 // Setup a goroutine to pipe test data through a client connection.
110 readerC := make(chan io.Reader)
111 defer close(readerC)
112
113 doneC := make(chan struct{})
114 go func() {
115 defer close(doneC)
116
117 // Create a connection to our server instance.
118 w := i.connect()
119 if w == nil {
120 return
121 }
122 defer w.Close()
123
124 // Write supplied data to the client connection.
125 r := <-readerC
126 if r != nil {
127 io.Copy(w, r)
128 }
129 }()
130
131 Convey(`Can receive stream data.`, func() {
132 // Write our handshake and data to the stream.
133 handshake := `{"name": "test", "contentType": "a pplication/octet-stream"}`
134 content := bytes.Repeat([]byte("THIS IS A TEST S TREAM "), 100)
135 readerC <- hb.reader(handshake, content)
136
137 // Retrieve the ensuing stream.
138 stream, props := i.S.Next()
139 So(stream, ShouldNotBeNil)
140 defer stream.Close()
141 So(props, ShouldNotBeNil)
142
143 <-doneC
144
145 // Consume all of the data in the stream.
146 recvData, _ := ioutil.ReadAll(stream)
147 So(recvData, ShouldResemble, content)
148 })
149
150 Convey(`Will exit Next if closed.`, func() {
151 type streamBundle struct {
152 rc io.ReadCloser
153 props *streamproto.Properties
154 }
155 streamC := make(chan *streamBundle)
156 defer close(streamC)
157
158 // Get the stream.
159 go func() {
160 rc, props := i.S.Next()
161 streamC <- &streamBundle{rc, props}
162 }()
163
164 // Close the stream server.
165 i.S.Close()
166
167 // Next must exit with nil.
168 bundle := <-streamC
169 So(bundle.rc, ShouldBeNil)
170 So(bundle.props, ShouldBeNil)
171 })
172 }))
173 })
174 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698