Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(35)

Side by Side Diff: third_party/gsutil/boto/emr/step.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23
24 class Step(object):
25 """
26 Jobflow Step base class
27 """
28 def jar(self):
29 """
30 :rtype: str
31 :return: URI to the jar
32 """
33 raise NotImplemented()
34
35 def args(self):
36 """
37 :rtype: list(str)
38 :return: List of arguments for the step
39 """
40 raise NotImplemented()
41
42 def main_class(self):
43 """
44 :rtype: str
45 :return: The main class name
46 """
47 raise NotImplemented()
48
49
50 class JarStep(Step):
51 """
52 Custom jar step
53 """
54 def __init__(self, name, jar, main_class=None,
55 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
56 """
57 A elastic mapreduce step that executes a jar
58
59 :type name: str
60 :param name: The name of the step
61 :type jar: str
62 :param jar: S3 URI to the Jar file
63 :type main_class: str
64 :param main_class: The class to execute in the jar
65 :type action_on_failure: str
66 :param action_on_failure: An action, defined in the EMR docs to
67 take on failure.
68 :type step_args: list(str)
69 :param step_args: A list of arguments to pass to the step
70 """
71 self.name = name
72 self._jar = jar
73 self._main_class = main_class
74 self.action_on_failure = action_on_failure
75
76 if isinstance(step_args, basestring):
77 step_args = [step_args]
78
79 self.step_args = step_args
80
81 def jar(self):
82 return self._jar
83
84 def args(self):
85 args = []
86
87 if self.step_args:
88 args.extend(self.step_args)
89
90 return args
91
92 def main_class(self):
93 return self._main_class
94
95
96 class StreamingStep(Step):
97 """
98 Hadoop streaming step
99 """
100 def __init__(self, name, mapper, reducer=None, combiner=None,
101 action_on_failure='TERMINATE_JOB_FLOW',
102 cache_files=None, cache_archives=None,
103 step_args=None, input=None, output=None,
104 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
105 """
106 A hadoop streaming elastic mapreduce step
107
108 :type name: str
109 :param name: The name of the step
110 :type mapper: str
111 :param mapper: The mapper URI
112 :type reducer: str
113 :param reducer: The reducer URI
114 :type combiner: str
115 :param combiner: The combiner URI. Only works for Hadoop 0.20
116 and later!
117 :type action_on_failure: str
118 :param action_on_failure: An action, defined in the EMR docs to
119 take on failure.
120 :type cache_files: list(str)
121 :param cache_files: A list of cache files to be bundled with the job
122 :type cache_archives: list(str)
123 :param cache_archives: A list of jar archives to be bundled with
124 the job
125 :type step_args: list(str)
126 :param step_args: A list of arguments to pass to the step
127 :type input: str or a list of str
128 :param input: The input uri
129 :type output: str
130 :param output: The output uri
131 :type jar: str
132 :param jar: The hadoop streaming jar. This can be either a local
133 path on the master node, or an s3:// URI.
134 """
135 self.name = name
136 self.mapper = mapper
137 self.reducer = reducer
138 self.combiner = combiner
139 self.action_on_failure = action_on_failure
140 self.cache_files = cache_files
141 self.cache_archives = cache_archives
142 self.input = input
143 self.output = output
144 self._jar = jar
145
146 if isinstance(step_args, basestring):
147 step_args = [step_args]
148
149 self.step_args = step_args
150
151 def jar(self):
152 return self._jar
153
154 def main_class(self):
155 return None
156
157 def args(self):
158 args = []
159
160 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
161 # will work
162 if self.step_args:
163 args.extend(self.step_args)
164
165 args.extend(['-mapper', self.mapper])
166
167 if self.combiner:
168 args.extend(['-combiner', self.combiner])
169
170 if self.reducer:
171 args.extend(['-reducer', self.reducer])
172 else:
173 args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
174
175 if self.input:
176 if isinstance(self.input, list):
177 for input in self.input:
178 args.extend(('-input', input))
179 else:
180 args.extend(('-input', self.input))
181 if self.output:
182 args.extend(('-output', self.output))
183
184 if self.cache_files:
185 for cache_file in self.cache_files:
186 args.extend(('-cacheFile', cache_file))
187
188 if self.cache_archives:
189 for cache_archive in self.cache_archives:
190 args.extend(('-cacheArchive', cache_archive))
191
192 return args
193
194 def __repr__(self):
195 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
196 self.__class__.__module__, self.__class__.__name__,
197 self.name, self.mapper, self.reducer, self.action_on_failure,
198 self.cache_files, self.cache_archives, self.step_args,
199 self.input, self.output, self._jar)
200
201
202 class ScriptRunnerStep(JarStep):
203
204 ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/scrip t-runner.jar'
205
206 def __init__(self, name, **kw):
207 JarStep.__init__(self, name, self.ScriptRunnerJar, **kw)
208
209
210 class PigBase(ScriptRunnerStep):
211
212 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
213 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
214
215
216 class InstallPigStep(PigBase):
217 """
218 Install pig on emr step
219 """
220
221 InstallPigName = 'Install Pig'
222
223 def __init__(self, pig_versions='latest'):
224 step_args = []
225 step_args.extend(self.BaseArgs)
226 step_args.extend(['--install-pig'])
227 step_args.extend(['--pig-versions', pig_versions])
228 ScriptRunnerStep.__init__(self, self.InstallPigName, step_args=step_args )
229
230
231 class PigStep(PigBase):
232 """
233 Pig script step
234 """
235
236 def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
237 step_args = []
238 step_args.extend(self.BaseArgs)
239 step_args.extend(['--pig-versions', pig_versions])
240 step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
241 step_args.extend(pig_args)
242 ScriptRunnerStep.__init__(self, name, step_args=step_args)
243
244
245 class HiveBase(ScriptRunnerStep):
246
247 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
248 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
249
250
251 class InstallHiveStep(HiveBase):
252 """
253 Install Hive on EMR step
254 """
255 InstallHiveName = 'Install Hive'
256
257 def __init__(self, hive_versions='latest', hive_site=None):
258 step_args = []
259 step_args.extend(self.BaseArgs)
260 step_args.extend(['--install-hive'])
261 step_args.extend(['--hive-versions', hive_versions])
262 if hive_site is not None:
263 step_args.extend(['--hive-site=%s' % hive_site])
264 ScriptRunnerStep.__init__(self, self.InstallHiveName,
265 step_args=step_args)
266
267
268 class HiveStep(HiveBase):
269 """
270 Hive script step
271 """
272
273 def __init__(self, name, hive_file, hive_versions='latest',
274 hive_args=None):
275 step_args = []
276 step_args.extend(self.BaseArgs)
277 step_args.extend(['--hive-versions', hive_versions])
278 step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
279 if hive_args is not None:
280 step_args.extend(hive_args)
281 ScriptRunnerStep.__init__(self, name, step_args=step_args)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698