diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a94d09be3bec6..64ec35e7b63e9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -77,6 +77,12 @@ object MimaExcludes { // SPARK-3822 ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") + ) ++ Seq( + // Making Java Spark Streaming callable from Java + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.StreamingContext.fileStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream") ) ++ Seq( // SPARK-1209 ProblemFilters.exclude[MissingClassProblem]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 54b219711efb9..974d9d9d2a591 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -331,8 +331,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory) + ] (directory: String, newFilesOnly: Boolean = true): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, newFilesOnly = newFilesOnly) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7db66c69a6d73..5d548f0e66fdb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -250,19 +250,19 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @tparam K Key type for reading HDFS file - * @tparam V Value type for reading HDFS file - * @tparam F Input format for reading HDFS file + * @param inputFormatClass Input format for reading HDFS file + * @param keyClass Key type for reading HDFS file + * @param valueClass Value type for reading HDFS file */ def fileStream[K, V, F <: NewInputFormat[K, V]]( - directory: String): JavaPairInputDStream[K, V] = { - implicit val cmk: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - implicit val cmf: ClassTag[F] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] - ssc.fileStream[K, V, F](directory) + directory: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], newFilesOnly: Boolean = true): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(keyClass) + implicit val cmv: ClassTag[V] = ClassTag(valueClass) + implicit val cmf: ClassTag[F] = ClassTag(inputFormatClass) + ssc.fileStream[K, V, F](directory, newFilesOnly) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4efeb8dfbe1ad..ae7b445027215 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -30,6 +30,13 @@ import com.google.common.io.Files; import com.google.common.collect.Sets; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -1703,6 +1710,65 @@ public void testTextFileStream() { JavaDStream test = ssc.textFileStream("/tmp/foo"); } + + @Test + public void testFileStream() throws Exception { + // Disable manual clock as FileInputDStream does not work with manual clock + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + // Set up some sequence files for streaming to read in + List> test_input = new ArrayList >(); + test_input.add(new Tuple2(1L, 123456)); + test_input.add(new Tuple2(2L, 123456)); + JavaPairRDD rdd = ssc.sc().parallelizePairs(test_input); + File tempDir = Files.createTempDir(); + JavaPairRDD saveable = rdd.mapToPair( + new PairFunction, LongWritable, IntWritable>() { + public Tuple2 call(Tuple2 record) { + return new Tuple2(new LongWritable(record._1), new IntWritable(record._2)); + }}); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + + // Construct a file stream from the above saved data + JavaPairDStream testRaw = ssc.fileStream( + tempDir.getAbsolutePath() + "/" , SequenceFileInputFormat.class, LongWritable.class, + IntWritable.class, false); + JavaPairDStream test = testRaw.mapToPair( + new PairFunction, Long, Integer>() { + public Tuple2 call(Tuple2 input) { + return new Tuple2(input._1().get(), input._2().get()); + } + }); + final Accumulator elem = ssc.sc().intAccumulator(0); + final Accumulator total = ssc.sc().intAccumulator(0); + final Accumulator calls = ssc.sc().intAccumulator(0); + test.foreachRDD(new Function, Void>() { + public Void call(JavaPairRDD rdd) { + rdd.foreach(new VoidFunction>() { + public void call(Tuple2 e) { + if (e._1() == 1l) { + elem.add(1); + } + total.add(1); + } + }); + calls.add(1); + return null; + } + }); + ssc.start(); + Thread.sleep(5000); + Assert.assertTrue(calls.value() > 0); + Assert.assertEquals(new Long(4L), new Long(total.value())); + Assert.assertEquals(new Long(2L), new Long(elem.value())); + } + @Test public void testRawSocketStream() { JavaReceiverInputDStream test = ssc.rawSocketStream("localhost", 12345);