From 4cd14cd2b9737823915e4b29b42cb8f9614503f2 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Wed, 17 May 2023 22:34:14 +0800 Subject: [PATCH] [HUDI-5394] Fix tests for RowCustomColumnsSortPartitioner --- .../RDDCustomColumnsSortPartitioner.java | 5 +-- .../RowCustomColumnsSortPartitioner.java | 5 ++- ...tBulkInsertInternalPartitionerForRows.java | 35 ++++++------------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 4b988200de4af..3d8c21a75d2c4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -30,9 +30,10 @@ import java.util.Arrays; /** - * A partitioner that does sort based on specified column values for each RDD partition. + * A partitioner that globally sorts a {@link JavaRDD} based on partition path column and custom columns. * - * @param HoodieRecordPayload type + * @see GlobalSortPartitioner + * @see BulkInsertSortMode#GLOBAL_SORT */ public class RDDCustomColumnsSortPartitioner implements BulkInsertPartitioner>> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java index d1d9e0bbb4c2c..a61e6fd4101af 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java @@ -28,7 +28,10 @@ import java.util.Arrays; /** - * A partitioner that does sorting based on specified column values for each spark partitions. + * A partitioner that globally sorts a {@link Dataset} based on partition path column and custom columns. + * + * @see GlobalSortPartitionerWithRows + * @see BulkInsertSortMode#GLOBAL_SORT */ public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index f4e99f4670397..03fc7dbca8c96 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -55,7 +55,7 @@ */ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness { - private static final Comparator KEY_COMPARATOR = + private static final Comparator DEFAULT_KEY_COMPARATOR = Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))); @BeforeEach @@ -103,8 +103,7 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isGloballySorted, boolean isLocallySorted, boolean populateMetaFields) { - Dataset records1 = generateTestRecords(); - Dataset records2 = generateTestRecords(); + Dataset records = generateTestRecords(); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -116,36 +115,24 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, testBulkInsertInternalPartitioner( BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), - records1, + records, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, - generateExpectedPartitionNumRecords(records1), - Option.empty(), - populateMetaFields); - testBulkInsertInternalPartitioner( - BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), - records2, - enforceNumOutputPartitions, - isGloballySorted, - isLocallySorted, - generateExpectedPartitionNumRecords(records2), + generateExpectedPartitionNumRecords(records), Option.empty(), populateMetaFields); } @Test public void testCustomColumnSortPartitionerWithRows() { - Dataset records1 = generateTestRecords(); - Dataset records2 = generateTestRecords(); - String sortColumnString = records1.columns()[5]; + Dataset records = generateTestRecords(); + String sortColumnString = records.columns()[5]; String[] sortColumns = sortColumnString.split(","); Comparator comparator = getCustomColumnComparator(sortColumns); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true); - testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true); + records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -154,9 +141,7 @@ public void testCustomColumnSortPartitionerWithRows() { .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true); - testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true); + records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, @@ -227,13 +212,13 @@ public Dataset generateTestRecords() { private void verifyRowsAscendingOrder(List records, Option> comparator) { List expectedRecords = new ArrayList<>(records); - Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR)); + Collections.sort(expectedRecords, comparator.orElse(DEFAULT_KEY_COMPARATOR)); assertEquals(expectedRecords, records); } private Comparator getCustomColumnComparator(String[] sortColumns) { Comparator comparator = Comparator.comparing(row -> { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); for (String col : sortColumns) { sb.append(row.getAs(col).toString()); }