Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2008)

Unified Diff: dashboard/dashboard/common/stored_object.py

Issue 2748953003: . Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « dashboard/dashboard/common/namespaced_stored_object.py ('k') | dashboard/dashboard/graph_revisions.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: dashboard/dashboard/common/stored_object.py
diff --git a/dashboard/dashboard/common/stored_object.py b/dashboard/dashboard/common/stored_object.py
index d0f8b96f31ce6c6906560068b6b37cd6605a8d36..d903134a2d5a47073427c44193e1f5ad49e5fbc2 100644
--- a/dashboard/dashboard/common/stored_object.py
+++ b/dashboard/dashboard/common/stored_object.py
@@ -40,6 +40,7 @@ _MAX_NUM_PARTS = 16
_CHUNK_SIZE = 1000 * 1000
+@ndb.synctasklet
def Get(key):
"""Gets the value.
@@ -49,13 +50,22 @@ def Get(key):
Returns:
A value for key.
"""
- results = MultipartCache.Get(key)
+ result = yield GetAsync(key)
+ raise ndb.Return(result)
+
+
+@ndb.tasklet
+def GetAsync(key):
+ results = yield MultipartCache.GetAsync(key)
if not results:
- results = _GetValueFromDatastore(key)
- MultipartCache.Set(key, results)
- return results
+ set_future = MultipartCache.SetAsync(key, results)
+ get_future = _GetValueFromDatastore(key)
+ yield set_future, get_future
+ results = get_future.get_result()
+ raise ndb.Return(results)
+@ndb.synctasklet
def Set(key, value):
"""Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
@@ -63,18 +73,31 @@ def Set(key, value):
key: String key value.
value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
"""
- entity = ndb.Key(MultipartEntity, key).get()
+ yield SetAsync(key, value)
+
+
+@ndb.tasklet
+def SetAsync(key, value):
+ entity = yield ndb.Key(MultipartEntity, key).get_async()
if not entity:
entity = MultipartEntity(id=key)
entity.SetData(value)
- entity.Save()
- MultipartCache.Set(key, value)
+ yield (entity.PutAsync(),
+ MultipartCache.SetAsync(key, value))
+@ndb.synctasklet
def Delete(key):
"""Deletes the value in datastore and memcache."""
- ndb.Key(MultipartEntity, key).delete()
- MultipartCache.Delete(key)
+ yield DeleteAsync(key)
+
+
+@ndb.tasklet
+def DeleteAsync(key):
+ multipart_entity_key = ndb.Key(MultipartEntity, key)
+ yield (multipart_entity_key.delete_async(),
+ MultipartEntity.DeleteAsync(multipart_entity_key),
+ MultipartCache.DeleteAsync(key))
class MultipartEntity(ndb.Model):
@@ -83,27 +106,27 @@ class MultipartEntity(ndb.Model):
# Number of entities use to store serialized.
size = ndb.IntegerProperty(default=0, indexed=False)
- @classmethod
- def _post_get_hook(cls, key, future): # pylint: disable=unused-argument
+ @ndb.tasklet
+ def GetPartsAsync(self):
"""Deserializes data from multiple PartEntity."""
- entity = future.get_result()
- if entity is None or not entity.size:
+ if self.size:
return
- string_id = entity.key.string_id()
+ string_id = self.key.string_id()
part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
for i in xrange(entity.size)]
- part_entities = ndb.get_multi(part_keys)
+ part_entities = yield ndb.get_multi_async(part_keys)
serialized = ''.join(p.value for p in part_entities if p is not None)
- entity.SetData(pickle.loads(serialized))
+ self.SetData(pickle.loads(serialized))
@classmethod
- def _pre_delete_hook(cls, key):
- """Deletes PartEntity entities."""
- part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True)
- ndb.delete_multi(part_keys)
+ @ndb.tasklet
+ def DeleteAsync(cls, key):
+ part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True)
+ yield ndb.delete_multi_async(part_keys)
- def Save(self):
+ @ndb.tasklet
+ def PutAsync(self):
"""Stores serialized data over multiple PartEntity."""
serialized_parts = _Serialize(self.GetData())
if len(serialized_parts) > _MAX_NUM_PARTS:
@@ -116,7 +139,7 @@ class MultipartEntity(ndb.Model):
part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
part_list.append(part)
self.size = num_parts
- ndb.put_multi(part_list + [self])
+ yield ndb.put_multi_async(part_list + [self])
def GetData(self):
return getattr(self, '_data', None)
@@ -142,43 +165,54 @@ class MultipartCache(object):
"""
@classmethod
- def Get(cls, key):
+ @ndb.tasklet
+ def GetAsync(cls, key):
"""Gets value in memcache."""
keys = cls._GetCacheKeyList(key)
head_key = cls._GetCacheKey(key)
- cache_values = memcache.get_multi(keys)
+ client = memcache.Client()
+ cache_values = yield client.get_multi_async(keys)
# Whether we have all the memcache values.
if len(keys) != len(cache_values) or head_key not in cache_values:
- return None
+ raise ndb.Return(None)
serialized = ''
cache_size = cache_values[head_key]
keys.remove(head_key)
for key in keys[:cache_size]:
if key not in cache_values:
- return None
+ raise ndb.Return(None)
if cache_values[key] is not None:
serialized += cache_values[key]
- return pickle.loads(serialized)
+ raise ndb.Return(pickle.loads(serialized))
@classmethod
- def Set(cls, key, value):
+ @ndb.tasklet
+ def SetAsync(cls, key, value):
"""Sets a value in memcache."""
serialized_parts = _Serialize(value)
if len(serialized_parts) > _MAX_NUM_PARTS:
logging.error('Max number of parts reached.')
- return
+ raise ndb.Return(None)
cached_values = {}
cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
for i in xrange(len(serialized_parts)):
cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
- memcache.set_multi(cached_values)
+ client = memcache.Client()
+ yield client.set_multi_async(cached_values)
@classmethod
+ @ndb.synctasklet
def Delete(cls, key):
"""Deletes all cached values for key."""
- memcache.delete_multi(cls._GetCacheKeyList(key))
+ yield cls.DeleteAsync(key)
+
+ @classmethod
+ @ndb.tasklet
+ def DeleteAsync(cls, key):
+ client = memcache.Client()
+ yield client.delete_multi_async(cls._GetCacheKeyList(key))
@classmethod
def _GetCacheKeyList(cls, key):
@@ -195,11 +229,13 @@ class MultipartCache(object):
return _MULTIPART_ENTITY_MEMCACHE_KEY + key
+@ndb.tasklet
def _GetValueFromDatastore(key):
- entity = ndb.Key(MultipartEntity, key).get()
+ entity = yield ndb.Key(MultipartEntity, key).get_async()
if not entity:
- return None
- return entity.GetData()
+ raise ndb.Return(None)
+ yield entity.GetPartsAsync()
+ raise ndb.Return(entity.GetData())
def _Serialize(value):
« no previous file with comments | « dashboard/dashboard/common/namespaced_stored_object.py ('k') | dashboard/dashboard/graph_revisions.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698