| Index: dashboard/dashboard/common/stored_object.py
|
| diff --git a/dashboard/dashboard/common/stored_object.py b/dashboard/dashboard/common/stored_object.py
|
| index d0f8b96f31ce6c6906560068b6b37cd6605a8d36..d903134a2d5a47073427c44193e1f5ad49e5fbc2 100644
|
| --- a/dashboard/dashboard/common/stored_object.py
|
| +++ b/dashboard/dashboard/common/stored_object.py
|
| @@ -40,6 +40,7 @@ _MAX_NUM_PARTS = 16
|
| _CHUNK_SIZE = 1000 * 1000
|
|
|
|
|
| +@ndb.synctasklet
|
| def Get(key):
|
| """Gets the value.
|
|
|
| @@ -49,13 +50,22 @@ def Get(key):
|
| Returns:
|
| A value for key.
|
| """
|
| - results = MultipartCache.Get(key)
|
| + result = yield GetAsync(key)
|
| + raise ndb.Return(result)
|
| +
|
| +
|
| +@ndb.tasklet
|
| +def GetAsync(key):
|
| + results = yield MultipartCache.GetAsync(key)
|
| if not results:
|
| - results = _GetValueFromDatastore(key)
|
| - MultipartCache.Set(key, results)
|
| - return results
|
| + set_future = MultipartCache.SetAsync(key, results)
|
| + get_future = _GetValueFromDatastore(key)
|
| + yield set_future, get_future
|
| + results = get_future.get_result()
|
| + raise ndb.Return(results)
|
|
|
|
|
| +@ndb.synctasklet
|
| def Set(key, value):
|
| """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
|
|
|
| @@ -63,18 +73,31 @@ def Set(key, value):
|
| key: String key value.
|
| value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
|
| """
|
| - entity = ndb.Key(MultipartEntity, key).get()
|
| + yield SetAsync(key, value)
|
| +
|
| +
|
| +@ndb.tasklet
|
| +def SetAsync(key, value):
|
| + entity = yield ndb.Key(MultipartEntity, key).get_async()
|
| if not entity:
|
| entity = MultipartEntity(id=key)
|
| entity.SetData(value)
|
| - entity.Save()
|
| - MultipartCache.Set(key, value)
|
| + yield (entity.PutAsync(),
|
| + MultipartCache.SetAsync(key, value))
|
|
|
|
|
| +@ndb.synctasklet
|
| def Delete(key):
|
| """Deletes the value in datastore and memcache."""
|
| - ndb.Key(MultipartEntity, key).delete()
|
| - MultipartCache.Delete(key)
|
| + yield DeleteAsync(key)
|
| +
|
| +
|
| +@ndb.tasklet
|
| +def DeleteAsync(key):
|
| + multipart_entity_key = ndb.Key(MultipartEntity, key)
|
| + yield (multipart_entity_key.delete_async(),
|
| + MultipartEntity.DeleteAsync(multipart_entity_key),
|
| + MultipartCache.DeleteAsync(key))
|
|
|
|
|
| class MultipartEntity(ndb.Model):
|
| @@ -83,27 +106,27 @@ class MultipartEntity(ndb.Model):
|
| # Number of entities use to store serialized.
|
| size = ndb.IntegerProperty(default=0, indexed=False)
|
|
|
| - @classmethod
|
| - def _post_get_hook(cls, key, future): # pylint: disable=unused-argument
|
| + @ndb.tasklet
|
| + def GetPartsAsync(self):
|
| """Deserializes data from multiple PartEntity."""
|
| - entity = future.get_result()
|
| - if entity is None or not entity.size:
|
| + if self.size:
|
| return
|
|
|
| - string_id = entity.key.string_id()
|
| + string_id = self.key.string_id()
|
| part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
|
| for i in xrange(entity.size)]
|
| - part_entities = ndb.get_multi(part_keys)
|
| + part_entities = yield ndb.get_multi_async(part_keys)
|
| serialized = ''.join(p.value for p in part_entities if p is not None)
|
| - entity.SetData(pickle.loads(serialized))
|
| + self.SetData(pickle.loads(serialized))
|
|
|
| @classmethod
|
| - def _pre_delete_hook(cls, key):
|
| - """Deletes PartEntity entities."""
|
| - part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True)
|
| - ndb.delete_multi(part_keys)
|
| + @ndb.tasklet
|
| + def DeleteAsync(cls, key):
|
| + part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True)
|
| + yield ndb.delete_multi_async(part_keys)
|
|
|
| - def Save(self):
|
| + @ndb.tasklet
|
| + def PutAsync(self):
|
| """Stores serialized data over multiple PartEntity."""
|
| serialized_parts = _Serialize(self.GetData())
|
| if len(serialized_parts) > _MAX_NUM_PARTS:
|
| @@ -116,7 +139,7 @@ class MultipartEntity(ndb.Model):
|
| part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
|
| part_list.append(part)
|
| self.size = num_parts
|
| - ndb.put_multi(part_list + [self])
|
| + yield ndb.put_multi_async(part_list + [self])
|
|
|
| def GetData(self):
|
| return getattr(self, '_data', None)
|
| @@ -142,43 +165,54 @@ class MultipartCache(object):
|
| """
|
|
|
| @classmethod
|
| - def Get(cls, key):
|
| + @ndb.tasklet
|
| + def GetAsync(cls, key):
|
| """Gets value in memcache."""
|
| keys = cls._GetCacheKeyList(key)
|
| head_key = cls._GetCacheKey(key)
|
| - cache_values = memcache.get_multi(keys)
|
| + client = memcache.Client()
|
| + cache_values = yield client.get_multi_async(keys)
|
| # Whether we have all the memcache values.
|
| if len(keys) != len(cache_values) or head_key not in cache_values:
|
| - return None
|
| + raise ndb.Return(None)
|
|
|
| serialized = ''
|
| cache_size = cache_values[head_key]
|
| keys.remove(head_key)
|
| for key in keys[:cache_size]:
|
| if key not in cache_values:
|
| - return None
|
| + raise ndb.Return(None)
|
| if cache_values[key] is not None:
|
| serialized += cache_values[key]
|
| - return pickle.loads(serialized)
|
| + raise ndb.Return(pickle.loads(serialized))
|
|
|
| @classmethod
|
| - def Set(cls, key, value):
|
| + @ndb.tasklet
|
| + def SetAsync(cls, key, value):
|
| """Sets a value in memcache."""
|
| serialized_parts = _Serialize(value)
|
| if len(serialized_parts) > _MAX_NUM_PARTS:
|
| logging.error('Max number of parts reached.')
|
| - return
|
| + raise ndb.Return(None)
|
|
|
| cached_values = {}
|
| cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
|
| for i in xrange(len(serialized_parts)):
|
| cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
|
| - memcache.set_multi(cached_values)
|
| + client = memcache.Client()
|
| + yield client.set_multi_async(cached_values)
|
|
|
| @classmethod
|
| + @ndb.synctasklet
|
| def Delete(cls, key):
|
| """Deletes all cached values for key."""
|
| - memcache.delete_multi(cls._GetCacheKeyList(key))
|
| + yield cls.DeleteAsync(key)
|
| +
|
| + @classmethod
|
| + @ndb.tasklet
|
| + def DeleteAsync(cls, key):
|
| + client = memcache.Client()
|
| + yield client.delete_multi_async(cls._GetCacheKeyList(key))
|
|
|
| @classmethod
|
| def _GetCacheKeyList(cls, key):
|
| @@ -195,11 +229,13 @@ class MultipartCache(object):
|
| return _MULTIPART_ENTITY_MEMCACHE_KEY + key
|
|
|
|
|
| +@ndb.tasklet
|
| def _GetValueFromDatastore(key):
|
| - entity = ndb.Key(MultipartEntity, key).get()
|
| + entity = yield ndb.Key(MultipartEntity, key).get_async()
|
| if not entity:
|
| - return None
|
| - return entity.GetData()
|
| + raise ndb.Return(None)
|
| + yield entity.GetPartsAsync()
|
| + raise ndb.Return(entity.GetData())
|
|
|
|
|
| def _Serialize(value):
|
|
|