Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37158][tests] Introduce ForSt to existing ITCases #26000

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;

import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;

Expand Down Expand Up @@ -185,16 +186,21 @@ public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

@Override
public void enableAsyncState() {
OneInputStreamOperator<IN, OUT> operator =
(OneInputStreamOperator<IN, OUT>)
((SimpleOperatorFactory<OUT>) operatorFactory).getOperator();
if (!(operator instanceof AsyncStateProcessingOperator)) {
super.enableAsyncState();
}
}

public boolean isOutputOnlyAfterEndOfStream() {
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}

public boolean isInternalSorterSupported() {
return operatorFactory.getOperatorAttributes().isInternalSorterSupported();
}

@Override
public void enableAsyncState() {
// nothing to do.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -620,27 +620,7 @@ private void relocateDefaultDbLogDir(DBOptions dbOptions) {
String relocatedDbLogDir = logFile.getParent();
this.relocatedDbLogBaseDir = new File(relocatedDbLogDir).toPath();
dbOptions.setDbLogDir(relocatedDbLogDir);
} else {
setLocalForStPathAsLogDir(dbOptions);
}
} else {
setLocalForStPathAsLogDir(dbOptions);
}
}

private void setLocalForStPathAsLogDir(DBOptions dbOptions) {
// Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will
// concatenates the dfs directory with the local directory as working dir when using flink
// env. We expect to directly use the dfs directory in flink env or local directory as
// working dir. We will implement this in ForStDB later, but before that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed. Another problem is that when
// the system property "log.file" is not set, ForSt directly uses the instance path as the
// log dir, which results in "/" being used as the log directory. This often has permission
// issues, so the db log dir is temporarily set explicitly here.
// TODO: remove this method after ForSt deal log dir well
if (localForStPath != null) {
this.relocatedDbLogBaseDir = java.nio.file.Path.of(localForStPath.toUri().toString());
dbOptions.setDbLogDir(localForStPath.getPath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
throw new UnsupportedOperationException("This method is not supported.");
throw new UnsupportedOperationException(
"Canonical savepoints are not supported by ForSt State Backend.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public static Collection<Object[]> data() {
new Object[][] {
{"rocksdb", false},
{"rocksdb", true},
{"hashmap", false}
{"hashmap", false},
{"forst", false},
{"forst", true}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -110,6 +112,7 @@ enum StateBackendEnum {
ROCKSDB_FULL,
ROCKSDB_INCREMENTAL,
ROCKSDB_INCREMENTAL_ZK,
FORST_INCREMENTAL
}

@Parameterized.Parameters(name = "statebackend type ={0}")
Expand Down Expand Up @@ -191,6 +194,14 @@ private Configuration getConfiguration() throws Exception {
setupRocksDB(config, 16, true);
break;
}
case FORST_INCREMENTAL:
{
config.set(
ForStOptions.TIMER_SERVICE_FACTORY,
ForStStateBackend.PriorityQueueStateType.ForStDB);
setupForSt(config, 16);
break;
}
default:
throw new IllegalStateException("No backend selected.");
}
Expand Down Expand Up @@ -229,6 +240,29 @@ private void setupRocksDB(
config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
}

private void setupForSt(Configuration config, int fileSizeThreshold) throws IOException {
// Configure the managed memory size as 64MB per slot for rocksDB state backend.
config.set(
TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64));

final String forstdb = tempFolder.newFolder().getAbsolutePath();
final File backups = tempFolder.newFolder().getAbsoluteFile();
// we use the fs backend with small threshold here to test the behaviour with file
// references, not self contained byte handles
config.set(StateBackendOptions.STATE_BACKEND, "forst");
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
Path.fromLocalFile(backups).toUri().toString());
if (fileSizeThreshold != -1) {
config.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
MemorySize.parse(fileSizeThreshold + "b"));
}
config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
}

protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -143,6 +146,21 @@ public void testWithRocksDbBackendIncremental() throws Exception {
testProgramWithBackend(env);
}

@Test
public void testWithForStBackendIncremental() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(
new Configuration()
.set(StateBackendOptions.STATE_BACKEND, "forst")
.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true)
.set(
ForStOptions.LOCAL_DIRECTORIES,
tmpFolder.newFolder().getAbsolutePath()));
CheckpointStorageUtils.configureFileSystemCheckpointStorage(
env, tmpFolder.newFolder().toURI().toString());
testProgramWithBackend(env);
}

