Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Side by Side Diff: client/logdog/butlerlib/streamclient/stream.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Cleanup, comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamclient
6
7 import (
8 "errors"
9 "io"
10
11 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
12 "github.com/luci/luci-go/common/logdog/protocol"
13 "github.com/luci/luci-go/common/recordio"
14 )
15
16 // Stream is an individual LogDog Butler stream.
17 type Stream interface {
18 io.WriteCloser
19
20 // WriteDatagram writes a LogDog Butler streaming datagram to the underl ying
21 // Writer.
22 WriteDatagram([]byte) error
23 }
24
25 // streamImpl is the standard implementation of the Stream interface.
26 type streamImpl struct {
27 *streamproto.Properties
28 io.WriteCloser
29
30 // rioW is a recordio.Writer bound to the WriteCloser. This will be
31 // initialized on the first writeRecord invocation.
32 rioW recordio.Writer
33 }
34
35 var _ Stream = (*streamImpl)(nil)
36
37 func (s *streamImpl) WriteDatagram(dg []byte) error {
38 if !s.isDatagramStream() {
39 return errors.New("not a datagram stream")
40 }
41
42 return s.writeRecord(dg)
43 }
44
45 func (s *streamImpl) Write(data []byte) (int, error) {
46 if s.isDatagramStream() {
47 return 0, errors.New("cannot use Write with datagram stream")
48 }
49
50 return s.writeRaw(data)
51 }
52
53 func (s *streamImpl) writeRaw(data []byte) (int, error) {
54 return s.WriteCloser.Write(data)
55 }
56
57 func (s *streamImpl) writeRecord(r []byte) error {
58 if s.rioW == nil {
59 s.rioW = recordio.NewWriter(s.WriteCloser)
60 }
61 if _, err := s.rioW.Write(r); err != nil {
62 return err
63 }
64 return s.rioW.Flush()
65 }
66
67 func (s *streamImpl) isDatagramStream() bool {
68 return s.StreamType == protocol.LogStreamDescriptor_DATAGRAM
69 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698