Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,10 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
case INSERT_OVERWRITE_TABLE:
writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
break;
case DELETE_PARTITION:
List<String> partitions = records.map(record -> record.getPartitionPath()).distinct().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my data source is kafka, does it mean that the list of partitions to delete should be received from kafka? If answer is yes, it maybe cumbersome to combine ingestion workflow with the data and partition deletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. thats what I have been trying to convey for a long time :) Deltastreamer (as the name suggests) is meant for incremental and continual ingestion of data from some source. It goes in cycles of fetch from source -> ingest into hudi ->repeat. So, I don't see in general how come one would fetch data from a source and then trigger delete partitions.

But we have a patch for a independent tool if you are interested. Guess that would help you. but that is a spark-submit command as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My data source is Avro encoded, the schema URL can't change. It may be hard to bypass it in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. so, does hudi-cli work for you. I can try to add support there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if it was a delta streamer CLI parameter, something that gets executed at the end of the ingestion. hudi-cli may work if it can be done as a single CLI command. For example:

hudi-cli delete-partitions --schema-name someschema --table-name sometable --location s3a://bucket/data --hive-server xxxxx --metastore=xxx --do-not-delete-data

--do-not-delete data maybe helpful to delete data faster. I use go tool that spins up hundreds of goroutines to delete hundreds of thousands of files data within seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is against the general nomenclature of how deltastreamer works. Delta streamer consumes records from a given source and and ingests to hudi. So, I don't think we can support a cli based argument which is more of one time standalone tool/job. So, I would probably suggest you to use the standalone tool.

writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses();
break;
default:
throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,19 @@ static void assertRecordCount(long expected, String tablePath, SQLContext sqlCon
assertEquals(expected, recordCount);
}

static Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) {
sqlContext.clearCache();
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList();
Map<String, Long> partitionRecordCount = new HashMap<>();
rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1)));
return partitionRecordCount;
}

static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) {
sqlContext.clearCache();
assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count());
}

static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
Expand Down Expand Up @@ -1356,6 +1369,13 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
"not_there");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps,
String partitionPath) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();

Expand All @@ -1366,7 +1386,7 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
Expand Down Expand Up @@ -1798,6 +1818,31 @@ public void testInsertOverwriteTable() throws Exception {
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
}

@Test
public void testDeletePartitions() throws Exception {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
testNum++;

prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareParquetDFSSource(false, false);
// set write operation to DELETE_PARTITION and add transformer to filter only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
// No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
}

void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
// Initial insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
Expand Down Expand Up @@ -1944,6 +1989,16 @@ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Datas
}
}

public static class TestSpecificPartitionTransformer implements Transformer {

@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
Dataset<Row> toReturn = rowDataset.filter("partition_path == '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'");
return toReturn;
}
}

/**
* Add new field evoluted_optional_union_field with value of the field rider.
*/
Expand Down