Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: client/internal/logdog/butler/bundler/data.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/data.go
diff --git a/client/internal/logdog/butler/bundler/data.go b/client/internal/logdog/butler/bundler/data.go
new file mode 100644
index 0000000000000000000000000000000000000000..c5b57655cf6fd81085a3844d09e9089ee23f5752
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/data.go
@@ -0,0 +1,127 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "sync"
+ "time"
+
+ "github.com/luci/luci-go/common/chunkstream"
+)
+
+type dataPoolRegistry struct {
+ sync.Mutex
+
+ // pools is a pool of Data instances. It is keyed on buffer size.
+ pools map[int]*dataPool
+}
+
+// globaldataPoolRegistry is the default data pool to use by the stream
+// package.
+var globalDataPoolRegistry = dataPoolRegistry{}
+
+func (r *dataPoolRegistry) getPool(s int) *dataPool {
+ r.Lock()
+ defer r.Unlock()
+
+ if r.pools == nil {
+ r.pools = map[int]*dataPool{}
+ }
+
+ pool := r.pools[s]
+ if pool == nil {
+ pool = newPool(s)
+ r.pools[s] = pool
+ }
+ return pool
+}
+
+type dataPool struct {
+ sync.Pool
+ size int
+}
+
+func newPool(size int) *dataPool {
+ p := dataPool{
+ size: size,
+ }
+ p.New = p.newData
+ return &p
+}
+
+func (p *dataPool) newData() interface{} {
+ return &streamData{
+ bufferBase: make([]byte, p.size),
+ releaseFunc: p.release,
+ }
+}
+
+func (p *dataPool) getData() Data {
+ d := p.Get().(*streamData)
+ d.reset()
+ return d
+}
+
+func (p *dataPool) release(d *streamData) {
+ p.Put(d)
+}
+
+// Data is a reusable data buffer that is used by Stream instances to ingest
+// data.
+//
+// Data is initially an empty buffer. Once data is loaded into it, the buffer is
+// resized to the bound data and a timestamp is attached via Bind.
+type Data interface {
+ chunkstream.Chunk
+
+ // Bind resizes the Chunk buffer and records a timestamp to associate with the
+ // data chunk.
+ Bind(int, time.Time) Data
+
+ // Timestamp returns the bound timestamp. This will be zero if no timestamp
+ // has been bound.
+ Timestamp() time.Time
+}
+
+// streamData is an implementation of the Chunk interface for Bundler chunks.
+//
+// It includes the ability to bind to a size/timestamp.
+type streamData struct {
+ bufferBase []byte
+ buffer []byte
+ ts time.Time
+
+ releaseFunc func(*streamData)
+}
+
+var _ Data = (*streamData)(nil)
+
+func (d *streamData) reset() {
+ d.buffer = d.bufferBase
+}
+
+func (d *streamData) Bytes() []byte {
+ return d.buffer
+}
+
+func (d *streamData) Len() int {
+ return len(d.buffer)
+}
+
+func (d *streamData) Bind(amount int, ts time.Time) Data {
+ d.buffer = d.buffer[:amount]
+ d.ts = ts
+ return d
+}
+
+func (d *streamData) Timestamp() time.Time {
+ return d.ts
+}
+
+func (d *streamData) Release() {
+ if d.releaseFunc != nil {
+ d.releaseFunc(d)
+ }
+}
« no previous file with comments | « client/internal/logdog/butler/bundler/counter.go ('k') | client/internal/logdog/butler/bundler/data_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698