diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 5c8a89380dc3..98ee48872be8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -332,7 +332,8 @@ private FlinkOptions() { .booleanType() .defaultValue(false) .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n" - + "By default false"); + + "By default false. Turning this on, could hide the write status errors while the spark checkpoint moves ahead. \n" + + " So, would recommend users to use this with caution."); public static final ConfigOption RECORD_KEY_FIELD = ConfigOptions .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 16f52f33b13c..c694174b8c79 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -386,13 +386,14 @@ object DataSourceWriteOptions { .withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch") /** - * By default true (in favor of streaming progressing over data integrity) + * By default false. If users prefer streaming progress over data integrity, can set this to true. */ val STREAMING_IGNORE_FAILED_BATCH: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.ignore.failed.batch") - .defaultValue("true") + .defaultValue("false") .withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)" - + " within a streaming microbatch") + + " within a streaming microbatch. Turning this on, could hide the write status errors while the spark checkpoint moves ahead." + + "So, would recommend users to use this with caution.") val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty .key("hoodie.meta.sync.client.tool.class") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 2befb47e5e02..5f7cee1df012 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -121,7 +121,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, } log.error(s"Micro batch id=$batchId threw following exception: ", e) if (ignoreFailedBatch) { - log.info(s"Ignore the exception and move on streaming as per " + + log.warn(s"Ignore the exception and move on streaming as per " + s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration") Success((true, None, None)) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index af2dd7613c4f..d16685243e5b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -372,6 +372,7 @@ public void stream(Dataset streamingInput, String operationType, String che .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") .option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false") + .option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(),"true") .option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append());