diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 7543382e19df4..5464ea3f203fc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; @@ -53,6 +54,8 @@ import java.util.List; import java.util.Set; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + /** * Hoodie data source/sink factory. */ @@ -81,6 +84,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { @Override public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); + checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)), + "Option [path] should not be empty."); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);