Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,17 @@ public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Sc
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
}

/**
* Given an Avro record and list of columns to remove, this method removes the list of columns from
* the given avro record using rewriteRecord method.
* <p>
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
return rewriteRecord(record, newSchema);
}

private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -227,6 +228,35 @@ public void testAddingAndRemovingMetadataFields() {
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size());
}

@Test
public void testRemoveFields() {
// partitioned table test.
String schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"}]},";
Schema expectedSchema = new Schema.Parser().parse(schemaStr);
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
rec.put("_row_key", "key1");
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
assertNull(rec1.get("pii_col"));
assertEquals(expectedSchema, rec1.getSchema());

// non-partitioned table test with empty list of fields.
schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
expectedSchema = new Schema.Parser().parse(schemaStr);
rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
assertEquals(expectedSchema, rec1.getSchema());
}

@Test
public void testGetNestedFieldVal() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import scala.collection.JavaConversions;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
Expand Down Expand Up @@ -280,6 +281,7 @@ public void refreshTimeline() throws IOException {
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}
Expand Down Expand Up @@ -375,6 +377,7 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
SimpleKeyGenerator.class.getName()))
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}

Expand Down Expand Up @@ -478,13 +481,14 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc

boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPartitionColumns(keyGenerator, props) could have been done once in the driver.

Copy link
Member

@codope codope Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.. please land #5302 if it looks good.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created the same PR #5303 as well.

HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
return new HoodieAvroRecord<>(keyGenerator.getKey(gr), payload);
return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload);
});

return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
Expand Down Expand Up @@ -727,6 +731,9 @@ public void setupWriteClient() throws IOException {

private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (isDropPartitionColumns()) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema, getPartitionColumns(keyGenerator, props));
}
registerAvroSchemas(sourceSchema, targetSchema);
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
Expand Down Expand Up @@ -898,4 +905,24 @@ public Option<String> getClusteringInstantOpt() {
return Option.empty();
}
}

/**
* Set based on hoodie.datasource.write.drop.partition.columns config.
* When set to true, will not write the partition columns into the table.
*/
private Boolean isDropPartitionColumns() {
return props.getBoolean(DROP_PARTITION_COLUMNS.key(), DROP_PARTITION_COLUMNS.defaultValue());
}

/**
* Get the list of partition columns as a list of strings.
*
* @param keyGenerator KeyGenerator
* @param props TypedProperties
* @return List of partition columns.
*/
private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
return Arrays.asList(partitionColumns.split(","));
}
}