| Index: impl/memory/datastore_query_execution.go | 
| diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..e219b524e2188ef81f0e47dee35b536af0c9433a | 
| --- /dev/null | 
| +++ b/impl/memory/datastore_query_execution.go | 
| @@ -0,0 +1,235 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package memory | 
| + | 
| +import ( | 
| +	"bytes" | 
| +	"fmt" | 
| + | 
| +	ds "github.com/luci/gae/service/datastore" | 
| +	"github.com/luci/gae/service/datastore/serialize" | 
| +	"github.com/luci/luci-go/common/cmpbin" | 
| +) | 
| + | 
| +type queryStrategy interface { | 
| +	// handle applies the strategy to the embedded user callback. | 
| +	//   - rawData is the slice of encoded Properties from the index row | 
| +	//     (correctly de-inverted). | 
| +	//   - data is the slice of decoded Properties from the index row | 
| +	//   - key is the decoded Key from the index row (the last item in rawData and | 
| +	//     data) | 
| +	//   - gc is the getCursor function to be passed to the user's callback | 
| +	handle(rawData [][]byte, data []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool | 
| +} | 
| + | 
| +type projectionLookup struct { | 
| +	suffixIndex  int | 
| +	propertyName string | 
| +} | 
| + | 
| +type projectionStrategy struct { | 
| +	cb ds.RawRunCB | 
| + | 
| +	project  []projectionLookup | 
| +	distinct map[string]struct{} | 
| +} | 
| + | 
| +func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) queryStrategy { | 
| +	projectionLookups := make([]projectionLookup, len(q.project)) | 
| +	for i, prop := range q.project { | 
| +		projectionLookups[i].propertyName = prop | 
| +		foundIt := false | 
| +		for j, col := range rq.suffixFormat { | 
| +			if col.Property == prop { | 
| +				projectionLookups[i].suffixIndex = j | 
| +				foundIt = true | 
| +				break | 
| +			} | 
| +		} | 
| +		if !foundIt { | 
| +			impossible(fmt.Errorf("planning a strategy for an unfulfillable query?")) | 
| +		} | 
| +	} | 
| +	ret := &projectionStrategy{cb: cb, project: projectionLookups} | 
| +	if q.distinct { | 
| +		ret.distinct = map[string]struct{}{} | 
| +	} | 
| +	return ret | 
| +} | 
| + | 
| +func (s *projectionStrategy) handle(rawData [][]byte, suffixData []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { | 
| +	projectedRaw := [][]byte(nil) | 
| +	if s.distinct != nil { | 
| +		projectedRaw = make([][]byte, len(suffixData)) | 
| +	} | 
| +	pmap := make(ds.PropertyMap, len(s.project)) | 
| +	for i, p := range s.project { | 
| +		if s.distinct != nil { | 
| +			projectedRaw[i] = rawData[p.suffixIndex] | 
| +		} | 
| +		pmap[p.propertyName] = []ds.Property{suffixData[p.suffixIndex]} | 
| +	} | 
| +	if s.distinct != nil { | 
| +		rawString := string(bjoin(projectedRaw...)) | 
| +		if _, ok := s.distinct[rawString]; ok { | 
| +			return true | 
| +		} | 
| +		s.distinct[rawString] = struct{}{} | 
| +	} | 
| +	return s.cb(key, pmap, gc) | 
| +} | 
| + | 
| +type keysOnlyStrategy ds.RawRunCB | 
| + | 
| +func (s keysOnlyStrategy) handle(_ [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { | 
| +	return ds.RawRunCB(s)(key, nil, gc) | 
| +} | 
| + | 
| +type normalStrategy struct { | 
| +	cb ds.RawRunCB | 
| + | 
| +	ns   string | 
| +	head *memCollection | 
| +} | 
| + | 
| +func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { | 
| +	coll := head.GetCollection("ents:" + ns) | 
| +	if coll == nil { | 
| +		return nil | 
| +	} | 
| +	return &normalStrategy{cb, ns, coll} | 
| +} | 
| + | 
| +func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { | 
| +	rawEnt := s.head.Get(rawData[len(rawData)-1][1:]) | 
| +	if rawEnt == nil { | 
| +		// entity doesn't exist at head | 
| +		return true | 
| +	} | 
| +	pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns) | 
| +	memoryCorruption(err) | 
| + | 
| +	return s.cb(key, pm, gc) | 
| +} | 
| + | 
| +func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy { | 
| +	if q.keysOnly { | 
| +		return keysOnlyStrategy(cb) | 
| +	} else if len(q.project) > 0 { | 
| +		return newProjectionStrategy(q, rq, cb) | 
| +	} | 
| +	return newNormalStrategy(rq.ns, cb, head) | 
| +} | 
| + | 
| +func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte) (raw [][]byte, decoded []ds.Property) { | 
| +	buf := serialize.Invertible(bytes.NewBuffer(suffix)) | 
| +	decoded = make([]ds.Property, len(suffixFormat)) | 
| +	raw = make([][]byte, len(suffixFormat)) | 
| + | 
| +	err := error(nil) | 
| +	prevOffset := 0 | 
| +	for i := range decoded { | 
| +		needInvert := suffixFormat[i].Direction == ds.DESCENDING | 
| + | 
| +		buf.SetInvert(needInvert) | 
| +		decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns) | 
| +		memoryCorruption(err) | 
| + | 
| +		offset := len(suffix) - buf.Len() | 
| +		if needInvert { | 
| +			raw[i] = invert(suffix[prevOffset:offset]) | 
| +		} else { | 
| +			raw[i] = suffix[prevOffset:offset] | 
| +		} | 
| +		prevOffset = offset | 
| +	} | 
| + | 
| +	return | 
| +} | 
| + | 
| +func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { | 
| +	q, ok := origQ.(*queryImpl) | 
| +	if !ok { | 
| +		return fmt.Errorf("unsupported Query implementation: %#v", q) | 
| +	} | 
| + | 
| +	rq, err := q.reduce(ns, isTxn) | 
| +	if err != nil { | 
| +		if err == errQueryDone { | 
| +			return nil | 
| +		} | 
| +		return err | 
| +	} | 
| + | 
| +	strategy := pickQueryStrategy(q, rq, cb, head) | 
| +	if strategy == nil { | 
| +		// e.g. the normalStrategy found that there were NO entities in the current | 
| +		// namespace. | 
| +		return nil | 
| +	} | 
| + | 
| +	dedup := map[string]struct{}{} | 
| + | 
| +	offset := q.offset | 
| +	limit := q.limit | 
| +	hasLimit := limit < 0 | 
| + | 
| +	cursorPrefix := []byte(nil) | 
| +	getCursorFn := func(suffix []byte) func() (ds.Cursor, error) { | 
| +		return func() (ds.Cursor, error) { | 
| +			if cursorPrefix == nil { | 
| +				buf := &bytes.Buffer{} | 
| +				_, err := cmpbin.WriteUint(buf, uint64(len(rq.suffixFormat))) | 
| +				memoryCorruption(err) | 
| + | 
| +				for _, col := range rq.suffixFormat { | 
| +					err := serialize.WriteIndexColumn(buf, col) | 
| +					memoryCorruption(err) | 
| +				} | 
| +				cursorPrefix = buf.Bytes() | 
| +			} | 
| +			return queryCursor(bjoin(cursorPrefix, suffix)), nil | 
| +		} | 
| +	} | 
| + | 
| +	idxs, err := getIndicies(rq, idx) | 
| +	if err == errQueryDone { | 
| +		return nil | 
| +	} else if err != nil { | 
| +		return err | 
| +	} | 
| + | 
| +	multiIterate(idxs, func(suffix []byte) bool { | 
| +		if offset > 0 { | 
| +			offset-- | 
| +			return true | 
| +		} | 
| +		if hasLimit { | 
| +			if limit <= 0 { | 
| +				return false | 
| +			} | 
| +			limit-- | 
| +		} | 
| + | 
| +		rawData, data := parseSuffix(ns, rq.suffixFormat, suffix) | 
| + | 
| +		keyProp := data[len(data)-1] | 
| +		if keyProp.Type() != ds.PTKey { | 
| +			impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", data)) | 
| +		} | 
| +		keyRaw := string(rawData[len(rawData)-1]) | 
| +		if _, ok := dedup[keyRaw]; ok { | 
| +			// dedup'd! | 
| +			return true | 
| +		} | 
| +		dedup[keyRaw] = struct{}{} | 
| + | 
| +		return strategy.handle( | 
| +			rawData, data, keyProp.Value().(ds.Key), | 
| +			getCursorFn(suffix)) | 
| +	}) | 
| + | 
| +	return nil | 
| +} | 
|  |