|
37 | 37 | import org.apache.kylin.cube.model.CubeBuildTypeEnum;
|
38 | 38 | import org.apache.kylin.engine.EngineFactory;
|
39 | 39 | import org.apache.kylin.engine.mr.CubingJob;
|
40 |
| -import org.apache.kylin.engine.mr.common.HadoopShellExecutable; |
41 |
| -import org.apache.kylin.engine.mr.common.MapReduceExecutable; |
42 | 40 | import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
|
43 | 41 | import org.apache.kylin.job.JobInstance;
|
44 | 42 | import org.apache.kylin.job.Scheduler;
|
45 | 43 | import org.apache.kylin.job.SchedulerFactory;
|
46 |
| -import org.apache.kylin.job.common.ShellExecutable; |
47 | 44 | import org.apache.kylin.job.constant.JobStatusEnum;
|
48 |
| -import org.apache.kylin.job.constant.JobStepStatusEnum; |
49 | 45 | import org.apache.kylin.job.constant.JobTimeFilterEnum;
|
50 | 46 | import org.apache.kylin.job.engine.JobEngineConfig;
|
51 | 47 | import org.apache.kylin.job.exception.JobException;
|
|
59 | 55 | import org.apache.kylin.metadata.realization.RealizationStatusEnum;
|
60 | 56 | import org.apache.kylin.rest.constant.Constant;
|
61 | 57 | import org.apache.kylin.rest.exception.BadRequestException;
|
| 58 | +import org.apache.kylin.rest.util.JobInfoConverter; |
62 | 59 | import org.apache.kylin.source.ISource;
|
63 | 60 | import org.apache.kylin.source.SourceFactory;
|
64 | 61 | import org.apache.kylin.source.SourcePartition;
|
@@ -275,108 +272,16 @@ private JobInstance getSingleJobInstance(AbstractExecutable job) {
|
275 | 272 | result.setSubmitter(cubeJob.getSubmitter());
|
276 | 273 | result.setUuid(cubeJob.getId());
|
277 | 274 | result.setType(CubeBuildTypeEnum.BUILD);
|
278 |
| - result.setStatus(parseToJobStatus(job.getStatus())); |
| 275 | + result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus())); |
279 | 276 | result.setMrWaiting(cubeJob.getMapReduceWaitTime() / 1000);
|
280 | 277 | result.setDuration(cubeJob.getDuration() / 1000);
|
281 | 278 | for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
|
282 | 279 | AbstractExecutable task = cubeJob.getTasks().get(i);
|
283 |
| - result.addStep(parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); |
| 280 | + result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); |
284 | 281 | }
|
285 | 282 | return result;
|
286 | 283 | }
|
287 | 284 |
|
288 |
| - private JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { |
289 |
| - if (job == null) { |
290 |
| - return null; |
291 |
| - } |
292 |
| - Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); |
293 |
| - CubingJob cubeJob = (CubingJob) job; |
294 |
| - Output output = outputs.get(job.getId()); |
295 |
| - final JobInstance result = new JobInstance(); |
296 |
| - result.setName(job.getName()); |
297 |
| - result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); |
298 |
| - result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); |
299 |
| - result.setLastModified(output.getLastModified()); |
300 |
| - result.setSubmitter(cubeJob.getSubmitter()); |
301 |
| - result.setUuid(cubeJob.getId()); |
302 |
| - result.setType(CubeBuildTypeEnum.BUILD); |
303 |
| - result.setStatus(parseToJobStatus(output.getState())); |
304 |
| - result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); |
305 |
| - result.setExecStartTime(AbstractExecutable.getStartTime(output)); |
306 |
| - result.setExecEndTime(AbstractExecutable.getEndTime(output)); |
307 |
| - result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime()) / 1000); |
308 |
| - for (int i = 0; i < cubeJob.getTasks().size(); ++i) { |
309 |
| - AbstractExecutable task = cubeJob.getTasks().get(i); |
310 |
| - result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); |
311 |
| - } |
312 |
| - return result; |
313 |
| - } |
314 |
| - |
315 |
| - private JobInstance.JobStep parseToJobStep(AbstractExecutable task, int i, Output stepOutput) { |
316 |
| - Preconditions.checkNotNull(stepOutput); |
317 |
| - JobInstance.JobStep result = new JobInstance.JobStep(); |
318 |
| - result.setId(task.getId()); |
319 |
| - result.setName(task.getName()); |
320 |
| - result.setSequenceID(i); |
321 |
| - result.setStatus(parseToJobStepStatus(stepOutput.getState())); |
322 |
| - for (Map.Entry<String, String> entry : stepOutput.getExtra().entrySet()) { |
323 |
| - if (entry.getKey() != null && entry.getValue() != null) { |
324 |
| - result.putInfo(entry.getKey(), entry.getValue()); |
325 |
| - } |
326 |
| - } |
327 |
| - result.setExecStartTime(AbstractExecutable.getStartTime(stepOutput)); |
328 |
| - result.setExecEndTime(AbstractExecutable.getEndTime(stepOutput)); |
329 |
| - if (task instanceof ShellExecutable) { |
330 |
| - result.setExecCmd(((ShellExecutable) task).getCmd()); |
331 |
| - } |
332 |
| - if (task instanceof MapReduceExecutable) { |
333 |
| - result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); |
334 |
| - result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); |
335 |
| - } |
336 |
| - if (task instanceof HadoopShellExecutable) { |
337 |
| - result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); |
338 |
| - } |
339 |
| - return result; |
340 |
| - } |
341 |
| - |
342 |
| - private JobStatusEnum parseToJobStatus(ExecutableState state) { |
343 |
| - switch (state) { |
344 |
| - case READY: |
345 |
| - return JobStatusEnum.PENDING; |
346 |
| - case RUNNING: |
347 |
| - return JobStatusEnum.RUNNING; |
348 |
| - case ERROR: |
349 |
| - return JobStatusEnum.ERROR; |
350 |
| - case DISCARDED: |
351 |
| - return JobStatusEnum.DISCARDED; |
352 |
| - case SUCCEED: |
353 |
| - return JobStatusEnum.FINISHED; |
354 |
| - case STOPPED: |
355 |
| - return JobStatusEnum.STOPPED; |
356 |
| - default: |
357 |
| - throw new RuntimeException("invalid state:" + state); |
358 |
| - } |
359 |
| - } |
360 |
| - |
361 |
| - private JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { |
362 |
| - switch (state) { |
363 |
| - case READY: |
364 |
| - return JobStepStatusEnum.PENDING; |
365 |
| - case RUNNING: |
366 |
| - return JobStepStatusEnum.RUNNING; |
367 |
| - case ERROR: |
368 |
| - return JobStepStatusEnum.ERROR; |
369 |
| - case DISCARDED: |
370 |
| - return JobStepStatusEnum.DISCARDED; |
371 |
| - case SUCCEED: |
372 |
| - return JobStepStatusEnum.FINISHED; |
373 |
| - case STOPPED: |
374 |
| - return JobStepStatusEnum.STOPPED; |
375 |
| - default: |
376 |
| - throw new RuntimeException("invalid state:" + state); |
377 |
| - } |
378 |
| - } |
379 |
| - |
380 | 285 | @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
|
381 | 286 | public void resumeJob(JobInstance job) throws IOException, JobException {
|
382 | 287 | getExecutableManager().resumeJob(job.getId());
|
@@ -448,7 +353,7 @@ private List<JobInstance> searchJobs(final String cubeNameSubstring, final Strin
|
448 | 353 | return Lists.newArrayList(FluentIterable.from(searchCubingJobs(cubeNameSubstring, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs, false)).transform(new Function<CubingJob, JobInstance>() {
|
449 | 354 | @Override
|
450 | 355 | public JobInstance apply(CubingJob cubingJob) {
|
451 |
| - return parseToJobInstance(cubingJob, allOutputs); |
| 356 | + return JobInfoConverter.parseToJobInstance(cubingJob, allOutputs); |
452 | 357 | }
|
453 | 358 | }));
|
454 | 359 | }
|
|
0 commit comments