Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
public Boolean withOperationField = false;

@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
public Boolean isConditionalSync = false;

// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
Expand All @@ -143,6 +146,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
newConfig.withOperationField = cfg.withOperationField;
newConfig.isConditionalSync = cfg.isConditionalSync;
return newConfig;
}

Expand Down Expand Up @@ -174,6 +178,7 @@ public String toString() {
+ ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ ", withOperationField=" + withOperationField
+ ", isConditionalSync=" + isConditionalSync
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);

LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
Expand All @@ -192,8 +192,11 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for " + tableName);
}

Expand All @@ -204,7 +207,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) {
// Append spark table properties & serde properties
Map<String, String> tableProperties = ConfigUtils.toMap(cfg.tableProperties);
Expand All @@ -215,6 +218,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
}
boolean schemaChanged = false;
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
Expand All @@ -236,6 +240,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
// /ql/exec/DDLTask.java#L3488
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties);
schemaChanged = true;
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
Expand All @@ -248,10 +253,12 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
hoodieHiveClient.updateTableProperties(tableName, tableProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties);
}
schemaChanged = true;
} else {
LOG.info("No Schema difference for " + tableName);
}
}
return schemaChanged;
}

/**
Expand Down Expand Up @@ -324,7 +331,8 @@ private Map<String, String> getSparkSerdeProperties(boolean readAsOptimized) {
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
Expand All @@ -335,9 +343,11 @@ private void syncPartitions(String tableName, List<String> writtenPartitionsSinc
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}
return partitionsChanged;
}

private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,4 +1017,35 @@ public void testTypeConverter(String syncMode) throws Exception {
.containsValue("BIGINT"), errorMsg);
ddlExecutor.runSQL(dropTableSql);
}

@ParameterizedTest
@MethodSource("syncMode")
public void testSyncWithoutDiffs(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.isConditionalSync = true;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String tableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;

String commitTime0 = "100";
String commitTime1 = "101";
String commitTime2 = "102";
HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true);

HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);

HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

assertTrue(hiveClient.doesTableExist(tableName));
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());

HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2);

tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
}

}