// ------------------------------------------------------------------------

protected void testProgramWithBackend(StreamExecutionEnvironment env) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
Expand All @@ -40,8 +43,10 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
Expand All @@ -56,8 +61,12 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
Expand All @@ -74,6 +83,7 @@
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase,
* because the static fields in these classes can not be shared.
*/
@RunWith(Parameterized.class)
public class RescaleCheckpointManuallyITCase extends TestLogger {

private static final int NUM_TASK_MANAGERS = 2;
Expand All @@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();

@Parameterized.Parameter(0)
public String statebackendType;

@Parameterized.Parameter(1)
public boolean enableAsyncState;

@Parameterized.Parameters(name = "statebackend type ={0}, enableAsyncState={1}")
public static Collection<Object[]> parameter() {
return Arrays.asList(
new Object[][] {
{"forst", true}, {"forst", false}, {"rocksdb", true}, {"rocksdb", false}
});
}

@Before
public void setup() throws Exception {
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

cluster =
Expand Down Expand Up @@ -263,7 +287,7 @@ private JobGraph createJobGraphWithKeyedState(

SharedReference<JobID> jobID = sharedObjects.add(new JobID());
SharedReference<MiniCluster> miniClusterRef = sharedObjects.add(miniCluster);
DataStream<Integer> input =
KeyedStream<Integer, Integer> input =
env.addSource(
new NotifyingDefiniteKeySource(
numberKeys, numberElements, failAfterEmission) {
Expand Down Expand Up @@ -300,10 +324,18 @@ public Integer getKey(Integer value) {
return value;
}
});
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));
if (enableAsyncState) {
input.enableAsyncState();
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new AsyncSubtaskIndexFlatMapper(numberElementsExpect));

result.sinkTo(new CollectionSink<>());
result.sinkTo(new CollectionSink<>());
} else {
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));

result.sinkTo(new CollectionSink<>());
}

return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
}
Expand Down Expand Up @@ -349,8 +381,9 @@ public void run(SourceContext<Integer> ctx) throws Exception {
} else {
boolean newCheckpoint = false;
long waited = 0L;
running = false;
// maximum wait 5min
while (!newCheckpoint && waited < 30000L) {
while (!newCheckpoint && waited < 300000L) {
synchronized (ctx.getCheckpointLock()) {
newCheckpoint = waitCheckpointCompleted();
}
Expand Down Expand Up @@ -423,6 +456,79 @@ public void initializeState(FunctionInitializationContext context) throws Except
}
}

private static class AsyncSubtaskIndexFlatMapper
extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private transient org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
private transient org.apache.flink.api.common.state.v2.ValueState<Integer> sum;

private final int numberElements;

public AsyncSubtaskIndexFlatMapper(int numberElements) {
this.numberElements = numberElements;
}

@Override
public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out)
throws Exception {
StateFuture<Integer> counterFuture =
counter.asyncValue()
.thenCompose(
(Integer c) -> {
int updated = c == null ? 1 : c + 1;
return counter.asyncUpdate(updated)
.thenApply(nothing -> updated);
});
StateFuture<Integer> sumFuture =
sum.asyncValue()
.thenCompose(
(Integer s) -> {
int updated = s == null ? value : s + value;
return sum.asyncUpdate(updated)
.thenApply(nothing -> updated);
});

counterFuture.thenCombine(
sumFuture,
(c, s) -> {
if (c == numberElements) {
out.collect(
Tuple2.of(
getRuntimeContext()
.getTaskInfo()
.getIndexOfThisSubtask(),
s));
}
return null;
});
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {}

@Override
public void open(OpenContext openContext) throws Exception {
counter =
((StreamingRuntimeContext) getRuntimeContext())
.getValueState(
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
"counter", BasicTypeInfo.INT_TYPE_INFO));
sum =
((StreamingRuntimeContext) getRuntimeContext())
.getValueState(
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
"sum", BasicTypeInfo.INT_TYPE_INFO));
}
}

private static class CollectionSink<IN> implements Sink<IN> {

private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> writers =
Expand Down