Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9990f95
[HUDI-1064]Delete extra rows from the code
linshan-ma Jul 7, 2020
48e79da
[HUDI-1064]remove .get
linshan-ma Jul 7, 2020
8fd3b6c
[HUDI-1064]Trim hoodie table name
linshan-ma Jul 7, 2020
b2ef0fd
Merge remote-tracking branch 'origin/HUDI-1064' into HUDI-1064
linshan-ma Jul 7, 2020
c0302a1
[HUDI-1064]remove .get()
linshan-ma Jul 7, 2020
ed585fc
[HUDI-1064]To solve the java.util.NoSuchElementException: None.get ex…
linshan-ma Jul 7, 2020
8b3e6b5
[HUDI-1064]compiler suceess
linshan-ma Jul 7, 2020
7b0601d
Sync from upstream
linshan-ma Jul 14, 2020
d0bbff7
Merge remote-tracking branch 'upstream/master'
linshan-ma Jul 15, 2020
a10f9fd
Merge remote-tracking branch 'upstream/master'
linshan-ma Jul 22, 2020
9a82e96
Merge remote-tracking branch 'upstream/master'
linshan-ma Jul 28, 2020
a19ca75
Merge remote-tracking branch 'upstream/master'
linshan-ma Jul 29, 2020
d86ddab
Merge remote-tracking branch 'upstream/master'
linshan-ma Aug 6, 2020
d06d94b
Merge remote-tracking branch 'upstream/master'
linshan-ma Aug 7, 2020
d0b9ba2
Merge remote-tracking branch 'upstream/master'
linshan-ma Aug 10, 2020
2999cb8
Merge remote-tracking branch 'upstream/master'
linshan-ma Aug 18, 2020
e6fc3fa
Merge remote-tracking branch 'upstream/master'
linshan-ma Sep 1, 2020
9519226
Merge remote-tracking branch 'upstream/master'
linshan-ma Sep 10, 2020
4656129
Merge remote-tracking branch 'upstream/master'
linshan-ma Sep 27, 2020
7b28e17
Merge remote-tracking branch 'upstream/master'
linshan-ma Nov 11, 2020
48de20d
Merge remote-tracking branch 'upstream/master'
linshan-ma Nov 19, 2020
80b1a4c
Merge remote-tracking branch 'upstream/master'
linshan-ma Nov 19, 2020
c9917ff
[HUDI-1383]Incorrect partitions getting hive synced,Modify hive parti…
linshan-ma Nov 19, 2020
c1e36b2
Adding a test to ensure fix works
nsivabalan Jan 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<St
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Collections.sort(hivePartitionValues);
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
Expand All @@ -206,7 +205,6 @@ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<St
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,54 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");

// Now lets create partition "2010/01/02" and followed by "2010/02/01". HoodieHiveClient had a bug where partition vals were sorted
// and stored as keys in a map. The following tests this particular case.
String commitTime2 = "101";
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
//HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);

hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"The one partition we wrote should be added to hive");
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be 101");

// create partition "2010/02/01" and ensure sync works
String commitTime3 = "102";
HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);

hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);

tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 3,
"Hive Schema should match the table schema + partition fields");
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSch
createCommitFile(commitMetadata, instantTime);
}

public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}

public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
Expand Down Expand Up @@ -266,6 +274,18 @@ private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boo
return commitMetadata;
}

private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata;
}

private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
Expand Down