From 2632a1e0c747d3734150c4ee3a6bc1c7537983e8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 7 Oct 2014 17:39:55 -0700 Subject: [PATCH 1/5] Draft of adding filestream support to Java API with tests --- .../api/java/JavaStreamingContext.scala | 20 +++---- .../apache/spark/streaming/JavaAPISuite.java | 54 +++++++++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 9dc26dc6b32a1..bc6420f973a89 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -250,18 +250,18 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @tparam K Key type for reading HDFS file - * @tparam V Value type for reading HDFS file - * @tparam F Input format for reading HDFS file + * @param inputFormatClass Input format for reading HDFS file + * @param keyClass Key type for reading HDFS file + * @param valueClass Value type for reading HDFS file */ def fileStream[K, V, F <: NewInputFormat[K, V]]( - directory: String): JavaPairInputDStream[K, V] = { - implicit val cmk: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - implicit val cmf: ClassTag[F] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] + directory: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V]): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(keyClass) + implicit val cmv: ClassTag[V] = ClassTag(valueClass) + implicit val cmf: ClassTag[F] = ClassTag(inputFormatClass) ssc.fileStream[K, V, F](directory) } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4efeb8dfbe1ad..02833961a1080 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -30,6 +30,13 @@ import com.google.common.io.Files; import com.google.common.collect.Sets; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -1703,6 +1710,53 @@ public void testTextFileStream() { JavaDStream test = ssc.textFileStream("/tmp/foo"); } + + @Test + public void testFileStream() throws Exception { + Tuple2 data = new Tuple2(1L, 123456); + List> test_input = new ArrayList >(); + test_input.add(data); + JavaPairRDD rdd = ssc.sc().parallelizePairs(test_input); + File tempDir = Files.createTempDir(); + JavaPairDStream testRaw = ssc.fileStream( + tempDir.getAbsolutePath(), SequenceFileInputFormat.class, LongWritable.class, IntWritable.class); + JavaPairDStream test = testRaw.mapToPair( + new PairFunction, Long, Integer>() { + public Tuple2 call(Tuple2 input) { + return new Tuple2(input._1().get(), input._2().get()); + } + }); + JavaPairRDD saveable = rdd.mapToPair( + new PairFunction, LongWritable, IntWritable>() { + public Tuple2 call(Tuple2 record) { + return new Tuple2(new LongWritable(record._1), new IntWritable(record._2)); + }}); + final Accumulator elem = ssc.sc().intAccumulator(0); + final Accumulator total = ssc.sc().intAccumulator(0); + test.foreachRDD(new Function, Void>() { + public Void call(JavaPairRDD rdd) { + rdd.foreach(new VoidFunction>() { + public void call(Tuple2 e) { + if (e._1() == 1l) { + elem.add(1); + } + total.add(1); + } + }); + return null; + } + }); + Thread.sleep(1000); + System.out.println("Saving old data to " + tempDir.getAbsolutePath()); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1412725941839/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + test.print(); + Thread.sleep(5000); + Assert.assertEquals(new Long(1L), new Long(total.value())); + Assert.assertEquals(new Long(1L), new Long(elem.value())); + } + @Test public void testRawSocketStream() { JavaReceiverInputDStream test = ssc.rawSocketStream("localhost", 12345); From 95f2bfee299792810379d28da1aa8f6eccb5c03e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 7 Oct 2014 17:55:42 -0700 Subject: [PATCH 2/5] hmm --- .../apache/spark/streaming/JavaAPISuite.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 02833961a1080..e89cbcf4965de 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1746,15 +1746,20 @@ public void call(Tuple2 e) { return null; } }); + test.print(); Thread.sleep(1000); - System.out.println("Saving old data to " + tempDir.getAbsolutePath()); - saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1412725941839/", + System.out.println("Saving old data to " + tempDir.getAbsolutePath() +"/1/"); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", LongWritable.class, IntWritable.class, SequenceFileOutputFormat.class); - test.print(); - Thread.sleep(5000); - Assert.assertEquals(new Long(1L), new Long(total.value())); - Assert.assertEquals(new Long(1L), new Long(elem.value())); + Thread.sleep(1000); + System.out.println("Saving old data to " + tempDir.getAbsolutePath() +"/2/"); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + + Assert.assertEquals(new Long(2L), new Long(total.value())); + Assert.assertEquals(new Long(2L), new Long(elem.value())); } @Test From 47cfdd58075445822579c9271ef12521a7565ab0 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 8 Oct 2014 16:58:23 -0700 Subject: [PATCH 3/5] Ok looks like it was mostly just a test issue with the clock. --- .../spark/streaming/StreamingContext.scala | 4 +- .../api/java/JavaStreamingContext.scala | 4 +- .../apache/spark/streaming/JavaAPISuite.java | 47 +++++++++++-------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 5a8eef1372e23..c3591a43a44e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -331,8 +331,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory) + ] (directory: String, newFilesOnly: Boolean = true): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, newFilesOnly = newFilesOnly) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index bc6420f973a89..21ac22921e54e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -258,11 +258,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { directory: String, inputFormatClass: Class[F], keyClass: Class[K], - valueClass: Class[V]): JavaPairInputDStream[K, V] = { + valueClass: Class[V], newFilesOnly: Boolean = true): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(keyClass) implicit val cmv: ClassTag[V] = ClassTag(valueClass) implicit val cmf: ClassTag[F] = ClassTag(inputFormatClass) - ssc.fileStream[K, V, F](directory) + ssc.fileStream[K, V, F](directory, newFilesOnly) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index e89cbcf4965de..47d34f60cd350 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1713,26 +1713,41 @@ public void testTextFileStream() { @Test public void testFileStream() throws Exception { + // Disable manual clock as FileInputDStream does not work with manual clock + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + // Set up some sequence files for streaming to read in Tuple2 data = new Tuple2(1L, 123456); List> test_input = new ArrayList >(); test_input.add(data); JavaPairRDD rdd = ssc.sc().parallelizePairs(test_input); File tempDir = Files.createTempDir(); + JavaPairRDD saveable = rdd.mapToPair( + new PairFunction, LongWritable, IntWritable>() { + public Tuple2 call(Tuple2 record) { + return new Tuple2(new LongWritable(record._1), new IntWritable(record._2)); + }}); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", + LongWritable.class, IntWritable.class, + SequenceFileOutputFormat.class); + + // Construct a file stream from the above saved data JavaPairDStream testRaw = ssc.fileStream( - tempDir.getAbsolutePath(), SequenceFileInputFormat.class, LongWritable.class, IntWritable.class); + tempDir.getAbsolutePath() + "/" , SequenceFileInputFormat.class, LongWritable.class, + IntWritable.class, false); JavaPairDStream test = testRaw.mapToPair( new PairFunction, Long, Integer>() { public Tuple2 call(Tuple2 input) { return new Tuple2(input._1().get(), input._2().get()); } }); - JavaPairRDD saveable = rdd.mapToPair( - new PairFunction, LongWritable, IntWritable>() { - public Tuple2 call(Tuple2 record) { - return new Tuple2(new LongWritable(record._1), new IntWritable(record._2)); - }}); final Accumulator elem = ssc.sc().intAccumulator(0); final Accumulator total = ssc.sc().intAccumulator(0); + final Accumulator calls = ssc.sc().intAccumulator(0); test.foreachRDD(new Function, Void>() { public Void call(JavaPairRDD rdd) { rdd.foreach(new VoidFunction>() { @@ -1743,23 +1758,15 @@ public void call(Tuple2 e) { total.add(1); } }); + calls.add(1); return null; } }); - test.print(); - Thread.sleep(1000); - System.out.println("Saving old data to " + tempDir.getAbsolutePath() +"/1/"); - saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", - LongWritable.class, IntWritable.class, - SequenceFileOutputFormat.class); - Thread.sleep(1000); - System.out.println("Saving old data to " + tempDir.getAbsolutePath() +"/2/"); - saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/", - LongWritable.class, IntWritable.class, - SequenceFileOutputFormat.class); - - Assert.assertEquals(new Long(2L), new Long(total.value())); - Assert.assertEquals(new Long(2L), new Long(elem.value())); + ssc.start(); + Thread.sleep(5000); + Assert.assertTrue(calls.value() > 0); + Assert.assertEquals(new Long(4L), new Long(total.value())); + Assert.assertEquals(new Long(4L), new Long(elem.value())); } @Test From 80704c5dcc35252a9db1fdd39031b7376f92a7fa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 8 Oct 2014 18:39:16 -0700 Subject: [PATCH 4/5] Fix the test --- .../test/java/org/apache/spark/streaming/JavaAPISuite.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 47d34f60cd350..ae7b445027215 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1718,9 +1718,9 @@ public void testFileStream() throws Exception { ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); // Set up some sequence files for streaming to read in - Tuple2 data = new Tuple2(1L, 123456); List> test_input = new ArrayList >(); - test_input.add(data); + test_input.add(new Tuple2(1L, 123456)); + test_input.add(new Tuple2(2L, 123456)); JavaPairRDD rdd = ssc.sc().parallelizePairs(test_input); File tempDir = Files.createTempDir(); JavaPairRDD saveable = rdd.mapToPair( @@ -1766,7 +1766,7 @@ public void call(Tuple2 e) { Thread.sleep(5000); Assert.assertTrue(calls.value() > 0); Assert.assertEquals(new Long(4L), new Long(total.value())); - Assert.assertEquals(new Long(4L), new Long(elem.value())); + Assert.assertEquals(new Long(2L), new Long(elem.value())); } @Test From 3a2cd21ec6672ecfb780d3a98fc5c1b5907c12e2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 5 Nov 2014 15:33:38 -0800 Subject: [PATCH 5/5] Add the excludes because of API changes. The old API wasn't callable anyways --- project/MimaExcludes.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c58666af84f24..20d5c8f4945ae 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -66,6 +66,12 @@ object MimaExcludes { "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.collectAsync") + ) ++ Seq( + // Making Java Spark Streaming callable from Java + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.StreamingContext.fileStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream") ) case v if v.startsWith("1.1") =>