Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import java.util.Arrays;

/**
* A partitioner that does sort based on specified column values for each RDD partition.
* A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on partition path column and custom columns.
*
* @param <T> HoodieRecordPayload type
* @see GlobalSortPartitioner
* @see BulkInsertSortMode#GLOBAL_SORT
*/
public class RDDCustomColumnsSortPartitioner<T>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.Arrays;

/**
* A partitioner that does sorting based on specified column values for each spark partitions.
* A partitioner that globally sorts a {@link Dataset<Row>} based on partition path column and custom columns.
*
* @see GlobalSortPartitionerWithRows
* @see BulkInsertSortMode#GLOBAL_SORT
*/
public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
*/
public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness {

private static final Comparator<Row> KEY_COMPARATOR =
private static final Comparator<Row> DEFAULT_KEY_COMPARATOR =
Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));

@BeforeEach
Expand Down Expand Up @@ -103,8 +103,7 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
boolean isGloballySorted,
boolean isLocallySorted,
boolean populateMetaFields) {
Dataset<Row> records1 = generateTestRecords();
Dataset<Row> records2 = generateTestRecords();
Dataset<Row> records = generateTestRecords();
Copy link
Member Author

@xushiyan xushiyan May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why the existing test case runs the same logic twice with records1 and records2. @boneanxs any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testCustomColumnSortPartitionerWithRows was copied from testBulkInsertInternalPartitioner. And I looked org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner#testBulkInsertInternalPartitioner:177, it actually generates two records sets with different union times:

JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
    JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);

So I think this should be a mistake, and I think union it twice should be enough(Here different union times for different partitions?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no change for any codes in the write path, so why the tests run successfully for Spark 3.1 or 2.4 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test only passes spark 2.4 which is an coincident. The existing test logic asserts 2 rdd partitions after re-partition by the partitioner. with spark 2.4's sort and coalesce, it gives 2 and passes the test as a local partitioner. The correct expectation is the partitioner is doing global sort and the resulting num partition should be 2 or less, which is what spark 3 gives us.


HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
Expand All @@ -116,36 +115,24 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,

testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions),
records1,
records,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records1),
Option.empty(),
populateMetaFields);
testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions),
records2,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records2),
generateExpectedPartitionNumRecords(records),
Option.empty(),
populateMetaFields);
}

@Test
public void testCustomColumnSortPartitionerWithRows() {
Dataset<Row> records1 = generateTestRecords();
Dataset<Row> records2 = generateTestRecords();
String sortColumnString = records1.columns()[5];
Dataset<Row> records = generateTestRecords();
String sortColumnString = records.columns()[5];
String[] sortColumns = sortColumnString.split(",");
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);

testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
Comment on lines -148 to +135
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing test treated the partitioner as non-global and hence failed the test scenario under spark 3.2


HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
Expand All @@ -154,9 +141,7 @@ public void testCustomColumnSortPartitionerWithRows() {
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
}

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
Expand Down Expand Up @@ -227,13 +212,13 @@ public Dataset<Row> generateTestRecords() {

private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) {
List<Row> expectedRecords = new ArrayList<>(records);
Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
Collections.sort(expectedRecords, comparator.orElse(DEFAULT_KEY_COMPARATOR));
assertEquals(expectedRecords, records);
}

private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
Comparator<Row> comparator = Comparator.comparing(row -> {
StringBuilder sb = new StringBuilder();
StringBuilder sb = new StringBuilder(row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
for (String col : sortColumns) {
sb.append(row.getAs(col).toString());
}
Expand Down