diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 1055bd522022c..9367e23dc64a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -441,6 +441,17 @@ public static List rewriteRecords(List records, Sc return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList()); } + /** + * Given an Avro record and list of columns to remove, this method removes the list of columns from + * the given avro record using rewriteRecord method. + *

+ * To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)} + */ + public static GenericRecord removeFields(GenericRecord record, List fieldsToRemove) { + Schema newSchema = removeFields(record.getSchema(), fieldsToRemove); + return rewriteRecord(record, newSchema); + } + private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) { Schema oldSchema = oldRecord.getSchema(); Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name()); diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 8c57dc84dead4..246d74411d61d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -32,6 +32,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -227,6 +228,35 @@ public void testAddingAndRemovingMetadataFields() { assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size()); } + @Test + public void testRemoveFields() { + // partitioned table test. + String schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"non_pii_col\", \"type\": \"string\"}]},"; + Schema expectedSchema = new Schema.Parser().parse(schemaStr); + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); + rec.put("_row_key", "key1"); + rec.put("non_pii_col", "val1"); + rec.put("pii_col", "val2"); + rec.put("timestamp", 3.5); + GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col")); + assertEquals("key1", rec1.get("_row_key")); + assertEquals("val1", rec1.get("non_pii_col")); + assertEquals(3.5, rec1.get("timestamp")); + assertNull(rec1.get("pii_col")); + assertEquals(expectedSchema, rec1.getSchema()); + + // non-partitioned table test with empty list of fields. + schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\"}]},"; + expectedSchema = new Schema.Parser().parse(schemaStr); + rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("")); + assertEquals(expectedSchema, rec1.getSchema()); + } + @Test public void testGetNestedFieldVal() { GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 0e57bd379acdb..7a08d1542f53a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -105,6 +105,7 @@ import scala.collection.JavaConversions; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT; @@ -280,6 +281,7 @@ public void refreshTimeline() throws IOException { .setPreCombineField(cfg.sourceOrderingField) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) + .setDropPartitionColumnsWhenWrite(isDropPartitionColumns()) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } @@ -375,6 +377,7 @@ public Pair>> readFromSource( SimpleKeyGenerator.class.getName())) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) + .setDropPartitionColumnsWhenWrite(isDropPartitionColumns()) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } @@ -478,13 +481,14 @@ private Pair>> fetchFromSourc boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); JavaRDD avroRDD = avroRDDOptional.get(); - JavaRDD records = avroRDD.map(gr -> { + JavaRDD records = avroRDD.map(record -> { + GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record; HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) : DataSourceUtils.createPayload(cfg.payloadClassName, gr); - return new HoodieAvroRecord<>(keyGenerator.getKey(gr), payload); + return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload); }); return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); @@ -727,6 +731,9 @@ public void setupWriteClient() throws IOException { private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException { LOG.info("Setting up new Hoodie Write Client"); + if (isDropPartitionColumns()) { + targetSchema = HoodieAvroUtils.removeFields(targetSchema, getPartitionColumns(keyGenerator, props)); + } registerAvroSchemas(sourceSchema, targetSchema); HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema); if (hoodieCfg.isEmbeddedTimelineServerEnabled()) { @@ -898,4 +905,24 @@ public Option getClusteringInstantOpt() { return Option.empty(); } } + + /** + * Set based on hoodie.datasource.write.drop.partition.columns config. + * When set to true, will not write the partition columns into the table. + */ + private Boolean isDropPartitionColumns() { + return props.getBoolean(DROP_PARTITION_COLUMNS.key(), DROP_PARTITION_COLUMNS.defaultValue()); + } + + /** + * Get the list of partition columns as a list of strings. + * + * @param keyGenerator KeyGenerator + * @param props TypedProperties + * @return List of partition columns. + */ + private List getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) { + String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); + return Arrays.asList(partitionColumns.split(",")); + } }