diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 828980506bd5c..e41a17368242a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -555,6 +555,10 @@ private Pair, JavaRDD> writeToSink(JavaRDD partitions = records.map(record -> record.getPartitionPath()).distinct().collect(); + writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses(); + break; default: throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index b7e6f1870df44..62078cf159b5e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -171,7 +171,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) props.setProperty("include", "sql-transformer.properties"); props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 7fabcaeb6f720..3119d0b9cf5e3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -258,6 +258,19 @@ static void assertRecordCount(long expected, String tablePath, SQLContext sqlCon assertEquals(expected, recordCount); } + static Map getPartitionRecordCount(String basePath, SQLContext sqlContext) { + sqlContext.clearCache(); + List rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList(); + Map partitionRecordCount = new HashMap<>(); + rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1))); + return partitionRecordCount; + } + + static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) { + sqlContext.clearCache(); + assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count()); + } + static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) { sqlContext.clearCache(); long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count(); @@ -1356,6 +1369,13 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, + "not_there"); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot, boolean addCommonProps, + String partitionPath) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); @@ -1366,7 +1386,7 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server", "false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath); if (useSchemaProvider) { parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile); if (hasTransformer) { @@ -1798,6 +1818,31 @@ public void testInsertOverwriteTable() throws Exception { testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE); } + @Test + public void testDeletePartitions() throws Exception { + prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path"); + String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp", null), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); + testNum++; + + prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); + prepareParquetDFSSource(false, false); + // set write operation to DELETE_PARTITION and add transformer to filter only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION + deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), + Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp", null), jsc); + deltaStreamer.sync(); + // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION. + TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + } + void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception { // Initial insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); @@ -1944,6 +1989,16 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas } } + public static class TestSpecificPartitionTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + Dataset toReturn = rowDataset.filter("partition_path == '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'"); + return toReturn; + } + } + /** * Add new field evoluted_optional_union_field with value of the field rider. */