| Index: impl/memory/memcache.go
 | 
| diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go
 | 
| index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..1be03861ce851fdd81cb44a290cb522ead0b91d9 100644
 | 
| --- a/impl/memory/memcache.go
 | 
| +++ b/impl/memory/memcache.go
 | 
| @@ -5,20 +5,20 @@
 | 
|  package memory
 | 
|  
 | 
|  import (
 | 
| +	"encoding/binary"
 | 
|  	"sync"
 | 
|  	"time"
 | 
|  
 | 
|  	"golang.org/x/net/context"
 | 
|  
 | 
| -	"github.com/luci/gae/impl/dummy"
 | 
|  	mc "github.com/luci/gae/service/memcache"
 | 
|  	"github.com/luci/luci-go/common/clock"
 | 
| +	"github.com/luci/luci-go/common/errors"
 | 
|  )
 | 
|  
 | 
|  type mcItem struct {
 | 
|  	key        string
 | 
|  	value      []byte
 | 
| -	object     interface{}
 | 
|  	flags      uint32
 | 
|  	expiration time.Duration
 | 
|  
 | 
| @@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil)
 | 
|  
 | 
|  func (m *mcItem) Key() string               { return m.key }
 | 
|  func (m *mcItem) Value() []byte             { return m.value }
 | 
| -func (m *mcItem) Object() interface{}       { return m.object }
 | 
|  func (m *mcItem) Flags() uint32             { return m.flags }
 | 
|  func (m *mcItem) Expiration() time.Duration { return m.expiration }
 | 
|  
 | 
| @@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item {
 | 
|  	m.value = val
 | 
|  	return m
 | 
|  }
 | 
| -func (m *mcItem) SetObject(obj interface{}) mc.Item {
 | 
| -	m.object = obj
 | 
| -	return m
 | 
| -}
 | 
