[HUDI-3838] Implemented drop partition column feature for delta streamer code path#5294
Conversation
| * <p> | ||
| * To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)} | ||
| */ | ||
| public static GenericRecord removeFields(GenericRecord record, List<String> columnsToRemove) { |
There was a problem hiding this comment.
The reason for not invoking rewriteRecord method directly from DeltaStreamer code is because I was getting serialization error org.apache.spark.SparkException: Task not serializable when passing the targetSchema to that method from the map lambda expression.
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Outdated
Show resolved
Hide resolved
| String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); | ||
| List<String> listOfPartitionColumns = Arrays.asList(partitionColumns.split(",")); | ||
| JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); | ||
| HoodieKey key = keyGenerator.getKey(avroRDD.first()); |
There was a problem hiding this comment.
not sure if this is right. we have to move this within map (). for each genRec, we have to first run through key generator and then modify the genRec if need be (drop partition cols).
Also, can we move lines 482 to 494 to separate method and keep this method cleaner/leaner.
There was a problem hiding this comment.
To make it cleaner I picked the first record for generating the key, I see your point, updated it.
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
Outdated
Show resolved
Hide resolved
|
@hudi-bot run azure |
codope
left a comment
There was a problem hiding this comment.
LGTM. Verified the patch. Below is table config and schema in a commit file:
#Updated at 2022-04-12T10:02:39.874Z
#Tue Apr 12 15:32:39 IST 2022
hoodie.table.precombine.field=tpep_dropoff_datetime
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.partition.fields=date_col
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.partition.metafile.use.base.format=false
hoodie.timeline.layout.version=1
hoodie.table.version=4
hoodie.table.metadata.partitions=column_stats,files
hoodie.table.recordkey.fields=tpep_pickup_datetime
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.name=ny_hudi_tbl
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=1182839966
* Schema (note that date_col is the partition column which is not written as part of schema) *
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"test_struct_name\",\"namespace\":\"test_record_namespace\",\"fields\":[{\"name\":\"VendorID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"tpep_pickup_datetime\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tpep_dropoff_datetime\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"passenger_count\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"trip_distance\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"RatecodeID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"PULocationID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"DOLocationID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"payment_type\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"fare_amount\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"extra\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"mta_tax\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"tip_amount\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"tolls_amount\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"improvement_surcharge\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"total_amount\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"congestion_surcharge\",\"type\":[\"null\",\"double\"],\"default\":null}]}",
"deltastreamer.checkpoint.key" : "1634021149000"
}
Would be good to add a UT in TestHoodieDeltastreamer as a followup. Tracking in HUDI-3863
| JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); | ||
| JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { | ||
| JavaRDD<HoodieRecord> records = avroRDD.map(record -> { | ||
| GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record; |
There was a problem hiding this comment.
getPartitionColumns(keyGenerator, props) could have been done once in the driver.
What is the purpose of the pull request
Add support for
hoodie.datasource.write.drop.partition.columnsconfig for DeltaStreamer code path.Brief change log
hoodie.datasource.write.drop.partition.columnsconfig for DeltaStreamer code path.Verify this pull request
(Please pick either of the following options)
This change added tests and can be verified as follows:
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.