Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
Expand Down Expand Up @@ -70,6 +71,8 @@
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.concurrent.Executors;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -119,6 +122,8 @@ public class SavepointEnvironment implements Environment {

private final ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;

@Nullable private ChannelStateWriter channelStateWriter;

private SavepointEnvironment(
RuntimeContext ctx,
ExecutionConfig executionConfig,
Expand Down Expand Up @@ -440,4 +445,15 @@ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
return CompletableFuture.completedFuture(null);
}
}

@Override
public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
this.channelStateWriter = channelStateWriter;
}

@Override
@Nullable
public ChannelStateWriter getChannelStateWriter() {
return this.channelStateWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -280,4 +281,8 @@ default CheckpointStorageAccess getCheckpointStorageAccess() {
}

ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory();

void setChannelStateWriter(ChannelStateWriter channelStateWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the pattern seems to be the define the set then the get method.


ChannelStateWriter getChannelStateWriter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class RuntimeEnvironment implements Environment {

ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;

@Nullable private ChannelStateWriter channelStateWriter;

// ------------------------------------------------------------------------

public RuntimeEnvironment(
Expand Down Expand Up @@ -408,4 +411,15 @@ public CheckpointStorageAccess getCheckpointStorageAccess() {
public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() {
return channelStateExecutorFactory;
}

public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
checkState(this.channelStateWriter == null, "Cannot set channelStateWriter twice!");
this.channelStateWriter = channelStateWriter;
}

@Override
@Nullable
public ChannelStateWriter getChannelStateWriter() {
return this.channelStateWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
Expand Down Expand Up @@ -311,6 +312,9 @@ public class Task
*/
private UserCodeClassLoader userCodeClassLoader;

/** The channelStateWriter of the env. We obtain it after the invokable is initialized. */
@Nullable private volatile ChannelStateWriter channelStateWriter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest a comment to describe why this needs to be volatile


/**
* <b>IMPORTANT:</b> This constructor may not start any work that would need to be undone in the
* case of a failing task deployment.
Expand Down Expand Up @@ -508,6 +512,12 @@ TaskInvokable getInvokable() {
return invokable;
}

@Nullable
@VisibleForTesting
ChannelStateWriter getChannelStateWriter() {
return channelStateWriter;
}

public boolean isBackPressured() {
if (invokable == null
|| partitionWriters.length == 0
Expand Down Expand Up @@ -749,6 +759,10 @@ private void doRun() {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}

// We register a reference to the channelStateWriter
// so we can close it after the inputGates close
this.channelStateWriter = env.getChannelStateWriter();

// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
Expand Down Expand Up @@ -1011,6 +1025,16 @@ private void releaseResources() {
}
closeAllResultPartitions();
closeAllInputGates();
if (this.channelStateWriter != null) {
LOG.debug("Closing channelStateWriter for task {}", taskNameWithSubtask);
try {
this.channelStateWriter.close();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
LOG.error(
"Failed to close channelStateWriter for task {}.", taskNameWithSubtask, t);
}
}

try {
taskStateManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ protected StreamTask(
CheckpointingOptions
.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE))
: ChannelStateWriter.NO_OP;
environment.setChannelStateWriter(channelStateWriter);

this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
checkpointStorageAccess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,6 @@ public void cancel() throws IOException {
}
}
IOUtils.closeAllQuietly(asyncCheckpointRunnables);
channelStateWriter.close();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
Expand Down Expand Up @@ -61,6 +62,8 @@
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -88,6 +91,8 @@ public class DummyEnvironment implements Environment {

private CheckpointStorageAccess checkpointStorageAccess;

@Nullable private ChannelStateWriter channelStateWriter;

public DummyEnvironment() {
this("Test Job", 1, 0, 1);
}
Expand Down Expand Up @@ -312,4 +317,15 @@ public void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorage
public CheckpointStorageAccess getCheckpointStorageAccess() {
return checkNotNull(checkpointStorageAccess);
}

@Override
public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
this.channelStateWriter = channelStateWriter;
}

@Override
@Nullable
public ChannelStateWriter getChannelStateWriter() {
return this.channelStateWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
Expand Down Expand Up @@ -65,6 +66,8 @@
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.concurrent.Executors;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -149,6 +152,8 @@ public class MockEnvironment implements Environment, AutoCloseable {

private final ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;

@Nullable private ChannelStateWriter channelStateWriter;

public static MockEnvironmentBuilder builder() {
return new MockEnvironmentBuilder();
}
Expand Down Expand Up @@ -495,4 +500,15 @@ public Optional<? extends Throwable> getActualExternalFailureCause() {
public void setExternalFailureCauseConsumer(Consumer<Throwable> externalFailureCauseConsumer) {
this.externalFailureCauseConsumer = Optional.of(externalFailureCauseConsumer);
}

@Override
public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
this.channelStateWriter = channelStateWriter;
}

@Override
@Nullable
public ChannelStateWriter getChannelStateWriter() {
return this.channelStateWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
Expand Down Expand Up @@ -1249,6 +1251,34 @@ public void testDeclineCheckpoint() throws Exception {
assertEquals(ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
}

private void testChannelStateWriterCloses(Class<? extends TriggerLatchInvokable> invokable)
throws Exception {
final Task task =
createTaskBuilder()
.setInvokable(invokable)
.setTaskManagerActions(new NoOpTaskManagerActions())
.build(Executors.directExecutor());

task.startTaskThread();
awaitInvokableLatch(task);
ChannelStateWriterWithCloseTracker channelStateWriter =
(ChannelStateWriterWithCloseTracker) task.getChannelStateWriter();
assertFalse(channelStateWriter.isClosed());
triggerInvokableLatch(task);
task.getExecutingThread().join();
assertTrue(channelStateWriter.isClosed());
}

@Test
public void testChannelStateWriterClosesOnSuccess() throws Exception {
testChannelStateWriterCloses(ChannelStateWriterSetterInvokable.class);
}

@Test
public void testChannelStateWriterClosesOnFailure() throws Exception {
testChannelStateWriterCloses(FailingChannelStateWriterSetterInvokable.class);
}

private void assertCheckpointDeclined(
Task task,
TestCheckpointResponder testCheckpointResponder,
Expand Down Expand Up @@ -1576,7 +1606,7 @@ public void cleanUp(Throwable throwable) throws Exception {
}

/** {@link AbstractInvokable} which throws {@link RuntimeException} on invoke. */
public static final class InvokableWithExceptionOnTrigger extends TriggerLatchInvokable {
public static class InvokableWithExceptionOnTrigger extends TriggerLatchInvokable {
public InvokableWithExceptionOnTrigger(Environment environment) {
super(environment);
}
Expand Down Expand Up @@ -1762,4 +1792,35 @@ void awaitTriggerLatch() {
}
}
}

private static class ChannelStateWriterWithCloseTracker
extends ChannelStateWriter.NoOpChannelStateWriter {
private final AtomicBoolean closeCalled = new AtomicBoolean(false);

@Override
public void close() {
closeCalled.set(true);
;
}

public boolean isClosed() {
return closeCalled.get();
}
}

private static class ChannelStateWriterSetterInvokable extends InvokableBlockingWithTrigger {

public ChannelStateWriterSetterInvokable(Environment environment) {
super(environment);
environment.setChannelStateWriter(new ChannelStateWriterWithCloseTracker());
}
}

private static class FailingChannelStateWriterSetterInvokable
extends InvokableWithExceptionOnTrigger {
public FailingChannelStateWriterSetterInvokable(Environment environment) {
super(environment);
environment.setChannelStateWriter(new ChannelStateWriterWithCloseTracker());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
Expand Down Expand Up @@ -143,6 +144,8 @@ public class StreamMockEnvironment implements Environment {

private CheckpointStorageAccess checkpointStorageAccess;

@Nullable private ChannelStateWriter channelStateWriter;

public StreamMockEnvironment(
Configuration jobConfig,
Configuration taskConfig,
Expand Down Expand Up @@ -455,4 +458,15 @@ public void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorage
public CheckpointStorageAccess getCheckpointStorageAccess() {
return checkpointStorageAccess;
}

@Override
public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
this.channelStateWriter = channelStateWriter;
}

@Override
@Nullable
public ChannelStateWriter getChannelStateWriter() {
return this.channelStateWriter;
}
}