Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(257)

Side by Side Diff: client/internal/logdog/butler/streamserver/namedPipe_test.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Bind POSIX test to POSIX domains. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net"
13 "testing"
14 "time"
15
16 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
17 . "github.com/smartystreets/goconvey/convey"
18 "golang.org/x/net/context"
19 )
20
21 // Testing interface for OS-level test routine abstraction.
22 type namedPipeTester interface {
23 // Decorator to create and destroy a NamedPipeServer instance.
24 withServer(func(i *namedPipeTestInstance)) func()
25 }
26
27 // Base class for a named pipe test instance, bound to a single server.
28 type namedPipeTestInstance struct {
29 S *namedPipeServer
30 connect func() io.WriteCloser
31 }
32
33 type testAddr string
34
35 func (a testAddr) Network() string { return string(a) }
36 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network()) }
37
38 type testConn struct {
39 bytes.Buffer
40
41 readDeadline time.Time
42 writeDeadline time.Time
43 }
44
45 func (c *testConn) Close() error { return nil }
46 func (c *testConn) LocalAddr() net.Addr { return testAddr("local") }
47 func (c *testConn) RemoteAddr() net.Addr { return testAddr("remote") }
48
49 func (c *testConn) SetReadDeadline(t time.Time) error {
50 c.readDeadline = t
51 return nil
52 }
53
54 func (c *testConn) SetWriteDeadline(t time.Time) error {
55 c.writeDeadline = t
56 return nil
57 }
58
59 func (c *testConn) SetDeadline(t time.Time) error {
60 c.readDeadline = t
61 c.writeDeadline = t
62 return nil
63 }
64
65 func testNamedPipeServer(t *testing.T, npt namedPipeTester) {
66 Convey(`In a testing environment`, t, func() {
67 c := context.Background()
68 hb := handshakeBuilder{
69 magic: streamproto.ProtocolFrameHeaderMagic,
70 }
71
72 Convey(`A named pipe server will panic if closed without listeni ng.`, func() {
73 s := &namedPipeServer{}
74 So(func() { s.Close() }, ShouldPanic)
75 })
76
77 Convey(`A listening named pipe server will panic if double-close d.`,
78 npt.withServer(func(i *namedPipeTestInstance) {
79 i.S.Close()
80 So(func() { i.S.Close() }, ShouldPanic)
81 }))
82
83 Convey(`A test client connection`, func() {
84 tc := testConn{}
85 npc := &namedPipeConn{
86 id: 1337,
87 conn: &tc,
88 name: "test",
89 }
90
91 Convey(`Will reject an invalid handshake magic.`, func() {
92 hb.magic = []byte(`NOT A HANDSHAKE MAGIC`)
93 hb.writeTo(&tc, "", nil)
94 So(func() {
95 npc.negotiate(c)
96 }, ShouldPanic)
97 })
98
99 Convey(`Will reject an invalid handshake.`, func() {
100 hb.writeTo(&tc, "CLEARLY NOT JSON", nil)
101 So(func() {
102 npc.negotiate(c)
103 }, ShouldPanic)
104 })
105 })
106
107 Convey(`A listening named pipe server`, npt.withServer(func(i *n amedPipeTestInstance) {
108 // Setup a goroutine to pipe test data through a client connection.
109 readerC := make(chan io.Reader)
110 defer close(readerC)
111
112 doneC := make(chan struct{})
113 go func() {
114 defer close(doneC)
115
116 // Create a connection to our server instance.
117 w := i.connect()
118 if w == nil {
119 return
120 }
121 defer w.Close()
122
123 // Write supplied data to the client connection.
124 r := <-readerC
125 if r != nil {
126 io.Copy(w, r)
127 }
128 }()
129
130 Convey(`Can receive stream data.`, func() {
131 // Write our handshake and data to the stream.
132 handshake := `{"name": "test", "contentType": "a pplication/octet-stream"}`
133 content := bytes.Repeat([]byte("THIS IS A TEST S TREAM "), 100)
134 readerC <- hb.reader(handshake, content)
135
136 // Retrieve the ensuing stream.
137 stream, props := i.S.Next()
138 So(stream, ShouldNotBeNil)
139 defer stream.Close()
140 So(props, ShouldNotBeNil)
141
142 <-doneC
143
144 // Consume all of the data in the stream.
145 recvData, _ := ioutil.ReadAll(stream)
146 So(recvData, ShouldResemble, content)
147 })
148
149 Convey(`Will exit Next if closed.`, func() {
150 type streamBundle struct {
151 rc io.ReadCloser
152 props *streamproto.Properties
153 }
154 streamC := make(chan *streamBundle)
155 defer close(streamC)
156
157 // Get the stream.
158 go func() {
159 rc, props := i.S.Next()
160 streamC <- &streamBundle{rc, props}
161 }()
162
163 // Close the stream server.
164 i.S.Close()
165
166 // Next must exit with nil.
167 bundle := <-streamC
168 So(bundle.rc, ShouldBeNil)
169 So(bundle.props, ShouldBeNil)
170 })
171 }))
172 })
173 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698