OLD | NEW |
| (Empty) |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 import os | |
5 import shutil | |
6 import tempfile | |
7 import unittest | |
8 | |
9 from telemetry import page_set_archive_info | |
10 | |
11 class MockPage(object): | |
12 def __init__(self, url): | |
13 self.url = url | |
14 | |
15 url1 = 'http://www.foo.com/' | |
16 url2 = 'http://www.bar.com/' | |
17 url3 = 'http://www.baz.com/' | |
18 recording1 = 'data_001.wpr' | |
19 recording2 = 'data_002.wpr' | |
20 archive_info_contents = (""" | |
21 { | |
22 "archives": { | |
23 "%s": ["%s", "%s"], | |
24 "%s": ["%s"] | |
25 } | |
26 } | |
27 """ % (recording1, url1, url2, recording2, url3)) | |
28 page1 = MockPage(url1) | |
29 page2 = MockPage(url2) | |
30 page3 = MockPage(url3) | |
31 | |
32 class TestPageSetArchiveInfo(unittest.TestCase): | |
33 def setUp(self): | |
34 self.tmp_dir = tempfile.mkdtemp() | |
35 # Write the metadata. | |
36 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json') | |
37 with open(self.page_set_archive_info_file, 'w') as f: | |
38 f.write(archive_info_contents) | |
39 | |
40 # Write the existing .wpr files. | |
41 for i in [1, 2]: | |
42 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f: | |
43 f.write(archive_info_contents) | |
44 | |
45 # Create the PageSetArchiveInfo object to be tested. | |
46 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( | |
47 self.page_set_archive_info_file, '/tmp/pageset.json') | |
48 | |
49 def tearDown(self): | |
50 shutil.rmtree(self.tmp_dir) | |
51 | |
52 def testReadingArchiveInfo(self): | |
53 self.assertEquals(recording1, os.path.basename( | |
54 self.archive_info.WprFilePathForPage(page1))) | |
55 self.assertEquals(recording1, os.path.basename( | |
56 self.archive_info.WprFilePathForPage(page2))) | |
57 self.assertEquals(recording2, os.path.basename( | |
58 self.archive_info.WprFilePathForPage(page3))) | |
59 | |
60 def testModifications(self): | |
61 new_recording1 = 'data_003.wpr' | |
62 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') | |
63 with open(new_temp_recording, 'w') as f: | |
64 f.write('wpr data') | |
65 | |
66 self.archive_info.AddNewTemporaryRecording(new_temp_recording) | |
67 | |
68 self.assertEquals(new_temp_recording, | |
69 self.archive_info.WprFilePathForPage(page1)) | |
70 self.assertEquals(new_temp_recording, | |
71 self.archive_info.WprFilePathForPage(page2)) | |
72 self.assertEquals(new_temp_recording, | |
73 self.archive_info.WprFilePathForPage(page3)) | |
74 | |
75 self.archive_info.AddRecordedPages([page2]) | |
76 | |
77 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, new_recording1))) | |
78 self.assertFalse(os.path.exists( | |
79 os.path.join(self.tmp_dir, new_temp_recording))) | |
80 | |
81 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, recording1))) | |
82 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, recording2))) | |
83 | |
84 new_recording2 = 'data_004.wpr' | |
85 with open(new_temp_recording, 'w') as f: | |
86 f.write('wpr data') | |
87 | |
88 self.archive_info.AddNewTemporaryRecording(new_temp_recording) | |
89 self.archive_info.AddRecordedPages([page3]) | |
90 | |
91 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, new_recording2))) | |
92 self.assertFalse(os.path.exists( | |
93 os.path.join(self.tmp_dir, new_temp_recording))) | |
94 | |
95 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, recording1))) | |
96 # recording2 is no longer needed, so it was deleted. | |
97 self.assertFalse(os.path.exists(os.path.join(self.tmp_dir, recording2))) | |
98 | |
99 def testCreatingNewArchiveInfo(self): | |
100 # Write only the page set without the corresponding metadata file. | |
101 page_set_contents = (""" | |
102 { | |
103 archive_data_file": "new-metadata.json", | |
104 "pages": [ | |
105 { | |
106 "url": "%s", | |
107 } | |
108 ] | |
109 }""" % url1) | |
110 | |
111 page_set_file = os.path.join(self.tmp_dir, 'new.json') | |
112 with open(page_set_file, 'w') as f: | |
113 f.write(page_set_contents) | |
114 | |
115 self.page_set_archive_info_file = os.path.join(self.tmp_dir, | |
116 'new-metadata.json') | |
117 | |
118 # Create the PageSetArchiveInfo object to be tested. | |
119 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( | |
120 self.page_set_archive_info_file, page_set_file) | |
121 | |
122 # Add a recording for all the pages. | |
123 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') | |
124 with open(new_temp_recording, 'w') as f: | |
125 f.write('wpr data') | |
126 | |
127 self.archive_info.AddNewTemporaryRecording(new_temp_recording) | |
128 | |
129 self.assertEquals(new_temp_recording, | |
130 self.archive_info.WprFilePathForPage(page1)) | |
131 | |
132 self.archive_info.AddRecordedPages([page1]) | |
133 | |
134 # Expected name for the recording (decided by PageSetArchiveInfo). | |
135 new_recording = os.path.join(self.tmp_dir, 'new_000.wpr') | |
136 | |
137 self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, new_recording))) | |
138 self.assertFalse(os.path.exists( | |
139 os.path.join(self.tmp_dir, new_temp_recording))) | |
140 | |
141 # Check that the archive info was written correctly. | |
142 self.assertTrue(os.path.exists(self.page_set_archive_info_file)) | |
143 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( | |
144 self.page_set_archive_info_file, '/tmp/pageset.json') | |
145 self.assertEquals(new_recording, | |
146 read_archive_info.WprFilePathForPage(page1)) | |
OLD | NEW |