diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index a155fb52d8852..c71a91a82d106 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -67,7 +67,7 @@
*
The Semantics
*
* The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
- * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
+ * starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
*
*
The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
index 3cdd798e2e841..e0a6fc1f4a336 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
@@ -28,11 +28,8 @@
/**
* WriteProfile that always return empty small files.
*
- *
This write profile is used for cases:
- * i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
- * the existing small files are ignored because of the 'OVERWRITE' semantics;
- * ii). INSERT operation when data file merge is disabled.
- *
+ *
This write profile is used for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
+ * the existing small files are ignored because of the 'OVERWRITE' semantics.
*
*
Note: assumes the index can always index log files for Flink write.
*/
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 077633ee90e53..bb545ad896ac9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -41,8 +41,9 @@
import java.util.Properties;
/**
- * An Utility which can incrementally consume data from Kafka and apply it to the target table.
- * currently, it only supports COW table and insert, upsert operation.
+ * A utility which can incrementally consume data from Kafka and apply it to the target table.
+ * It has the similar functionality with SQL data source except that the source is bind to Kafka
+ * and the format is bind to JSON.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a2d0960770e9c..627bc2c29acf6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -135,7 +135,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
}
/**
- * Setup the config options based on the table definition, for e.g the table name, primary key.
+ * Sets up the config options based on the table definition, for e.g the table name, primary key.
*
* @param conf The configuration to setup
* @param tableName The table name
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 0494143a1a01b..6ef608bc713b8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -40,6 +40,7 @@
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
+import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -48,7 +49,6 @@
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -108,9 +108,6 @@ public class HoodieTableSource implements
private static final int NO_LIMIT_CONSTANT = -1;
- private static final InputFormat EMPTY_INPUT_FORMAT =
- new CollectionInputFormat<>(Collections.emptyList(), null);
-
private final transient org.apache.hadoop.conf.Configuration hadoopConf;
private final transient HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
@@ -340,7 +337,7 @@ private List buildFileIndex() {
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
- return EMPTY_INPUT_FORMAT;
+ return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, inputSplits, false);
@@ -360,7 +357,7 @@ private List buildFileIndex() {
if (result.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for incremental read, returns empty collection instead");
- return new CollectionInputFormat<>(Collections.emptyList(), null);
+ return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, result.getInputSplits(), false);
@@ -419,7 +416,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
private InputFormat baseFileOnlyInputFormat() {
final Path[] paths = getReadPaths();
if (paths.length == 0) {
- return EMPTY_INPUT_FORMAT;
+ return InputFormats.EMPTY_INPUT_FORMAT;
}
FileInputFormat format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index e3a8eee9292db..4cd45a81d7980 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -125,7 +125,7 @@ public class MergeOnReadInputFormat
/**
* Flag saying whether to emit the deletes. In streaming read mode, downstream
- * operators need the delete messages to retract the legacy accumulator.
+ * operators need the DELETE messages to retract the legacy accumulator.
*/
private boolean emitDelete;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java b/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java
new file mode 100644
index 0000000000000..f193357e88809
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/InputFormats.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collections;
+
+/**
+ * Utilities for all kinds of {@link org.apache.flink.api.common.io.InputFormat}s.
+ */
+public class InputFormats {
+ public static final InputFormat EMPTY_INPUT_FORMAT =
+ new CollectionInputFormat<>(Collections.emptyList(), null);
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 04eeab8b377af..b717268800ac4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -250,7 +250,7 @@ public static void initTableIfNotExists(Configuration conf) throws IOException {
basePath, conf.getString(FlinkOptions.TABLE_NAME));
}
// Do not close the filesystem in order to use the CACHE,
- // some of the filesystems release the handles in #close method.
+ // some filesystems release the handles in #close method.
}
/**
@@ -359,7 +359,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) {
}
/**
- * Return the median instant time between the given two instant time.
+ * Returns the median instant time between the given two instant time.
*/
public static String medianInstantTime(String highVal, String lowVal) {
try {
@@ -399,6 +399,10 @@ public static Option createTransformer(List classNames) thr
}
}
+ /**
+ * Returns whether the give file is in valid hoodie format.
+ * For example, filtering out the empty or corrupt files.
+ */
public static boolean isValidFile(FileStatus fileStatus) {
final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
if (PARQUET.getFileExtension().equals(extension)) {
@@ -416,11 +420,19 @@ public static boolean isValidFile(FileStatus fileStatus) {
return fileStatus.getLen() > 0;
}
+ /**
+ * Returns whether insert deduplication is allowed with given configuration {@code conf}.
+ */
public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
}
+ /**
+ * Returns whether there are successful commits on the timeline.
+ * @param metaClient The meta client
+ * @return true if there is any successful commit
+ */
public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
}