Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: third_party/gsutil/boto/tests/integration/sqs/test_connection.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/
2 # Copyright (c) 2010, Eucalyptus Systems, Inc.
3 # All rights reserved.
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a
6 # copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish, dis-
9 # tribute, sublicense, and/or sell copies of the Software, and to permit
10 # persons to whom the Software is furnished to do so, subject to the fol-
11 # lowing conditions:
12 #
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 # IN THE SOFTWARE.
23
24 """
25 Some unit tests for the SQSConnection
26 """
27
28 import time
29 from threading import Timer
30 from tests.unit import unittest
31
32 from boto.sqs.connection import SQSConnection
33 from boto.sqs.message import MHMessage
34 from boto.exception import SQSError
35
36
37 class SQSConnectionTest(unittest.TestCase):
38
39 sqs = True
40
41 def test_1_basic(self):
42 print '--- running SQSConnection tests ---'
43 c = SQSConnection()
44 rs = c.get_all_queues()
45 num_queues = 0
46 for q in rs:
47 num_queues += 1
48
49 # try illegal name
50 try:
51 queue = c.create_queue('bad*queue*name')
52 self.fail('queue name should have been bad')
53 except SQSError:
54 pass
55
56 # now create one that should work and should be unique (i.e. a new one)
57 queue_name = 'test%d' % int(time.time())
58 timeout = 60
59 queue = c.create_queue(queue_name, timeout)
60 time.sleep(60)
61 rs = c.get_all_queues()
62 i = 0
63 for q in rs:
64 i += 1
65 assert i == num_queues + 1
66 assert queue.count_slow() == 0
67
68 # check the visibility timeout
69 t = queue.get_timeout()
70 assert t == timeout, '%d != %d' % (t, timeout)
71
72 # now try to get queue attributes
73 a = q.get_attributes()
74 assert 'ApproximateNumberOfMessages' in a
75 assert 'VisibilityTimeout' in a
76 a = q.get_attributes('ApproximateNumberOfMessages')
77 assert 'ApproximateNumberOfMessages' in a
78 assert 'VisibilityTimeout' not in a
79 a = q.get_attributes('VisibilityTimeout')
80 assert 'ApproximateNumberOfMessages' not in a
81 assert 'VisibilityTimeout' in a
82
83 # now change the visibility timeout
84 timeout = 45
85 queue.set_timeout(timeout)
86 time.sleep(60)
87 t = queue.get_timeout()
88 assert t == timeout, '%d != %d' % (t, timeout)
89
90 # now add a message
91 message_body = 'This is a test\n'
92 message = queue.new_message(message_body)
93 queue.write(message)
94 time.sleep(60)
95 assert queue.count_slow() == 1
96 time.sleep(90)
97
98 # now read the message from the queue with a 10 second timeout
99 message = queue.read(visibility_timeout=10)
100 assert message
101 assert message.get_body() == message_body
102
103 # now immediately try another read, shouldn't find anything
104 message = queue.read()
105 assert message == None
106
107 # now wait 30 seconds and try again
108 time.sleep(30)
109 message = queue.read()
110 assert message
111
112 # now delete the message
113 queue.delete_message(message)
114 time.sleep(30)
115 assert queue.count_slow() == 0
116
117 # try a batch write
118 num_msgs = 10
119 msgs = [(i, 'This is message %d' % i, 0) for i in range(num_msgs)]
120 queue.write_batch(msgs)
121
122 # try to delete all of the messages using batch delete
123 deleted = 0
124 while deleted < num_msgs:
125 time.sleep(5)
126 msgs = queue.get_messages(num_msgs)
127 if msgs:
128 br = queue.delete_message_batch(msgs)
129 deleted += len(br.results)
130
131 # create another queue so we can test force deletion
132 # we will also test MHMessage with this queue
133 queue_name = 'test%d' % int(time.time())
134 timeout = 60
135 queue = c.create_queue(queue_name, timeout)
136 queue.set_message_class(MHMessage)
137 time.sleep(30)
138
139 # now add a couple of messages
140 message = queue.new_message()
141 message['foo'] = 'bar'
142 queue.write(message)
143 message_body = {'fie': 'baz', 'foo': 'bar'}
144 message = queue.new_message(body=message_body)
145 queue.write(message)
146 time.sleep(30)
147
148 m = queue.read()
149 assert m['foo'] == 'bar'
150
151 # now delete that queue and messages
152 c.delete_queue(queue, True)
153
154 print '--- tests completed ---'
155
156 def test_sqs_timeout(self):
157 c = SQSConnection()
158 queue_name = 'test_sqs_timeout_%s' % int(time.time())
159 queue = c.create_queue(queue_name)
160 self.addCleanup(c.delete_queue, queue, True)
161 start = time.time()
162 poll_seconds = 2
163 response = queue.read(visibility_timeout=None,
164 wait_time_seconds=poll_seconds)
165 total_time = time.time() - start
166 self.assertTrue(total_time > poll_seconds,
167 "SQS queue did not block for at least %s seconds: %s" %
168 (poll_seconds, total_time))
169 self.assertIsNone(response)
170
171 # Now that there's an element in the queue, we should not block for 2
172 # seconds.
173 c.send_message(queue, 'test message')
174 start = time.time()
175 poll_seconds = 2
176 message = c.receive_message(
177 queue, number_messages=1,
178 visibility_timeout=None, attributes=None,
179 wait_time_seconds=poll_seconds)[0]
180 total_time = time.time() - start
181 self.assertTrue(total_time < poll_seconds,
182 "SQS queue blocked longer than %s seconds: %s" %
183 (poll_seconds, total_time))
184 self.assertEqual(message.get_body(), 'test message')
185
186 attrs = c.get_queue_attributes(queue, 'ReceiveMessageWaitTimeSeconds')
187 self.assertEqual(attrs['ReceiveMessageWaitTimeSeconds'], '0')
188
189 def test_sqs_longpoll(self):
190 c = SQSConnection()
191 queue_name = 'test_sqs_longpoll_%s' % int(time.time())
192 queue = c.create_queue(queue_name)
193 self.addCleanup(c.delete_queue, queue, True)
194 messages = []
195
196 # The basic idea is to spawn a timer thread that will put something
197 # on the queue in 5 seconds and verify that our long polling client
198 # sees the message after waiting for approximately that long.
199 def send_message():
200 messages.append(
201 queue.write(queue.new_message('this is a test message')))
202
203 t = Timer(5.0, send_message)
204 t.start()
205 self.addCleanup(t.join)
206
207 start = time.time()
208 response = queue.read(wait_time_seconds=10)
209 end = time.time()
210
211 t.join()
212 self.assertEqual(response.id, messages[0].id)
213 self.assertEqual(response.get_body(), messages[0].get_body())
214 # The timer thread should send the message in 5 seconds, so
215 # we're giving +- .5 seconds for the total time the queue
216 # was blocked on the read call.
217 self.assertTrue(4.5 <= (end - start) <= 5.5)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698