Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Side by Side Diff: server/logdog/storage/bigtable/bigtable.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "strings"
10 9
11 "github.com/luci/luci-go/common/errors" 10 "github.com/luci/luci-go/common/errors"
11 "github.com/luci/luci-go/common/grpcutil"
12 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/server/logdog/storage" 13 "github.com/luci/luci-go/server/logdog/storage"
14 "golang.org/x/net/context" 14 "golang.org/x/net/context"
15 "google.golang.org/cloud/bigtable" 15 "google.golang.org/cloud/bigtable"
16 ) 16 )
17 17
18 // errStop is an internal sentinel error used to communicate "stop iteration" 18 // errStop is an internal sentinel error used to communicate "stop iteration"
19 // to btTable.getLogData. 19 // to btTable.getLogData.
20 var errStop = errors.New("stop") 20 var errStop = errors.New("stop")
21 21
22 // btGetCallback is a callback that is invoked for each log data row returned 22 // btGetCallback is a callback that is invoked for each log data row returned
23 // by getLogData. 23 // by getLogData.
24 // 24 //
25 // If an error is encountered, no more log data will be fetched. The error will 25 // If an error is encountered, no more log data will be fetched. The error will
26 // be propagated to the getLogData call unless the returned error is errStop, in 26 // be propagated to the getLogData call unless the returned error is errStop, in
27 // which case iteration will stop and getLogData will return nil. 27 // which case iteration will stop and getLogData will return nil.
28 type btGetCallback func(*rowKey, []byte) error 28 type btGetCallback func(*rowKey, []byte) error
29 29
30 // btTable is a general interface for BigTable operations intended to enable 30 // btTable is a general interface for BigTable operations intended to enable
31 // unit tests to stub out BigTable without adding runtime inefficiency. 31 // unit tests to stub out BigTable without adding runtime inefficiency.
32 //
33 // If any of these methods fails with a transient error, it will be wrapped
34 // as an errors.Transient error.
35 type btTable interface { 32 type btTable interface {
36 // putLogData adds new log data to BigTable. 33 // putLogData adds new log data to BigTable.
37 // 34 //
38 // If data already exists for the named row, it will return storage.ErrE xists 35 // If data already exists for the named row, it will return storage.ErrE xists
39 // and not add the data. 36 // and not add the data.
40 putLogData(context.Context, *rowKey, []byte) error 37 putLogData(context.Context, *rowKey, []byte) error
41 38
42 // getLogData retrieves rows belonging to the supplied stream record, st arting 39 // getLogData retrieves rows belonging to the supplied stream record, st arting
43 // with the first index owned by that record. The supplied callback is i nvoked 40 // with the first index owned by that record. The supplied callback is i nvoked
44 // once per retrieved row. 41 // once per retrieved row.
45 // 42 //
46 // rk is the starting row key. 43 // rk is the starting row key.
47 // 44 //
48 // If the supplied limit is nonzero, no more than limit rows will be 45 // If the supplied limit is nonzero, no more than limit rows will be
49 // retrieved. 46 // retrieved.
50 // 47 //
51 // If keysOnly is true, then the callback will return nil row data. 48 // If keysOnly is true, then the callback will return nil row data.
52 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b tGetCallback) error 49 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b tGetCallback) error
53 } 50 }
54 51
55 // btTransientSubstrings is the set of known error substrings returned by
56 // BigTable that indicate failures that aren't related to the specific data
57 // content.
58 var btTransientSubstrings = []string{
59 "Internal error encountered",
60 "interactive login is required",
61 }
62
63 // btTableProd is an implementation of the btTable interface that uses a real 52 // btTableProd is an implementation of the btTable interface that uses a real
64 // production BigTable connection. 53 // production BigTable connection.
65 type btTableProd struct { 54 type btTableProd struct {
66 *bigtable.Table 55 *bigtable.Table
67 } 56 }
68 57
69 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or { 58 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or {
70 m := bigtable.NewMutation() 59 m := bigtable.NewMutation()
71 m.Set("log", "data", bigtable.ServerTime, data) 60 m.Set("log", "data", bigtable.ServerTime, data)
72 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m) 61 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m)
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 if isTransient(err) { 132 if isTransient(err) {
144 err = errors.WrapTransient(err) 133 err = errors.WrapTransient(err)
145 } 134 }
146 return err 135 return err
147 } 136 }
148 137
149 // isTransient tests if a BigTable SDK error is transient. 138 // isTransient tests if a BigTable SDK error is transient.
150 // 139 //
151 // Since the BigTable API doesn't give us this information, we will identify 140 // Since the BigTable API doesn't give us this information, we will identify
152 // transient errors by parsing their error string :( 141 // transient errors by parsing their error string :(
153 //
154 // TODO(dnj): File issue to add error qualifier functions to BigTable API.
155 func isTransient(err error) bool { 142 func isTransient(err error) bool {
156 » if err == nil { 143 » return grpcutil.IsTransient(err)
157 » » return false
158 » }
159
160 » msg := err.Error()
161 » for _, s := range btTransientSubstrings {
162 » » if strings.Contains(msg, s) {
163 » » » return true
164 » » }
165 » }
166 » return false
167 } 144 }
168 145
169 // getLogData loads the "data" column from the "log" column family and returns 146 // getLogData loads the "data" column from the "log" column family and returns
170 // its []byte contents. 147 // its []byte contents.
171 // 148 //
172 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. 149 // If the row doesn't exist, storage.ErrDoesNotExist will be returned.
173 func getLogData(row bigtable.Row) ([]byte, error) { 150 func getLogData(row bigtable.Row) ([]byte, error) {
174 ri := getReadItem(row, "log", "data") 151 ri := getReadItem(row, "log", "data")
175 if ri == nil { 152 if ri == nil {
176 return nil, storage.ErrDoesNotExist 153 return nil, storage.ErrDoesNotExist
(...skipping 11 matching lines...) Expand all
188 165
189 // Get the specific ReadItem for our column 166 // Get the specific ReadItem for our column
190 colName := fmt.Sprintf("%s:%s", family, column) 167 colName := fmt.Sprintf("%s:%s", family, column)
191 for _, item := range items { 168 for _, item := range items {
192 if item.Column == colName { 169 if item.Column == colName {
193 return &item 170 return &item
194 } 171 }
195 } 172 }
196 return nil 173 return nil
197 } 174 }
OLDNEW
« no previous file with comments | « server/internal/logdog/service/service.go ('k') | server/logdog/storage/bigtable/bigtable_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698