Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: dashboard/dashboard/common/stored_object.py

Issue 3005413002: [dashboard] Remove memcache and _MAX_NUM_PARTS from stored_object. (Closed)
Patch Set: Remove _GetValueFromDatastore. Created 3 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | dashboard/dashboard/common/stored_object_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """A module for storing and getting objects from datastore. 5 """A module for storing and getting objects from datastore.
6 6
7 This module provides Get, Set and Delete functions for storing pickleable 7 App Engine datastore limits entity size to less than 1 MB; this module
8 objects in datastore, with support for large objects greater than 1 MB. 8 supports storing larger objects by splitting the data and using multiple
9 datastore entities.
9 10
10 Although this module contains ndb.Model classes, these are not intended 11 Although this module contains ndb.Model classes, these are not intended
11 to be used directly by other modules. 12 to be used directly by other modules.
12 13
13 App Engine datastore limits entity size to less than 1 MB; this module
14 supports storing larger objects by splitting the data and using multiple
15 datastore entities and multiple memcache keys. Using ndb.get and pickle, a
16 complex data structure can be retrieved more quickly than datastore fetch.
17
18 Example: 14 Example:
19 john = Account() 15 john = Account()
20 john.username = 'John' 16 john.username = 'John'
21 john.userid = 123 17 john.userid = 123
22 stored_object.Set(john.userid, john) 18 stored_object.Set(john.userid, john)
23 """ 19 """
24 20
25 import cPickle as pickle 21 import cPickle as pickle
26 import logging
27 22
28 from google.appengine.api import memcache
29 from google.appengine.ext import ndb 23 from google.appengine.ext import ndb
30 24
31 _MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_' 25 # Max bytes per entity.
32
33 # Maximum number of entities and memcache to save a value.
34 # The limit for data stored in one datastore entity is 1 MB,
35 # and the limit for memcache batch operations is 32 MB. See:
36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits
37 _MAX_NUM_PARTS = 16
38
39 # Max bytes per entity or value cached with memcache.
40 _CHUNK_SIZE = 1000 * 1000 26 _CHUNK_SIZE = 1000 * 1000
41 27
42 28
43 @ndb.synctasklet 29 @ndb.synctasklet
44 def Get(key): 30 def Get(key):
45 """Gets the value. 31 """Gets the value.
46 32
47 Args: 33 Args:
48 key: String key value. 34 key: String key value.
49 35
50 Returns: 36 Returns:
51 A value for key. 37 A value for key.
52 """ 38 """
53 result = yield GetAsync(key) 39 result = yield GetAsync(key)
54 raise ndb.Return(result) 40 raise ndb.Return(result)
55 41
56 42
57 @ndb.tasklet 43 @ndb.tasklet
58 def GetAsync(key): 44 def GetAsync(key):
59 results = yield MultipartCache.GetAsync(key) 45 entity = yield ndb.Key(MultipartEntity, key).get_async()
60 if not results: 46 if not entity:
61 set_future = MultipartCache.SetAsync(key, results) 47 raise ndb.Return(None)
62 get_future = _GetValueFromDatastore(key) 48 yield entity.GetPartsAsync()
63 yield set_future, get_future 49 raise ndb.Return(entity.GetData())
64 results = get_future.get_result()
65 raise ndb.Return(results)
66 50
67 51
68 @ndb.synctasklet 52 @ndb.synctasklet
69 def Set(key, value): 53 def Set(key, value):
70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. 54 """Sets the value in datastore.
71 55
72 Args: 56 Args:
73 key: String key value. 57 key: String key value.
74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. 58 value: A pickleable value to be stored.
75 """ 59 """
76 yield SetAsync(key, value) 60 yield SetAsync(key, value)
77 61
78 62
79 @ndb.tasklet 63 @ndb.tasklet
80 def SetAsync(key, value): 64 def SetAsync(key, value):
81 entity = yield ndb.Key(MultipartEntity, key).get_async() 65 entity = yield ndb.Key(MultipartEntity, key).get_async()
82 if not entity: 66 if not entity:
83 entity = MultipartEntity(id=key) 67 entity = MultipartEntity(id=key)
84 entity.SetData(value) 68 entity.SetData(value)
85 yield (entity.PutAsync(), 69 yield entity.PutAsync()
86 MultipartCache.SetAsync(key, value))
87 70
88 71
89 @ndb.synctasklet 72 @ndb.synctasklet
90 def Delete(key): 73 def Delete(key):
91 """Deletes the value in datastore and memcache.""" 74 """Deletes the value in datastore."""
92 yield DeleteAsync(key) 75 yield DeleteAsync(key)
93 76
94 77
95 @ndb.tasklet 78 @ndb.tasklet
96 def DeleteAsync(key): 79 def DeleteAsync(key):
97 multipart_entity_key = ndb.Key(MultipartEntity, key) 80 multipart_entity_key = ndb.Key(MultipartEntity, key)
98 yield (multipart_entity_key.delete_async(), 81 yield (multipart_entity_key.delete_async(),
99 MultipartEntity.DeleteAsync(multipart_entity_key), 82 MultipartEntity.DeleteAsync(multipart_entity_key))
100 MultipartCache.DeleteAsync(key))
101 83
102 84
103 class MultipartEntity(ndb.Model): 85 class MultipartEntity(ndb.Model):
104 """Container for PartEntity.""" 86 """Container for PartEntity."""
105 87
106 # Number of entities use to store serialized. 88 # Number of entities use to store serialized.
107 size = ndb.IntegerProperty(default=0, indexed=False) 89 size = ndb.IntegerProperty(default=0, indexed=False)
108 90
109 @ndb.tasklet 91 @ndb.tasklet
110 def GetPartsAsync(self): 92 def GetPartsAsync(self):
(...skipping 11 matching lines...) Expand all
122 @classmethod 104 @classmethod
123 @ndb.tasklet 105 @ndb.tasklet
124 def DeleteAsync(cls, key): 106 def DeleteAsync(cls, key):
125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) 107 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True)
126 yield ndb.delete_multi_async(part_keys) 108 yield ndb.delete_multi_async(part_keys)
127 109
128 @ndb.tasklet 110 @ndb.tasklet
129 def PutAsync(self): 111 def PutAsync(self):
130 """Stores serialized data over multiple PartEntity.""" 112 """Stores serialized data over multiple PartEntity."""
131 serialized_parts = _Serialize(self.GetData()) 113 serialized_parts = _Serialize(self.GetData())
132 if len(serialized_parts) > _MAX_NUM_PARTS:
133 logging.error('Max number of parts reached.')
134 return
135 part_list = [] 114 part_list = []
136 num_parts = len(serialized_parts) 115 num_parts = len(serialized_parts)
137 for i in xrange(num_parts): 116 for i in xrange(num_parts):
138 if serialized_parts[i] is not None: 117 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) 118 part_list.append(part)
140 part_list.append(part)
141 self.size = num_parts 119 self.size = num_parts
142 yield ndb.put_multi_async(part_list + [self]) 120 yield ndb.put_multi_async(part_list + [self])
143 121
144 def GetData(self): 122 def GetData(self):
145 return getattr(self, '_data', None) 123 return getattr(self, '_data', None)
146 124
147 def SetData(self, data): 125 def SetData(self, data):
148 setattr(self, '_data', data) 126 setattr(self, '_data', data)
149 127
150 128
151 class PartEntity(ndb.Model): 129 class PartEntity(ndb.Model):
152 """Holds a part of serialized data for MultipartEntity. 130 """Holds a part of serialized data for MultipartEntity.
153 131
154 This entity key has the form: 132 This entity key has the form:
155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) 133 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index)
156 """ 134 """
157 value = ndb.BlobProperty() 135 value = ndb.BlobProperty()
158 136
159 137
160 class MultipartCache(object):
161 """Contains operations for storing values over multiple memcache keys.
162
163 Values are serialized, split, and stored over multiple memcache keys. The
164 head cache stores the expected size.
165 """
166
167 @classmethod
168 @ndb.tasklet
169 def GetAsync(cls, key):
170 """Gets value in memcache."""
171 keys = cls._GetCacheKeyList(key)
172 head_key = cls._GetCacheKey(key)
173 client = memcache.Client()
174 cache_values = yield client.get_multi_async(keys)
175 # Whether we have all the memcache values.
176 if len(keys) != len(cache_values) or head_key not in cache_values:
177 raise ndb.Return(None)
178
179 serialized = ''
180 cache_size = cache_values[head_key]
181 keys.remove(head_key)
182 for key in keys[:cache_size]:
183 if key not in cache_values:
184 raise ndb.Return(None)
185 if cache_values[key] is not None:
186 serialized += cache_values[key]
187 raise ndb.Return(pickle.loads(serialized))
188
189 @classmethod
190 @ndb.tasklet
191 def SetAsync(cls, key, value):
192 """Sets a value in memcache."""
193 serialized_parts = _Serialize(value)
194 if len(serialized_parts) > _MAX_NUM_PARTS:
195 logging.error('Max number of parts reached.')
196 raise ndb.Return(None)
197
198 cached_values = {}
199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
200 for i in xrange(len(serialized_parts)):
201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
202 client = memcache.Client()
203 yield client.set_multi_async(cached_values)
204
205 @classmethod
206 @ndb.synctasklet
207 def Delete(cls, key):
208 """Deletes all cached values for key."""
209 yield cls.DeleteAsync(key)
210
211 @classmethod
212 @ndb.tasklet
213 def DeleteAsync(cls, key):
214 client = memcache.Client()
215 yield client.delete_multi_async(cls._GetCacheKeyList(key))
216
217 @classmethod
218 def _GetCacheKeyList(cls, key):
219 """Gets a list of head cache key and cache key parts."""
220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)]
221 keys.append(cls._GetCacheKey(key))
222 return keys
223
224 @classmethod
225 def _GetCacheKey(cls, key, index=None):
226 """Returns either head cache key or cache key part."""
227 if index is not None:
228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index)
229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key
230
231
232 @ndb.tasklet
233 def _GetValueFromDatastore(key):
234 entity = yield ndb.Key(MultipartEntity, key).get_async()
235 if not entity:
236 raise ndb.Return(None)
237 yield entity.GetPartsAsync()
238 raise ndb.Return(entity.GetData())
239
240
241 def _Serialize(value): 138 def _Serialize(value):
242 """Serializes value and returns a list of its parts. 139 """Serializes value and returns a list of its parts.
243 140
244 Args: 141 Args:
245 value: A pickleable value. 142 value: A pickleable value.
246 143
247 Returns: 144 Returns:
248 A list of string representation of the value that has been pickled and split 145 A list of string representation of the value that has been pickled and split
249 into _CHUNK_SIZE. 146 into _CHUNK_SIZE.
250 """ 147 """
251 serialized = pickle.dumps(value, 2) 148 serialized = pickle.dumps(value, 2)
252 length = len(serialized) 149 length = len(serialized)
253 values = [] 150 values = []
254 for i in xrange(0, length, _CHUNK_SIZE): 151 for i in xrange(0, length, _CHUNK_SIZE):
255 values.append(serialized[i:i + _CHUNK_SIZE]) 152 values.append(serialized[i:i + _CHUNK_SIZE])
256 for i in xrange(len(values), _MAX_NUM_PARTS):
257 values.append(None)
258 return values 153 return values
OLDNEW
« no previous file with comments | « no previous file | dashboard/dashboard/common/stored_object_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698