Chromium Code Reviews| Index: filter/dsQueryBatch/filter.go |
| diff --git a/filter/dsQueryBatch/filter.go b/filter/dsQueryBatch/filter.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..5555218d77880ba8a627126a36944914f3a96579 |
| --- /dev/null |
| +++ b/filter/dsQueryBatch/filter.go |
| @@ -0,0 +1,82 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package dsQueryBatch |
| + |
| +import ( |
| + "fmt" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| +) |
| + |
| +type iterQueryFilter struct { |
| + ds.RawInterface |
| + |
| + batchSize int32 |
| +} |
| + |
| +func (f *iterQueryFilter) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
| + limit, hasLimit := fq.Limit() |
| + |
| + var cursor ds.Cursor |
| + for { |
| + iterQuery := fq.Original() |
| + if cursor != nil { |
| + iterQuery = iterQuery.Start(cursor) |
| + cursor = nil |
| + } |
| + iterLimit := f.batchSize |
| + if hasLimit && limit < iterLimit { |
| + iterLimit = limit |
| + } |
| + iterQuery = iterQuery.Limit(iterLimit) |
| + |
| + iterFinalizedQuery, err := iterQuery.Finalize() |
| + if err != nil { |
| + return fmt.Errorf("failed to finalize internal query: %v", err) |
|
iannucci
2016/03/31 22:40:31
panic: this can't fail if we started with a Finali
dnj
2016/03/31 23:54:06
Done.
|
| + } |
| + |
| + count := int32(0) |
| + err = f.RawInterface.Run(iterFinalizedQuery, func(key *ds.Key, val ds.PropertyMap, getCursor ds.CursorCB) error { |
| + if cursor != nil { |
| + // We're iterating past our batch size, which should never happen, since |
| + // we set a limit. |
| + return fmt.Errorf("iterating past batch size") |
|
iannucci
2016/03/31 22:40:30
yeah let's panic here too. maybe more comment expl
dnj
2016/03/31 23:54:06
Done.
|
| + } |
| + |
| + // Call backing iterator. |
|
iannucci
2016/03/31 22:40:30
doesn't need a comment
dnj
2016/03/31 23:54:06
Done.
|
| + if err := cb(key, val, getCursor); err != nil { |
| + return err |
| + } |
| + |
| + // If this is the last entry in our batch, get the cursor and stop this |
| + // query round. |
| + count++ |
| + if count >= f.batchSize { |
| + cursor, err = getCursor() |
| + if err != nil { |
|
iannucci
2016/03/31 22:40:31
if cursor, err = .......; .... {
|
| + return fmt.Errorf("failed to get cursor: %v", err) |
| + } |
| + } |
| + return nil |
| + }) |
| + if err != nil && err != ds.Stop { |
| + return err |
| + } |
| + |
| + // If we have no cursor, we're done. |
| + if cursor == nil { |
| + break |
| + } |
| + |
| + // Reduce our limit for the next round. |
| + if hasLimit { |
| + limit -= count |
| + if limit <= 0 { |
| + break |
| + } |
| + } |
| + } |
| + return nil |
| +} |