OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/ |
| 2 # Copyright (c) 2009, Eucalyptus Systems, Inc. |
| 3 # All rights reserved. |
| 4 # |
| 5 # Permission is hereby granted, free of charge, to any person obtaining a |
| 6 # copy of this software and associated documentation files (the |
| 7 # "Software"), to deal in the Software without restriction, including |
| 8 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 9 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 10 # persons to whom the Software is furnished to do so, subject to the fol- |
| 11 # lowing conditions: |
| 12 # |
| 13 # The above copyright notice and this permission notice shall be included |
| 14 # in all copies or substantial portions of the Software. |
| 15 # |
| 16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 22 # IN THE SOFTWARE. |
| 23 |
| 24 """ |
| 25 Some unit tests for the EC2Connection |
| 26 """ |
| 27 |
| 28 import unittest |
| 29 import time |
| 30 import telnetlib |
| 31 import socket |
| 32 |
| 33 from nose.plugins.attrib import attr |
| 34 from boto.ec2.connection import EC2Connection |
| 35 |
| 36 |
| 37 class EC2ConnectionTest (unittest.TestCase): |
| 38 ec2 = True |
| 39 |
| 40 @attr('notdefault') |
| 41 def test_launch_permissions(self): |
| 42 # this is my user_id, if you want to run these tests you should |
| 43 # replace this with yours or they won't work |
| 44 user_id = '963068290131' |
| 45 print '--- running EC2Connection tests ---' |
| 46 c = EC2Connection() |
| 47 # get list of private AMI's |
| 48 rs = c.get_all_images(owners=[user_id]) |
| 49 assert len(rs) > 0 |
| 50 # now pick the first one |
| 51 image = rs[0] |
| 52 # temporarily make this image runnable by everyone |
| 53 status = image.set_launch_permissions(group_names=['all']) |
| 54 assert status |
| 55 d = image.get_launch_permissions() |
| 56 assert 'groups' in d |
| 57 assert len(d['groups']) > 0 |
| 58 # now remove that permission |
| 59 status = image.remove_launch_permissions(group_names=['all']) |
| 60 assert status |
| 61 time.sleep(10) |
| 62 d = image.get_launch_permissions() |
| 63 assert 'groups' not in d |
| 64 |
| 65 def test_1_basic(self): |
| 66 # create 2 new security groups |
| 67 c = EC2Connection() |
| 68 group1_name = 'test-%d' % int(time.time()) |
| 69 group_desc = 'This is a security group created during unit testing' |
| 70 group1 = c.create_security_group(group1_name, group_desc) |
| 71 time.sleep(2) |
| 72 group2_name = 'test-%d' % int(time.time()) |
| 73 group_desc = 'This is a security group created during unit testing' |
| 74 group2 = c.create_security_group(group2_name, group_desc) |
| 75 # now get a listing of all security groups and look for our new one |
| 76 rs = c.get_all_security_groups() |
| 77 found = False |
| 78 for g in rs: |
| 79 if g.name == group1_name: |
| 80 found = True |
| 81 assert found |
| 82 # now pass arg to filter results to only our new group |
| 83 rs = c.get_all_security_groups([group1_name]) |
| 84 assert len(rs) == 1 |
| 85 # try some group to group authorizations/revocations |
| 86 # first try the old style |
| 87 status = c.authorize_security_group(group1.name, |
| 88 group2.name, |
| 89 group2.owner_id) |
| 90 assert status |
| 91 status = c.revoke_security_group(group1.name, |
| 92 group2.name, |
| 93 group2.owner_id) |
| 94 assert status |
| 95 # now try specifying a specific port |
| 96 status = c.authorize_security_group(group1.name, |
| 97 group2.name, |
| 98 group2.owner_id, |
| 99 'tcp', 22, 22) |
| 100 assert status |
| 101 status = c.revoke_security_group(group1.name, |
| 102 group2.name, |
| 103 group2.owner_id, |
| 104 'tcp', 22, 22) |
| 105 assert status |
| 106 |
| 107 # now delete the second security group |
| 108 status = c.delete_security_group(group2_name) |
| 109 # now make sure it's really gone |
| 110 rs = c.get_all_security_groups() |
| 111 found = False |
| 112 for g in rs: |
| 113 if g.name == group2_name: |
| 114 found = True |
| 115 assert not found |
| 116 |
| 117 group = group1 |
| 118 |
| 119 # now try to launch apache image with our new security group |
| 120 rs = c.get_all_images() |
| 121 img_loc = 'ec2-public-images/fedora-core4-apache.manifest.xml' |
| 122 for image in rs: |
| 123 if image.location == img_loc: |
| 124 break |
| 125 reservation = image.run(security_groups=[group.name]) |
| 126 instance = reservation.instances[0] |
| 127 while instance.state != 'running': |
| 128 print '\tinstance is %s' % instance.state |
| 129 time.sleep(30) |
| 130 instance.update() |
| 131 # instance in now running, try to telnet to port 80 |
| 132 t = telnetlib.Telnet() |
| 133 try: |
| 134 t.open(instance.dns_name, 80) |
| 135 except socket.error: |
| 136 pass |
| 137 # now open up port 80 and try again, it should work |
| 138 group.authorize('tcp', 80, 80, '0.0.0.0/0') |
| 139 t.open(instance.dns_name, 80) |
| 140 t.close() |
| 141 # now revoke authorization and try again |
| 142 group.revoke('tcp', 80, 80, '0.0.0.0/0') |
| 143 try: |
| 144 t.open(instance.dns_name, 80) |
| 145 except socket.error: |
| 146 pass |
| 147 # now kill the instance and delete the security group |
| 148 instance.terminate() |
| 149 |
| 150 # check that state and previous_state have updated |
| 151 assert instance.state == 'shutting-down' |
| 152 assert instance.state_code == 32 |
| 153 assert instance.previous_state == 'running' |
| 154 assert instance.previous_state_code == 16 |
| 155 |
| 156 # unfortunately, I can't delete the sg within this script |
| 157 #sg.delete() |
| 158 |
| 159 # create a new key pair |
| 160 key_name = 'test-%d' % int(time.time()) |
| 161 status = c.create_key_pair(key_name) |
| 162 assert status |
| 163 # now get a listing of all key pairs and look for our new one |
| 164 rs = c.get_all_key_pairs() |
| 165 found = False |
| 166 for k in rs: |
| 167 if k.name == key_name: |
| 168 found = True |
| 169 assert found |
| 170 # now pass arg to filter results to only our new key pair |
| 171 rs = c.get_all_key_pairs([key_name]) |
| 172 assert len(rs) == 1 |
| 173 key_pair = rs[0] |
| 174 # now delete the key pair |
| 175 status = c.delete_key_pair(key_name) |
| 176 # now make sure it's really gone |
| 177 rs = c.get_all_key_pairs() |
| 178 found = False |
| 179 for k in rs: |
| 180 if k.name == key_name: |
| 181 found = True |
| 182 assert not found |
| 183 |
| 184 # short test around Paid AMI capability |
| 185 demo_paid_ami_id = 'ami-bd9d78d4' |
| 186 demo_paid_ami_product_code = 'A79EC0DB' |
| 187 l = c.get_all_images([demo_paid_ami_id]) |
| 188 assert len(l) == 1 |
| 189 assert len(l[0].product_codes) == 1 |
| 190 assert l[0].product_codes[0] == demo_paid_ami_product_code |
| 191 |
| 192 print '--- tests completed ---' |
OLD | NEW |