Index: dashboard/dashboard/common/stored_object.py |
diff --git a/dashboard/dashboard/common/stored_object.py b/dashboard/dashboard/common/stored_object.py |
index 9f802e1f0ce929066b11605b96afda0eb19f89a3..8bd06f224c58246019185d02bd33f62203190e6b 100644 |
--- a/dashboard/dashboard/common/stored_object.py |
+++ b/dashboard/dashboard/common/stored_object.py |
@@ -4,17 +4,13 @@ |
"""A module for storing and getting objects from datastore. |
-This module provides Get, Set and Delete functions for storing pickleable |
-objects in datastore, with support for large objects greater than 1 MB. |
+App Engine datastore limits entity size to less than 1 MB; this module |
+supports storing larger objects by splitting the data and using multiple |
+datastore entities. |
Although this module contains ndb.Model classes, these are not intended |
to be used directly by other modules. |
-App Engine datastore limits entity size to less than 1 MB; this module |
-supports storing larger objects by splitting the data and using multiple |
-datastore entities and multiple memcache keys. Using ndb.get and pickle, a |
-complex data structure can be retrieved more quickly than datastore fetch. |
- |
Example: |
john = Account() |
john.username = 'John' |
@@ -23,20 +19,10 @@ Example: |
""" |
import cPickle as pickle |
-import logging |
-from google.appengine.api import memcache |
from google.appengine.ext import ndb |
-_MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_' |
- |
-# Maximum number of entities and memcache to save a value. |
-# The limit for data stored in one datastore entity is 1 MB, |
-# and the limit for memcache batch operations is 32 MB. See: |
-# https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits |
-_MAX_NUM_PARTS = 16 |
- |
-# Max bytes per entity or value cached with memcache. |
+# Max bytes per entity. |
_CHUNK_SIZE = 1000 * 1000 |
@@ -56,22 +42,20 @@ def Get(key): |
@ndb.tasklet |
def GetAsync(key): |
- results = yield MultipartCache.GetAsync(key) |
- if not results: |
- set_future = MultipartCache.SetAsync(key, results) |
- get_future = _GetValueFromDatastore(key) |
- yield set_future, get_future |
- results = get_future.get_result() |
- raise ndb.Return(results) |
+ entity = yield ndb.Key(MultipartEntity, key).get_async() |
+ if not entity: |
+ raise ndb.Return(None) |
+ yield entity.GetPartsAsync() |
+ raise ndb.Return(entity.GetData()) |
@ndb.synctasklet |
def Set(key, value): |
- """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. |
+ """Sets the value in datastore. |
Args: |
key: String key value. |
- value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. |
+ value: A pickleable value to be stored. |
""" |
yield SetAsync(key, value) |
@@ -82,13 +66,12 @@ def SetAsync(key, value): |
if not entity: |
entity = MultipartEntity(id=key) |
entity.SetData(value) |
- yield (entity.PutAsync(), |
- MultipartCache.SetAsync(key, value)) |
+ yield entity.PutAsync() |
@ndb.synctasklet |
def Delete(key): |
- """Deletes the value in datastore and memcache.""" |
+ """Deletes the value in datastore.""" |
yield DeleteAsync(key) |
@@ -96,8 +79,7 @@ def Delete(key): |
def DeleteAsync(key): |
multipart_entity_key = ndb.Key(MultipartEntity, key) |
yield (multipart_entity_key.delete_async(), |
- MultipartEntity.DeleteAsync(multipart_entity_key), |
- MultipartCache.DeleteAsync(key)) |
+ MultipartEntity.DeleteAsync(multipart_entity_key)) |
class MultipartEntity(ndb.Model): |
@@ -129,15 +111,11 @@ class MultipartEntity(ndb.Model): |
def PutAsync(self): |
"""Stores serialized data over multiple PartEntity.""" |
serialized_parts = _Serialize(self.GetData()) |
- if len(serialized_parts) > _MAX_NUM_PARTS: |
- logging.error('Max number of parts reached.') |
- return |
part_list = [] |
num_parts = len(serialized_parts) |
for i in xrange(num_parts): |
- if serialized_parts[i] is not None: |
- part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
- part_list.append(part) |
+ part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
+ part_list.append(part) |
self.size = num_parts |
yield ndb.put_multi_async(part_list + [self]) |
@@ -157,87 +135,6 @@ class PartEntity(ndb.Model): |
value = ndb.BlobProperty() |
-class MultipartCache(object): |
- """Contains operations for storing values over multiple memcache keys. |
- |
- Values are serialized, split, and stored over multiple memcache keys. The |
- head cache stores the expected size. |
- """ |
- |
- @classmethod |
- @ndb.tasklet |
- def GetAsync(cls, key): |
- """Gets value in memcache.""" |
- keys = cls._GetCacheKeyList(key) |
- head_key = cls._GetCacheKey(key) |
- client = memcache.Client() |
- cache_values = yield client.get_multi_async(keys) |
- # Whether we have all the memcache values. |
- if len(keys) != len(cache_values) or head_key not in cache_values: |
- raise ndb.Return(None) |
- |
- serialized = '' |
- cache_size = cache_values[head_key] |
- keys.remove(head_key) |
- for key in keys[:cache_size]: |
- if key not in cache_values: |
- raise ndb.Return(None) |
- if cache_values[key] is not None: |
- serialized += cache_values[key] |
- raise ndb.Return(pickle.loads(serialized)) |
- |
- @classmethod |
- @ndb.tasklet |
- def SetAsync(cls, key, value): |
- """Sets a value in memcache.""" |
- serialized_parts = _Serialize(value) |
- if len(serialized_parts) > _MAX_NUM_PARTS: |
- logging.error('Max number of parts reached.') |
- raise ndb.Return(None) |
- |
- cached_values = {} |
- cached_values[cls._GetCacheKey(key)] = len(serialized_parts) |
- for i in xrange(len(serialized_parts)): |
- cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] |
- client = memcache.Client() |
- yield client.set_multi_async(cached_values) |
- |
- @classmethod |
- @ndb.synctasklet |
- def Delete(cls, key): |
- """Deletes all cached values for key.""" |
- yield cls.DeleteAsync(key) |
- |
- @classmethod |
- @ndb.tasklet |
- def DeleteAsync(cls, key): |
- client = memcache.Client() |
- yield client.delete_multi_async(cls._GetCacheKeyList(key)) |
- |
- @classmethod |
- def _GetCacheKeyList(cls, key): |
- """Gets a list of head cache key and cache key parts.""" |
- keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] |
- keys.append(cls._GetCacheKey(key)) |
- return keys |
- |
- @classmethod |
- def _GetCacheKey(cls, key, index=None): |
- """Returns either head cache key or cache key part.""" |
- if index is not None: |
- return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) |
- return _MULTIPART_ENTITY_MEMCACHE_KEY + key |
- |
- |
-@ndb.tasklet |
-def _GetValueFromDatastore(key): |
- entity = yield ndb.Key(MultipartEntity, key).get_async() |
- if not entity: |
- raise ndb.Return(None) |
- yield entity.GetPartsAsync() |
- raise ndb.Return(entity.GetData()) |
- |
- |
def _Serialize(value): |
"""Serializes value and returns a list of its parts. |
@@ -253,6 +150,4 @@ def _Serialize(value): |
values = [] |
for i in xrange(0, length, _CHUNK_SIZE): |
values.append(serialized[i:i + _CHUNK_SIZE]) |
- for i in xrange(len(values), _MAX_NUM_PARTS): |
- values.append(None) |
return values |