Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
Expand Down Expand Up @@ -280,4 +282,14 @@ default CheckpointStorageAccess getCheckpointStorageAccess() {
}

ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory();

default ChannelStateWriter getChannelStateWriter() {
throw new UnsupportedOperationException();
}

default void setChannelStateWriter(ChannelStateWriter channelStateWriter) {}

default StateBackend getStateBackend() {
throw new UnsupportedOperationException();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
Expand All @@ -47,6 +48,7 @@
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.util.UserCodeClassLoader;
Expand Down Expand Up @@ -118,6 +120,10 @@ public class RuntimeEnvironment implements Environment {

ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;

@Nullable private ChannelStateWriter channelStateWriter;

private final StateBackend stateBackend;

// ------------------------------------------------------------------------

public RuntimeEnvironment(
Expand Down Expand Up @@ -151,7 +157,8 @@ public RuntimeEnvironment(
Task containingTask,
ExternalResourceInfoProvider externalResourceInfoProvider,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory,
TaskManagerActions taskManagerActions) {
TaskManagerActions taskManagerActions,
StateBackend stateBackend) {

this.jobId = checkNotNull(jobId);
this.jobType = checkNotNull(jobType);
Expand Down Expand Up @@ -184,6 +191,7 @@ public RuntimeEnvironment(
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.channelStateExecutorFactory = checkNotNull(channelStateExecutorFactory);
this.taskManagerActions = checkNotNull(taskManagerActions);
this.stateBackend = checkNotNull(stateBackend);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -408,4 +416,22 @@ public CheckpointStorageAccess getCheckpointStorageAccess() {
public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() {
return channelStateExecutorFactory;
}

public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
checkState(
this.channelStateWriter == null, "Can not set channelStateWriter twice!");
this.channelStateWriter = channelStateWriter;
}

@Override
public ChannelStateWriter getChannelStateWriter() {
return checkNotNull(
channelStateWriter, "channelStateWriter has not been initialized yet!");
}

@Override
public StateBackend getStateBackend() {
return checkNotNull(
stateBackend, "stateBackend has not been initialized yet!");
}
Comment on lines +432 to +436
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, stateBackend never be null, it is initialized in constructor instead of setter.

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.AutoCloseableRegistry;
Expand All @@ -42,6 +43,9 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
Expand Down Expand Up @@ -78,11 +82,19 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
Expand All @@ -97,6 +109,7 @@
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -311,6 +324,8 @@ public class Task
*/
private UserCodeClassLoader userCodeClassLoader;

private volatile ChannelStateWriter channelStateWriter;

/**
* <b>IMPORTANT:</b> This constructor may not start any work that would need to be undone in the
* case of a failing task deployment.
Expand Down Expand Up @@ -698,6 +713,7 @@ private void doRun() {
TaskKvStateRegistry kvStateRegistry =
kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());

final StateBackend stateBackend = createStateBackend();
Environment env =
new RuntimeEnvironment(
jobId,
Expand Down Expand Up @@ -730,7 +746,21 @@ private void doRun() {
this,
externalResourceInfoProvider,
channelStateExecutorFactory,
taskManagerActions);
taskManagerActions,
stateBackend);

final CheckpointStorage checkpointStorage = createCheckpointStorage(stateBackend);
CheckpointStorageAccess checkpointStorageAccess =
checkpointStorage.createCheckpointStorage(this.jobId);
checkpointStorageAccess =
tryApplyFileMergingCheckpoint(
checkpointStorageAccess,
this.taskStateManager.getFileMergingSnapshotManager(),
postFailureCleanUpRegistry,
env);
this.channelStateWriter = createChannelStateWriter(checkpointStorage, checkpointStorageAccess);
env.setCheckpointStorageAccess(checkpointStorageAccess);
env.setChannelStateWriter(channelStateWriter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discuss before, StreamTask sets writer to env and Task gets and closes writer, right?

Yes

From the PR, the writer is set in Task. IIUC, we do not need to change lot of code if it is still set in StreamTask, right?

If so, the getStateBackend() may be not needed in Environment interface


// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
Expand Down Expand Up @@ -1011,6 +1041,14 @@ private void releaseResources() {
}
closeAllResultPartitions();
closeAllInputGates();
if (this.channelStateWriter != null) {
try {
this.channelStateWriter.close();
} catch (IOException e) {
LOG.error(
"Failed to close channelStateWriter for task {}.", taskNameWithSubtask, e);
}
}

try {
taskStateManager.close();
Expand Down Expand Up @@ -1558,6 +1596,110 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEv
}
}


// ------------------------------------------------------------------------
// State backend
// ------------------------------------------------------------------------

private StateBackend createStateBackend() throws Exception {
final StreamConfig configuration = new StreamConfig(this.taskConfiguration);
final StateBackend fromApplication =
configuration.getStateBackend(userCodeClassLoader.asClassLoader());

return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getJobConfiguration(),
taskManagerConfig.getConfiguration(),
userCodeClassLoader.asClassLoader(),
LOG);
}

private CheckpointStorage createCheckpointStorage(StateBackend backend) throws Exception {
final StreamConfig configuration = new StreamConfig(this.taskConfiguration);
final CheckpointStorage fromApplication =
configuration.getCheckpointStorage(userCodeClassLoader.asClassLoader());
return CheckpointStorageLoader.load(
fromApplication,
backend,
getJobConfiguration(),
taskManagerConfig.getConfiguration(),
userCodeClassLoader.asClassLoader(),
LOG);
}


private CheckpointStorageAccess tryApplyFileMergingCheckpoint(
CheckpointStorageAccess checkpointStorageAccess,
@Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
AutoCloseableRegistry cleanUpRegistry,
Environment environment) {
if (fileMergingSnapshotManager == null) {
return checkpointStorageAccess;
}
try {
CheckpointStorageAccess mergingCheckpointStorageAccess =
(CheckpointStorageAccess)
checkpointStorageAccess.toFileMergingStorage(
fileMergingSnapshotManager, environment);
mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
if (mergingCheckpointStorageAccess instanceof FsMergingCheckpointStorageAccess) {
cleanUpRegistry.registerCloseable(
() ->
((FsMergingCheckpointStorageAccess) mergingCheckpointStorageAccess)
.close());
}
return mergingCheckpointStorageAccess;
} catch (IOException e) {
LOG.warn(
"Initiating FsMergingCheckpointStorageAccess failed "
+ "with exception: {}, falling back to original checkpoint storage access {}.",
e.getMessage(),
checkpointStorageAccess.getClass(),
e);
return checkpointStorageAccess;
}
}

private ChannelStateWriter createChannelStateWriter(CheckpointStorage checkpointStorage, CheckpointStorageAccess checkpointStorageAccess) throws Exception {
return CheckpointingOptions.isUnalignedCheckpointEnabled(getJobConfiguration())
? this.openChannelStateWriter(
// Note: don't pass checkpointStorageAccess directly to channel
// state writer.
// The fileSystem of checkpointStorageAccess may be an instance
// of SafetyNetWrapperFileSystem, which close all held streams
// when thread exits. Channel state writers are invoked in other
// threads instead of task thread, therefore channel state
// writer cannot share file streams directly, otherwise
// conflicts will occur on job exit.
() -> {
if (checkpointStorageAccess
instanceof FsMergingCheckpointStorageAccess) {
// FsMergingCheckpointStorageAccess using unguarded
// fileSystem, which can be shared.
return checkpointStorageAccess;
} else {
// Other checkpoint storage access should be lazily
// initialized to avoid sharing.
return checkpointStorage.createCheckpointStorage(
this.jobId);
}
})
: ChannelStateWriter.NO_OP;
}

public ChannelStateWriter openChannelStateWriter(SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerView) {
return new ChannelStateWriterImpl(
this.getJobVertexId(),
this.taskNameWithSubtask,
this.taskInfo.getIndexOfThisSubtask(),
checkpointStorageWorkerView,
this.channelStateExecutorFactory,
jobConfiguration.get(
CheckpointingOptions
.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE));
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Loading