Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<St
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Collections.sort(hivePartitionValues);
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
Expand All @@ -219,7 +218,6 @@ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<St
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;

import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -56,7 +56,7 @@ private static Stream<Boolean> useJdbc() {
}

private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
}

@BeforeEach
Expand Down Expand Up @@ -347,7 +347,7 @@ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMeta
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
"Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
Expand Down Expand Up @@ -377,7 +377,7 @@ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMeta
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field");
"Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMe
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
"Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
Expand Down Expand Up @@ -489,6 +489,50 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");

// HoodieHiveClient had a bug where partition vals were sorted
// and stored as keys in a map. The following tests this particular case.
// Now lets create partition "2010/01/02" and followed by "2010/02/01".
String commitTime2 = "101";
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);

hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be 101");

// create partition "2010/02/01" and ensure sync works
String commitTime3 = "102";
HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);

hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);

tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 3,
"Hive Schema should match the table schema + partition fields");
assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
Expand All @@ -507,17 +551,17 @@ public void testNonPartitionedSync(boolean useJdbc) throws Exception {

HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size(),
"Hive Schema should match the table schema,ignoring the partition fields");
hiveClient.getDataSchema().getColumns().size(),
"Hive Schema should match the table schema,ignoring the partition fields");
assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table should not have partitions because of the NonPartitionedExtractor");
"Table should not have partitions because of the NonPartitionedExtractor");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSch
createCommitFile(commitMetadata, instantTime);
}

public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}

public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
Expand Down Expand Up @@ -266,6 +274,18 @@ private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boo
return commitMetadata;
}

private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata;
}

private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
Expand Down