diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java new file mode 100644 index 0000000000000..28e2f7c95c22d --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.HoodieConfig; + +/** + * Configs/params used for internal purposes. + */ +public class HoodieInternalConfig extends HoodieConfig { + + private static final long serialVersionUID = 0L; + + public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = "hoodie.bulkinsert.are.partitioner.records.sorted"; + public static final Boolean DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = false; + + /** + * Returns if partition records are sorted or not. + * @param propertyValue value for property BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED. + * @return the property value. + */ + public static Boolean getBulkInsertIsPartitionRecordsSorted(String propertyValue) { + return propertyValue != null ? Boolean.parseBoolean(propertyValue) : DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED; + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4a2f2c2fb2e7a..1573c7fbfcd03 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1578,7 +1578,6 @@ protected void setDefaults() { HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); - } private void validate() { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java new file mode 100644 index 0000000000000..47b3efbb77cb1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * A factory to generate built-in partitioner to repartition input Rows into at least + * expected number of output spark partitions for bulk insert operation. + */ +public abstract class BulkInsertInternalPartitionerWithRowsFactory { + + public static BulkInsertPartitioner> get(BulkInsertSortMode sortMode) { + switch (sortMode) { + case NONE: + return new NonSortPartitionerWithRows(); + case GLOBAL_SORT: + return new GlobalSortPartitionerWithRows(); + case PARTITION_SORT: + return new PartitionSortPartitionerWithRows(); + default: + throw new UnsupportedOperationException("The bulk insert sort mode \"" + sortMode.name() + "\" is not supported."); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java new file mode 100644 index 0000000000000..24bcc0aff0df3 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; + +/** + * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. + */ +public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner> { + + @Override + public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + // Now, sort the records and line them up nicely for loading. + // Let's use "partitionPath + key" as the sort key. + return rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD)) + .coalesce(outputSparkPartitions); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java new file mode 100644 index 0000000000000..e1c34a8f84062 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * A built-in partitioner that only does coalesce for input Rows for bulk insert operation, + * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * + */ +public class NonSortPartitionerWithRows implements BulkInsertPartitioner> { + + @Override + public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + return rows.coalesce(outputSparkPartitions); + } + + @Override + public boolean arePartitionRecordsSorted() { + return false; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java new file mode 100644 index 0000000000000..b669c338f8668 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * A built-in partitioner that does local sorting for each spark partitions after coalesce for bulk insert operation, corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode. + */ +public class PartitionSortPartitionerWithRows implements BulkInsertPartitioner> { + + @Override + public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + return rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java new file mode 100644 index 0000000000000..276ad5b43ab3a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.SparkDatasetTestUtils; + +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests {@link BulkInsertPartitioner}s with Rows. + */ +public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness { + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts("TestBulkInsertInternalPartitionerForRows"); + initPath(); + initFileSystem(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + private static Stream configParams() { + Object[][] data = new Object[][] { + {BulkInsertSortMode.GLOBAL_SORT, true, true}, + {BulkInsertSortMode.PARTITION_SORT, false, true}, + {BulkInsertSortMode.NONE, false, false} + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("configParams") + public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, + boolean isGloballySorted, boolean isLocallySorted) + throws Exception { + Dataset records1 = generateTestRecords(); + Dataset records2 = generateTestRecords(); + testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode), + records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); + testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode), + records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + } + + private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, + Dataset rows, + boolean isGloballySorted, boolean isLocallySorted, + Map expectedPartitionNumRecords) { + int numPartitions = 2; + Dataset actualRecords = (Dataset) partitioner.repartitionRecords(rows, numPartitions); + List collectedActualRecords = actualRecords.collectAsList(); + if (isGloballySorted) { + // Verify global order + verifyRowsAscendingOrder(collectedActualRecords); + } else if (isLocallySorted) { + // Verify local order + actualRecords.mapPartitions((MapPartitionsFunction) input -> { + List partitionRows = new ArrayList<>(); + while (input.hasNext()) { + partitionRows.add(input.next()); + } + verifyRowsAscendingOrder(partitionRows); + return Collections.emptyList().iterator(); + }, SparkDatasetTestUtils.ENCODER); + } + + // Verify number of records per partition path + Map actualPartitionNumRecords = new HashMap<>(); + for (Row record : collectedActualRecords) { + String partitionPath = record.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + actualPartitionNumRecords.put(partitionPath, + actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + 1); + } + assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); + } + + public static Map generateExpectedPartitionNumRecords(Dataset rows) { + Dataset toReturn = rows.groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count(); + List result = toReturn.collectAsList(); + Map returnMap = new HashMap<>(); + for (Row row : result) { + returnMap.put(row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD), (Long) row.getAs("count")); + } + return returnMap; + } + + public Dataset generateTestRecords() { + Dataset rowsPart1 = SparkDatasetTestUtils.getRandomRows(sqlContext, 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, false); + Dataset rowsPart2 = SparkDatasetTestUtils.getRandomRows(sqlContext, 150, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, false); + return rowsPart1.union(rowsPart2); + } + + private void verifyRowsAscendingOrder(List records) { + List expectedRecords = new ArrayList<>(records); + Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + assertEquals(expectedRecords, records); + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 52f1be8d713ec..0cdeeaef0ad1b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -49,6 +49,8 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import java.io.IOException; import java.util.ArrayList; @@ -98,6 +100,25 @@ private static Option createUserDefinedBulkInsertPartitio } } + /** + * Create a UserDefinedBulkInsertPartitionerRows class via reflection, + *
+ * if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig. + * + * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass() + */ + public static Option>> createUserDefinedBulkInsertPartitionerWithRows(HoodieWriteConfig config) + throws HoodieException { + String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass(); + try { + return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) + ? Option.empty() : + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + } catch (Throwable e) { + throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e); + } + } + /** * Create a payload class via reflection, passing in an ordering/precombine value. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java index eb26c4f3209c4..ac17d943e9a15 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -48,15 +50,17 @@ public class BulkInsertDataInternalWriterHelper { private final HoodieTable hoodieTable; private final HoodieWriteConfig writeConfig; private final StructType structType; + private final Boolean arePartitionRecordsSorted; private final List writeStatusList = new ArrayList<>(); private HoodieRowCreateHandle handle; private String lastKnownPartitionPath = null; private String fileIdPrefix; private int numFilesWritten = 0; + private Map handles = new HashMap<>(); public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType) { + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.instantTime = instantTime; @@ -64,6 +68,7 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo this.taskId = taskId; this.taskEpochId = taskEpochId; this.structType = structType; + this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); } @@ -74,7 +79,7 @@ public void write(InternalRow record) throws IOException { if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) { LOG.info("Creating new file for partition path " + partitionPath); - createNewHandle(partitionPath); + handle = getRowCreateHandle(partitionPath); lastKnownPartitionPath = partitionPath; } handle.write(record); @@ -92,19 +97,30 @@ public List getWriteStatuses() throws IOException { public void abort() { } - private void createNewHandle(String partitionPath) throws IOException { - if (null != handle) { - close(); + private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException { + if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path + // if records are sorted, we can close all existing handles + if (arePartitionRecordsSorted) { + close(); + } + handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + instantTime, taskPartitionId, taskId, taskEpochId, structType)); + } else if (!handles.get(partitionPath).canWrite()) { + // even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and + // create a new one. + writeStatusList.add(handles.remove(partitionPath).close()); + handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + instantTime, taskPartitionId, taskId, taskEpochId, structType)); } - handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, taskId, taskEpochId, structType); + return handles.get(partitionPath); } public void close() throws IOException { - if (null != handle) { - writeStatusList.add(handle.close()); - handle = null; + for (HoodieRowCreateHandle rowCreateHandle: handles.values()) { + writeStatusList.add(rowCreateHandle.close()); } + handles.clear(); + handle = null; } private String getNextFileId() { diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java index d66a5ee51a0c6..bfc56718a433a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java @@ -30,7 +30,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,13 +63,40 @@ public void tearDown() throws Exception { } protected void assertWriteStatuses(List writeStatuses, int batches, int size, - Option> fileAbsPaths, Option> fileNames) { - assertEquals(batches, writeStatuses.size()); + Option> fileAbsPaths, Option> fileNames) { + assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, fileNames); + } + + protected void assertWriteStatuses(List writeStatuses, int batches, int size, boolean areRecordsSorted, + Option> fileAbsPaths, Option> fileNames) { + if (areRecordsSorted) { + assertEquals(batches, writeStatuses.size()); + } else { + assertEquals(Math.min(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS.length, batches), writeStatuses.size()); + } + + Map sizeMap = new HashMap<>(); + if (!areRecordsSorted) { + // no of records are written per batch. Every 4th batch goes into same writeStatus. So, populating the size expected + // per write status + for (int i = 0; i < batches; i++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3]; + if (!sizeMap.containsKey(partitionPath)) { + sizeMap.put(partitionPath, 0L); + } + sizeMap.put(partitionPath, sizeMap.get(partitionPath) + size); + } + } + int counter = 0; for (HoodieInternalWriteStatus writeStatus : writeStatuses) { // verify write status assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath()); - assertEquals(writeStatus.getTotalRecords(), size); + if (areRecordsSorted) { + assertEquals(writeStatus.getTotalRecords(), size); + } else { + assertEquals(writeStatus.getTotalRecords(), sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3])); + } assertNull(writeStatus.getGlobalError()); assertEquals(writeStatus.getFailedRowsSize(), 0); assertEquals(writeStatus.getTotalErrorRecords(), 0); @@ -82,8 +111,13 @@ protected void assertWriteStatuses(List writeStatuses .substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); } HoodieWriteStat writeStat = writeStatus.getStat(); - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); + if (areRecordsSorted) { + assertEquals(size, writeStat.getNumInserts()); + assertEquals(size, writeStat.getNumWrites()); + } else { + assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumInserts()); + assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumWrites()); + } assertEquals(fileId, writeStat.getFileId()); assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); assertEquals(0, writeStat.getNumDeletes()); diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index 2802eb97744ba..c5ea8fe2651d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -18,17 +18,12 @@ package org.apache.hudi; -import static org.apache.spark.sql.functions.callUDF; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -40,8 +35,16 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import scala.collection.JavaConverters; +import static org.apache.spark.sql.functions.callUDF; + /** * Helper class to assist in preparing {@link Dataset}s for bulk insert with datasource implementation. */ @@ -60,12 +63,13 @@ public class HoodieDatasetBulkInsertHelper { * 4. Sorts input dataset by hoodie partition path and record key * * @param sqlContext SQL Context - * @param config Hoodie Write Config - * @param rows Spark Input dataset + * @param config Hoodie Write Config + * @param rows Spark Input dataset * @return hoodie dataset which is ready for bulk insert. */ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlContext, - HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace) { + HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace, + BulkInsertPartitioner> bulkInsertPartitionerRows) { List originalFields = Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList()); @@ -101,8 +105,6 @@ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlConte Dataset colOrderedDataset = rowDatasetWithHoodieColumns.select( JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq()); - return colOrderedDataset - .sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD)) - .coalesce(config.getBulkInsertShuffleParallelism()); + return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism()); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aa835ff52b137..3da3ddd1bebfd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,7 +19,6 @@ package org.apache.hudi import java.util import java.util.Properties - import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -34,13 +33,15 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool +import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext @@ -50,7 +51,7 @@ import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -335,7 +336,17 @@ object HoodieSparkSqlWriter { } val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA.key, schema.toString) val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params)) - val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace) + val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) + val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (userDefinedBulkInsertPartitionerOpt.isPresent) { + userDefinedBulkInsertPartitionerOpt.get + } + else { + BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode) + } + val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted(); + parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString) + val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace, + bulkInsertPartitionerRows) if (SPARK_VERSION.startsWith("2.")) { hoodieDF.write.format("org.apache.hudi.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 97948b9ee3176..94f1a69deade3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -35,6 +35,8 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -162,6 +164,25 @@ public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws Ho assertThat(optionCaptor.getValue().get(), is(instanceOf(NoOpBulkInsertPartitioner.class))); } + @Test + public void testCreateUserDefinedBulkInsertPartitionerRowsWithInValidPartitioner() throws HoodieException { + config = HoodieWriteConfig.newBuilder().withPath("/").withUserDefinedBulkInsertPartitionerClass("NonExistantUserDefinedClass").build(); + + Exception exception = assertThrows(HoodieException.class, () -> { + DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config); + }); + + assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitionerRows")); + } + + @Test + public void testCreateUserDefinedBulkInsertPartitionerRowsWithValidPartitioner() throws HoodieException { + config = HoodieWriteConfig.newBuilder().withPath("/").withUserDefinedBulkInsertPartitionerClass(NoOpBulkInsertPartitionerRows.class.getName()).build(); + + Option>> partitioner = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config); + assertThat(partitioner.isPresent(), is(true)); + } + private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) { config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath()) .withUserDefinedBulkInsertPartitionerClass(partitionerClassName) @@ -184,4 +205,18 @@ public boolean arePartitionRecordsSorted() { return false; } } + + public static class NoOpBulkInsertPartitionerRows + implements BulkInsertPartitioner> { + + @Override + public Dataset repartitionRecords(Dataset records, int outputSparkPartitions) { + return records; + } + + @Override + public boolean arePartitionRecordsSorted() { + return false; + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java index 01242c802055f..d3f0f984a4ff1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows; import org.apache.hudi.testutils.DataSourceTestUtils; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -62,7 +63,8 @@ public void testBulkInsertHelper() throws IOException { HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).build(); List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); - Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace"); + Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace", + new NonSortPartitionerWithRows()); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -117,7 +119,8 @@ public void testNoPropsSet() { List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); try { - HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace"); + HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows()); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -127,7 +130,8 @@ public void testNoPropsSet() { rows = DataSourceTestUtils.generateRandomRows(10); dataset = sqlContext.createDataFrame(rows, structType); try { - HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace"); + HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows()); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -137,7 +141,8 @@ public void testNoPropsSet() { rows = DataSourceTestUtils.generateRandomRows(10); dataset = sqlContext.createDataFrame(rows, structType); try { - HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace"); + HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows()); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -147,7 +152,8 @@ public void testNoPropsSet() { rows = DataSourceTestUtils.generateRandomRows(10); dataset = sqlContext.createDataFrame(rows, structType); try { - HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace"); + HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows()); fail("Should have thrown exception"); } catch (Exception e) { // ignore diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index a06eeb188d75a..268660375c7e4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.testutils.DataSourceTestUtils @@ -119,9 +120,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .foreach(tableType => { - test("test bulk insert dataset with datasource impl for " + tableType) { + List(BulkInsertSortMode.GLOBAL_SORT.name(), BulkInsertSortMode.NONE.name(), BulkInsertSortMode.PARTITION_SORT.name()) + .foreach(sortMode => { + test("test_bulk_insert_for_" + sortMode) { initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { @@ -131,7 +132,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType, + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, "hoodie.bulkinsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key -> "true", @@ -143,7 +144,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - val records = DataSourceTestUtils.generateRandomRows(100) + val records = DataSourceTestUtils.generateRandomRows(1000) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi diff --git a/hudi-spark-datasource/hudi-spark2/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml index 5a04f44321907..38fba35dfa4d4 100644 --- a/hudi-spark-datasource/hudi-spark2/pom.xml +++ b/hudi-spark-datasource/hudi-spark2/pom.xml @@ -253,6 +253,11 @@ junit-jupiter-api test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 05739beea6f9b..6ced37149473e 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.internal; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.config.HoodieInternalConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.spark.sql.SaveMode; @@ -62,7 +63,10 @@ public Optional createWriter(String writeUUID, StructType sche String tblName = options.get(HoodieWriteConfig.TABLE_NAME.key()).get(); // 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways. HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA.key()).get(), path, tblName, options.asMap()); + boolean arePartitionRecordsSorted = HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted( + options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).isPresent() + ? options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).get() : null); return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(), - getConfiguration())); + getConfiguration(), arePartitionRecordsSorted)); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java index 3ce8d776a8e68..2555594997c53 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java @@ -37,9 +37,9 @@ public class HoodieBulkInsertDataInternalWriter implements DataWriter createDataWriter(int partitionId, long taskId, long epochId) { return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId, epochId, - structType); + structType, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java index 4b3dafc6264f7..6aa5329f86e8d 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java @@ -45,12 +45,14 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter { private final HoodieWriteConfig writeConfig; private final StructType structType; private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; + private final Boolean arePartitionRecordsSorted; public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession sparkSession, Configuration configuration) { + SparkSession sparkSession, Configuration configuration, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; + this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, sparkSession, configuration); } @@ -60,7 +62,7 @@ public DataWriterFactory createWriterFactory() { dataSourceInternalWriterHelper.createInflightCommit(); if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), - writeConfig, instantTime, structType); + writeConfig, instantTime, structType, arePartitionRecordsSorted); } else { throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); } diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index 0b021abeb3b5f..f77763f85d941 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -28,9 +28,13 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; @@ -46,16 +50,26 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieBulkInsertInternalWriterTestBase { - @Test - public void testDataInternalWriter() throws Exception { + private static Stream configParams() { + Object[][] data = new Object[][] { + {true}, + {false} + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("configParams") + public void testDataInternalWriter(boolean sorted) throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); // execute N rounds - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 3; i++) { String instantTime = "00" + i; // init writer - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE); + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE, + sorted); int size = 10 + RANDOM.nextInt(1000); // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file @@ -78,7 +92,7 @@ public void testDataInternalWriter() throws Exception { Option> fileNames = Option.of(new ArrayList<>()); // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames); + assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames); // verify rows Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); @@ -99,7 +113,8 @@ public void testGlobalFailure() throws Exception { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; String instantTime = "001"; - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE); + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE, + false); int size = 10 + RANDOM.nextInt(100); int totalFailures = 5; @@ -131,7 +146,7 @@ public void testGlobalFailure() throws Exception { Option> fileAbsPaths = Option.of(new ArrayList<>()); Option> fileNames = Option.of(new ArrayList<>()); // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); + assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, false, fileAbsPaths, fileNames); // verify rows Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java index 184ff771cef4a..ca8058ff0af1c 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java @@ -52,7 +52,7 @@ public void testDataSourceWriter() throws Exception { String instantTime = "001"; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; @@ -98,7 +98,7 @@ public void testMultipleDataSourceWrites() throws Exception { String instantTime = "00" + i; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; @@ -142,7 +142,7 @@ public void testLargeWrites() throws Exception { String instantTime = "00" + i; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; @@ -189,7 +189,7 @@ public void testAbort() throws Exception { String instantTime0 = "00" + 0; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); @@ -227,7 +227,7 @@ public void testAbort() throws Exception { // 2nd batch. abort in the end String instantTime1 = "00" + 1; dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong()); for (int j = 0; j < batches; j++) { diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml index 306a377eafffb..a1534b413537f 100644 --- a/hudi-spark-datasource/hudi-spark3/pom.xml +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -226,6 +226,11 @@ junit-jupiter-api test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index 029eebbd07f15..d6188f43a493e 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.spark3.internal; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.config.HoodieInternalConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.internal.BaseDefaultSource; import org.apache.hudi.internal.DataSourceInternalWriterHelper; @@ -47,9 +48,11 @@ public Table getTable(StructType schema, Transform[] partitioning, Map createWriter(int partitionId, long taskId) { return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId, - structType); + structType, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java index b0945156d703d..c89758a377a92 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java @@ -45,13 +45,15 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite { private final String instantTime; private final HoodieWriteConfig writeConfig; private final StructType structType; + private final boolean arePartitionRecordsSorted; private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession jss, Configuration hadoopConfiguration) { + SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; + this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, jss, hadoopConfiguration); } @@ -61,7 +63,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dataSourceInternalWriterHelper.createInflightCommit(); if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), - writeConfig, instantTime, structType); + writeConfig, instantTime, structType, arePartitionRecordsSorted); } else { throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java index 10e2e64f11387..243b04d29409b 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java @@ -37,19 +37,21 @@ public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder { private final StructType structType; private final SparkSession jss; private final Configuration hadoopConfiguration; + private final boolean arePartitionRecordsSorted; public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession jss, Configuration hadoopConfiguration) { + SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; this.jss = jss; this.hadoopConfiguration = hadoopConfiguration; + this.arePartitionRecordsSorted = arePartitionRecordsSorted; } @Override public BatchWrite buildForBatch() { return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss, - hadoopConfiguration); + hadoopConfiguration, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java index ce746e75d9720..436b9c3b221e8 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java @@ -41,14 +41,16 @@ class HoodieDataSourceInternalTable implements SupportsWrite { private final StructType structType; private final SparkSession jss; private final Configuration hadoopConfiguration; + private final boolean arePartitionRecordsSorted; public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config, - StructType schema, SparkSession jss, Configuration hadoopConfiguration) { + StructType schema, SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = config; this.structType = schema; this.jss = jss; this.hadoopConfiguration = hadoopConfiguration; + this.arePartitionRecordsSorted = arePartitionRecordsSorted; } @Override @@ -73,6 +75,6 @@ public Set capabilities() { @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) { return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss, - hadoopConfiguration); + hadoopConfiguration, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java index ffb649bd3970a..26685fd8fc567 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -29,9 +29,13 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; @@ -47,8 +51,17 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieBulkInsertInternalWriterTestBase { - @Test - public void testDataInternalWriter() throws Exception { + private static Stream configParams() { + Object[][] data = new Object[][] { + {true}, + {false} + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("configParams") + public void testDataInternalWriter(boolean sorted) throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -56,7 +69,8 @@ public void testDataInternalWriter() throws Exception { for (int i = 0; i < 5; i++) { String instantTime = "00" + i; // init writer - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE); + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE, + sorted); int size = 10 + RANDOM.nextInt(1000); // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file @@ -79,7 +93,7 @@ public void testDataInternalWriter() throws Exception { Option> fileNames = Option.of(new ArrayList<>()); // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames); + assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames); // verify rows Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); @@ -100,7 +114,7 @@ public void testGlobalFailure() throws Exception { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; String instantTime = "001"; - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE); + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE, false); int size = 10 + RANDOM.nextInt(100); int totalFailures = 5; diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java index 69829ec281a49..bd02663b5ee45 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java @@ -56,7 +56,7 @@ public void testDataSourceWriter() throws Exception { String instantTime = "001"; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; @@ -103,7 +103,7 @@ public void testMultipleDataSourceWrites() throws Exception { String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; @@ -148,7 +148,7 @@ public void testLargeWrites() throws Exception { String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; @@ -195,7 +195,7 @@ public void testAbort() throws Exception { String instantTime0 = "00" + 0; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); @@ -234,7 +234,7 @@ public void testAbort() throws Exception { // 2nd batch. abort in the end String instantTime1 = "00" + 1; dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong()); for (int j = 0; j < batches; j++) {