OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """A module for storing and getting objects from datastore. | 5 """A module for storing and getting objects from datastore. |
6 | 6 |
7 This module provides Get, Set and Delete functions for storing pickleable | 7 App Engine datastore limits entity size to less than 1 MB; this module |
8 objects in datastore, with support for large objects greater than 1 MB. | 8 supports storing larger objects by splitting the data and using multiple |
| 9 datastore entities. |
9 | 10 |
10 Although this module contains ndb.Model classes, these are not intended | 11 Although this module contains ndb.Model classes, these are not intended |
11 to be used directly by other modules. | 12 to be used directly by other modules. |
12 | 13 |
13 App Engine datastore limits entity size to less than 1 MB; this module | |
14 supports storing larger objects by splitting the data and using multiple | |
15 datastore entities and multiple memcache keys. Using ndb.get and pickle, a | |
16 complex data structure can be retrieved more quickly than datastore fetch. | |
17 | |
18 Example: | 14 Example: |
19 john = Account() | 15 john = Account() |
20 john.username = 'John' | 16 john.username = 'John' |
21 john.userid = 123 | 17 john.userid = 123 |
22 stored_object.Set(john.userid, john) | 18 stored_object.Set(john.userid, john) |
23 """ | 19 """ |
24 | 20 |
25 import cPickle as pickle | 21 import cPickle as pickle |
26 import logging | |
27 | 22 |
28 from google.appengine.api import memcache | |
29 from google.appengine.ext import ndb | 23 from google.appengine.ext import ndb |
30 | 24 |
31 _MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_' | 25 # Max bytes per entity. |
32 | |
33 # Maximum number of entities and memcache to save a value. | |
34 # The limit for data stored in one datastore entity is 1 MB, | |
35 # and the limit for memcache batch operations is 32 MB. See: | |
36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits | |
37 _MAX_NUM_PARTS = 16 | |
38 | |
39 # Max bytes per entity or value cached with memcache. | |
40 _CHUNK_SIZE = 1000 * 1000 | 26 _CHUNK_SIZE = 1000 * 1000 |
41 | 27 |
42 | 28 |
43 @ndb.synctasklet | 29 @ndb.synctasklet |
44 def Get(key): | 30 def Get(key): |
45 """Gets the value. | 31 """Gets the value. |
46 | 32 |
47 Args: | 33 Args: |
48 key: String key value. | 34 key: String key value. |
49 | 35 |
50 Returns: | 36 Returns: |
51 A value for key. | 37 A value for key. |
52 """ | 38 """ |
53 result = yield GetAsync(key) | 39 result = yield GetAsync(key) |
54 raise ndb.Return(result) | 40 raise ndb.Return(result) |
55 | 41 |
56 | 42 |
57 @ndb.tasklet | 43 @ndb.tasklet |
58 def GetAsync(key): | 44 def GetAsync(key): |
59 results = yield MultipartCache.GetAsync(key) | 45 entity = yield ndb.Key(MultipartEntity, key).get_async() |
60 if not results: | 46 if not entity: |
61 set_future = MultipartCache.SetAsync(key, results) | 47 raise ndb.Return(None) |
62 get_future = _GetValueFromDatastore(key) | 48 yield entity.GetPartsAsync() |
63 yield set_future, get_future | 49 raise ndb.Return(entity.GetData()) |
64 results = get_future.get_result() | |
65 raise ndb.Return(results) | |
66 | 50 |
67 | 51 |
68 @ndb.synctasklet | 52 @ndb.synctasklet |
69 def Set(key, value): | 53 def Set(key, value): |
70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. | 54 """Sets the value in datastore. |
71 | 55 |
72 Args: | 56 Args: |
73 key: String key value. | 57 key: String key value. |
74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. | 58 value: A pickleable value to be stored. |
75 """ | 59 """ |
76 yield SetAsync(key, value) | 60 yield SetAsync(key, value) |
77 | 61 |
78 | 62 |
79 @ndb.tasklet | 63 @ndb.tasklet |
80 def SetAsync(key, value): | 64 def SetAsync(key, value): |
81 entity = yield ndb.Key(MultipartEntity, key).get_async() | 65 entity = yield ndb.Key(MultipartEntity, key).get_async() |
82 if not entity: | 66 if not entity: |
83 entity = MultipartEntity(id=key) | 67 entity = MultipartEntity(id=key) |
84 entity.SetData(value) | 68 entity.SetData(value) |
85 yield (entity.PutAsync(), | 69 yield entity.PutAsync() |
86 MultipartCache.SetAsync(key, value)) | |
87 | 70 |
88 | 71 |
89 @ndb.synctasklet | 72 @ndb.synctasklet |
90 def Delete(key): | 73 def Delete(key): |
91 """Deletes the value in datastore and memcache.""" | 74 """Deletes the value in datastore.""" |
92 yield DeleteAsync(key) | 75 yield DeleteAsync(key) |
93 | 76 |
94 | 77 |
95 @ndb.tasklet | 78 @ndb.tasklet |
96 def DeleteAsync(key): | 79 def DeleteAsync(key): |
97 multipart_entity_key = ndb.Key(MultipartEntity, key) | 80 multipart_entity_key = ndb.Key(MultipartEntity, key) |
98 yield (multipart_entity_key.delete_async(), | 81 yield (multipart_entity_key.delete_async(), |
99 MultipartEntity.DeleteAsync(multipart_entity_key), | 82 MultipartEntity.DeleteAsync(multipart_entity_key)) |
100 MultipartCache.DeleteAsync(key)) | |
101 | 83 |
102 | 84 |
103 class MultipartEntity(ndb.Model): | 85 class MultipartEntity(ndb.Model): |
104 """Container for PartEntity.""" | 86 """Container for PartEntity.""" |
105 | 87 |
106 # Number of entities use to store serialized. | 88 # Number of entities use to store serialized. |
107 size = ndb.IntegerProperty(default=0, indexed=False) | 89 size = ndb.IntegerProperty(default=0, indexed=False) |
108 | 90 |
109 @ndb.tasklet | 91 @ndb.tasklet |
110 def GetPartsAsync(self): | 92 def GetPartsAsync(self): |
(...skipping 11 matching lines...) Expand all Loading... |
122 @classmethod | 104 @classmethod |
123 @ndb.tasklet | 105 @ndb.tasklet |
124 def DeleteAsync(cls, key): | 106 def DeleteAsync(cls, key): |
125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) | 107 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) |
126 yield ndb.delete_multi_async(part_keys) | 108 yield ndb.delete_multi_async(part_keys) |
127 | 109 |
128 @ndb.tasklet | 110 @ndb.tasklet |
129 def PutAsync(self): | 111 def PutAsync(self): |
130 """Stores serialized data over multiple PartEntity.""" | 112 """Stores serialized data over multiple PartEntity.""" |
131 serialized_parts = _Serialize(self.GetData()) | 113 serialized_parts = _Serialize(self.GetData()) |
132 if len(serialized_parts) > _MAX_NUM_PARTS: | |
133 logging.error('Max number of parts reached.') | |
134 return | |
135 part_list = [] | 114 part_list = [] |
136 num_parts = len(serialized_parts) | 115 num_parts = len(serialized_parts) |
137 for i in xrange(num_parts): | 116 for i in xrange(num_parts): |
138 if serialized_parts[i] is not None: | 117 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) | 118 part_list.append(part) |
140 part_list.append(part) | |
141 self.size = num_parts | 119 self.size = num_parts |
142 yield ndb.put_multi_async(part_list + [self]) | 120 yield ndb.put_multi_async(part_list + [self]) |
143 | 121 |
144 def GetData(self): | 122 def GetData(self): |
145 return getattr(self, '_data', None) | 123 return getattr(self, '_data', None) |
146 | 124 |
147 def SetData(self, data): | 125 def SetData(self, data): |
148 setattr(self, '_data', data) | 126 setattr(self, '_data', data) |
149 | 127 |
150 | 128 |
151 class PartEntity(ndb.Model): | 129 class PartEntity(ndb.Model): |
152 """Holds a part of serialized data for MultipartEntity. | 130 """Holds a part of serialized data for MultipartEntity. |
153 | 131 |
154 This entity key has the form: | 132 This entity key has the form: |
155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) | 133 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) |
156 """ | 134 """ |
157 value = ndb.BlobProperty() | 135 value = ndb.BlobProperty() |
158 | 136 |
159 | 137 |
160 class MultipartCache(object): | |
161 """Contains operations for storing values over multiple memcache keys. | |
162 | |
163 Values are serialized, split, and stored over multiple memcache keys. The | |
164 head cache stores the expected size. | |
165 """ | |
166 | |
167 @classmethod | |
168 @ndb.tasklet | |
169 def GetAsync(cls, key): | |
170 """Gets value in memcache.""" | |
171 keys = cls._GetCacheKeyList(key) | |
172 head_key = cls._GetCacheKey(key) | |
173 client = memcache.Client() | |
174 cache_values = yield client.get_multi_async(keys) | |
175 # Whether we have all the memcache values. | |
176 if len(keys) != len(cache_values) or head_key not in cache_values: | |
177 raise ndb.Return(None) | |
178 | |
179 serialized = '' | |
180 cache_size = cache_values[head_key] | |
181 keys.remove(head_key) | |
182 for key in keys[:cache_size]: | |
183 if key not in cache_values: | |
184 raise ndb.Return(None) | |
185 if cache_values[key] is not None: | |
186 serialized += cache_values[key] | |
187 raise ndb.Return(pickle.loads(serialized)) | |
188 | |
189 @classmethod | |
190 @ndb.tasklet | |
191 def SetAsync(cls, key, value): | |
192 """Sets a value in memcache.""" | |
193 serialized_parts = _Serialize(value) | |
194 if len(serialized_parts) > _MAX_NUM_PARTS: | |
195 logging.error('Max number of parts reached.') | |
196 raise ndb.Return(None) | |
197 | |
198 cached_values = {} | |
199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) | |
200 for i in xrange(len(serialized_parts)): | |
201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] | |
202 client = memcache.Client() | |
203 yield client.set_multi_async(cached_values) | |
204 | |
205 @classmethod | |
206 @ndb.synctasklet | |
207 def Delete(cls, key): | |
208 """Deletes all cached values for key.""" | |
209 yield cls.DeleteAsync(key) | |
210 | |
211 @classmethod | |
212 @ndb.tasklet | |
213 def DeleteAsync(cls, key): | |
214 client = memcache.Client() | |
215 yield client.delete_multi_async(cls._GetCacheKeyList(key)) | |
216 | |
217 @classmethod | |
218 def _GetCacheKeyList(cls, key): | |
219 """Gets a list of head cache key and cache key parts.""" | |
220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] | |
221 keys.append(cls._GetCacheKey(key)) | |
222 return keys | |
223 | |
224 @classmethod | |
225 def _GetCacheKey(cls, key, index=None): | |
226 """Returns either head cache key or cache key part.""" | |
227 if index is not None: | |
228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) | |
229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key | |
230 | |
231 | |
232 @ndb.tasklet | |
233 def _GetValueFromDatastore(key): | |
234 entity = yield ndb.Key(MultipartEntity, key).get_async() | |
235 if not entity: | |
236 raise ndb.Return(None) | |
237 yield entity.GetPartsAsync() | |
238 raise ndb.Return(entity.GetData()) | |
239 | |
240 | |
241 def _Serialize(value): | 138 def _Serialize(value): |
242 """Serializes value and returns a list of its parts. | 139 """Serializes value and returns a list of its parts. |
243 | 140 |
244 Args: | 141 Args: |
245 value: A pickleable value. | 142 value: A pickleable value. |
246 | 143 |
247 Returns: | 144 Returns: |
248 A list of string representation of the value that has been pickled and split | 145 A list of string representation of the value that has been pickled and split |
249 into _CHUNK_SIZE. | 146 into _CHUNK_SIZE. |
250 """ | 147 """ |
251 serialized = pickle.dumps(value, 2) | 148 serialized = pickle.dumps(value, 2) |
252 length = len(serialized) | 149 length = len(serialized) |
253 values = [] | 150 values = [] |
254 for i in xrange(0, length, _CHUNK_SIZE): | 151 for i in xrange(0, length, _CHUNK_SIZE): |
255 values.append(serialized[i:i + _CHUNK_SIZE]) | 152 values.append(serialized[i:i + _CHUNK_SIZE]) |
256 for i in xrange(len(values), _MAX_NUM_PARTS): | |
257 values.append(None) | |
258 return values | 153 return values |
OLD | NEW |