Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void close() {
}
}

public LockManager getLockManager() {
return lockManager;
}

public Option<HoodieInstant> getLastCompletedTransactionOwner() {
return lastCompletedTxnOwnerInstant;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutio
return this;
}

public HoodieLockConfig.Builder withFileSystemLockPath(String path) {
lockConfig.setValue(FILESYSTEM_LOCK_PATH, path);
return this;
}

public HoodieLockConfig.Builder withFileSystemLockExpire(Integer expireTime) {
lockConfig.setValue(FILESYSTEM_LOCK_EXPIRE, String.valueOf(expireTime));
return this;
}

public HoodieLockConfig build() {
lockConfig.setDefaults(HoodieLockConfig.class.getName());
return lockConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,21 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
if (this.metadataWriter == null) {
initMetadataWriter();
}
// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
try {
// guard the metadata writer with concurrent lock
this.txnManager.getLockManager().lock();

// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
} finally {
this.txnManager.getLockManager().unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;

/**
* Current checkpoint.
*/
private long checkpointId = -1;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -219,7 +214,6 @@ public void close() throws Exception {

@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
this.checkpointId = checkpointId;
executor.execute(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
Expand All @@ -43,6 +44,7 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieStorageConfig;
Expand Down Expand Up @@ -88,6 +90,7 @@
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;

/**
* Utilities for Flink stream read and write.
Expand Down Expand Up @@ -170,7 +173,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
Expand Down Expand Up @@ -218,6 +221,12 @@ public static HoodieWriteConfig getHoodieClientConfig(
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProvider.class)
.withLockWaitTimeInMillis(2000L) // 2s
.withFileSystemLockExpire(1) // 1 minute
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
.build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
Expand All @@ -231,6 +240,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());

// do not configure cleaning strategy as LAZY until multi-writers is supported.
HoodieWriteConfig writeConfig = builder.build();
if (loadFsViewStorageConfig) {
// do not use the builder to give a change for recovering the original fs view storage config
Expand Down Expand Up @@ -548,4 +558,11 @@ public static boolean fileExists(FileSystem fs, Path path) {
throw new HoodieException("Exception while checking file " + path + " existence", e);
}
}

/**
* Returns the auxiliary path.
*/
public static String getAuxiliaryPath(Configuration conf) {
return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,9 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -166,7 +165,6 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}

@Disabled
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
Expand Down Expand Up @@ -201,14 +199,13 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
asyncCompactionService.start(null);

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(5);
TimeUnit.SECONDS.sleep(10);

asyncCompactionService.shutDown();

TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}

@Disabled
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
Expand All @@ -218,7 +215,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
Expand All @@ -227,9 +223,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Expand All @@ -253,9 +246,13 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
tableEnv.executeSql(insertT1ForNewPartition).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
writeClient.close();
// re-create the write client/fs view server
// or there is low probability that connection refused occurs then
// the reader metadata view is not complete
writeClient = StreamerUtil.createWriteClient(conf);

metaClient.reloadActiveTimeline();
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));

HoodieFlinkTable<?> table = writeClient.getHoodieTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void testStreamWriteBatchRead() {
}

@Test
void testStreamWriteBatchReadOptimized() {
void testStreamWriteBatchReadOptimized() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
Expand All @@ -236,11 +236,16 @@ void testStreamWriteBatchReadOptimized() {
.option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
// disable the metadata table because
// the lock conflicts resolution takes time
.option(FlinkOptions.METADATA_ENABLED, false)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

// give some buffer time for finishing the async compaction tasks
TimeUnit.SECONDS.sleep(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid sleeping? By blocking on the writer to finish the writing instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to, but flink had no good manner for waiting for all the async tasks finish.

List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());

Expand Down