Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public void writeTo(ByteBuffer buffer) {
*
* Unlike getBytes this will not create a copy the array if this is a slice.
*/
public @Nonnull ByteBuffer getByteBuffer() {
@Nonnull
public ByteBuffer getByteBuffer() {
if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = (byte[]) base;

Expand Down
225 changes: 113 additions & 112 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,116 +30,117 @@
*/
public class SparkFirehoseListener implements SparkListenerInterface {

public void onEvent(SparkListenerEvent event) { }

@Override
public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
onEvent(stageCompleted);
}

@Override
public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
onEvent(stageSubmitted);
}

@Override
public final void onTaskStart(SparkListenerTaskStart taskStart) {
onEvent(taskStart);
}

@Override
public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
onEvent(taskGettingResult);
}

@Override
public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
onEvent(taskEnd);
}

@Override
public final void onJobStart(SparkListenerJobStart jobStart) {
onEvent(jobStart);
}

@Override
public final void onJobEnd(SparkListenerJobEnd jobEnd) {
onEvent(jobEnd);
}

@Override
public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
onEvent(environmentUpdate);
}

@Override
public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
onEvent(blockManagerAdded);
}

@Override
public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
onEvent(blockManagerRemoved);
}

@Override
public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
onEvent(unpersistRDD);
}

@Override
public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
onEvent(applicationStart);
}

@Override
public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
onEvent(applicationEnd);
}

@Override
public final void onExecutorMetricsUpdate(
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
}

@Override
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}

@Override
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
public void onEvent(SparkListenerEvent event) { }

@Override
public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
onEvent(stageCompleted);
}

@Override
public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
onEvent(stageSubmitted);
}

@Override
public final void onTaskStart(SparkListenerTaskStart taskStart) {
onEvent(taskStart);
}

@Override
public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
onEvent(taskGettingResult);
}

@Override
public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
onEvent(taskEnd);
}

@Override
public final void onJobStart(SparkListenerJobStart jobStart) {
onEvent(jobStart);
}

@Override
public final void onJobEnd(SparkListenerJobEnd jobEnd) {
onEvent(jobEnd);
}

@Override
public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
onEvent(environmentUpdate);
}

@Override
public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
onEvent(blockManagerAdded);
}

@Override
public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
onEvent(blockManagerRemoved);
}

@Override
public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
onEvent(unpersistRDD);
}

@Override
public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
onEvent(applicationStart);
}

@Override
public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
onEvent(applicationEnd);
}

@Override
public final void onExecutorMetricsUpdate(
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
}

@Override
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}

@Override
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ private UnsafeExternalSorter(
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
taskContext.addTaskCompletionListener(context -> {
cleanupResources();
});
}

/**
Expand Down
Loading