diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3.5.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java deleted file mode 100644 index ea7e6e65e7cbc..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.testutils.HoodieSparkClientTestHarness; -import org.apache.hudi.testutils.SparkDatasetTestUtils; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Base class for TestHoodieBulkInsertDataInternalWriter. - */ -public class HoodieBulkInsertInternalWriterTestBase extends HoodieSparkClientTestHarness { - - protected static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - initSparkContexts(); - initPath(); - initHoodieStorage(); - initTestDataGenerator(); - initMetaClient(); - initTimelineService(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } - - protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) { - return getWriteConfig(populateMetaFields, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue()); - } - - protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, String hiveStylePartitioningValue) { - Properties properties = new Properties(); - if (!populateMetaFields) { - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), SparkDatasetTestUtils.RECORD_KEY_FIELD_NAME); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME); - properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); - } - properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), hiveStylePartitioningValue); - return SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build(); - } - - protected void assertWriteStatuses(List writeStatuses, int batches, int size, - Option> fileAbsPaths, Option> fileNames) { - assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, fileNames, false); - } - - protected void assertWriteStatuses(List writeStatuses, int batches, int size, boolean areRecordsSorted, - Option> fileAbsPaths, Option> fileNames, boolean isHiveStylePartitioning) { - if (areRecordsSorted) { - assertEquals(batches, writeStatuses.size()); - } else { - assertEquals(Math.min(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS.length, batches), writeStatuses.size()); - } - - Map sizeMap = new HashMap<>(); - if (!areRecordsSorted) { - // no of records are written per batch. Every 4th batch goes into same writeStatus. So, populating the size expected - // per write status - for (int i = 0; i < batches; i++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3]; - if (!sizeMap.containsKey(partitionPath)) { - sizeMap.put(partitionPath, 0L); - } - sizeMap.put(partitionPath, sizeMap.get(partitionPath) + size); - } - } - - int counter = 0; - for (WriteStatus writeStatus : writeStatuses) { - // verify write status - String actualPartitionPathFormat = isHiveStylePartitioning ? SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=%s" : "%s"; - assertEquals(String.format(actualPartitionPathFormat, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStatus.getPartitionPath()); - if (areRecordsSorted) { - assertEquals(writeStatus.getTotalRecords(), size); - } else { - assertEquals(writeStatus.getTotalRecords(), sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3])); - } - assertNull(writeStatus.getGlobalError()); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertFalse(writeStatus.hasErrors()); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - if (fileAbsPaths.isPresent()) { - fileAbsPaths.get().add(basePath + "/" + writeStatus.getStat().getPath()); - } - if (fileNames.isPresent()) { - fileNames.get().add(writeStatus.getStat().getPath() - .substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); - } - HoodieWriteStat writeStat = writeStatus.getStat(); - if (areRecordsSorted) { - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - } else { - assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumInserts()); - assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumWrites()); - } - assertEquals(fileId, writeStat.getFileId()); - assertEquals(String.format(actualPartitionPathFormat, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3]), writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - protected void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, Option> fileNames, - boolean populateMetaColumns) { - if (populateMetaColumns) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.ordinal()).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal())); - if (fileNames.isPresent()) { - assertTrue(fileNames.get().contains(entry.get(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal()))); - } - assertFalse(entry.isNullAt(HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.ordinal())); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } else { // operation = BULK_INSERT_APPEND_ONLY - // all meta columns are untouched - assertEquals(0, expectedRows.except(actualRows).count()); - } - } -} diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java deleted file mode 100644 index 25ac77e6c4ae2..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.spark.internal; - -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * Unit tests {@link HoodieBulkInsertDataInternalWriter}. - */ -public class TestHoodieBulkInsertDataInternalWriter extends - HoodieBulkInsertInternalWriterTestBase { - - private static Stream configParams() { - Object[][] data = new Object[][] { - {true, true}, - {true, false}, - {false, true}, - {false, false} - }; - return Stream.of(data).map(Arguments::of); - } - - private static Stream bulkInsertTypeParams() { - Object[][] data = new Object[][] { - {true}, - {false} - }; - return Stream.of(data).map(Arguments::of); - } - - @ParameterizedTest - @MethodSource("configParams") - public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - // execute N rounds - for (int i = 0; i < 2; i++) { - String instantTime = "00" + i; - // init writer - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), - RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted); - - int size = 10 + RANDOM.nextInt(1000); - // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file - int batches = 3; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - Option> fileAbsPaths = Option.of(new ArrayList<>()); - Option> fileNames = Option.of(new ArrayList<>()); - - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames, false); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); - assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields); - } - } - - - /** - * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected - * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk. - */ - @Test - public void testGlobalFailure() throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(true); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; - - String instantTime = "001"; - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), - RANDOM.nextLong(), STRUCT_TYPE, true, false); - - int size = 10 + RANDOM.nextInt(100); - int totalFailures = 5; - // Generate first batch of valid rows - Dataset inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false); - List internalRows = toInternalRows(inputRows, ENCODER); - - // generate some failures rows - for (int i = 0; i < totalFailures; i++) { - internalRows.add(getInternalRowWithError(partitionPath)); - } - - // generate 2nd batch of valid rows - Dataset inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false); - internalRows.addAll(toInternalRows(inputRows2, ENCODER)); - - // issue writes - try { - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - fail("Should have failed"); - } catch (Throwable e) { - // expected - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - - Option> fileAbsPaths = Option.of(new ArrayList<>()); - Option> fileNames = Option.of(new ArrayList<>()); - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); - assertOutput(inputRows, result, instantTime, fileNames, true); - } - - private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) - throws Exception { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } -} diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java deleted file mode 100644 index 2364954adae1e..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.spark.internal; - -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestUtils; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.write.DataWriter; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Stream; - -import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Unit tests {@link HoodieDataSourceInternalBatchWrite}. - */ -public class TestHoodieDataSourceInternalBatchWrite extends - HoodieBulkInsertInternalWriterTestBase { - - private static Stream bulkInsertTypeParams() { - Object[][] data = new Object[][] { - {true}, - {false} - }; - return Stream.of(data).map(Arguments::of); - } - - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testDataSourceWriter(boolean populateMetaFields) throws Exception { - testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields); - } - - private void testDataSourceWriterInternal(Map extraMetadata, Map expectedExtraMetadata, boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String instantTime = "001"; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields, false); - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); - - String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(1000); - int batches = 5; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - - metaClient.reloadActiveTimeline(); - Dataset result = - HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getStorage(), - partitionPathsAbs.toArray(new String[0])); - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), - Option.empty()); - - // verify extra metadata - Option commitMetadataOption = - HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient); - assertTrue(commitMetadataOption.isPresent()); - Map actualExtraMetadata = new HashMap<>(); - commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry -> - !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)) - .forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue())); - assertEquals(actualExtraMetadata, expectedExtraMetadata); - } - - @Test - public void testDataSourceWriterExtraCommitMetadata() throws Exception { - String commitExtraMetaPrefix = "commit_extra_meta_"; - Map extraMeta = new HashMap<>(); - extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix); - extraMeta.put(commitExtraMetaPrefix + "a", "valA"); - extraMeta.put(commitExtraMetaPrefix + "b", "valB"); - extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata - - Map expectedMetadata = new HashMap<>(); - expectedMetadata.putAll(extraMeta); - expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key()); - expectedMetadata.remove("commit_extra_c"); - - testDataSourceWriterInternal(extraMeta, expectedMetadata, true); - } - - @Test - public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception { - String commitExtraMetaPrefix = "commit_extra_meta_"; - Map extraMeta = new HashMap<>(); - extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix); - extraMeta.put("keyA", "valA"); - extraMeta.put("keyB", "valB"); - extraMeta.put("commit_extra_c", "valC"); - // none of the keys has commit metadata key prefix. - testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true); - } - - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 2; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); - - int size = 10 + RANDOM.nextInt(1000); - int batches = 3; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime, - populateMetaFields, INSTANT_GENERATOR); - - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - } - } - - // Large writes are not required to be executed w/ regular CI jobs. Takes lot of running time. - @Disabled - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testLargeWrites(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 3; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); - - int size = 10000 + RANDOM.nextInt(10000); - int batches = 3; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime, - populateMetaFields, INSTANT_GENERATOR); - - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - } - } - - /** - * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1. - * commit batch1 - * abort batch2 - * verify only records from batch1 is available to read - */ - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testAbort(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String instantTime0 = "00" + 0; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); - - List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(100); - int batches = 1; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - Dataset result = - HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getStorage(), - partitionPathsAbs.toArray(new String[0])); - // verify rows - assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), - Option.empty()); - - // 2nd batch. abort in the end - String instantTime1 = "00" + 1; - dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, - sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, - false); - writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null) - .createWriter(1, RANDOM.nextLong()); - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - } - - commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalBatchWrite.abort(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getStorage(), - partitionPathsAbs.toArray(new String[0])); - // verify rows - // only rows from first batch should be present - assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields); - } - - private void writeRows(Dataset inputRows, DataWriter writer) throws Exception { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } -} diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java deleted file mode 100644 index ea7e6e65e7cbc..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.testutils.HoodieSparkClientTestHarness; -import org.apache.hudi.testutils.SparkDatasetTestUtils; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Base class for TestHoodieBulkInsertDataInternalWriter. - */ -public class HoodieBulkInsertInternalWriterTestBase extends HoodieSparkClientTestHarness { - - protected static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - initSparkContexts(); - initPath(); - initHoodieStorage(); - initTestDataGenerator(); - initMetaClient(); - initTimelineService(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } - - protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) { - return getWriteConfig(populateMetaFields, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue()); - } - - protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, String hiveStylePartitioningValue) { - Properties properties = new Properties(); - if (!populateMetaFields) { - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), SparkDatasetTestUtils.RECORD_KEY_FIELD_NAME); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME); - properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); - } - properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), hiveStylePartitioningValue); - return SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build(); - } - - protected void assertWriteStatuses(List writeStatuses, int batches, int size, - Option> fileAbsPaths, Option> fileNames) { - assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, fileNames, false); - } - - protected void assertWriteStatuses(List writeStatuses, int batches, int size, boolean areRecordsSorted, - Option> fileAbsPaths, Option> fileNames, boolean isHiveStylePartitioning) { - if (areRecordsSorted) { - assertEquals(batches, writeStatuses.size()); - } else { - assertEquals(Math.min(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS.length, batches), writeStatuses.size()); - } - - Map sizeMap = new HashMap<>(); - if (!areRecordsSorted) { - // no of records are written per batch. Every 4th batch goes into same writeStatus. So, populating the size expected - // per write status - for (int i = 0; i < batches; i++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3]; - if (!sizeMap.containsKey(partitionPath)) { - sizeMap.put(partitionPath, 0L); - } - sizeMap.put(partitionPath, sizeMap.get(partitionPath) + size); - } - } - - int counter = 0; - for (WriteStatus writeStatus : writeStatuses) { - // verify write status - String actualPartitionPathFormat = isHiveStylePartitioning ? SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=%s" : "%s"; - assertEquals(String.format(actualPartitionPathFormat, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStatus.getPartitionPath()); - if (areRecordsSorted) { - assertEquals(writeStatus.getTotalRecords(), size); - } else { - assertEquals(writeStatus.getTotalRecords(), sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3])); - } - assertNull(writeStatus.getGlobalError()); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertFalse(writeStatus.hasErrors()); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - if (fileAbsPaths.isPresent()) { - fileAbsPaths.get().add(basePath + "/" + writeStatus.getStat().getPath()); - } - if (fileNames.isPresent()) { - fileNames.get().add(writeStatus.getStat().getPath() - .substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); - } - HoodieWriteStat writeStat = writeStatus.getStat(); - if (areRecordsSorted) { - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - } else { - assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumInserts()); - assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumWrites()); - } - assertEquals(fileId, writeStat.getFileId()); - assertEquals(String.format(actualPartitionPathFormat, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3]), writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - protected void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, Option> fileNames, - boolean populateMetaColumns) { - if (populateMetaColumns) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.ordinal()).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal())); - if (fileNames.isPresent()) { - assertTrue(fileNames.get().contains(entry.get(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal()))); - } - assertFalse(entry.isNullAt(HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.ordinal())); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } else { // operation = BULK_INSERT_APPEND_ONLY - // all meta columns are untouched - assertEquals(0, expectedRows.except(actualRows).count()); - } - } -} diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java deleted file mode 100644 index 25ac77e6c4ae2..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.spark.internal; - -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * Unit tests {@link HoodieBulkInsertDataInternalWriter}. - */ -public class TestHoodieBulkInsertDataInternalWriter extends - HoodieBulkInsertInternalWriterTestBase { - - private static Stream configParams() { - Object[][] data = new Object[][] { - {true, true}, - {true, false}, - {false, true}, - {false, false} - }; - return Stream.of(data).map(Arguments::of); - } - - private static Stream bulkInsertTypeParams() { - Object[][] data = new Object[][] { - {true}, - {false} - }; - return Stream.of(data).map(Arguments::of); - } - - @ParameterizedTest - @MethodSource("configParams") - public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - // execute N rounds - for (int i = 0; i < 2; i++) { - String instantTime = "00" + i; - // init writer - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), - RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted); - - int size = 10 + RANDOM.nextInt(1000); - // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file - int batches = 3; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - Option> fileAbsPaths = Option.of(new ArrayList<>()); - Option> fileNames = Option.of(new ArrayList<>()); - - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames, false); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); - assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields); - } - } - - - /** - * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected - * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk. - */ - @Test - public void testGlobalFailure() throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(true); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; - - String instantTime = "001"; - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), - RANDOM.nextLong(), STRUCT_TYPE, true, false); - - int size = 10 + RANDOM.nextInt(100); - int totalFailures = 5; - // Generate first batch of valid rows - Dataset inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false); - List internalRows = toInternalRows(inputRows, ENCODER); - - // generate some failures rows - for (int i = 0; i < totalFailures; i++) { - internalRows.add(getInternalRowWithError(partitionPath)); - } - - // generate 2nd batch of valid rows - Dataset inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false); - internalRows.addAll(toInternalRows(inputRows2, ENCODER)); - - // issue writes - try { - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - fail("Should have failed"); - } catch (Throwable e) { - // expected - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - - Option> fileAbsPaths = Option.of(new ArrayList<>()); - Option> fileNames = Option.of(new ArrayList<>()); - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); - assertOutput(inputRows, result, instantTime, fileNames, true); - } - - private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) - throws Exception { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } -} diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java deleted file mode 100644 index 2afeec82ae79b..0000000000000 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.spark.internal; - -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestUtils; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.write.DataWriter; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Stream; - -import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Unit tests {@link HoodieDataSourceInternalBatchWrite}. - */ -public class TestHoodieDataSourceInternalBatchWrite extends - HoodieBulkInsertInternalWriterTestBase { - - private static Stream bulkInsertTypeParams() { - Object[][] data = new Object[][] { - {true}, - {false} - }; - return Stream.of(data).map(Arguments::of); - } - - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testDataSourceWriter(boolean populateMetaFields) throws Exception { - testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields); - } - - private void testDataSourceWriterInternal(Map extraMetadata, Map expectedExtraMetadata, boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String instantTime = "001"; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields, false); - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); - - String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(1000); - int batches = 5; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - - metaClient.reloadActiveTimeline(); - Dataset result = HoodieClientTestUtils.read( - jsc, basePath, sqlContext, metaClient.getStorage(), partitionPathsAbs.toArray(new String[0])); - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - - // verify extra metadata - Option commitMetadataOption = - HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient); - assertTrue(commitMetadataOption.isPresent()); - Map actualExtraMetadata = new HashMap<>(); - commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry -> - !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)) - .forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue())); - assertEquals(actualExtraMetadata, expectedExtraMetadata); - } - - @Test - public void testDataSourceWriterExtraCommitMetadata() throws Exception { - String commitExtraMetaPrefix = "commit_extra_meta_"; - Map extraMeta = new HashMap<>(); - extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix); - extraMeta.put(commitExtraMetaPrefix + "a", "valA"); - extraMeta.put(commitExtraMetaPrefix + "b", "valB"); - extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata - - Map expectedMetadata = new HashMap<>(); - expectedMetadata.putAll(extraMeta); - expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key()); - expectedMetadata.remove("commit_extra_c"); - - testDataSourceWriterInternal(extraMeta, expectedMetadata, true); - } - - @Test - public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception { - String commitExtraMetaPrefix = "commit_extra_meta_"; - Map extraMeta = new HashMap<>(); - extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix); - extraMeta.put("keyA", "valA"); - extraMeta.put("keyB", "valB"); - extraMeta.put("commit_extra_c", "valC"); - // none of the keys has commit metadata key prefix. - testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true); - } - - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 2; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); - - int size = 10 + RANDOM.nextInt(1000); - int batches = 3; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), - instantTime, populateMetaFields, INSTANT_GENERATOR); - - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - } - } - - // Large writes are not required to be executed w/ regular CI jobs. Takes lot of running time. - @Disabled - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testLargeWrites(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 3; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); - - int size = 10000 + RANDOM.nextInt(10000); - int batches = 3; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime, - populateMetaFields, INSTANT_GENERATOR); - - // verify output - assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - } - } - - /** - * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1. - * commit batch1 - * abort batch2 - * verify only records from batch1 is available to read - */ - @ParameterizedTest - @MethodSource("bulkInsertTypeParams") - public void testAbort(boolean populateMetaFields) throws Exception { - // init config and table - HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String instantTime0 = "00" + 0; - // init writer - HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, populateMetaFields, false); - DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); - - List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(100); - int batches = 1; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - Dataset result = HoodieClientTestUtils.read( - jsc, basePath, sqlContext, metaClient.getStorage(), partitionPathsAbs.toArray(new String[0])); - // verify rows - assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); - - // 2nd batch. abort in the end - String instantTime1 = "00" + 1; - dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), storageConf, - Collections.EMPTY_MAP, populateMetaFields, false); - writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong()); - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - } - - commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalBatchWrite.abort(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - result = HoodieClientTestUtils.read( - jsc, basePath, sqlContext, metaClient.getStorage(), partitionPathsAbs.toArray(new String[0])); - // verify rows - // only rows from first batch should be present - assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields); - } - - private void writeRows(Dataset inputRows, DataWriter writer) throws Exception { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } -}