Chromium Code Reviews| Index: impl/cloud/datastore.go |
| diff --git a/impl/cloud/datastore.go b/impl/cloud/datastore.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..c6a64f42596af74b1bb88992913904af121ebb55 |
| --- /dev/null |
| +++ b/impl/cloud/datastore.go |
| @@ -0,0 +1,544 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package cloud |
| + |
| +import ( |
| + "fmt" |
| + "reflect" |
| + "strings" |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/errors" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| + infoS "github.com/luci/gae/service/info" |
| + "google.golang.org/cloud/datastore" |
| + |
| + "golang.org/x/net/context" |
| +) |
| + |
| +type cloudDatastore struct { |
| + client *datastore.Client |
| +} |
| + |
| +func (cds *cloudDatastore) use(c context.Context) context.Context { |
| + return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.RawInterface { |
| + inf := infoS.Get(ic) |
| + if ns, ok := inf.GetNamespace(); ok { |
| + ic = datastore.WithNamespace(ic, ns) |
| + } |
| + |
| + bds := boundDatastore{ |
| + Context: ic, |
| + cloudDatastore: cds, |
| + appID: inf.FullyQualifiedAppID(), |
| + } |
| + if wantTxn { |
| + bds.transaction = datastoreTransaction(ic) |
| + } |
| + return &bds |
| + }) |
| +} |
| + |
| +// boundDatastore is a bound instance of the cloudDatastore installed in the |
| +// Context. |
| +type boundDatastore struct { |
| + // Context is the bound user Context. It includes the datastore namespace, if |
| + // one is set. |
| + context.Context |
| + *cloudDatastore |
| + |
| + appID string |
|
iannucci
2016/05/23 21:53:06
namespace too?
dnj (Google)
2016/07/01 02:25:55
I'm tracking that in Info service and applying it
|
| + transaction *datastore.Transaction |
| +} |
| + |
| +func (bds *boundDatastore) AllocateIDs(incomplete *ds.Key, n int) (int64, error) { |
| + // AllocateIDs assumes that a contiguous ID space will be returned. The cloud |
| + // datastore library does not offer this guarantee. |
|
iannucci
2016/05/23 21:53:06
IMO, I would just change the high level AllocateID
dnj (Google)
2016/07/01 02:25:55
Done.
|
| + // |
| + // It is our *expectation* that the remote datastore API will return a |
| + // contiguous set of IDs for a given entity type. We will panic if this |
| + // expectation is violated. |
| + keys := make([]*ds.Key, n) |
| + for i := range keys { |
| + keys[i] = incomplete |
| + } |
| + |
| + nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys...)) |
| + if err != nil { |
| + return -1, normalizeError(err) |
| + } |
| + |
| + keys = bds.nativeKeysToGAE(nativeKeys...) |
| + start := keys[0].IntID() |
| + |
| + // Assert that the allocated IDs are contiguous. |
| + expected := start + 1 |
| + for _, key := range keys[1:] { |
| + if id := key.IntID(); id != expected { |
| + panic(fmt.Errorf("non-contiugous key IDs returned (%d != %d)", id, expected)) |
| + } |
| + expected++ |
| + } |
| + |
| + return start, nil |
| +} |
| + |
| +func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error { |
| + if bds.transaction != nil { |
| + return errors.New("nested transactions are not supported") |
| + } |
| + |
| + // The cloud datastore SDK does not expose any transaction options. |
|
iannucci
2016/05/23 21:53:06
well that's a bummer.
dnj (Google)
2016/07/01 02:25:55
Acknowledged.
|
| + if opts != nil { |
| + switch { |
| + case opts.XG: |
| + return errors.New("cross-group transactions are not supported") |
| + case opts.Attempts != 0 && opts.Attempts != 3: |
|
iannucci
2016/05/23 21:53:06
This is actually a client library responsibility.
dnj (Google)
2016/07/01 02:25:55
Oh good to know. Implemented in the same manner as
|
| + return errors.New("setting transaction attempts is not supported") |
| + } |
| + } |
| + |
| + _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transaction) error { |
| + return fn(withDatastoreTransaction(bds, tx)) |
| + }) |
| + return normalizeError(err) |
| +} |
| + |
| +func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) { |
| + cursor, err := datastore.DecodeCursor(s) |
| + return cursor, normalizeError(err) |
| +} |
| + |
| +func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error { |
| + it := bds.client.Run(bds, bds.prepareNativeQuery(q)) |
| + cursorFn := func() (ds.Cursor, error) { |
| + return it.Cursor() |
| + } |
| + |
| + for { |
| + var npls *nativePropertyLoadSaver |
| + if !q.KeysOnly() { |
| + npls = bds.mkNPLS(nil) |
| + } |
| + nativeKey, err := it.Next(npls) |
| + if err != nil { |
| + if err == datastore.Done { |
| + return nil |
| + } |
| + return normalizeError(err) |
| + } |
| + |
| + if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, cursorFn); err != nil { |
| + if err == ds.Stop { |
| + return nil |
| + } |
| + return normalizeError(err) |
| + } |
| + } |
| +} |
| + |
| +func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { |
| + v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) |
| + if err != nil { |
| + return -1, normalizeError(err) |
| + } |
| + return int64(v), nil |
| +} |
| + |
| +func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error { |
| + if err == nil { |
| + for i := 0; i < amt; i++ { |
| + if err := cb(i, nil); err != nil { |
| + return err |
| + } |
| + } |
| + return nil |
| + } |
| + |
| + err = errors.Fix(err) |
| + if me, ok := err.(errors.MultiError); ok { |
| + for i, err := range me { |
| + if err := cb(i, normalizeError(err)); err != nil { |
| + return err |
| + } |
| + } |
| + return nil |
| + } |
| + return normalizeError(err) |
| +} |
| + |
| +func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
| + nativeKeys := bds.gaeKeysToNative(keys...) |
| + nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) |
| + for i := range nativePLS { |
| + nativePLS[i] = bds.mkNPLS(nil) |
| + } |
| + |
| + var err error |
| + if tx := bds.transaction; tx != nil { |
| + // Transactional GetMulti. |
| + err = tx.GetMulti(nativeKeys, nativePLS) |
| + } else { |
| + // Non-transactional GetMulti. |
| + err = bds.client.GetMulti(bds, nativeKeys, nativePLS) |
| + } |
| + |
| + return idxCallbacker(err, len(nativePLS), func(idx int, err error) error { |
| + return cb(nativePLS[idx].pmap, err) |
| + }) |
| +} |
| + |
| +func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
| + nativeKeys := bds.gaeKeysToNative(keys...) |
| + nativePLS := make([]*nativePropertyLoadSaver, len(vals)) |
| + for i := range nativePLS { |
| + nativePLS[i] = bds.mkNPLS(vals[i]) |
| + } |
| + |
| + var err error |
| + if tx := bds.transaction; tx != nil { |
| + // Transactional PutMulti. |
| + // |
| + // In order to simulate the presence of mid-transaction key allocation, we |
| + // will identify any incomplete keys and allocate IDs for them. This is |
| + // potentially wasteful in the event o failed or retried transactions, but |
|
iannucci
2016/05/23 21:53:06
s/o/of
dnj (Google)
2016/07/01 02:25:55
Done.
|
| + // it is required to maintain API compatibility with the datastore |
| + // interface. |
| + var incompleteKeys []*datastore.Key |
| + var incompleteKeyMap map[int]int |
| + for i, k := range nativeKeys { |
| + if k.Incomplete() { |
| + if incompleteKeyMap == nil { |
| + // Optimization: if there are any incomplete keys, allocate room for |
| + // the full range. |
| + incompleteKeyMap = make(map[int]int, len(nativeKeys)-i) |
| + incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i) |
| + } |
| + incompleteKeyMap[len(incompleteKeys)] = i |
| + incompleteKeys = append(incompleteKeys, k) |
| + } |
| + } |
| + if len(incompleteKeys) > 0 { |
| + idKeys, err := bds.client.AllocateIDs(bds, incompleteKeys) |
| + if err != nil { |
| + return err |
| + } |
| + for i, idKey := range idKeys { |
| + nativeKeys[incompleteKeyMap[i]] = idKey |
| + } |
| + } |
| + |
| + _, err = tx.PutMulti(nativeKeys, nativePLS) |
| + } else { |
| + // Non-transactional PutMulti. |
| + nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS) |
| + } |
| + |
| + return idxCallbacker(err, len(nativeKeys), func(idx int, err error) error { |
| + if err == nil { |
| + return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) |
| + } |
| + return cb(nil, err) |
| + }) |
| +} |
| + |
| +func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { |
| + nativeKeys := bds.gaeKeysToNative(keys...) |
| + |
| + var err error |
| + if tx := bds.transaction; tx != nil { |
| + // Transactional DeleteMulti. |
| + err = tx.DeleteMulti(nativeKeys) |
| + } else { |
| + // Non-transactional DeleteMulti. |
| + err = bds.client.DeleteMulti(bds, nativeKeys) |
| + } |
| + |
| + return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error { |
| + return cb(err) |
| + }) |
| +} |
| + |
| +func (bds *boundDatastore) Testable() ds.Testable { |
| + return nil |
| +} |
| + |
| +func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.Query { |
| + nq := datastore.NewQuery(fq.Kind()) |
| + if bds.transaction != nil { |
| + nq = nq.Transaction(bds.transaction) |
| + } |
| + |
| + // nativeFilter translates a filter field. If the translation fails, we'll |
| + // pass the result through to the underlying datastore and allow it to |
| + // reject it. |
| + nativeFilter := func(prop ds.Property) interface{} { |
| + if np, err := bds.gaePropertyToNative("", []ds.Property{prop}); err == nil { |
| + return np.Value |
| + } |
| + return prop.Value() |
| + } |
| + |
| + // Equality filters. |
| + for field, props := range fq.EqFilters() { |
| + for _, prop := range props { |
| + nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter(prop)) |
| + } |
| + } |
| + |
| + // Inequality filters. |
| + if ineq := fq.IneqFilterProp(); ineq != "" { |
| + if field, op, prop := fq.IneqFilterLow(); field != "" { |
| + nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFilter(prop)) |
| + } |
| + |
| + if field, op, prop := fq.IneqFilterHigh(); field != "" { |
| + nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFilter(prop)) |
| + } |
| + } |
| + |
| + start, end := fq.Bounds() |
| + if start != nil { |
| + nq = nq.Start(start.(datastore.Cursor)) |
| + } |
| + if end != nil { |
| + nq = nq.End(end.(datastore.Cursor)) |
| + } |
| + |
| + if fq.Distinct() { |
| + nq = nq.Distinct() |
| + } |
| + if fq.KeysOnly() { |
| + nq = nq.KeysOnly() |
| + } |
| + if limit, ok := fq.Limit(); ok { |
| + nq = nq.Limit(int(limit)) |
| + } |
| + if offset, ok := fq.Offset(); ok { |
| + nq = nq.Offset(int(offset)) |
| + } |
| + if proj := fq.Project(); proj != nil { |
| + nq = nq.Project(proj...) |
| + } |
| + if ancestor := fq.Ancestor(); ancestor != nil { |
| + nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0]) |
| + } |
| + if fq.EventuallyConsistent() { |
| + nq = nq.EventualConsistency() |
| + } |
| + |
| + for _, ic := range fq.Orders() { |
| + prop := ic.Property |
| + if ic.Descending { |
| + prop = "-" + prop |
| + } |
| + nq = nq.Order(prop) |
| + } |
| + |
| + return nq |
| +} |
| + |
| +func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver { |
| + return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} |
| +} |
| + |
| +func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) { |
| + nativeProp.Name = name |
| + |
| + nativeValues := make([]interface{}, len(props)) |
| + for i, prop := range props { |
| + switch pt := prop.Type(); pt { |
| + case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.PTString, ds.PTFloat: |
| + nativeValues[i] = prop.Value() |
| + break |
| + |
| + case ds.PTKey: |
| + nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds.Key))[0] |
| + |
| + default: |
| + err = fmt.Errorf("unsupported property type at %d: %v", i, pt) |
| + return |
| + } |
| + } |
| + |
| + if len(nativeValues) == 1 { |
| + nativeProp.Value = nativeValues[0] |
| + nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) |
| + } else { |
| + // We must always index list values. |
| + nativeProp.Value = nativeValues |
| + } |
| + return |
| +} |
| + |
| +func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (name string, props []ds.Property, err error) { |
| + name = nativeProp.Name |
| + |
| + var nativeValues []interface{} |
| + // Slice of supported native type. Break this into a slice of datastore |
| + // properties. |
| + // |
| + // It must be an []interface{}. |
| + if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice && rv.Type().Elem().Kind() == reflect.Interface { |
| + nativeValues = rv.Interface().([]interface{}) |
| + } else { |
| + nativeValues = []interface{}{nativeProp.Value} |
| + } |
| + |
| + if len(nativeValues) == 0 { |
| + return |
| + } |
| + |
| + props = make([]ds.Property, len(nativeValues)) |
| + for i, nv := range nativeValues { |
| + switch nvt := nv.(type) { |
| + case int64, bool, string, float64, []byte: |
| + break |
| + |
| + case time.Time: |
| + // Cloud datastore library returns local time. |
| + nv = nvt.UTC() |
| + |
| + case *datastore.Key: |
| + nv = bds.nativeKeysToGAE(nvt)[0] |
| + |
| + default: |
| + err = fmt.Errorf("element %d has unsupported datastore.Value type %T", i, nv) |
| + return |
| + } |
| + |
| + indexSetting := ds.ShouldIndex |
| + if nativeProp.NoIndex { |
| + indexSetting = ds.NoIndex |
| + } |
| + props[i].SetValue(nv, indexSetting) |
| + } |
| + return |
| +} |
| + |
| +func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key { |
| + nativeKeys := make([]*datastore.Key, len(keys)) |
| + for i, key := range keys { |
| + _, _, toks := key.Split() |
| + |
| + var nativeKey *datastore.Key |
| + for _, tok := range toks { |
| + nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID, tok.IntID, nativeKey) |
| + } |
| + nativeKeys[i] = nativeKey |
| + } |
| + return nativeKeys |
| +} |
| + |
| +func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.Key { |
| + keys := make([]*ds.Key, len(nativeKeys)) |
| + toks := make([]ds.KeyTok, 1) |
| + for i, nativeKey := range nativeKeys { |
| + toks = toks[:0] |
| + cur := nativeKey |
| + for { |
| + toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: cur.ID(), StringID: cur.Name()}) |
| + cur = cur.Parent() |
| + if cur == nil { |
| + break |
| + } |
| + } |
| + |
| + // Reverse "toks" so we have ancestor-to-child lineage. |
| + for i := 0; i < len(toks)/2; i++ { |
| + ri := len(toks) - i - 1 |
| + toks[i], toks[ri] = toks[ri], toks[i] |
| + } |
| + keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) |
| + } |
| + return keys |
| +} |
| + |
| +// nativePropertyLoadSaver is a ds.PropertyMap which implements |
| +// datastore.PropertyLoadSaver. |
| +// |
| +// It naturally converts between native and GAE properties and values. |
| +type nativePropertyLoadSaver struct { |
| + bds *boundDatastore |
| + pmap ds.PropertyMap |
| +} |
| + |
| +var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil) |
| + |
| +func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error { |
| + if npls.pmap == nil { |
| + // Allocate for common case: one property per property name. |
| + npls.pmap = make(ds.PropertyMap, len(props)) |
| + } |
| + |
| + for _, nativeProp := range props { |
| + name, props, err := npls.bds.nativePropertyToGAE(nativeProp) |
| + if err != nil { |
| + return err |
| + } |
| + npls.pmap[name] = append(npls.pmap[name], props...) |
| + } |
| + return nil |
| +} |
| + |
| +func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) { |
| + if len(npls.pmap) == 0 { |
| + return nil, nil |
| + } |
| + |
| + props := make([]datastore.Property, 0, len(npls.pmap)) |
| + for name, plist := range npls.pmap { |
| + // Strip meta. |
| + if strings.HasPrefix(name, "$") { |
| + continue |
| + } |
| + |
| + nativeProp, err := npls.bds.gaePropertyToNative(name, plist) |
| + if err != nil { |
| + return nil, err |
| + } |
| + props = append(props, nativeProp) |
| + } |
| + return props, nil |
| +} |
| + |
| +var datastoreTransactionKey = "*datastore.Transaction" |
| + |
| +func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) context.Context { |
| + return context.WithValue(c, &datastoreTransactionKey, tx) |
| +} |
| + |
| +func datastoreTransaction(c context.Context) *datastore.Transaction { |
| + if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok { |
| + return tx |
| + } |
| + return nil |
| +} |
| + |
| +func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap { |
| + if pmap == nil { |
| + return nil |
| + } |
| + |
| + clone := make(ds.PropertyMap, len(pmap)) |
| + for k, props := range pmap { |
| + clone[k] = append([]ds.Property(nil), props...) |
| + } |
| + return clone |
| +} |
| + |
| +func normalizeError(err error) error { |
| + switch err { |
| + case datastore.ErrNoSuchEntity: |
| + return ds.ErrNoSuchEntity |
| + case datastore.ErrConcurrentTransaction: |
| + return ds.ErrConcurrentTransaction |
| + case datastore.ErrInvalidKey: |
| + return ds.ErrInvalidKey |
| + default: |
| + return err |
| + } |
| +} |