Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ parameters:
default:
- 'hudi-common'
- 'hudi-flink-datasource/hudi-flink'
- 'hudi-flink-datasource/hudi-flink1.13.x'
- 'hudi-flink-datasource/hudi-flink1.14.x'
- 'hudi-flink-datasource/hudi-flink1.13'
- 'hudi-flink-datasource/hudi-flink1.14'
- name: job2Modules
type: object
default:
Expand Down Expand Up @@ -62,8 +62,8 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.13'
- '!hudi-flink-datasource/hudi-flink1.14'
- '!hudi-sync'
- '!hudi-sync/hudi-adb-sync'
- '!hudi-sync/hudi-datahub-sync'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void close() {
}
}

public LockManager getLockManager() {
return lockManager;
}

public Option<HoodieInstant> getLastCompletedTransactionOwner() {
return lastCompletedTxnOwnerInstant;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutio
return this;
}

public HoodieLockConfig.Builder withFileSystemLockPath(String path) {
lockConfig.setValue(FILESYSTEM_LOCK_PATH, path);
return this;
}

public HoodieLockConfig.Builder withFileSystemLockExpire(Integer expireTime) {
lockConfig.setValue(FILESYSTEM_LOCK_EXPIRE, String.valueOf(expireTime));
return this;
}

public HoodieLockConfig build() {
lockConfig.setDefaults(HoodieLockConfig.class.getName());
return lockConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,21 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
if (this.metadataWriter == null) {
initMetadataWriter();
}
// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
try {
// guard the metadata writer with concurrent lock
this.txnManager.getLockManager().lock();

// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
} finally {
this.txnManager.getLockManager().unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ trait SparkAdapter extends Serializable {
* Extract condition in [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def extractCondition(deleteFromTable: Command): Expression
def extractDeleteCondition(deleteFromTable: Command): Expression

/**
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.Option;

import java.io.IOException;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
* limitations under the License.
*/

package org.apache.hudi.payload;

import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
package org.apache.hudi.common.model;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;

/**
* Current checkpoint.
*/
private long checkpointId = -1;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -219,7 +214,6 @@ public void close() throws Exception {

@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
this.checkpointId = checkpointId;
executor.execute(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
Expand All @@ -43,6 +44,7 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieStorageConfig;
Expand Down Expand Up @@ -88,6 +90,7 @@
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;

/**
* Utilities for Flink stream read and write.
Expand Down Expand Up @@ -170,7 +173,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
Expand Down Expand Up @@ -218,6 +221,12 @@ public static HoodieWriteConfig getHoodieClientConfig(
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProvider.class)
.withLockWaitTimeInMillis(2000L) // 2s
.withFileSystemLockExpire(1) // 1 minute
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
.build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
Expand All @@ -231,6 +240,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());

// do not configure cleaning strategy as LAZY until multi-writers is supported.
HoodieWriteConfig writeConfig = builder.build();
if (loadFsViewStorageConfig) {
// do not use the builder to give a change for recovering the original fs view storage config
Expand Down Expand Up @@ -548,4 +558,11 @@ public static boolean fileExists(FileSystem fs, Path path) {
throw new HoodieException("Exception while checking file " + path + " existence", e);
}
}

/**
* Returns the auxiliary path.
*/
public static String getAuxiliaryPath(Configuration conf) {
return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,9 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -166,7 +165,6 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}

@Disabled
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
Expand Down Expand Up @@ -201,14 +199,13 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
asyncCompactionService.start(null);

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(5);
TimeUnit.SECONDS.sleep(10);

asyncCompactionService.shutDown();

TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}

@Disabled
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
Expand All @@ -218,7 +215,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
Expand All @@ -227,9 +223,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Expand All @@ -253,9 +246,13 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
tableEnv.executeSql(insertT1ForNewPartition).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
writeClient.close();
// re-create the write client/fs view server
// or there is low probability that connection refused occurs then
// the reader metadata view is not complete
writeClient = StreamerUtil.createWriteClient(conf);

metaClient.reloadActiveTimeline();
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));

HoodieFlinkTable<?> table = writeClient.getHoodieTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void testStreamWriteBatchRead() {
}

@Test
void testStreamWriteBatchReadOptimized() {
void testStreamWriteBatchReadOptimized() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
Expand All @@ -236,11 +236,16 @@ void testStreamWriteBatchReadOptimized() {
.option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
// disable the metadata table because
// the lock conflicts resolution takes time
.option(FlinkOptions.METADATA_ENABLED, false)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

// give some buffer time for finishing the async compaction tasks
TimeUnit.SECONDS.sleep(5);
List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-flink1.13.x</artifactId>
<artifactId>hudi-flink1.13</artifactId>
<version>0.12.0-SNAPSHOT</version>
<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-flink1.14.x</artifactId>
<artifactId>hudi-flink1.14</artifactId>
<version>0.12.0-SNAPSHOT</version>
<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-flink1.15.x</artifactId>
<artifactId>hudi-flink1.15</artifactId>
<version>0.12.0-SNAPSHOT</version>
<packaging>jar</packaging>

Expand Down
6 changes: 3 additions & 3 deletions hudi-flink-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
</properties>

<modules>
<module>hudi-flink1.13.x</module>
<module>hudi-flink1.14.x</module>
<module>hudi-flink1.15.x</module>
<module>hudi-flink1.13</module>
<module>hudi-flink1.14</module>
<module>hudi-flink1.15</module>
<module>hudi-flink</module>
</modules>

Expand Down
14 changes: 7 additions & 7 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in

`hudi-spark`
`hudi-spark2`
`hudi-spark3.1.x`
`hudi-spark3.2.x`
`hudi-spark3.3.x`
`hudi-spark3.1`
`hudi-spark3.2`
`hudi-spark3.3`
`hudi-spark2-common`
`hudi-spark3-common`
`hudi-spark-common`

* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4
file that supports spark sql on spark 2.x version.
* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions.
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark3.2.x is the module that contains the code that compatible with spark 3.2.x versions.
* hudi-spark3.3.x is the module that contains the code that compatible with spark 3.3.x+ versions.
* hudi-spark3.1 is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark3.2 is the module that contains the code that compatible with spark 3.2.x versions.
* hudi-spark3.3 is the module that contains the code that compatible with spark 3.3.x+ versions.
* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions.
Expand All @@ -55,7 +55,7 @@ has no class since hudi only supports spark 2.4.4 version, and it acts as the pl
### To improve:
Spark3.3 support time travel syntax link [SPARK-37219](https://issues.apache.org/jira/browse/SPARK-37219).
Once Spark 3.3 released. The files in the following list will be removed:
* hudi-spark3.3.x's `HoodieSpark3_3ExtendedSqlAstBuilder.scala`, `HoodieSpark3_3ExtendedSqlParser.scala`, `TimeTravelRelation.scala`, `SqlBase.g4`, `HoodieSqlBase.g4`
* hudi-spark3.3's `HoodieSpark3_3ExtendedSqlAstBuilder.scala`, `HoodieSpark3_3ExtendedSqlParser.scala`, `TimeTravelRelation.scala`, `SqlBase.g4`, `HoodieSqlBase.g4`
Tracking Jira: [HUDI-4468](https://issues.apache.org/jira/browse/HUDI-4468)

Some other improvements undergoing:
Expand Down
Loading