Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ad08be
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jun 30, 2022
b3a5f7f
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jul 1, 2022
73a89be
Merge branch 'master' of github.com:cuibo01/hudi into hudiCatalog1
cuibo01 Jul 4, 2022
49b6c1c
Merge pull request #5 from cuibo01/hudiCatalog1
cuibo01 Jul 4, 2022
fbd973e
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jul 7, 2022
a998586
[minor] following 4152, refactor the clazz about plan selection strat…
danny0405 Jul 8, 2022
c5051b5
Merge branch 'apache:master' into hudiCatalog
cuibo01 Jul 8, 2022
f20acb8
[HUDI-4367] Support copyToTable on call (#6054)
scxwhite Jul 8, 2022
8032fca
Merge branch 'apache:master' into hudiCatalog
cuibo01 Jul 8, 2022
0c113ba
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jul 8, 2022
fc8d962
[HUDI-4335] Bug fixes in AWSGlueCatalogSyncClient post schema evoluti…
kumudkumartirupati Jul 8, 2022
b686c07
[HUDI-4276] Reconcile schema-inject null values for missing fields an…
xiarixiaoyao Jul 8, 2022
6566fc6
[HUDI-3500] Add call procedure for RepairsCommand (#6053)
hechao-ustc Jul 9, 2022
126b88b
[HUDI-2150] Rename/Restructure configs for better modularity (#6061)
liujinhui1994 Jul 9, 2022
10aec07
[MINOR] Bump xalan from 2.7.1 to 2.7.2 (#6062)
dependabot[bot] Jul 9, 2022
02ff755
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jul 10, 2022
046044c
[HUDI-4324] Remove use_jdbc config from hudi sync (#6072)
xushiyan Jul 10, 2022
63f95ab
[HUDI-3730][RFC-55] Improve hudi-sync classes design and simplify con…
fengjian428 Jul 10, 2022
9619231
[HUDI-4098] Support HMS for flink HoodieCatalog
cuibo01 Jul 10, 2022
51244eb
[HUDI-4323] Make database table names optional in sync tool (#6073)
xushiyan Jul 11, 2022
a270eee
[MINOR] Update RFCs status (#6078)
codope Jul 11, 2022
f09f982
Merge branch 'apache:master' into hudiCatalog
cuibo01 Jul 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.mode jdbc
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
3 changes: 2 additions & 1 deletion docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_fields=partition
2 changes: 2 additions & 0 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down Expand Up @@ -79,6 +80,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import java.util.stream.Collectors;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
Expand Down Expand Up @@ -193,11 +193,11 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
*/
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
if (nonEmpty(tableProperties)) {
if (isNullOrEmpty(tableProperties)) {
return;
}
try {
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true);
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, false);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
}
Expand All @@ -210,10 +210,7 @@ public void updateTableSchema(String tableName, MessageType newSchema) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
StorageDescriptor sd = table.getStorageDescriptor();
sd.setColumns(newColumns);

Expand Down Expand Up @@ -258,15 +255,7 @@ public void createTable(String tableName,
try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}
List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);

// now create the schema partition
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
Expand Down Expand Up @@ -419,6 +408,19 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}

private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
// In Glue, the full schema should exclude the partition keys
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
cols.add(column);
}
}
return cols;
}

private enum TableType {
MANAGED_TABLE,
EXTERNAL_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
Expand Down Expand Up @@ -538,7 +538,7 @@ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Stri
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void init() throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -212,7 +213,8 @@ public void testShowArchivedCommits(boolean enableMetadataTable) throws Exceptio
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down Expand Up @@ -266,7 +268,8 @@ public void testShowArchivedCommitsWithMultiCommitsFile(boolean enableMetadataTa
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -166,7 +167,8 @@ private void generateArchive() throws IOException {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -276,15 +277,21 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
Expand Down
Loading