Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ object MimaExcludes {
// SPARK-3822
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
) ++ Seq(
// Making Java Spark Streaming callable from Java
ProblemFilters.exclude[MissingMethodProblem](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please dont exclude StreamingContext related stuff. That should not change. Only the JavaStreamingContext should be affected.

"org.apache.spark.streaming.StreamingContext.fileStream"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
) ++ Seq(
// SPARK-1209
ProblemFilters.exclude[MissingClassProblem](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
] (directory: String, newFilesOnly: Boolean = true): InputDStream[(K, V)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be related to SPARK 3754

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether there is any use case for exposing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use case of exposing this was so that I could expose equivelent java functionality to what was available in Scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @holdenk, although the underlying fileinputdstream supported it, it was not exposed in the API. I am not sure its worth exposing this kind of functionality. So lets not change this.

new FileInputDStream[K, V, F](this, directory, newFilesOnly = newFilesOnly)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,19 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
* @param inputFormatClass Input format for reading HDFS file
* @param keyClass Key type for reading HDFS file
* @param valueClass Value type for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory)
directory: String,
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V], newFilesOnly: Boolean = true): JavaPairInputDStream[K, V] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above, no need to expose this functionality of newFilesOnly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction on this comment. newFilesOnly should be exposed as it is exposed in the Scala api.

implicit val cmk: ClassTag[K] = ClassTag(keyClass)
implicit val cmv: ClassTag[V] = ClassTag(valueClass)
implicit val cmf: ClassTag[F] = ClassTag(inputFormatClass)
ssc.fileStream[K, V, F](directory, newFilesOnly)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
import com.google.common.io.Files;
import com.google.common.collect.Sets;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -1703,6 +1710,65 @@ public void testTextFileStream() {
JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
}


@Test
public void testFileStream() throws Exception {
// Disable manual clock as FileInputDStream does not work with manual clock
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
// Set up some sequence files for streaming to read in
List<Tuple2<Long, Integer>> test_input = new ArrayList<Tuple2<Long, Integer> >();
test_input.add(new Tuple2(1L, 123456));
test_input.add(new Tuple2(2L, 123456));
JavaPairRDD<Long, Integer> rdd = ssc.sc().parallelizePairs(test_input);
File tempDir = Files.createTempDir();
JavaPairRDD<LongWritable, IntWritable> saveable = rdd.mapToPair(
new PairFunction<Tuple2<Long, Integer>, LongWritable, IntWritable>() {
public Tuple2<LongWritable, IntWritable> call(Tuple2<Long, Integer> record) {
return new Tuple2(new LongWritable(record._1), new IntWritable(record._2));
}});
saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1/",
LongWritable.class, IntWritable.class,
SequenceFileOutputFormat.class);
saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/",
LongWritable.class, IntWritable.class,
SequenceFileOutputFormat.class);

// Construct a file stream from the above saved data
JavaPairDStream<LongWritable, IntWritable> testRaw = ssc.fileStream(
tempDir.getAbsolutePath() + "/" , SequenceFileInputFormat.class, LongWritable.class,
IntWritable.class, false);
JavaPairDStream<Long, Integer> test = testRaw.mapToPair(
new PairFunction<Tuple2<LongWritable, IntWritable>, Long, Integer>() {
public Tuple2<Long, Integer> call(Tuple2<LongWritable, IntWritable> input) {
return new Tuple2(input._1().get(), input._2().get());
}
});
final Accumulator<Integer> elem = ssc.sc().intAccumulator(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant this be made simpler, with DStream.count instead of tying in accumulators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should I collect it back if I use DStream.count()? The provided utils for doing this in the scala tests didn't seem to match this very well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not possible to just call rdd.count() and add up the counts in a global counter?

final Accumulator<Integer> total = ssc.sc().intAccumulator(0);
final Accumulator<Integer> calls = ssc.sc().intAccumulator(0);
test.foreachRDD(new Function<JavaPairRDD<Long, Integer>, Void>() {
public Void call(JavaPairRDD<Long, Integer> rdd) {
rdd.foreach(new VoidFunction<Tuple2<Long, Integer>>() {
public void call(Tuple2<Long, Integer> e) {
if (e._1() == 1l) {
elem.add(1);
}
total.add(1);
}
});
calls.add(1);
return null;
}
});
ssc.start();
Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this something like a eventually block in ScalaTest

Assert.assertTrue(calls.value() > 0);
Assert.assertEquals(new Long(4L), new Long(total.value()));
Assert.assertEquals(new Long(2L), new Long(elem.value()));
}

@Test
public void testRawSocketStream() {
JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
Expand Down