-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-7163] Fix not parsable text DateTimeParseException when compact #10220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can you elaborate a little more about the operation procedure that can reproduce this exception? That would help a lot for understanding the fix. |
@danny0405 : Looks like this is a porting of #6000 to Flink integration |
Yeah, It's like a following up of #6000 to fix the timestamp precision of legacy tables. |
Hudi Version 0.14.0 step 1, i use bootstrapping convert parquet table to hudi, like this spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/cluster/hudi/hudi-utilities-bundle_2.11-0.12.2.jar \
--target-base-path /tmp/t1 --table-type MERGE_ON_READ \
--target-table t1 \
--run-bootstrap \
--bootstrap-overwrite \
--hoodie-conf hoodie.bootstrap.base.path=hdfs://hacluster/user/hive/warehouse/t1 \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.keygenerator.type=NON_PARTITION \
--hoodie-conf hoodie.bootstrap.parallelism=2 \step 2, use flink datastrem writer to this hudi table and turn on compact, like this val hudiProps = DFSPropertiesConfiguration.getGlobalProps
val hudiConf = Configuration.fromMap(hudiProps.asInstanceOf[JMap[String, String]])
conf.addAll(hudiConf)
OptionsInference.setupSinkTasks(conf, env.getParallelism)
val rowType = RowType.of(false, Array(new TimestampType, new BigIntType, new VarCharType, new VarCharType), Array("_origin_op_ts", "id", "content", "date"))
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(rowType).toString)
conf.setString(FlinkOptions.PATH, "/tmp/t1")
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
conf.setString(FlinkOptions.TABLE_NAME, "t1")
conf.setString(FlinkOptions.PRECOMBINE_FIELD, "_origin_op_ts")
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "id")
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true)
conf.setString(FlinkOptions.KEYGEN_TYPE, KeyGeneratorType.NON_PARTITION.name)
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false)
val hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, rowDataDataStream.javaStream)
val pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream)
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, FlinkOptions.TIME_ELAPSED)
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, TimeUnit.SECONDS.toSeconds(120).toInt)
Pipelines.compact(conf, pipeline)
env.executeexception: |
4dd8838 to
9cf1cdd
Compare
| throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); | ||
| long high = HoodieActiveTimeline.parseDateFromInstantTimeSafely(highVal) | ||
| .orElseThrow(() -> new HoodieException("Get instant time diff with interval [" + highVal + "] error")).getTime(); | ||
| long low = HoodieActiveTimeline.parseDateFromInstantTimeSafely(lowVal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the exception only happens for table upgrade?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is mainly fordefensive programming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you encounter exceptions in production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean the method orElseThrow is fordefensive programming. sure, we can repeat the provided operation procedure to reproduce this exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, if we are not enocunting problems, would de-prioritize the PR a little bit.
9cf1cdd to
89b798a
Compare
|
@danny0405 do you think we should land this or the changes are no longer needed? |
|
We can land it. |

Change Logs
fix https://issues.apache.org/jira/browse/HUDI-7163
reference https://issues.apache.org/jira/browse/HUDI-4340
Impact
flink compact
Risk level (write none, low medium or high below)
low medium
Documentation Update
None
Contributor's checklist