@@ -2,6 +2,9 @@ package com.jayway.saaloop.dsl
2
2
3
3
import org .apache .hadoop .mapreduce .{Reducer , Job , Mapper }
4
4
import org .apache .hadoop .conf .Configuration
5
+ import org .apache .hadoop .mapreduce .lib .input .FileInputFormat
6
+ import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat
7
+ import org .apache .hadoop .fs .Path
5
8
6
9
/**
7
10
* Copyright 2012 Amir Moulavi ([email protected] )
@@ -27,8 +30,15 @@ trait SaaloopJob extends Types {
27
30
28
31
object job {
29
32
30
- def apply [K1 <: key, V1 <: value, K2 <: key, V2 <: value, K3 <: key, V3 <: value, K4 <: key, V4 <: value]
31
- (name: String )(mapWith: Mapper [K1 , V1 , K2 , V2 ], reduceWith: Reducer [K3 , V3 , K4 , V4 ], hadoopConfiguration: Configuration = conf): Job = {
33
+ def apply [K1 <: key, V1 <: value, K2 <: key, V2 <: value, K3 <: key, V3 <: value, K4 <: key, V4 <: value, FIN <: fin, FOUT <: fout]
34
+ (name: String )(mapWith: Mapper [K1 , V1 , K2 , V2 ],
35
+ reduceWith: Reducer [K3 , V3 , K4 , V4 ],
36
+ hadoopConfiguration: Configuration = conf,
37
+ inputPath: String = " ." ,
38
+ outputPath: String = " ." ,
39
+ inputFormatClass: Class [FIN ] = null ,
40
+ outputFormatClass: Class [FOUT ] = null ,
41
+ waitForCompletion: Boolean = false ): Job = {
32
42
33
43
val job = new Job (hadoopConfiguration, name)
34
44
job.setMapperClass(mapWith.getClass)
@@ -37,6 +47,11 @@ trait SaaloopJob extends Types {
37
47
val outputValueType = asInstanceOf [V4 ]
38
48
job.setOutputKeyClass(outputKeyType.getClass)
39
49
job.setOutputValueClass(outputValueType.getClass)
50
+ FileInputFormat .addInputPath(job, new Path (inputPath))
51
+ FileOutputFormat .setOutputPath(job, new Path (outputPath))
52
+ if (inputFormatClass != null ) job.setInputFormatClass(inputFormatClass)
53
+ if (outputFormatClass != null ) job.setOutputFormatClass(outputFormatClass)
54
+ job.waitForCompletion(waitForCompletion)
40
55
41
56
job
42
57
}
0 commit comments