Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ public MessageType getTableParquetSchema() throws Exception {
return convertAvroSchemaToParquet(getTableAvroSchema(true));
}

/**
* Gets users data schema for a hoodie table in Parquet format.
*
* @return Parquet schema for the table
*/
public MessageType getTableParquetSchema(boolean includeMetadataField) throws Exception {
return convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
}

/**
* Gets users data schema for a hoodie table in Avro format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public static final ConfigProperty<String> HIVE_SYNC_AS_DATA_SOURCE_TABLE = HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE;
public static final ConfigProperty<Integer> HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD;
public static final ConfigProperty<Boolean> HIVE_CREATE_MANAGED_TABLE = HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
public static final ConfigProperty<Boolean> HIVE_SYNC_OMIT_METADATA_FIELDS = HiveSyncConfigHolder.HIVE_SYNC_OMIT_METADATA_FIELDS;
public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM = HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
public static final ConfigProperty<String> HIVE_SYNC_MODE = HiveSyncConfigHolder.HIVE_SYNC_MODE;
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
Expand Down Expand Up @@ -130,6 +131,8 @@ public static class HiveSyncConfigParams {
public Boolean supportTimestamp;
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
public Boolean createManagedTable;
@Parameter(names = {"--omit-metafields"}, description = "Omit metafields in schema")
public Boolean omitMetaFields;
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum;
@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
Expand Down Expand Up @@ -167,6 +170,7 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(HIVE_SYNC_AS_DATA_SOURCE_TABLE.key(), syncAsSparkDataSourceTable);
props.setPropertyIfNonNull(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), sparkSchemaLengthThreshold);
props.setPropertyIfNonNull(HIVE_CREATE_MANAGED_TABLE.key(), createManagedTable);
props.setPropertyIfNonNull(HIVE_SYNC_OMIT_METADATA_FIELDS.key(), omitMetaFields);
props.setPropertyIfNonNull(HIVE_BATCH_SYNC_PARTITION_NUM.key(), batchSyncNum);
props.setPropertyIfNonNull(HIVE_SYNC_BUCKET_SYNC.key(), bucketSync);
props.setPropertyIfNonNull(HIVE_SYNC_BUCKET_SYNC_SPEC.key(), bucketSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public class HiveSyncConfigHolder {
.key("hoodie.datasource.hive_sync.create_managed_table")
.defaultValue(false)
.withDocumentation("Whether to sync the table as managed table.");
public static final ConfigProperty<Boolean> HIVE_SYNC_OMIT_METADATA_FIELDS = ConfigProperty
.key("hoodie.datasource.hive_sync.omit_metadata_fields")
.defaultValue(false)
.sinceVersion("0.13.0")
.withDocumentation("Whether to omit the hoodie metadata fields in the target table.");
public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM = ConfigProperty
.key("hoodie.datasource.hive_sync.batch_num")
.defaultValue(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_OMIT_METADATA_FIELDS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE;
Expand Down Expand Up @@ -201,7 +202,8 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean tableExists = syncClient.tableExists(tableName);

// Get the parquet schema for this table looking at the latest commit
MessageType schema = syncClient.getStorageSchema();
MessageType schema = syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));


// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ default MessageType getStorageSchema() {
return null;
}

/**
* Get the schema from the Hudi table on storage.
*
* @param includeMetadataField true if to include metadata fields in the schema
*/
default MessageType getStorageSchema(boolean includeMetadataField) {
return null;
}

/**
* Update schema for the table in the metastore.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public MessageType getStorageSchema() {
}
}

@Override
public MessageType getStorageSchema(boolean includeMetadataField) {
try {
return new TableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
}

public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in "
Expand Down