Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void writeTo(ByteBuffer buffer) {
*
* Unlike getBytes this will not create a copy the array if this is a slice.
*/
public @Nonnull ByteBuffer getByteBuffer() {
@Nonnull public ByteBuffer getByteBuffer() {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems actually a correct order but there is a bug in checkstyle. Please see https://github.com/checkstyle/checkstyle/issues/903. I am willing to disable ModifierOrder for this method if anyone thinks so.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler accepts it but I agree that annotations conventionally come before modifiers. They usually come on the line above.

if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = (byte[]) base;

Expand Down
47 changes: 24 additions & 23 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public void onEvent(SparkListenerEvent event) { }

@Override
public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
onEvent(stageCompleted);
onEvent(stageCompleted);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the whole file is mis-indented. The first indent is still 4 spaces. I think you could fix that too if you like

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure. Thank you. Let me address the comments and compare this to your work too.

}

@Override
public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
onEvent(stageSubmitted);
onEvent(stageSubmitted);
}

@Override
Expand All @@ -49,97 +49,98 @@ public final void onTaskStart(SparkListenerTaskStart taskStart) {

@Override
public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
onEvent(taskGettingResult);
onEvent(taskGettingResult);
}

@Override
public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
onEvent(taskEnd);
onEvent(taskEnd);
}

@Override
public final void onJobStart(SparkListenerJobStart jobStart) {
onEvent(jobStart);
onEvent(jobStart);
}

@Override
public final void onJobEnd(SparkListenerJobEnd jobEnd) {
onEvent(jobEnd);
onEvent(jobEnd);
}

@Override
public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
onEvent(environmentUpdate);
onEvent(environmentUpdate);
}

@Override
public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
onEvent(blockManagerAdded);
onEvent(blockManagerAdded);
}

@Override
public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
onEvent(blockManagerRemoved);
onEvent(blockManagerRemoved);
}

@Override
public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
onEvent(unpersistRDD);
onEvent(unpersistRDD);
}

@Override
public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
onEvent(applicationStart);
onEvent(applicationStart);
}

@Override
public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
onEvent(applicationEnd);
onEvent(applicationEnd);
}

@Override
public final void onExecutorMetricsUpdate(
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
onEvent(executorAdded);
}

@Override
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
onEvent(executorRemoved);
}

@Override
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
onEvent(executorBlacklisted);
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
onEvent(nodeUnblacklisted);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
onEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ private UnsafeExternalSorter(
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
taskContext.addTaskCompletionListener(context -> {
cleanupResources();
});
}

