| Index: client/logdog/butlerlib/streamclient/stream.go
|
| diff --git a/client/logdog/butlerlib/streamclient/stream.go b/client/logdog/butlerlib/streamclient/stream.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d867a3e18feb84aa735aab10e68798040a6580a0
|
| --- /dev/null
|
| +++ b/client/logdog/butlerlib/streamclient/stream.go
|
| @@ -0,0 +1,69 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package streamclient
|
| +
|
| +import (
|
| + "errors"
|
| + "io"
|
| +
|
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| + "github.com/luci/luci-go/common/recordio"
|
| +)
|
| +
|
| +// Stream is an individual LogDog Butler stream.
|
| +type Stream interface {
|
| + io.WriteCloser
|
| +
|
| + // WriteDatagram writes a LogDog Butler streaming datagram to the underlying
|
| + // Writer.
|
| + WriteDatagram([]byte) error
|
| +}
|
| +
|
| +// streamImpl is the standard implementation of the Stream interface.
|
| +type streamImpl struct {
|
| + *streamproto.Properties
|
| + io.WriteCloser
|
| +
|
| + // rioW is a recordio.Writer bound to the WriteCloser. This will be
|
| + // initialized on the first writeRecord invocation.
|
| + rioW recordio.Writer
|
| +}
|
| +
|
| +var _ Stream = (*streamImpl)(nil)
|
| +
|
| +func (s *streamImpl) WriteDatagram(dg []byte) error {
|
| + if !s.isDatagramStream() {
|
| + return errors.New("not a datagram stream")
|
| + }
|
| +
|
| + return s.writeRecord(dg)
|
| +}
|
| +
|
| +func (s *streamImpl) Write(data []byte) (int, error) {
|
| + if s.isDatagramStream() {
|
| + return 0, errors.New("cannot use Write with datagram stream")
|
| + }
|
| +
|
| + return s.writeRaw(data)
|
| +}
|
| +
|
| +func (s *streamImpl) writeRaw(data []byte) (int, error) {
|
| + return s.WriteCloser.Write(data)
|
| +}
|
| +
|
| +func (s *streamImpl) writeRecord(r []byte) error {
|
| + if s.rioW == nil {
|
| + s.rioW = recordio.NewWriter(s.WriteCloser)
|
| + }
|
| + if _, err := s.rioW.Write(r); err != nil {
|
| + return err
|
| + }
|
| + return s.rioW.Flush()
|
| +}
|
| +
|
| +func (s *streamImpl) isDatagramStream() bool {
|
| + return s.StreamType == protocol.LogStreamDescriptor_DATAGRAM
|
| +}
|
|
|