diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index a4b203b5b9d4c..c3e9c4e6a6ca7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -46,6 +47,7 @@
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
@@ -280,4 +282,14 @@ default CheckpointStorageAccess getCheckpointStorageAccess() {
}
ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory();
+
+ default ChannelStateWriter getChannelStateWriter() {
+ throw new UnsupportedOperationException();
+ }
+
+ default void setChannelStateWriter(ChannelStateWriter channelStateWriter) {}
+
+ default StateBackend getStateBackend() {
+ throw new UnsupportedOperationException();
+ };
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index d5e297c54b0ee..50aa82045dcab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
@@ -47,6 +48,7 @@
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.util.UserCodeClassLoader;
@@ -118,6 +120,10 @@ public class RuntimeEnvironment implements Environment {
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;
+ @Nullable private ChannelStateWriter channelStateWriter;
+
+ private final StateBackend stateBackend;
+
// ------------------------------------------------------------------------
public RuntimeEnvironment(
@@ -151,7 +157,8 @@ public RuntimeEnvironment(
Task containingTask,
ExternalResourceInfoProvider externalResourceInfoProvider,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory,
- TaskManagerActions taskManagerActions) {
+ TaskManagerActions taskManagerActions,
+ StateBackend stateBackend) {
this.jobId = checkNotNull(jobId);
this.jobType = checkNotNull(jobType);
@@ -184,6 +191,7 @@ public RuntimeEnvironment(
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.channelStateExecutorFactory = checkNotNull(channelStateExecutorFactory);
this.taskManagerActions = checkNotNull(taskManagerActions);
+ this.stateBackend = checkNotNull(stateBackend);
}
// ------------------------------------------------------------------------
@@ -408,4 +416,22 @@ public CheckpointStorageAccess getCheckpointStorageAccess() {
public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() {
return channelStateExecutorFactory;
}
+
+ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+ checkState(
+ this.channelStateWriter == null, "Can not set channelStateWriter twice!");
+ this.channelStateWriter = channelStateWriter;
+ }
+
+ @Override
+ public ChannelStateWriter getChannelStateWriter() {
+ return checkNotNull(
+ channelStateWriter, "channelStateWriter has not been initialized yet!");
+ }
+
+ @Override
+ public StateBackend getStateBackend() {
+ return checkNotNull(
+ stateBackend, "stateBackend has not been initialized yet!");
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1be391cd9856b..12f336b105f9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,6 +27,7 @@
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.AutoCloseableRegistry;
@@ -42,6 +43,9 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -78,11 +82,19 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLoader;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
@@ -97,6 +109,7 @@
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -311,6 +324,8 @@ public class Task
*/
private UserCodeClassLoader userCodeClassLoader;
+ private volatile ChannelStateWriter channelStateWriter;
+
/**
* IMPORTANT: This constructor may not start any work that would need to be undone in the
* case of a failing task deployment.
@@ -698,6 +713,7 @@ private void doRun() {
TaskKvStateRegistry kvStateRegistry =
kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());
+ final StateBackend stateBackend = createStateBackend();
Environment env =
new RuntimeEnvironment(
jobId,
@@ -730,7 +746,21 @@ private void doRun() {
this,
externalResourceInfoProvider,
channelStateExecutorFactory,
- taskManagerActions);
+ taskManagerActions,
+ stateBackend);
+
+ final CheckpointStorage checkpointStorage = createCheckpointStorage(stateBackend);
+ CheckpointStorageAccess checkpointStorageAccess =
+ checkpointStorage.createCheckpointStorage(this.jobId);
+ checkpointStorageAccess =
+ tryApplyFileMergingCheckpoint(
+ checkpointStorageAccess,
+ this.taskStateManager.getFileMergingSnapshotManager(),
+ postFailureCleanUpRegistry,
+ env);
+ this.channelStateWriter = createChannelStateWriter(checkpointStorage, checkpointStorageAccess);
+ env.setCheckpointStorageAccess(checkpointStorageAccess);
+ env.setChannelStateWriter(channelStateWriter);
// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
@@ -1011,6 +1041,14 @@ private void releaseResources() {
}
closeAllResultPartitions();
closeAllInputGates();
+ if (this.channelStateWriter != null) {
+ try {
+ this.channelStateWriter.close();
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to close channelStateWriter for task {}.", taskNameWithSubtask, e);
+ }
+ }
try {
taskStateManager.close();
@@ -1558,6 +1596,110 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue
+ ((FsMergingCheckpointStorageAccess) mergingCheckpointStorageAccess)
+ .close());
+ }
+ return mergingCheckpointStorageAccess;
+ } catch (IOException e) {
+ LOG.warn(
+ "Initiating FsMergingCheckpointStorageAccess failed "
+ + "with exception: {}, falling back to original checkpoint storage access {}.",
+ e.getMessage(),
+ checkpointStorageAccess.getClass(),
+ e);
+ return checkpointStorageAccess;
+ }
+ }
+
+ private ChannelStateWriter createChannelStateWriter(CheckpointStorage checkpointStorage, CheckpointStorageAccess checkpointStorageAccess) throws Exception {
+ return CheckpointingOptions.isUnalignedCheckpointEnabled(getJobConfiguration())
+ ? this.openChannelStateWriter(
+ // Note: don't pass checkpointStorageAccess directly to channel
+ // state writer.
+ // The fileSystem of checkpointStorageAccess may be an instance
+ // of SafetyNetWrapperFileSystem, which close all held streams
+ // when thread exits. Channel state writers are invoked in other
+ // threads instead of task thread, therefore channel state
+ // writer cannot share file streams directly, otherwise
+ // conflicts will occur on job exit.
+ () -> {
+ if (checkpointStorageAccess
+ instanceof FsMergingCheckpointStorageAccess) {
+ // FsMergingCheckpointStorageAccess using unguarded
+ // fileSystem, which can be shared.
+ return checkpointStorageAccess;
+ } else {
+ // Other checkpoint storage access should be lazily
+ // initialized to avoid sharing.
+ return checkpointStorage.createCheckpointStorage(
+ this.jobId);
+ }
+ })
+ : ChannelStateWriter.NO_OP;
+ }
+
+ public ChannelStateWriter openChannelStateWriter(SupplierWithException
+ checkpointStorageWorkerView) {
+ return new ChannelStateWriterImpl(
+ this.getJobVertexId(),
+ this.taskNameWithSubtask,
+ this.taskInfo.getIndexOfThisSubtask(),
+ checkpointStorageWorkerView,
+ this.channelStateExecutorFactory,
+ jobConfiguration.get(
+ CheckpointingOptions
+ .UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE));
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7e02d458464f..324f098b6ee56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -149,7 +149,6 @@
import static org.apache.flink.runtime.metrics.MetricNames.INITIALIZE_STATE_DURATION;
import static org.apache.flink.runtime.metrics.MetricNames.MAILBOX_START_DURATION;
import static org.apache.flink.runtime.metrics.MetricNames.READ_OUTPUT_DATA_DURATION;
-import static org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.openChannelStateWriter;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
@@ -239,7 +238,6 @@ public abstract class StreamTask>
protected final StateBackend stateBackend;
/** Our checkpoint storage. We use this to create checkpoint streams. */
- protected final CheckpointStorage checkpointStorage;
private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
@@ -463,8 +461,7 @@ protected StreamTask(
environment.setMainMailboxExecutor(mainMailboxExecutor);
environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
- this.stateBackend = createStateBackend();
- this.checkpointStorage = createCheckpointStorage(stateBackend);
+ this.stateBackend = environment.getStateBackend();
this.changelogWriterAvailabilityProvider =
environment.getTaskStateManager().getStateChangelogStorage() == null
? null
@@ -473,14 +470,6 @@ protected StreamTask(
.getStateChangelogStorage()
.getAvailabilityProvider();
- CheckpointStorageAccess checkpointStorageAccess =
- checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
- checkpointStorageAccess =
- tryApplyFileMergingCheckpoint(
- checkpointStorageAccess,
- environment.getTaskStateManager().getFileMergingSnapshotManager());
- environment.setCheckpointStorageAccess(checkpointStorageAccess);
-
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
this.timerService = createTimerService("Time Trigger for " + getName());
@@ -489,42 +478,10 @@ protected StreamTask(
}
this.systemTimerService = createTimerService("System Time Trigger for " + getName());
- final CheckpointStorageAccess finalCheckpointStorageAccess = checkpointStorageAccess;
-
- ChannelStateWriter channelStateWriter =
- CheckpointingOptions.isUnalignedCheckpointEnabled(getJobConfiguration())
- ? openChannelStateWriter(
- getName(),
- // Note: don't pass checkpointStorageAccess directly to channel
- // state writer.
- // The fileSystem of checkpointStorageAccess may be an instance
- // of SafetyNetWrapperFileSystem, which close all held streams
- // when thread exits. Channel state writers are invoked in other
- // threads instead of task thread, therefore channel state
- // writer cannot share file streams directly, otherwise
- // conflicts will occur on job exit.
- () -> {
- if (finalCheckpointStorageAccess
- instanceof FsMergingCheckpointStorageAccess) {
- // FsMergingCheckpointStorageAccess using unguarded
- // fileSystem, which can be shared.
- return finalCheckpointStorageAccess;
- } else {
- // Other checkpoint storage access should be lazily
- // initialized to avoid sharing.
- return checkpointStorage.createCheckpointStorage(
- getEnvironment().getJobID());
- }
- },
- environment,
- getJobConfiguration()
- .get(
- CheckpointingOptions
- .UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE))
- : ChannelStateWriter.NO_OP;
+
this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
- checkpointStorageAccess,
+ environment.getCheckpointStorageAccess(),
getName(),
actionExecutor,
getAsyncOperationsThreadPool(),
@@ -533,7 +490,7 @@ protected StreamTask(
this::prepareInputSnapshot,
getJobConfiguration()
.get(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS),
- channelStateWriter,
+ environment.getChannelStateWriter(),
configuration
.getConfiguration()
.get(
@@ -564,35 +521,6 @@ protected StreamTask(
}
}
- private CheckpointStorageAccess tryApplyFileMergingCheckpoint(
- CheckpointStorageAccess checkpointStorageAccess,
- @Nullable FileMergingSnapshotManager fileMergingSnapshotManager) {
- if (fileMergingSnapshotManager == null) {
- return checkpointStorageAccess;
- }
- try {
- CheckpointStorageAccess mergingCheckpointStorageAccess =
- (CheckpointStorageAccess)
- checkpointStorageAccess.toFileMergingStorage(
- fileMergingSnapshotManager, environment);
- mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
- if (mergingCheckpointStorageAccess instanceof FsMergingCheckpointStorageAccess) {
- resourceCloser.registerCloseable(
- () ->
- ((FsMergingCheckpointStorageAccess) mergingCheckpointStorageAccess)
- .close());
- }
- return mergingCheckpointStorageAccess;
- } catch (IOException e) {
- LOG.warn(
- "Initiating FsMergingCheckpointStorageAccess failed "
- + "with exception: {}, falling back to original checkpoint storage access {}.",
- e.getMessage(),
- checkpointStorageAccess.getClass(),
- e);
- return checkpointStorageAccess;
- }
- }
private TimerService createTimerService(String timerThreadName) {
ThreadFactory timerThreadFactory =
@@ -1635,34 +1563,6 @@ public void dispatchOperatorEvent(OperatorID operator, SerializedValue
- checkpointStorageWorkerView,
- Environment env,
- int maxSubtasksPerChannelStateFile) {
- return new ChannelStateWriterImpl(
- env.getJobVertexId(),
- taskName,
- env.getTaskInfo().getIndexOfThisSubtask(),
- checkpointStorageWorkerView,
- env.getChannelStateExecutorFactory(),
- maxSubtasksPerChannelStateFile);
- }
-
@Override
public void abortCheckpointOnBarrier(
long checkpointId, CheckpointException cause, OperatorChain, ?> operatorChain)
@@ -579,7 +564,6 @@ public void cancel() throws IOException {
}
}
IOUtils.closeAllQuietly(asyncCheckpointRunnables);
- channelStateWriter.close();
}
@VisibleForTesting
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
index f315c7efc09d3..0249ba37740fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
@@ -34,7 +34,6 @@
import java.util.concurrent.ExecutorService;
import static org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.IMMEDIATE;
-import static org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.openChannelStateWriter;
/** A mock builder to build {@link SubtaskCheckpointCoordinator}. */
public class MockSubtaskCheckpointCoordinatorBuilder {
@@ -100,16 +99,7 @@ SubtaskCheckpointCoordinator build() throws IOException {
if (asyncExceptionHandler == null) {
this.asyncExceptionHandler = new NonHandleAsyncException();
}
- ChannelStateWriter channelStateWriter =
- unalignedCheckpointEnabled
- ? openChannelStateWriter(
- taskName,
- () ->
- checkpointStorage.createCheckpointStorage(
- environment.getJobID()),
- environment,
- maxSubtasksPerChannelStateFile)
- : ChannelStateWriter.NO_OP;
+ ChannelStateWriter channelStateWriter = this.environment.getChannelStateWriter();
return new SubtaskCheckpointCoordinatorImpl(
checkpointStorage.createCheckpointStorage(environment.getJobID()),