/**
Expand Down
111 changes: 57 additions & 54 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void groupByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
assertEquals(2, oddsAndEvens.count());
Expand Down Expand Up @@ -528,14 +528,14 @@ public void aggregateByKey() {
new Tuple2<>(5, 3)), 2);

Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
(a, b) -> {
a.add(b);
return a;
},
(a, b) -> {
a.addAll(b);
return a;
}).collectAsMap();
(a, b) -> {
a.add(b);
return a;
},
(a, b) -> {
a.addAll(b);
return a;
}).collectAsMap();
assertEquals(3, sets.size());
assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
Expand Down Expand Up @@ -666,8 +666,8 @@ public void javaDoubleRDDHistoGram() {
assertArrayEquals(expected_counts, histogram);
// SPARK-5744
assertArrayEquals(
new long[] {0},
sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
new long[] {0},
sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
}

private static class DoubleComparator implements Comparator<Double>, Serializable {
Expand Down Expand Up @@ -807,7 +807,7 @@ public void mapsFromPairsToPairs() {

// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
item -> Collections.singletonList(item.swap()).iterator());
item -> Collections.singletonList(item.swap()).iterator());
swapped.collect();

// There was never a bug here, but it's worth testing:
Expand Down Expand Up @@ -845,11 +845,13 @@ public void mapPartitionsWithIndex() {
public void getNumPartitions(){
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("aa", 2),
new Tuple2<>("aaa", 3)
), 2);
JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("aa", 2),
new Tuple2<>("aaa", 3)
),
2);
assertEquals(3, rdd1.getNumPartitions());
assertEquals(2, rdd2.getNumPartitions());
assertEquals(2, rdd3.getNumPartitions());
Expand Down Expand Up @@ -977,7 +979,7 @@ public void sequenceFile() {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);

// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
Expand Down Expand Up @@ -1068,11 +1070,11 @@ public void writeWithNewAPIHadoopFile() {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
.saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);

JavaPairRDD<IntWritable, Text> output =
sc.sequenceFile(outputDir, IntWritable.class, Text.class);
sc.sequenceFile(outputDir, IntWritable.class, Text.class);
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

Expand All @@ -1088,11 +1090,11 @@ public void readWithNewAPIHadoopFile() throws IOException {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);

JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

Expand Down Expand Up @@ -1135,10 +1137,10 @@ public void hadoopFile() {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);

JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
SequenceFileInputFormat.class, IntWritable.class, Text.class);
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

Expand All @@ -1154,10 +1156,11 @@ public void hadoopFileCompressed() {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class);
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class,
SequenceFileOutputFormat.class, DefaultCodec.class);

JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
SequenceFileInputFormat.class, IntWritable.class, Text.class);

assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
Expand Down Expand Up @@ -1263,23 +1266,23 @@ public void combineByKey() {
Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;

JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
.combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
.combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
Map<Integer, Integer> results = combinedRDD.collectAsMap();
ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
assertEquals(expected, results);

Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
combinedRDD.rdd(),
JavaConverters.collectionAsScalaIterableConverter(
Collections.<RDD<?>>emptyList()).asScala().toSeq());
combinedRDD.rdd(),
JavaConverters.collectionAsScalaIterableConverter(
Collections.<RDD<?>>emptyList()).asScala().toSeq());
combinedRDD = originalRDD.keyBy(keyFunction)
.combineByKey(
createCombinerFunction,
mergeValueFunction,
mergeValueFunction,
defaultPartitioner,
false,
new KryoSerializer(new SparkConf()));
.combineByKey(
createCombinerFunction,
mergeValueFunction,
mergeValueFunction,
defaultPartitioner,
false,
new KryoSerializer(new SparkConf()));
results = combinedRDD.collectAsMap();
assertEquals(expected, results);
}
Expand All @@ -1291,11 +1294,10 @@ public void mapOnPairRDD() {
JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
assertEquals(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(0, 2),
new Tuple2<>(1, 3),
new Tuple2<>(0, 4)), rdd3.collect());

new Tuple2<>(1, 1),
new Tuple2<>(0, 2),
new Tuple2<>(1, 3),
new Tuple2<>(0, 4)), rdd3.collect());
}

@SuppressWarnings("unchecked")
Expand All @@ -1312,16 +1314,18 @@ public void collectPartitions() {
assertEquals(Arrays.asList(3, 4), parts[0]);
assertEquals(Arrays.asList(5, 6, 7), parts[1]);

assertEquals(Arrays.asList(new Tuple2<>(1, 1),
new Tuple2<>(2, 0)),
rdd2.collectPartitions(new int[] {0})[0]);
assertEquals(
Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
rdd2.collectPartitions(new int[] {0})[0]);

List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
assertEquals(Arrays.asList(new Tuple2<>(5, 1),
new Tuple2<>(6, 0),
new Tuple2<>(7, 1)),
parts2[1]);
assertEquals(
Arrays.asList(
new Tuple2<>(5, 1),
new Tuple2<>(6, 0),
new Tuple2<>(7, 1)),
parts2[1]);
}

@Test
Expand Down Expand Up @@ -1352,7 +1356,6 @@ public void countApproxDistinctByKey() {
double error = Math.abs((resCount - count) / count);
assertTrue(error < 0.1);
}

}

@Test
Expand Down Expand Up @@ -1531,8 +1534,8 @@ public void testRegisterKryoClasses() {
SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
assertEquals(
Class1.class.getName() + "," + Class2.class.getName(),
conf.get("spark.kryo.classesToRegister"));
Class1.class.getName() + "," + Class2.class.getName(),
conf.get("spark.kryo.classesToRegister"));
}

@Test
Expand Down
Loading