| Index: dashboard/dashboard/common/stored_object.py
|
| diff --git a/dashboard/dashboard/common/stored_object.py b/dashboard/dashboard/common/stored_object.py
|
| index 9f802e1f0ce929066b11605b96afda0eb19f89a3..8bd06f224c58246019185d02bd33f62203190e6b 100644
|
| --- a/dashboard/dashboard/common/stored_object.py
|
| +++ b/dashboard/dashboard/common/stored_object.py
|
| @@ -4,17 +4,13 @@
|
|
|
| """A module for storing and getting objects from datastore.
|
|
|
| -This module provides Get, Set and Delete functions for storing pickleable
|
| -objects in datastore, with support for large objects greater than 1 MB.
|
| +App Engine datastore limits entity size to less than 1 MB; this module
|
| +supports storing larger objects by splitting the data and using multiple
|
| +datastore entities.
|
|
|
| Although this module contains ndb.Model classes, these are not intended
|
| to be used directly by other modules.
|
|
|
| -App Engine datastore limits entity size to less than 1 MB; this module
|
| -supports storing larger objects by splitting the data and using multiple
|
| -datastore entities and multiple memcache keys. Using ndb.get and pickle, a
|
| -complex data structure can be retrieved more quickly than datastore fetch.
|
| -
|
| Example:
|
| john = Account()
|
| john.username = 'John'
|
| @@ -23,20 +19,10 @@ Example:
|
| """
|
|
|
| import cPickle as pickle
|
| -import logging
|
|
|
| -from google.appengine.api import memcache
|
| from google.appengine.ext import ndb
|
|
|
| -_MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_'
|
| -
|
| -# Maximum number of entities and memcache to save a value.
|
| -# The limit for data stored in one datastore entity is 1 MB,
|
| -# and the limit for memcache batch operations is 32 MB. See:
|
| -# https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits
|
| -_MAX_NUM_PARTS = 16
|
| -
|
| -# Max bytes per entity or value cached with memcache.
|
| +# Max bytes per entity.
|
| _CHUNK_SIZE = 1000 * 1000
|
|
|
|
|
| @@ -56,22 +42,20 @@ def Get(key):
|
|
|
| @ndb.tasklet
|
| def GetAsync(key):
|
| - results = yield MultipartCache.GetAsync(key)
|
| - if not results:
|
| - set_future = MultipartCache.SetAsync(key, results)
|
| - get_future = _GetValueFromDatastore(key)
|
| - yield set_future, get_future
|
| - results = get_future.get_result()
|
| - raise ndb.Return(results)
|
| + entity = yield ndb.Key(MultipartEntity, key).get_async()
|
| + if not entity:
|
| + raise ndb.Return(None)
|
| + yield entity.GetPartsAsync()
|
| + raise ndb.Return(entity.GetData())
|
|
|
|
|
| @ndb.synctasklet
|
| def Set(key, value):
|
| - """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
|
| + """Sets the value in datastore.
|
|
|
| Args:
|
| key: String key value.
|
| - value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
|
| + value: A pickleable value to be stored.
|
| """
|
| yield SetAsync(key, value)
|
|
|
| @@ -82,13 +66,12 @@ def SetAsync(key, value):
|
| if not entity:
|
| entity = MultipartEntity(id=key)
|
| entity.SetData(value)
|
| - yield (entity.PutAsync(),
|
| - MultipartCache.SetAsync(key, value))
|
| + yield entity.PutAsync()
|
|
|
|
|
| @ndb.synctasklet
|
| def Delete(key):
|
| - """Deletes the value in datastore and memcache."""
|
| + """Deletes the value in datastore."""
|
| yield DeleteAsync(key)
|
|
|
|
|
| @@ -96,8 +79,7 @@ def Delete(key):
|
| def DeleteAsync(key):
|
| multipart_entity_key = ndb.Key(MultipartEntity, key)
|
| yield (multipart_entity_key.delete_async(),
|
| - MultipartEntity.DeleteAsync(multipart_entity_key),
|
| - MultipartCache.DeleteAsync(key))
|
| + MultipartEntity.DeleteAsync(multipart_entity_key))
|
|
|
|
|
| class MultipartEntity(ndb.Model):
|
| @@ -129,15 +111,11 @@ class MultipartEntity(ndb.Model):
|
| def PutAsync(self):
|
| """Stores serialized data over multiple PartEntity."""
|
| serialized_parts = _Serialize(self.GetData())
|
| - if len(serialized_parts) > _MAX_NUM_PARTS:
|
| - logging.error('Max number of parts reached.')
|
| - return
|
| part_list = []
|
| num_parts = len(serialized_parts)
|
| for i in xrange(num_parts):
|
| - if serialized_parts[i] is not None:
|
| - part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
|
| - part_list.append(part)
|
| + part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
|
| + part_list.append(part)
|
| self.size = num_parts
|
| yield ndb.put_multi_async(part_list + [self])
|
|
|
| @@ -157,87 +135,6 @@ class PartEntity(ndb.Model):
|
| value = ndb.BlobProperty()
|
|
|
|
|
| -class MultipartCache(object):
|
| - """Contains operations for storing values over multiple memcache keys.
|
| -
|
| - Values are serialized, split, and stored over multiple memcache keys. The
|
| - head cache stores the expected size.
|
| - """
|
| -
|
| - @classmethod
|
| - @ndb.tasklet
|
| - def GetAsync(cls, key):
|
| - """Gets value in memcache."""
|
| - keys = cls._GetCacheKeyList(key)
|
| - head_key = cls._GetCacheKey(key)
|
| - client = memcache.Client()
|
| - cache_values = yield client.get_multi_async(keys)
|
| - # Whether we have all the memcache values.
|
| - if len(keys) != len(cache_values) or head_key not in cache_values:
|
| - raise ndb.Return(None)
|
| -
|
| - serialized = ''
|
| - cache_size = cache_values[head_key]
|
| - keys.remove(head_key)
|
| - for key in keys[:cache_size]:
|
| - if key not in cache_values:
|
| - raise ndb.Return(None)
|
| - if cache_values[key] is not None:
|
| - serialized += cache_values[key]
|
| - raise ndb.Return(pickle.loads(serialized))
|
| -
|
| - @classmethod
|
| - @ndb.tasklet
|
| - def SetAsync(cls, key, value):
|
| - """Sets a value in memcache."""
|
| - serialized_parts = _Serialize(value)
|
| - if len(serialized_parts) > _MAX_NUM_PARTS:
|
| - logging.error('Max number of parts reached.')
|
| - raise ndb.Return(None)
|
| -
|
| - cached_values = {}
|
| - cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
|
| - for i in xrange(len(serialized_parts)):
|
| - cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
|
| - client = memcache.Client()
|
| - yield client.set_multi_async(cached_values)
|
| -
|
| - @classmethod
|
| - @ndb.synctasklet
|
| - def Delete(cls, key):
|
| - """Deletes all cached values for key."""
|
| - yield cls.DeleteAsync(key)
|
| -
|
| - @classmethod
|
| - @ndb.tasklet
|
| - def DeleteAsync(cls, key):
|
| - client = memcache.Client()
|
| - yield client.delete_multi_async(cls._GetCacheKeyList(key))
|
| -
|
| - @classmethod
|
| - def _GetCacheKeyList(cls, key):
|
| - """Gets a list of head cache key and cache key parts."""
|
| - keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)]
|
| - keys.append(cls._GetCacheKey(key))
|
| - return keys
|
| -
|
| - @classmethod
|
| - def _GetCacheKey(cls, key, index=None):
|
| - """Returns either head cache key or cache key part."""
|
| - if index is not None:
|
| - return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index)
|
| - return _MULTIPART_ENTITY_MEMCACHE_KEY + key
|
| -
|
| -
|
| -@ndb.tasklet
|
| -def _GetValueFromDatastore(key):
|
| - entity = yield ndb.Key(MultipartEntity, key).get_async()
|
| - if not entity:
|
| - raise ndb.Return(None)
|
| - yield entity.GetPartsAsync()
|
| - raise ndb.Return(entity.GetData())
|
| -
|
| -
|
| def _Serialize(value):
|
| """Serializes value and returns a list of its parts.
|
|
|
| @@ -253,6 +150,4 @@ def _Serialize(value):
|
| values = []
|
| for i in xrange(0, length, _CHUNK_SIZE):
|
| values.append(serialized[i:i + _CHUNK_SIZE])
|
| - for i in xrange(len(values), _MAX_NUM_PARTS):
|
| - values.append(None)
|
| return values
|
|
|