|  func (m *mcItem) SetFlags(flg uint32) mc.Item {
 | 
|  	m.flags = flg
 | 
|  	return m
 | 
| @@ -54,30 +49,98 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item {
 | 
|  	return m
 | 
|  }
 | 
|  
 | 
| -func (m *mcItem) duplicate() *mcItem {
 | 
| +func (m *mcItem) SetAll(other mc.Item) {
 | 
| +	*m = *other.(*mcItem).duplicate(false)
 | 
| +}
 | 
| +
 | 
| +func (m *mcItem) duplicate(deep bool) *mcItem {
 | 
|  	ret := mcItem{}
 | 
|  	ret = *m
 | 
| -	ret.value = make([]byte, len(m.value))
 | 
| -	copy(ret.value, m.value)
 | 
| +	if deep {
 | 
| +		ret.value = make([]byte, len(m.value))
 | 
| +		copy(ret.value, m.value)
 | 
| +	}
 | 
|  	return &ret
 | 
|  }
 | 
|  
 | 
|  type memcacheData struct {
 | 
| -	lock  sync.Mutex
 | 
| +	lock  sync.RWMutex
 | 
|  	items map[string]*mcItem
 | 
|  	casID uint64
 | 
| +
 | 
| +	stats mc.Statistics
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) {
 | 
| +	m.casID++
 | 
| +
 | 
| +	var exp time.Duration
 | 
| +	if i.Expiration() != 0 {
 | 
| +		exp = time.Duration(now.Add(i.Expiration()).UnixNano())
 | 
| +	}
 | 
| +	value := make([]byte, len(i.Value()))
 | 
| +	copy(value, i.Value())
 | 
| +	return &mcItem{
 | 
| +		key:        i.Key(),
 | 
| +		flags:      i.Flags(),
 | 
| +		expiration: exp,
 | 
| +		value:      value,
 | 
| +		CasID:      m.casID,
 | 
| +	}
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) {
 | 
| +	if cur, ok := m.items[i.Key()]; ok {
 | 
| +		m.stats.Items--
 | 
| +		m.stats.Bytes -= uint64(len(cur.value))
 | 
| +	}
 | 
| +	m.stats.Items++
 | 
| +	m.stats.Bytes += uint64(len(i.Value()))
 | 
| +	m.items[i.Key()] = m.mkItemLocked(now, i)
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) delItemLocked(k string) {
 | 
| +	if itm, ok := m.items[k]; ok {
 | 
| +		m.stats.Items--
 | 
| +		m.stats.Bytes -= uint64(len(itm.value))
 | 
| +		delete(m.items, k)
 | 
| +	}
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) reset() {
 | 
| +	m.stats = mc.Statistics{}
 | 
| +	m.items = map[string]*mcItem{}
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) hasItemLocked(now time.Time, key string) bool {
 | 
| +	ret, ok := m.items[key]
 | 
| +	if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) {
 | 
| +		m.delItemLocked(key)
 | 
| +		return false
 | 
| +	}
 | 
| +	return ok
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) {
 | 
| +	if !m.hasItemLocked(now, key) {
 | 
| +		m.stats.Misses++
 | 
| +		return nil, mc.ErrCacheMiss
 | 
| +	}
 | 
| +
 | 
| +	ret := m.items[key]
 | 
| +	m.stats.Hits++
 | 
| +	m.stats.ByteHits += uint64(len(ret.value))
 | 
| +	return ret, nil
 | 
|  }
 | 
|  
 | 
|  // memcacheImpl binds the current connection's memcache data to an
 | 
|  // implementation of {gae.Memcache, gae.Testable}.
 | 
|  type memcacheImpl struct {
 | 
| -	mc.Interface
 | 
| -
 | 
|  	data *memcacheData
 | 
|  	ctx  context.Context
 | 
|  }
 | 
|  
 | 
| -var _ mc.Interface = (*memcacheImpl)(nil)
 | 
| +var _ mc.RawInterface = (*memcacheImpl)(nil)
 | 
|  
 | 
|  // useMC adds a gae.Memcache implementation to context, accessible
 | 
|  // by gae.GetMC(c)
 | 
| @@ -85,7 +148,7 @@ func useMC(c context.Context) context.Context {
 | 
|  	lck := sync.Mutex{}
 | 
|  	mcdMap := map[string]*memcacheData{}
 | 
|  
 | 
| -	return mc.SetFactory(c, func(ic context.Context) mc.Interface {
 | 
| +	return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface {
 | 
|  		lck.Lock()
 | 
|  		defer lck.Unlock()
 | 
|  
 | 
| @@ -97,106 +160,180 @@ func useMC(c context.Context) context.Context {
 | 
|  		}
 | 
|  
 | 
|  		return &memcacheImpl{
 | 
| -			dummy.Memcache(),
 | 
|  			mcd,
 | 
|  			ic,
 | 
|  		}
 | 
|  	})
 | 
|  }
 | 
|  
 | 
| -func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) {
 | 
| -	m.data.casID++
 | 
| +func (m *memcacheImpl) NewItem(key string) mc.Item {
 | 
| +	return &mcItem{key: key}
 | 
| +}
 | 
|  
 | 
| -	var exp time.Duration
 | 
| -	if i.Expiration() != 0 {
 | 
| -		exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano())
 | 
| +func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) {
 | 
| +	// This weird construction is so that we:
 | 
| +	//   - don't take the lock for the entire multi operation, since it could imply
 | 
| +	//     false atomicity.
 | 
| +	//   - don't allow cb to block the actual batch operation, since that would
 | 
| +	//     allow binding in ways that aren't possible under the real
 | 
| +	//     implementation (like a recursive deadlock)
 | 
| +	errs := make([]error, len(items))
 | 
| +	for i, itm := range items {
 | 
| +		errs[i] = inner(itm)
 | 
|  	}
 | 
| -	newItem := mcItem{
 | 
| -		key:        i.Key(),
 | 
| -		flags:      i.Flags(),
 | 
| -		expiration: exp,
 | 
| -		value:      i.Value(),
 | 
| -		CasID:      m.data.casID,
 | 
| +	for _, e := range errs {
 | 
| +		cb(e)
 | 
|  	}
 | 
| -	return newItem.duplicate()
 | 
|  }
 | 
|  
 | 
| -func (m *memcacheImpl) NewItem(key string) mc.Item {
 | 
| -	return &mcItem{key: key}
 | 
| +func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error {
 | 
| +	now := clock.Now(m.ctx)
 | 
| +	doCBs(items, cb, func(itm mc.Item) error {
 | 
| +		m.data.lock.Lock()
 | 
| +		defer m.data.lock.Unlock()
 | 
| +		if !m.data.hasItemLocked(now, itm.Key()) {
 | 
| +			m.data.setItemLocked(now, itm)
 | 
| +			return nil
 | 
| +		} else {
 | 
| +			return (mc.ErrNotStored)
 | 
| +		}
 | 
| +	})
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
| -// Add implements context.MCSingleReadWriter.Add.
 | 
| -func (m *memcacheImpl) Add(i mc.Item) error {
 | 
| -	m.data.lock.Lock()
 | 
| -	defer m.data.lock.Unlock()
 | 
| +func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error {
 | 
| +	now := clock.Now(m.ctx)
 | 
| +	doCBs(items, cb, func(itm mc.Item) error {
 | 
| +		m.data.lock.Lock()
 | 
| +		defer m.data.lock.Unlock()
 | 
| +
 | 
| +		if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil {
 | 
| +			casid := uint64(0)
 | 
| +			if mi, ok := itm.(*mcItem); ok && mi != nil {
 | 
| +				casid = mi.CasID
 | 
| +			}
 | 
| +
 | 
| +			if cur.CasID == casid {
 | 
| +				m.data.setItemLocked(now, itm)
 | 
| +			} else {
 | 
| +				return mc.ErrCASConflict
 | 
| +			}
 | 
| +			return nil
 | 
| +		}
 | 
| +		return mc.ErrNotStored
 | 
| +	})
 | 
| +	return nil
 | 
| +}
 | 
|  
 | 
| -	if _, ok := m.retrieveLocked(i.Key()); !ok {
 | 
| -		m.data.items[i.Key()] = m.mkItemLocked(i)
 | 
| +func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error {
 | 
| +	now := clock.Now(m.ctx)
 | 
| +	doCBs(items, cb, func(itm mc.Item) error {
 | 
| +		m.data.lock.Lock()
 | 
| +		defer m.data.lock.Unlock()
 | 
| +		m.data.setItemLocked(now, itm)
 | 
|  		return nil
 | 
| +	})
 | 
| +	return nil
 | 
| +}
 | 
| +
 | 
| +func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error {
 | 
| +	now := clock.Now(m.ctx)
 | 
| +
 | 
| +	itms := make([]mc.Item, len(keys))
 | 
| +	errs := make([]error, len(keys))
 | 
| +
 | 
| +	for i, k := range keys {
 | 
| +		itms[i], errs[i] = func() (mc.Item, error) {
 | 
| +			m.data.lock.RLock()
 | 
| +			defer m.data.lock.RUnlock()
 | 
| +			val, err := m.data.retrieveLocked(now, k)
 | 
| +			if err != nil {
 | 
| +				return nil, err
 | 
| +			}
 | 
| +			return val.duplicate(true).SetExpiration(0), nil
 | 
| +		}()
 | 
|  	}
 | 
| -	return mc.ErrNotStored
 | 
| +
 | 
| +	for i, itm := range itms {
 | 
| +		cb(itm, errs[i])
 | 
| +	}
 | 
| +
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
| -// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap.
 | 
| -func (m *memcacheImpl) CompareAndSwap(item mc.Item) error {
 | 
| -	m.data.lock.Lock()
 | 
| -	defer m.data.lock.Unlock()
 | 
| +func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error {
 | 
| +	now := clock.Now(m.ctx)
 | 
|  
 | 
| -	if cur, ok := m.retrieveLocked(item.Key()); ok {
 | 
| -		casid := uint64(0)
 | 
| -		if mi, ok := item.(*mcItem); ok && mi != nil {
 | 
| -			casid = mi.CasID
 | 
| -		}
 | 
| +	errs := make([]error, len(keys))
 | 
|  
 | 
| -		if cur.CasID == casid {
 | 
| -			m.data.items[item.Key()] = m.mkItemLocked(item)
 | 
| -		} else {
 | 
| -			return mc.ErrCASConflict
 | 
| -		}
 | 
| -	} else {
 | 
| -		return mc.ErrNotStored
 | 
| +	for i, k := range keys {
 | 
| +		errs[i] = func() error {
 | 
| +			m.data.lock.Lock()
 | 
| +			defer m.data.lock.Unlock()
 | 
| +			_, err := m.data.retrieveLocked(now, k)
 | 
| +			if err != nil {
 | 
| +				return err
 | 
| +			}
 | 
| +			m.data.delItemLocked(k)
 | 
| +			return nil
 | 
| +		}()
 | 
|  	}
 | 
| +
 | 
| +	for _, e := range errs {
 | 
| +		cb(e)
 | 
| +	}
 | 
| +
 | 
|  	return nil
 | 
|  }
 | 
|  
 | 
| -// Set implements context.MCSingleReadWriter.Set.
 | 
| -func (m *memcacheImpl) Set(i mc.Item) error {
 | 
| +func (m *memcacheImpl) Flush() error {
 | 
|  	m.data.lock.Lock()
 | 
|  	defer m.data.lock.Unlock()
 | 
| -	m.data.items[i.Key()] = m.mkItemLocked(i)
 | 
| +
 | 
| +	m.data.reset()
 | 
|  	return nil
 | 
|  }
 | 
|  
 | 
| -// Get implements context.MCSingleReadWriter.Get.
 | 
| -func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) {
 | 
| +func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) {
 | 
| +	now := clock.Now(m.ctx)
 | 
| +
 | 
|  	m.data.lock.Lock()
 | 
|  	defer m.data.lock.Unlock()
 | 
| -	if val, ok := m.retrieveLocked(key); ok {
 | 
| -		itm = val.duplicate().SetExpiration(0)
 | 
| +
 | 
| +	cur := uint64(0)
 | 
| +	if initialValue == nil {
 | 
| +		curItm, err := m.data.retrieveLocked(now, key)
 | 
| +		if err != nil {
 | 
| +			return 0, err
 | 
| +		}
 | 
| +		if len(curItm.value) != 8 {
 | 
| +			return 0, errors.New("memcache Increment: got invalid current value")
 | 
| +		}
 | 
| +		cur = binary.LittleEndian.Uint64(curItm.value)
 | 
|  	} else {
 | 
| -		err = mc.ErrCacheMiss
 | 
| +		cur = *initialValue
 | 
| +	}
 | 
| +	if delta < 0 {
 | 
| +		if uint64(-delta) > cur {
 | 
| +			cur = 0
 | 
| +		} else {
 | 
| +			cur -= uint64(-delta)
 | 
| +		}
 | 
| +	} else {
 | 
| +		cur += uint64(delta)
 | 
|  	}
 | 
| -	return
 | 
| -}
 | 
|  
 | 
| -// Delete implements context.MCSingleReadWriter.Delete.
 | 
| -func (m *memcacheImpl) Delete(key string) error {
 | 
| -	m.data.lock.Lock()
 | 
| -	defer m.data.lock.Unlock()
 | 
| +	newval := make([]byte, 8)
 | 
| +	binary.LittleEndian.PutUint64(newval, cur)
 | 
| +	m.data.setItemLocked(now, m.NewItem(key).SetValue(newval))
 | 
|  
 | 
| -	if _, ok := m.retrieveLocked(key); ok {
 | 
| -		delete(m.data.items, key)
 | 
| -		return nil
 | 
| -	}
 | 
| -	return mc.ErrCacheMiss
 | 
| +	return cur, nil
 | 
|  }
 | 
|  
 | 
| -func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) {
 | 
| -	ret, ok := m.data.items[key]
 | 
| -	if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) {
 | 
| -		ret = nil
 | 
| -		ok = false
 | 
| -		delete(m.data.items, key)
 | 
| -	}
 | 
| -	return ret, ok
 | 
| +func (m *memcacheImpl) Stats() (*mc.Statistics, error) {
 | 
| +	m.data.lock.RLock()
 | 
| +	defer m.data.lock.RUnlock()
 | 
| +
 | 
| +	ret := m.data.stats
 | 
| +	return &ret, nil
 | 
|  }
 | 
| 
 |