parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix);
+ parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
+ return copy;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/AbstractColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/AbstractColumnReader.java
new file mode 100644
index 0000000000000..4f83be77f05ca
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/AbstractColumnReader.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.formats.parquet.vector.ParquetDictionary;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
+/**
+ * Abstract {@link ColumnReader}.
+ * See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ *
+ * Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
+ * because some of the package scope methods.
+ */
+public abstract class AbstractColumnReader
+ implements ColumnReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.class);
+
+ private final PageReader pageReader;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ protected final Dictionary dictionary;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ protected final int maxDefLevel;
+
+ protected final ColumnDescriptor descriptor;
+
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if valuesRead ==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ /*
+ * Input streams:
+ * 1.Run length encoder to encode every data, so we have run length stream to get
+ * run length information.
+ * 2.Data maybe is real data, maybe is dictionary ids which need be decode to real
+ * data from Dictionary.
+ *
+ * Run length stream ------> Data stream
+ * |
+ * ------> Dictionary ids stream
+ */
+
+ /**
+ * Run length decoder for data and dictionary.
+ */
+ protected RunLengthDecoder runLenDecoder;
+
+ /**
+ * Data input stream.
+ */
+ ByteBufferInputStream dataInputStream;
+
+ /**
+ * Dictionary decoder to wrap dictionary ids input stream.
+ */
+ private RunLengthDecoder dictionaryIdsDecoder;
+
+ public AbstractColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader) throws IOException {
+ this.descriptor = descriptor;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ /*
+ * Total number of values in this column (in this row group).
+ */
+ long totalValueCount = pageReader.getTotalValueCount();
+ if (totalValueCount == 0) {
+ throw new IOException("totalValueCount == 0");
+ }
+ }
+
+ protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) {
+ PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
+ Preconditions.checkArgument(
+ actualName == expectedName,
+ "Expected type name: %s, actual type name: %s",
+ expectedName,
+ actualName);
+ }
+
+ /**
+ * Reads `total` values from this columnReader into column.
+ */
+ @Override
+ public final void readToVector(int readNumber, V vector) throws IOException {
+ int rowId = 0;
+ WritableIntVector dictionaryIds = null;
+ if (dictionary != null) {
+ dictionaryIds = vector.reserveDictionaryIds(readNumber);
+ }
+ while (readNumber > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ DataPage page = pageReader.readPage();
+ if (page instanceof DataPageV1) {
+ readPageV1((DataPageV1) page);
+ } else if (page instanceof DataPageV2) {
+ readPageV2((DataPageV2) page);
+ } else {
+ throw new RuntimeException("Unsupported page type: " + page.getClass());
+ }
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ int num = Math.min(readNumber, leftInPage);
+ if (isCurrentPageDictionaryEncoded) {
+ // Read and decode dictionary ids.
+ runLenDecoder.readDictionaryIds(
+ num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
+
+ if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
+ // Column vector supports lazy decoding of dictionary values so just set the dictionary.
+ // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
+ // non-dictionary encoded values have already been added).
+ vector.setDictionary(new ParquetDictionary(dictionary));
+ } else {
+ readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
+ }
+ } else {
+ if (vector.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values but this new page is not. The batch
+ // does not support a mix of dictionary and not so we will decode the dictionary.
+ readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
+ }
+ vector.setDictionary(null);
+ readBatch(rowId, num, vector);
+ }
+
+ valuesRead += num;
+ rowId += num;
+ readNumber -= num;
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+
+ // Initialize the decoders.
+ if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
+ }
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.runLenDecoder = new RunLengthDecoder(bitWidth);
+ try {
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ this.runLenDecoder.initFromStream(pageValueCount, in);
+ prepareNewPage(page.getValueEncoding(), in);
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ // do not read the length from the stream. v2 pages handle dividing the page bytes.
+ this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
+ this.runLenDecoder.initFromStream(
+ this.pageValueCount, page.getDefinitionLevels().toInputStream());
+ try {
+ prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void prepareNewPage(
+ Encoding dataEncoding,
+ ByteBufferInputStream in) throws IOException {
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ if (dictionary == null) {
+ throw new IOException("Could not read page in col "
+ + descriptor
+ + " as the dictionary was missing for encoding "
+ + dataEncoding);
+ }
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dataInputStream = null;
+ this.dictionaryIdsDecoder = new RunLengthDecoder();
+ try {
+ this.dictionaryIdsDecoder.initFromStream(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException("could not read dictionary in col " + descriptor, e);
+ }
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dictionaryIdsDecoder = null;
+ LOG.debug("init from page at offset {} for length {}", in.position(), in.available());
+ this.dataInputStream = in.remainingStream();
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ afterReadPage();
+ }
+
+ final ByteBuffer readDataBuffer(int length) {
+ try {
+ return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
+ /**
+ * After read a page, we may need some initialization.
+ */
+ protected void afterReadPage() {}
+
+ /**
+ * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
+ * If return false, we will decode all the data first.
+ */
+ protected boolean supportLazyDecode() {
+ return true;
+ }
+
+ /**
+ * Read batch from {@link #runLenDecoder} and {@link #dataInputStream}.
+ */
+ protected abstract void readBatch(int rowId, int num, V column);
+
+ /**
+ * Decode dictionary ids to data.
+ * From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
+ */
+ protected abstract void readBatchFromDictionaryIds(
+ int rowId,
+ int num,
+ V column,
+ WritableIntVector dictionaryIds);
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
new file mode 100644
index 0000000000000..2a28d85c5c6a6
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
+import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+
+/**
+ * An implementation of {@link FileInputFormat} to read {@link RowData} records
+ * from Parquet files.
+ *
+ * Note: Reference Flink release 1.11.2
+ * {@code org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat}
+ * to support TIMESTAMP_MILLIS.
+ *
+ * @see ParquetSplitReaderUtil
+ */
+public class CopyOnWriteInputFormat extends FileInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private final String[] fullFieldNames;
+ private final DataType[] fullFieldTypes;
+ private final int[] selectedFields;
+ private final String partDefaultName;
+ private final boolean utcTimestamp;
+ private final SerializableConfiguration conf;
+ private final long limit;
+
+ private transient ParquetColumnarRowSplitReader reader;
+ private transient long currentReadCount;
+
+ public CopyOnWriteInputFormat(
+ Path[] paths,
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ int[] selectedFields,
+ String partDefaultName,
+ long limit,
+ Configuration conf,
+ boolean utcTimestamp) {
+ super.setFilePaths(paths);
+ this.limit = limit;
+ this.partDefaultName = partDefaultName;
+ this.fullFieldNames = fullFieldNames;
+ this.fullFieldTypes = fullFieldTypes;
+ this.selectedFields = selectedFields;
+ this.conf = new SerializableConfiguration(conf);
+ this.utcTimestamp = utcTimestamp;
+ }
+
+ @Override
+ public void open(FileInputSplit fileSplit) throws IOException {
+ // generate partition specs.
+ List fieldNameList = Arrays.asList(fullFieldNames);
+ LinkedHashMap partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
+ fileSplit.getPath());
+ LinkedHashMap partObjects = new LinkedHashMap<>();
+ partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
+ partDefaultName.equals(v) ? null : v,
+ fullFieldTypes[fieldNameList.indexOf(k)])));
+
+ this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
+ utcTimestamp,
+ true,
+ conf.conf(),
+ fullFieldNames,
+ fullFieldTypes,
+ partObjects,
+ selectedFields,
+ DEFAULT_SIZE,
+ new Path(fileSplit.getPath().toString()),
+ fileSplit.getStart(),
+ fileSplit.getLength());
+ this.currentReadCount = 0L;
+ }
+
+ @Override
+ public boolean supportsMultiPaths() {
+ return true;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (currentReadCount >= limit) {
+ return true;
+ } else {
+ return reader.reachedEnd();
+ }
+ }
+
+ @Override
+ public RowData nextRecord(RowData reuse) {
+ currentReadCount++;
+ return reader.nextRecord();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ this.reader.close();
+ }
+ this.reader = null;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/Int64TimestampColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/Int64TimestampColumnReader.java
new file mode 100644
index 0000000000000..df779de41153a
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/Int64TimestampColumnReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableTimestampVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+
+/**
+ * Timestamp {@link org.apache.flink.formats.parquet.vector.reader.ColumnReader} that supports INT64 8 bytes,
+ * TIMESTAMP_MILLIS is the deprecated ConvertedType counterpart of a TIMESTAMP logical type
+ * that is UTC normalized and has MILLIS precision.
+ *
+ * See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
+ * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
+ */
+public class Int64TimestampColumnReader extends AbstractColumnReader {
+
+ private final boolean utcTimestamp;
+
+ public Int64TimestampColumnReader(
+ boolean utcTimestamp,
+ ColumnDescriptor descriptor,
+ PageReader pageReader) throws IOException {
+ super(descriptor, pageReader);
+ this.utcTimestamp = utcTimestamp;
+ checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
+ }
+
+ @Override
+ protected boolean supportLazyDecode() {
+ return false;
+ }
+
+ @Override
+ protected void readBatch(int rowId, int num, WritableTimestampVector column) {
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ ByteBuffer buffer = readDataBuffer(8);
+ column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong()));
+ } else {
+ column.setNullAt(rowId + i);
+ }
+ }
+ }
+
+ @Override
+ protected void readBatchFromDictionaryIds(
+ int rowId,
+ int num,
+ WritableTimestampVector column,
+ WritableIntVector dictionaryIds) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ column.setTimestamp(i, decodeInt64ToTimestamp(
+ utcTimestamp, dictionary, dictionaryIds.getInt(i)));
+ }
+ }
+ }
+
+ public static TimestampData decodeInt64ToTimestamp(
+ boolean utcTimestamp,
+ org.apache.parquet.column.Dictionary dictionary,
+ int id) {
+ long value = dictionary.decodeToLong(id);
+ return int64ToTimestamp(utcTimestamp, value);
+ }
+
+ private static TimestampData int64ToTimestamp(boolean utcTimestamp, long millionsOfDay) {
+ if (utcTimestamp) {
+ return TimestampData.fromEpochMillis(millionsOfDay, 0);
+ } else {
+ Timestamp timestamp = new Timestamp(millionsOfDay);
+ return TimestampData.fromTimestamp(timestamp);
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetColumnarRowSplitReader.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetColumnarRowSplitReader.java
new file mode 100644
index 0000000000000..d163f30dd6482
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetColumnarRowSplitReader.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.ColumnarRowData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.hudi.source.format.cow.ParquetSplitReaderUtil.createColumnReader;
+import static org.apache.hudi.source.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * This reader is used to read a {@link VectorizedColumnBatch} from input split.
+ *
+ * Note: Reference Flink release 1.11.2
+ * {@code org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader}
+ * because it is package scope.
+ */
+public class ParquetColumnarRowSplitReader implements Closeable {
+
+ private final boolean utcTimestamp;
+
+ private final MessageType fileSchema;
+
+ private final MessageType requestedSchema;
+
+ /**
+ * The total number of rows this RecordReader will eventually read. The sum of the rows of all
+ * the row groups.
+ */
+ private final long totalRowCount;
+
+ private final WritableColumnVector[] writableVectors;
+
+ private final VectorizedColumnBatch columnarBatch;
+
+ private final ColumnarRowData row;
+
+ private final LogicalType[] selectedTypes;
+
+ private final int batchSize;
+
+ private ParquetFileReader reader;
+
+ /**
+ * For each request column, the reader to read this column. This is NULL if this column is
+ * missing from the file, in which case we populate the attribute with NULL.
+ */
+ private ColumnReader[] columnReaders;
+
+ /**
+ * The number of rows that have been returned.
+ */
+ private long rowsReturned;
+
+ /**
+ * The number of rows that have been reading, including the current in flight row group.
+ */
+ private long totalCountLoadedSoFar;
+
+ // the index of the next row to return
+ private int nextRow;
+
+ // the number of rows in the current batch
+ private int rowsInBatch;
+
+ public ParquetColumnarRowSplitReader(
+ boolean utcTimestamp,
+ boolean caseSensitive,
+ Configuration conf,
+ LogicalType[] selectedTypes,
+ String[] selectedFieldNames,
+ ColumnBatchGenerator generator,
+ int batchSize,
+ Path path,
+ long splitStart,
+ long splitLength) throws IOException {
+ this.utcTimestamp = utcTimestamp;
+ this.selectedTypes = selectedTypes;
+ this.batchSize = batchSize;
+ // then we need to apply the predicate push down filter
+ ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
+ FilterCompat.Filter filter = getFilter(conf);
+ List blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+
+ this.fileSchema = footer.getFileMetaData().getSchema();
+ this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
+ this.reader = new ParquetFileReader(
+ conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
+
+ long totalRowCount = 0;
+ for (BlockMetaData block : blocks) {
+ totalRowCount += block.getRowCount();
+ }
+ this.totalRowCount = totalRowCount;
+ this.nextRow = 0;
+ this.rowsInBatch = 0;
+ this.rowsReturned = 0;
+
+ checkSchema();
+
+ this.writableVectors = createWritableVectors();
+ this.columnarBatch = generator.generate(createReadableVectors());
+ this.row = new ColumnarRowData(columnarBatch);
+ }
+
+ /**
+ * Clips `parquetSchema` according to `fieldNames`.
+ */
+ private static MessageType clipParquetSchema(
+ GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
+ Type[] types = new Type[fieldNames.length];
+ if (caseSensitive) {
+ for (int i = 0; i < fieldNames.length; ++i) {
+ String fieldName = fieldNames[i];
+ if (parquetSchema.getFieldIndex(fieldName) < 0) {
+ throw new IllegalArgumentException(fieldName + " does not exist");
+ }
+ types[i] = parquetSchema.getType(fieldName);
+ }
+ } else {
+ Map caseInsensitiveFieldMap = new HashMap<>();
+ for (Type type : parquetSchema.getFields()) {
+ caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT),
+ (key, previousType) -> {
+ if (previousType != null) {
+ throw new FlinkRuntimeException(
+ "Parquet with case insensitive mode should have no duplicate key: " + key);
+ }
+ return type;
+ });
+ }
+ for (int i = 0; i < fieldNames.length; ++i) {
+ Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
+ if (type == null) {
+ throw new IllegalArgumentException(fieldNames[i] + " does not exist");
+ }
+ // TODO clip for array,map,row types.
+ types[i] = type;
+ }
+ }
+
+ return Types.buildMessage().addFields(types).named("flink-parquet");
+ }
+
+ private WritableColumnVector[] createWritableVectors() {
+ WritableColumnVector[] columns = new WritableColumnVector[selectedTypes.length];
+ for (int i = 0; i < selectedTypes.length; i++) {
+ columns[i] = createWritableColumnVector(
+ batchSize,
+ selectedTypes[i],
+ requestedSchema.getColumns().get(i).getPrimitiveType());
+ }
+ return columns;
+ }
+
+ /**
+ * Create readable vectors from writable vectors.
+ * Especially for decimal, see {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}.
+ */
+ private ColumnVector[] createReadableVectors() {
+ ColumnVector[] vectors = new ColumnVector[writableVectors.length];
+ for (int i = 0; i < writableVectors.length; i++) {
+ vectors[i] = selectedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL
+ ? new ParquetDecimalVector(writableVectors[i])
+ : writableVectors[i];
+ }
+ return vectors;
+ }
+
+ private void checkSchema() throws IOException, UnsupportedOperationException {
+ if (selectedTypes.length != requestedSchema.getFieldCount()) {
+ throw new RuntimeException("The quality of field type is incompatible with the request schema!");
+ }
+
+ /*
+ * Check that the requested schema is supported.
+ */
+ for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
+ Type t = requestedSchema.getFields().get(i);
+ if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
+ throw new UnsupportedOperationException("Complex types not supported.");
+ }
+
+ String[] colPath = requestedSchema.getPaths().get(i);
+ if (fileSchema.containsPath(colPath)) {
+ ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
+ if (!fd.equals(requestedSchema.getColumns().get(i))) {
+ throw new UnsupportedOperationException("Schema evolution not supported.");
+ }
+ } else {
+ if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+ // Column is missing in data but the required data is non-nullable. This file is invalid.
+ throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
+ }
+ }
+ }
+ }
+
+ /**
+ * Method used to check if the end of the input is reached.
+ *
+ * @return True if the end is reached, otherwise false.
+ * @throws IOException Thrown, if an I/O error occurred.
+ */
+ public boolean reachedEnd() throws IOException {
+ return !ensureBatch();
+ }
+
+ public ColumnarRowData nextRecord() {
+ // return the next row
+ row.setRowId(this.nextRow++);
+ return row;
+ }
+
+ /**
+ * Checks if there is at least one row left in the batch to return. If no more row are
+ * available, it reads another batch of rows.
+ *
+ * @return Returns true if there is one more row to return, false otherwise.
+ * @throws IOException throw if an exception happens while reading a batch.
+ */
+ private boolean ensureBatch() throws IOException {
+ if (nextRow >= rowsInBatch) {
+ // No more rows available in the Rows array.
+ nextRow = 0;
+ // Try to read the next batch if rows from the file.
+ return nextBatch();
+ }
+ // there is at least one Row left in the Rows array.
+ return true;
+ }
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ private boolean nextBatch() throws IOException {
+ for (WritableColumnVector v : writableVectors) {
+ v.reset();
+ }
+ columnarBatch.setNumRows(0);
+ if (rowsReturned >= totalRowCount) {
+ return false;
+ }
+ if (rowsReturned == totalCountLoadedSoFar) {
+ readNextRowGroup();
+ }
+
+ int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
+ for (int i = 0; i < columnReaders.length; ++i) {
+ //noinspection unchecked
+ columnReaders[i].readToVector(num, writableVectors[i]);
+ }
+ rowsReturned += num;
+ columnarBatch.setNumRows(num);
+ rowsInBatch = num;
+ return true;
+ }
+
+ private void readNextRowGroup() throws IOException {
+ PageReadStore pages = reader.readNextRowGroup();
+ if (pages == null) {
+ throw new IOException("expecting more rows but reached last block. Read "
+ + rowsReturned + " out of " + totalRowCount);
+ }
+ List columns = requestedSchema.getColumns();
+ columnReaders = new ColumnReader[columns.size()];
+ for (int i = 0; i < columns.size(); ++i) {
+ columnReaders[i] = createColumnReader(
+ utcTimestamp,
+ selectedTypes[i],
+ columns.get(i),
+ pages.getPageReader(columns.get(i)));
+ }
+ totalCountLoadedSoFar += pages.getRowCount();
+ }
+
+ /**
+ * Seek to a particular row number.
+ */
+ public void seekToRow(long rowCount) throws IOException {
+ if (totalCountLoadedSoFar != 0) {
+ throw new UnsupportedOperationException("Only support seek at first.");
+ }
+
+ List blockMetaData = reader.getRowGroups();
+
+ for (BlockMetaData metaData : blockMetaData) {
+ if (metaData.getRowCount() > rowCount) {
+ break;
+ } else {
+ reader.skipNextRowGroup();
+ rowsReturned += metaData.getRowCount();
+ totalCountLoadedSoFar += metaData.getRowCount();
+ rowsInBatch = (int) metaData.getRowCount();
+ nextRow = (int) metaData.getRowCount();
+ rowCount -= metaData.getRowCount();
+ }
+ }
+ for (int i = 0; i < rowCount; i++) {
+ boolean end = reachedEnd();
+ if (end) {
+ throw new RuntimeException("Seek to many rows.");
+ }
+ nextRecord();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ /**
+ * Interface to gen {@link VectorizedColumnBatch}.
+ */
+ public interface ColumnBatchGenerator {
+ VectorizedColumnBatch generate(ColumnVector[] readVectors);
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetDecimalVector.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetDecimalVector.java
new file mode 100644
index 0000000000000..feaa657461fef
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetDecimalVector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+
+/**
+ * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
+ * provide {@link DecimalColumnVector} interface.
+ *
+ * Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
+ * because it is not public.
+ */
+public class ParquetDecimalVector implements DecimalColumnVector {
+
+ private final ColumnVector vector;
+
+ ParquetDecimalVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ if (DecimalDataUtils.is32BitDecimal(precision)) {
+ return DecimalData.fromUnscaledLong(
+ ((IntColumnVector) vector).getInt(i),
+ precision,
+ scale);
+ } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+ return DecimalData.fromUnscaledLong(
+ ((LongColumnVector) vector).getLong(i),
+ precision,
+ scale);
+ } else {
+ return DecimalData.fromUnscaledBytes(
+ ((BytesColumnVector) vector).getBytes(i).getBytes(),
+ precision,
+ scale);
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetSplitReaderUtil.java
new file mode 100644
index 0000000000000..a047c7e6c4d27
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetSplitReaderUtil.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Util for generating {@link ParquetColumnarRowSplitReader}.
+ *
+ *
NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
+ * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
+ */
+public class ParquetSplitReaderUtil {
+
+ /**
+ * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
+ */
+ public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
+ boolean utcTimestamp,
+ boolean caseSensitive,
+ Configuration conf,
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ Map partitionSpec,
+ int[] selectedFields,
+ int batchSize,
+ Path path,
+ long splitStart,
+ long splitLength) throws IOException {
+ List nonPartNames = Arrays.stream(fullFieldNames)
+ .filter(n -> !partitionSpec.containsKey(n))
+ .collect(Collectors.toList());
+
+ List selNonPartNames = Arrays.stream(selectedFields)
+ .mapToObj(i -> fullFieldNames[i])
+ .filter(nonPartNames::contains).collect(Collectors.toList());
+
+ int[] selParquetFields = selNonPartNames.stream()
+ .mapToInt(nonPartNames::indexOf)
+ .toArray();
+
+ ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
+ // create and initialize the row batch
+ ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+ for (int i = 0; i < vectors.length; i++) {
+ String name = fullFieldNames[selectedFields[i]];
+ LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
+ vectors[i] = partitionSpec.containsKey(name)
+ ? createVectorFromConstant(type, partitionSpec.get(name), batchSize)
+ : readVectors[selNonPartNames.indexOf(name)];
+ }
+ return new VectorizedColumnBatch(vectors);
+ };
+
+ return new ParquetColumnarRowSplitReader(
+ utcTimestamp,
+ caseSensitive,
+ conf,
+ Arrays.stream(selParquetFields)
+ .mapToObj(i -> fullFieldTypes[i].getLogicalType())
+ .toArray(LogicalType[]::new),
+ selNonPartNames.toArray(new String[0]),
+ gen,
+ batchSize,
+ new org.apache.hadoop.fs.Path(path.toUri()),
+ splitStart,
+ splitLength);
+ }
+
+ private static ColumnVector createVectorFromConstant(
+ LogicalType type,
+ Object value,
+ int batchSize) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ HeapBytesVector bsv = new HeapBytesVector(batchSize);
+ if (value == null) {
+ bsv.fillWithNulls();
+ } else {
+ bsv.fill(value instanceof byte[]
+ ? (byte[]) value
+ : value.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ return bsv;
+ case BOOLEAN:
+ HeapBooleanVector bv = new HeapBooleanVector(batchSize);
+ if (value == null) {
+ bv.fillWithNulls();
+ } else {
+ bv.fill((boolean) value);
+ }
+ return bv;
+ case TINYINT:
+ HeapByteVector byteVector = new HeapByteVector(batchSize);
+ if (value == null) {
+ byteVector.fillWithNulls();
+ } else {
+ byteVector.fill(((Number) value).byteValue());
+ }
+ return byteVector;
+ case SMALLINT:
+ HeapShortVector sv = new HeapShortVector(batchSize);
+ if (value == null) {
+ sv.fillWithNulls();
+ } else {
+ sv.fill(((Number) value).shortValue());
+ }
+ return sv;
+ case INTEGER:
+ HeapIntVector iv = new HeapIntVector(batchSize);
+ if (value == null) {
+ iv.fillWithNulls();
+ } else {
+ iv.fill(((Number) value).intValue());
+ }
+ return iv;
+ case BIGINT:
+ HeapLongVector lv = new HeapLongVector(batchSize);
+ if (value == null) {
+ lv.fillWithNulls();
+ } else {
+ lv.fill(((Number) value).longValue());
+ }
+ return lv;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = value == null
+ ? null
+ : Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
+ ColumnVector internalVector;
+ if (DecimalDataUtils.is32BitDecimal(precision)) {
+ internalVector = createVectorFromConstant(
+ new IntType(),
+ decimal == null ? null : (int) decimal.toUnscaledLong(),
+ batchSize);
+ } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+ internalVector = createVectorFromConstant(
+ new BigIntType(),
+ decimal == null ? null : decimal.toUnscaledLong(),
+ batchSize);
+ } else {
+ internalVector = createVectorFromConstant(
+ new VarBinaryType(),
+ decimal == null ? null : decimal.toUnscaledBytes(),
+ batchSize);
+ }
+ return new ParquetDecimalVector(internalVector);
+ case FLOAT:
+ HeapFloatVector fv = new HeapFloatVector(batchSize);
+ if (value == null) {
+ fv.fillWithNulls();
+ } else {
+ fv.fill(((Number) value).floatValue());
+ }
+ return fv;
+ case DOUBLE:
+ HeapDoubleVector dv = new HeapDoubleVector(batchSize);
+ if (value == null) {
+ dv.fillWithNulls();
+ } else {
+ dv.fill(((Number) value).doubleValue());
+ }
+ return dv;
+ case DATE:
+ if (value instanceof LocalDate) {
+ value = Date.valueOf((LocalDate) value);
+ }
+ return createVectorFromConstant(
+ new IntType(),
+ value == null ? null : dateToInternal((Date) value),
+ batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ HeapTimestampVector tv = new HeapTimestampVector(batchSize);
+ if (value == null) {
+ tv.fillWithNulls();
+ } else {
+ tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
+ }
+ return tv;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ public static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ ColumnDescriptor descriptor,
+ PageReader pageReader) throws IOException {
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ return new BooleanColumnReader(descriptor, pageReader);
+ case TINYINT:
+ return new ByteColumnReader(descriptor, pageReader);
+ case DOUBLE:
+ return new DoubleColumnReader(descriptor, pageReader);
+ case FLOAT:
+ return new FloatColumnReader(descriptor, pageReader);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new IntColumnReader(descriptor, pageReader);
+ case BIGINT:
+ return new LongColumnReader(descriptor, pageReader);
+ case SMALLINT:
+ return new ShortColumnReader(descriptor, pageReader);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT64:
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader);
+ case INT96:
+ return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
+ default:
+ throw new AssertionError();
+ }
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return new IntColumnReader(descriptor, pageReader);
+ case INT64:
+ return new LongColumnReader(descriptor, pageReader);
+ case BINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new FixedLenBytesColumnReader(
+ descriptor, pageReader, ((DecimalType) fieldType).getPrecision());
+ default:
+ throw new AssertionError();
+ }
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+
+ public static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ PrimitiveType primitiveType) {
+ PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
+ "Unexpected type: %s", typeName);
+ return new HeapBooleanVector(batchSize);
+ case TINYINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapByteVector(batchSize);
+ case DOUBLE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
+ "Unexpected type: %s", typeName);
+ return new HeapDoubleVector(batchSize);
+ case FLOAT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
+ "Unexpected type: %s", typeName);
+ return new HeapFloatVector(batchSize);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapIntVector(batchSize);
+ case BIGINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT64,
+ "Unexpected type: %s", typeName);
+ return new HeapLongVector(batchSize);
+ case SMALLINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapShortVector(batchSize);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BINARY,
+ "Unexpected type: %s", typeName);
+ return new HeapBytesVector(batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
+ "TIME_MICROS original type is not ");
+ return new HeapTimestampVector(batchSize);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) fieldType;
+ if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) {
+ checkArgument(
+ (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || typeName == PrimitiveType.PrimitiveTypeName.INT32)
+ && primitiveType.getOriginalType() == OriginalType.DECIMAL,
+ "Unexpected type: %s", typeName);
+ return new HeapIntVector(batchSize);
+ } else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) {
+ checkArgument(
+ (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || typeName == PrimitiveType.PrimitiveTypeName.INT64)
+ && primitiveType.getOriginalType() == OriginalType.DECIMAL,
+ "Unexpected type: %s", typeName);
+ return new HeapLongVector(batchSize);
+ } else {
+ checkArgument(
+ (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
+ && primitiveType.getOriginalType() == OriginalType.DECIMAL,
+ "Unexpected type: %s", typeName);
+ return new HeapBytesVector(batchSize);
+ }
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/RunLengthDecoder.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/RunLengthDecoder.java
new file mode 100644
index 0000000000000..d9fc85560a67e
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/RunLengthDecoder.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.cow;
+
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Run length decoder for data and dictionary ids.
+ * See https://github.com/apache/parquet-format/blob/master/Encodings.md
+ * See {@link RunLengthBitPackingHybridDecoder}.
+ *
+ * Note: Reference Flink release 1.11.2
+ * {@code org.apache.flink.formats.parquet.vector.reader.RunLengthDecoder}
+ * because it is package scope.
+ */
+final class RunLengthDecoder {
+
+ /**
+ * If true, the bit width is fixed. This decoder is used in different places and this also
+ * controls if we need to read the bitwidth from the beginning of the data stream.
+ */
+ private final boolean fixedWidth;
+ private final boolean readLength;
+
+ // Encoded data.
+ private ByteBufferInputStream in;
+
+ // bit/byte width of decoded data and utility to batch unpack them.
+ private int bitWidth;
+ private int bytesWidth;
+ private BytePacker packer;
+
+ // Current decoding mode and values
+ RunLengthDecoder.MODE mode;
+ int currentCount;
+ int currentValue;
+
+ // Buffer of decoded values if the values are PACKED.
+ int[] currentBuffer = new int[16];
+ int currentBufferIdx = 0;
+
+ RunLengthDecoder() {
+ this.fixedWidth = false;
+ this.readLength = false;
+ }
+
+ RunLengthDecoder(int bitWidth) {
+ this.fixedWidth = true;
+ this.readLength = bitWidth != 0;
+ initWidthAndPacker(bitWidth);
+ }
+
+ RunLengthDecoder(int bitWidth, boolean readLength) {
+ this.fixedWidth = true;
+ this.readLength = readLength;
+ initWidthAndPacker(bitWidth);
+ }
+
+ /**
+ * Init from input stream.
+ */
+ void initFromStream(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
+ if (fixedWidth) {
+ // initialize for repetition and definition levels
+ if (readLength) {
+ int length = readIntLittleEndian();
+ this.in = in.sliceStream(length);
+ }
+ } else {
+ // initialize for values
+ if (in.available() > 0) {
+ initWidthAndPacker(in.read());
+ }
+ }
+ if (bitWidth == 0) {
+ // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+ this.mode = RunLengthDecoder.MODE.RLE;
+ this.currentCount = valueCount;
+ this.currentValue = 0;
+ } else {
+ this.currentCount = 0;
+ }
+ }
+
+ /**
+ * Initializes the internal state for decoding ints of `bitWidth`.
+ */
+ private void initWidthAndPacker(int bitWidth) {
+ Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+ this.bitWidth = bitWidth;
+ this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ }
+
+ int readInteger() {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+
+ this.currentCount--;
+ switch (mode) {
+ case RLE:
+ return this.currentValue;
+ case PACKED:
+ return this.currentBuffer[currentBufferIdx++];
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ /**
+ * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
+ * populated into `nulls`.
+ */
+ void readDictionaryIds(
+ int total,
+ WritableIntVector values,
+ WritableColumnVector nulls,
+ int rowId,
+ int level,
+ RunLengthDecoder data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ data.readDictionaryIdData(n, values, rowId);
+ } else {
+ nulls.setNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ values.setInt(rowId + i, data.readInteger());
+ } else {
+ nulls.setNullAt(rowId + i);
+ }
+ }
+ break;
+ default:
+ throw new AssertionError();
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ /**
+ * It is used to decode dictionary IDs.
+ */
+ private void readDictionaryIdData(int total, WritableIntVector c, int rowId) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ c.setInts(rowId, n, currentValue);
+ break;
+ case PACKED:
+ c.setInts(rowId, n, currentBuffer, currentBufferIdx);
+ currentBufferIdx += n;
+ break;
+ default:
+ throw new AssertionError();
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ /**
+ * Reads the next varint encoded int.
+ */
+ private int readUnsignedVarInt() throws IOException {
+ int value = 0;
+ int shift = 0;
+ int b;
+ do {
+ b = in.read();
+ value |= (b & 0x7F) << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ return value;
+ }
+
+ /**
+ * Reads the next 4 byte little endian int.
+ */
+ private int readIntLittleEndian() throws IOException {
+ int ch4 = in.read();
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
+ }
+
+ /**
+ * Reads the next byteWidth little endian int.
+ */
+ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+ switch (bytesWidth) {
+ case 0:
+ return 0;
+ case 1:
+ return in.read();
+ case 2: {
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return (ch1 << 8) + ch2;
+ }
+ case 3: {
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return (ch1 << 16) + (ch2 << 8) + ch3;
+ }
+ case 4: {
+ return readIntLittleEndian();
+ }
+ default:
+ throw new RuntimeException("Unreachable");
+ }
+ }
+
+ /**
+ * Reads the next group.
+ */
+ void readNextGroup() {
+ try {
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? RunLengthDecoder.MODE.RLE : RunLengthDecoder.MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
+
+ if (this.currentBuffer.length < this.currentCount) {
+ this.currentBuffer = new int[this.currentCount];
+ }
+ currentBufferIdx = 0;
+ int valueIndex = 0;
+ while (valueIndex < this.currentCount) {
+ // values are bit packed 8 at a time, so reading bitWidth will always work
+ ByteBuffer buffer = in.slice(bitWidth);
+ this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read from input stream", e);
+ }
+ }
+
+ enum MODE {
+ RLE,
+ PACKED
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
new file mode 100644
index 0000000000000..ebd91afedc1fa
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.FormatUtils;
+import org.apache.hudi.source.format.cow.ParquetColumnarRowSplitReader;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
+import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.source.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * The base InputFormat class to read from Hoodie data + log files.
+ *
+ *
Use {@link org.apache.flink.formats.parquet.utils.ParquetRecordReader}
+ * to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
+ * overrides {@link #createInputSplits(int)} and {@link #close()} to change the behaviors.
+ */
+public class MergeOnReadInputFormat
+ extends RichInputFormat {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Configuration conf;
+
+ private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+ private Path[] paths;
+
+ private final MergeOnReadTableState tableState;
+
+ /**
+ * Uniform iterator view for the underneath records.
+ */
+ private transient RecordIterator iterator;
+
+ // for project push down
+ /**
+ * Full table names.
+ */
+ private final List fieldNames;
+
+ /**
+ * Full field data types.
+ */
+ private final List fieldTypes;
+
+ /**
+ * Default partition name when the field value is null.
+ */
+ private final String defaultPartName;
+
+ /**
+ * Required field positions.
+ */
+ private final int[] requiredPos;
+
+ // for limit push down
+ /**
+ * Limit for the reader, -1 when the reading is not limited.
+ */
+ private final long limit;
+
+ /**
+ * Recording the current read count for limit check.
+ */
+ private long currentReadCount = 0;
+
+ public MergeOnReadInputFormat(
+ Configuration conf,
+ Path[] paths,
+ MergeOnReadTableState tableState,
+ List fieldTypes,
+ String defaultPartName,
+ long limit) {
+ this.conf = conf;
+ this.paths = paths;
+ this.tableState = tableState;
+ this.fieldNames = tableState.getRowType().getFieldNames();
+ this.fieldTypes = fieldTypes;
+ this.defaultPartName = defaultPartName;
+ // Needs improvement: this requiredPos is only suitable for parquet reader,
+ // because we need to
+ this.requiredPos = tableState.getRequiredPositions();
+ this.limit = limit;
+ }
+
+ @Override
+ public void open(MergeOnReadInputSplit split) throws IOException {
+ this.currentReadCount = 0L;
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ if (!split.getLogPaths().isPresent()) {
+ // base file only
+ this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+ } else if (!split.getBasePath().isPresent()) {
+ // log files only
+ this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+ } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
+ this.iterator = new SkipMergeIterator(
+ getRequiredSchemaReader(split.getBasePath().get()),
+ getLogFileIterator(split));
+ } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
+ this.iterator = new MergeIterator(
+ hadoopConf,
+ split,
+ this.tableState.getRowType(),
+ this.tableState.getRequiredRowType(),
+ new Schema.Parser().parse(this.tableState.getAvroSchema()),
+ new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
+ this.requiredPos,
+ getFullSchemaReader(split.getTablePath()));
+ } else {
+ throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ + "file path: " + split.getBasePath()
+ + "log paths: " + split.getLogPaths()
+ + "hoodie table path: " + split.getTablePath()
+ + "spark partition Index: " + split.getSplitNumber()
+ + "merge type: " + split.getMergeType());
+ }
+ }
+
+ @Override
+ public void configure(Configuration configuration) {
+ if (this.paths.length == 0) {
+ // file path was not specified yet. Try to set it from the parameters.
+ String filePath = configuration.getString(FlinkOptions.PATH, null);
+ if (filePath == null) {
+ throw new IllegalArgumentException("File path was not specified in input format or configuration.");
+ } else {
+ this.paths = new Path[] { new Path(filePath) };
+ }
+ }
+ // may supports nested files in the future.
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
+ // statistics not supported yet.
+ return null;
+ }
+
+ @Override
+ public MergeOnReadInputSplit[] createInputSplits(int minNumSplits) {
+ return this.tableState.getInputSplits().toArray(new MergeOnReadInputSplit[0]);
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(MergeOnReadInputSplit[] mergeOnReadInputSplits) {
+ return new DefaultInputSplitAssigner(mergeOnReadInputSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (limit > 0 && currentReadCount >= limit) {
+ return true;
+ } else {
+ // log file reaches end ?
+ return this.iterator.reachedEnd();
+ }
+ }
+
+ @Override
+ public RowData nextRecord(RowData o) {
+ currentReadCount++;
+ return this.iterator.nextRecord();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.iterator != null) {
+ this.iterator.close();
+ }
+ this.iterator = null;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException {
+ return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray());
+ }
+
+ private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException {
+ return getReader(path, this.requiredPos);
+ }
+
+ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException {
+ // generate partition specs.
+ LinkedHashMap partSpec = FilePathUtils.extractPartitionKeyValues(
+ new org.apache.flink.core.fs.Path(path).getParent(),
+ this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
+ this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
+ LinkedHashMap partObjects = new LinkedHashMap<>();
+ partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
+ defaultPartName.equals(v) ? null : v,
+ fieldTypes.get(fieldNames.indexOf(k)))));
+
+ return ParquetSplitReaderUtil.genPartColumnarRowReader(
+ this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
+ true,
+ FormatUtils.getParquetConf(this.conf, hadoopConf),
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new DataType[0]),
+ partObjects,
+ requiredPos,
+ DEFAULT_SIZE,
+ new org.apache.flink.core.fs.Path(path),
+ 0,
+ Long.MAX_VALUE); // read the whole file
+ }
+
+ private Iterator getLogFileIterator(MergeOnReadInputSplit split) {
+ final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+ final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
+ final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
+ final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
+ AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+ final Map> logRecords =
+ FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
+ final Iterator logRecordsKeyIterator = logRecords.keySet().iterator();
+
+ return new Iterator() {
+ private RowData currentRecord;
+
+ @Override
+ public boolean hasNext() {
+ if (logRecordsKeyIterator.hasNext()) {
+ String curAvrokey = logRecordsKeyIterator.next();
+ Option curAvroRecord = null;
+ try {
+ curAvroRecord = logRecords.get(curAvrokey).getData().getInsertValue(tableSchema);
+ } catch (IOException e) {
+ throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e);
+ }
+ if (!curAvroRecord.isPresent()) {
+ // delete record found, skipping
+ return hasNext();
+ } else {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ curAvroRecord.get(),
+ requiredSchema,
+ requiredPos,
+ recordBuilder);
+ currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+ };
+ }
+
+ private interface RecordIterator {
+ boolean reachedEnd() throws IOException;
+
+ RowData nextRecord();
+
+ void close() throws IOException;
+ }
+
+ static class BaseFileOnlyIterator implements RecordIterator {
+ // base file reader
+ private final ParquetColumnarRowSplitReader reader;
+
+ BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return this.reader.reachedEnd();
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return this.reader.nextRecord();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+ }
+
+ static class LogFileOnlyIterator implements RecordIterator {
+ // iterator for log files
+ private final Iterator iterator;
+
+ LogFileOnlyIterator(Iterator iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return !this.iterator.hasNext();
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return this.iterator.next();
+ }
+
+ @Override
+ public void close() {
+ // no operation
+ }
+ }
+
+ static class SkipMergeIterator implements RecordIterator {
+ // base file reader
+ private final ParquetColumnarRowSplitReader reader;
+ // iterator for log files
+ private final Iterator iterator;
+
+ private RowData currentRecord;
+
+ SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator iterator) {
+ this.reader = reader;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (!this.reader.reachedEnd()) {
+ currentRecord = this.reader.nextRecord();
+ return false;
+ }
+ if (this.iterator.hasNext()) {
+ currentRecord = this.iterator.next();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+ }
+
+ static class MergeIterator implements RecordIterator {
+ // base file reader
+ private final ParquetColumnarRowSplitReader reader;
+ // log keys used for merging
+ private final Iterator logKeysIterator;
+ // log records
+ private final Map> logRecords;
+
+ private final Schema tableSchema;
+ private final Schema requiredSchema;
+ private final int[] requiredPos;
+ private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
+ private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
+ private final GenericRecordBuilder recordBuilder;
+
+ private Set keyToSkip = new HashSet<>();
+
+ private RowData currentRecord;
+
+ MergeIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ MergeOnReadInputSplit split,
+ RowType tableRowType,
+ RowType requiredRowType,
+ Schema tableSchema,
+ Schema requiredSchema,
+ int[] requiredPos,
+ ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
+ this.tableSchema = tableSchema;
+ this.reader = reader;
+ this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
+ this.logKeysIterator = this.logRecords.keySet().iterator();
+ this.requiredSchema = requiredSchema;
+ this.requiredPos = requiredPos;
+ this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+ this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
+ this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (!this.reader.reachedEnd()) {
+ currentRecord = this.reader.nextRecord();
+ final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+ if (logRecords.containsKey(curKey)) {
+ keyToSkip.add(curKey);
+ Option mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
+ if (!mergedAvroRecord.isPresent()) {
+ // deleted
+ return reachedEnd();
+ } else {
+ GenericRecord record = buildAvroRecordBySchema(
+ mergedAvroRecord.get(),
+ requiredSchema,
+ requiredPos,
+ recordBuilder);
+ this.currentRecord = (RowData) avroToRowDataConverter.convert(record);
+ return false;
+ }
+ }
+ return false;
+ } else {
+ if (logKeysIterator.hasNext()) {
+ final String curKey = logKeysIterator.next();
+ if (keyToSkip.contains(curKey)) {
+ return reachedEnd();
+ } else {
+ Option insertAvroRecord =
+ logRecords.get(curKey).getData().getInsertValue(tableSchema);
+ if (!insertAvroRecord.isPresent()) {
+ // stand alone delete record, skipping
+ return reachedEnd();
+ } else {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ insertAvroRecord.get(),
+ requiredSchema,
+ requiredPos,
+ recordBuilder);
+ this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+
+ private Option mergeRowWithLog(
+ RowData curRow,
+ String curKey) throws IOException {
+ GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
+ return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
new file mode 100644
index 0000000000000..5cf0affaa5e67
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.core.io.InputSplit;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Represents an input split of source, actually a data bucket.
+ */
+public class MergeOnReadInputSplit implements InputSplit {
+ private static final long serialVersionUID = 1L;
+
+ private final int splitNum;
+ private final Option basePath;
+ private final Option> logPaths;
+ private final String latestCommit;
+ private final String tablePath;
+ private final long maxCompactionMemoryInBytes;
+ private final String mergeType;
+
+ public MergeOnReadInputSplit(
+ int splitNum,
+ @Nullable String basePath,
+ Option> logPaths,
+ String latestCommit,
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ String mergeType) {
+ this.splitNum = splitNum;
+ this.basePath = Option.ofNullable(basePath);
+ this.logPaths = logPaths;
+ this.latestCommit = latestCommit;
+ this.tablePath = tablePath;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.mergeType = mergeType;
+ }
+
+ public Option getBasePath() {
+ return basePath;
+ }
+
+ public Option> getLogPaths() {
+ return logPaths;
+ }
+
+ public String getLatestCommit() {
+ return latestCommit;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ public long getMaxCompactionMemoryInBytes() {
+ return maxCompactionMemoryInBytes;
+ }
+
+ public String getMergeType() {
+ return mergeType;
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return this.splitNum;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadTableState.java
new file mode 100644
index 0000000000000..6b90352f52abd
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadTableState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Statistics for merge on read table source.
+ */
+public class MergeOnReadTableState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowType rowType;
+ private final RowType requiredRowType;
+ private final String avroSchema;
+ private final String requiredAvroSchema;
+ private final List inputSplits;
+
+ public MergeOnReadTableState(
+ RowType rowType,
+ RowType requiredRowType,
+ String avroSchema,
+ String requiredAvroSchema,
+ List inputSplits) {
+ this.rowType = rowType;
+ this.requiredRowType = requiredRowType;
+ this.avroSchema = avroSchema;
+ this.requiredAvroSchema = requiredAvroSchema;
+ this.inputSplits = inputSplits;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public RowType getRequiredRowType() {
+ return requiredRowType;
+ }
+
+ public String getAvroSchema() {
+ return avroSchema;
+ }
+
+ public String getRequiredAvroSchema() {
+ return requiredAvroSchema;
+ }
+
+ public List getInputSplits() {
+ return inputSplits;
+ }
+
+ public int[] getRequiredPositions() {
+ final List fieldNames = rowType.getFieldNames();
+ return requiredRowType.getFieldNames().stream()
+ .map(fieldNames::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 21664f3d7d8ee..db72cc18ead35 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -20,11 +20,23 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import java.util.List;
@@ -36,7 +48,7 @@
* Note: Changes in this class need to be kept in sync with the corresponding runtime classes
* {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}.
*
- *
NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ *
NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
*/
public class AvroSchemaConverter {
@@ -143,5 +155,171 @@ public static DataType convertToDataType(Schema schema) {
throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
}
}
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+ *
+ *
Use "record" as the type name.
+ *
+ * @param schema the schema type, usually it should be the top level record type, e.g. not a
+ * nested type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType schema) {
+ return convertToSchema(schema, "record");
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+ *
+ *
The "{rowName}_" is used as the nested row type name prefix in order to generate the right
+ * schema. Nested record type that only differs with type name is still compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
+ int precision;
+ boolean nullable = logicalType.isNullable();
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return SchemaBuilder.builder().nullType();
+ case BOOLEAN:
+ Schema bool = SchemaBuilder.builder().booleanType();
+ return nullable ? nullableSchema(bool) : bool;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ Schema integer = SchemaBuilder.builder().intType();
+ return nullable ? nullableSchema(integer) : integer;
+ case BIGINT:
+ Schema bigint = SchemaBuilder.builder().longType();
+ return nullable ? nullableSchema(bigint) : bigint;
+ case FLOAT:
+ Schema f = SchemaBuilder.builder().floatType();
+ return nullable ? nullableSchema(f) : f;
+ case DOUBLE:
+ Schema d = SchemaBuilder.builder().doubleType();
+ return nullable ? nullableSchema(d) : d;
+ case CHAR:
+ case VARCHAR:
+ Schema str = SchemaBuilder.builder().stringType();
+ return nullable ? nullableSchema(str) : str;
+ case BINARY:
+ case VARBINARY:
+ Schema binary = SchemaBuilder.builder().bytesType();
+ return nullable ? nullableSchema(binary) : binary;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // use long to represents Timestamp
+ final TimestampType timestampType = (TimestampType) logicalType;
+ precision = timestampType.getPrecision();
+ org.apache.avro.LogicalType avroLogicalType;
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.timestampMillis();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support TIMESTAMP type "
+ + "with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullable ? nullableSchema(timestamp) : timestamp;
+ case DATE:
+ // use int to represents Date
+ Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(date) : date;
+ case TIME_WITHOUT_TIME_ZONE:
+ precision = ((TimeType) logicalType).getPrecision();
+ if (precision > 3) {
+ throw new IllegalArgumentException(
+ "Avro does not support TIME type with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ // use int to represents Time, we only support millisecond when deserialization
+ Schema time =
+ LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(time) : time;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ // store BigDecimal as byte[]
+ Schema decimal =
+ LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
+ .addToSchema(SchemaBuilder.builder().bytesType());
+ return nullable ? nullableSchema(decimal) : decimal;
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ List fieldNames = rowType.getFieldNames();
+ // we have to make sure the record name is different in a Schema
+ SchemaBuilder.FieldAssembler builder =
+ SchemaBuilder.builder().record(rowName).fields();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = fieldNames.get(i);
+ LogicalType fieldType = rowType.getTypeAt(i);
+ SchemaBuilder.GenericDefault fieldBuilder =
+ builder.name(fieldName)
+ .type(convertToSchema(fieldType, rowName + "_" + fieldName));
+
+ if (fieldType.isNullable()) {
+ builder = fieldBuilder.withDefault(null);
+ } else {
+ builder = fieldBuilder.noDefault();
+ }
+ }
+ Schema record = builder.endRecord();
+ return nullable ? nullableSchema(record) : record;
+ case MULTISET:
+ case MAP:
+ Schema map =
+ SchemaBuilder.builder()
+ .map()
+ .values(
+ convertToSchema(
+ extractValueTypeToAvroMap(logicalType), rowName));
+ return nullable ? nullableSchema(map) : map;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ Schema array =
+ SchemaBuilder.builder()
+ .array()
+ .items(convertToSchema(arrayType.getElementType(), rowName));
+ return nullable ? nullableSchema(array) : array;
+ case RAW:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported to derive Schema for type: " + logicalType);
+ }
+ }
+
+ private static LogicalType extractValueTypeToAvroMap(LogicalType type) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ } else {
+ MultisetType multisetType = (MultisetType) type;
+ keyType = multisetType.getElementType();
+ valueType = new IntType();
+ }
+ if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "Avro format doesn't support non-string as key type of map. "
+ + "The key type is: "
+ + keyType.asSummaryString());
+ }
+ return valueType;
+ }
+
+ /** Returns schema with nullable true. */
+ private static Schema nullableSchema(Schema schema) {
+ return schema.isNullable()
+ ? schema
+ : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+ }
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
new file mode 100644
index 0000000000000..1ce467f5467e4
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/**
+ * Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}.
+ *
+ * NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ */
+@Internal
+public class AvroToRowDataConverters {
+
+ /**
+ * Runtime converter that converts Avro data structures into objects of Flink Table & SQL
+ * internal data structures.
+ */
+ @FunctionalInterface
+ public interface AvroToRowDataConverter extends Serializable {
+ Object convert(Object object);
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Runtime Converters
+ // -------------------------------------------------------------------------------------
+
+ public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+ final AvroToRowDataConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(AvroToRowDataConverters::createNullableConverter)
+ .toArray(AvroToRowDataConverter[]::new);
+ final int arity = rowType.getFieldCount();
+
+ return avroObject -> {
+ IndexedRecord record = (IndexedRecord) avroObject;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; ++i) {
+ row.setField(i, fieldConverters[i].convert(record.get(i)));
+ }
+ return row;
+ };
+ }
+
+ /**
+ * Creates a runtime converter which is null safe.
+ */
+ private static AvroToRowDataConverter createNullableConverter(LogicalType type) {
+ final AvroToRowDataConverter converter = createConverter(type);
+ return avroObject -> {
+ if (avroObject == null) {
+ return null;
+ }
+ return converter.convert(avroObject);
+ };
+ }
+
+ /**
+ * Creates a runtime converter which assuming input object is not null.
+ */
+ private static AvroToRowDataConverter createConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return avroObject -> null;
+ case TINYINT:
+ return avroObject -> ((Integer) avroObject).byteValue();
+ case SMALLINT:
+ return avroObject -> ((Integer) avroObject).shortValue();
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ return avroObject -> avroObject;
+ case DATE:
+ return AvroToRowDataConverters::convertToDate;
+ case TIME_WITHOUT_TIME_ZONE:
+ return AvroToRowDataConverters::convertToTime;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return AvroToRowDataConverters::convertToTimestamp;
+ case CHAR:
+ case VARCHAR:
+ return avroObject -> StringData.fromString(avroObject.toString());
+ case BINARY:
+ case VARBINARY:
+ return AvroToRowDataConverters::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case ROW:
+ return createRowConverter((RowType) type);
+ case MAP:
+ case MULTISET:
+ return createMapConverter(type);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return avroObject -> {
+ final byte[] bytes;
+ if (avroObject instanceof GenericFixed) {
+ bytes = ((GenericFixed) avroObject).bytes();
+ } else if (avroObject instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) avroObject;
+ bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ } else {
+ bytes = (byte[]) avroObject;
+ }
+ return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+ };
+ }
+
+ private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) {
+ final AvroToRowDataConverter elementConverter =
+ createNullableConverter(arrayType.getElementType());
+ final Class> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+
+ return avroObject -> {
+ final List> list = (List>) avroObject;
+ final int length = list.size();
+ final Object[] array = (Object[]) Array.newInstance(elementClass, length);
+ for (int i = 0; i < length; ++i) {
+ array[i] = elementConverter.convert(list.get(i));
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ private static AvroToRowDataConverter createMapConverter(LogicalType type) {
+ final AvroToRowDataConverter keyConverter =
+ createConverter(DataTypes.STRING().getLogicalType());
+ final AvroToRowDataConverter valueConverter =
+ createNullableConverter(extractValueTypeToAvroMap(type));
+
+ return avroObject -> {
+ final Map, ?> map = (Map, ?>) avroObject;
+ Map result = new HashMap<>();
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ Object key = keyConverter.convert(entry.getKey());
+ Object value = valueConverter.convert(entry.getValue());
+ result.put(key, value);
+ }
+ return new GenericMapData(result);
+ };
+ }
+
+ private static TimestampData convertToTimestamp(Object object) {
+ final long millis;
+ if (object instanceof Long) {
+ millis = (Long) object;
+ } else if (object instanceof Instant) {
+ millis = ((Instant) object).toEpochMilli();
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTimestamp(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIMESTAMP logical type. Received: " + object);
+ }
+ }
+ return TimestampData.fromEpochMillis(millis);
+ }
+
+ private static int convertToDate(Object object) {
+ if (object instanceof Integer) {
+ return (Integer) object;
+ } else if (object instanceof LocalDate) {
+ return (int) ((LocalDate) object).toEpochDay();
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ return (int) jodaConverter.convertDate(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for DATE logical type. Received: " + object);
+ }
+ }
+ }
+
+ private static int convertToTime(Object object) {
+ final int millis;
+ if (object instanceof Integer) {
+ millis = (Integer) object;
+ } else if (object instanceof LocalTime) {
+ millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTime(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIME logical type. Received: " + object);
+ }
+ }
+ return millis;
+ }
+
+ private static byte[] convertToBytes(Object object) {
+ if (object instanceof GenericFixed) {
+ return ((GenericFixed) object).bytes();
+ } else if (object instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) object;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ return (byte[]) object;
+ }
+ }
+
+ /**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the
+ * classpath.
+ */
+ static class JodaConverter {
+
+ private static JodaConverter instance;
+ private static boolean instantiated = false;
+
+ public static JodaConverter getConverter() {
+ if (instantiated) {
+ return instance;
+ }
+
+ try {
+ Class.forName(
+ "org.joda.time.DateTime",
+ false,
+ Thread.currentThread().getContextClassLoader());
+ instance = new JodaConverter();
+ } catch (ClassNotFoundException e) {
+ instance = null;
+ } finally {
+ instantiated = true;
+ }
+ return instance;
+ }
+
+ public long convertDate(Object object) {
+ final org.joda.time.LocalDate value = (org.joda.time.LocalDate) object;
+ return value.toDate().getTime();
+ }
+
+ public int convertTime(Object object) {
+ final org.joda.time.LocalTime value = (org.joda.time.LocalTime) object;
+ return value.get(DateTimeFieldType.millisOfDay());
+ }
+
+ public long convertTimestamp(Object object) {
+ final DateTime value = (DateTime) object;
+ return value.toDate().getTime();
+ }
+
+ private JodaConverter() {
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e56ab22bc6b9c..81a234a1df61c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -21,6 +21,9 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -306,4 +309,31 @@ public static String generateBucketKey(String partitionPath, String fileId) {
public static boolean isInsert(HoodieRecordLocation loc) {
return Objects.equals(loc.getInstantTime(), "I");
}
+
+ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException {
+ LOG.info("Getting table path..");
+ for (Path path : userProvidedPaths) {
+ try {
+ Option tablePath = TablePathUtils.getTablePath(fs, path);
+ if (tablePath.isPresent()) {
+ return tablePath.get().toString();
+ }
+ } catch (HoodieException he) {
+ LOG.warn("Error trying to get table path from " + path.toString(), he);
+ }
+ }
+
+ throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
+ }
+
+ /**
+ * Returns whether needs to schedule the async compaction.
+ * @param conf The flink configuration.
+ */
+ public static boolean needsScheduleCompaction(Configuration conf) {
+ return conf.getString(FlinkOptions.TABLE_TYPE)
+ .toUpperCase(Locale.ROOT)
+ .equals(HoodieTableType.MERGE_ON_READ.name())
+ && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+ }
}
diff --git a/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000000..105f64cfb2067
--- /dev/null
+++ b/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hudi.factory.HoodieTableFactory
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index 3c8b9f6529884..6e1fe475d4d94 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -34,6 +34,7 @@
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.source.ContinuousFileSource;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
@@ -111,14 +112,11 @@ public void testWriteToHoodie() throws Exception {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
- TextInputFormat format = new TextInputFormat(new Path(sourcePath));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
- format.setCharsetName("UTF-8");
-
DataStream dataStream = execEnv
- // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
- .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
@@ -136,14 +134,8 @@ public void testWriteToHoodie() throws Exception {
execEnv.addOperator(dataStream.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
- if (client.getJobStatus().get() != JobStatus.FAILED) {
- try {
- TimeUnit.SECONDS.sleep(8);
- client.cancel();
- } catch (Throwable var1) {
- // ignored
- }
- }
+ // wait for the streaming job to finish
+ client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@@ -175,14 +167,11 @@ public void testWriteToHoodieLegacy() throws Exception {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
- TextInputFormat format = new TextInputFormat(new Path(sourcePath));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
- format.setCharsetName("UTF-8");
-
execEnv
- // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
- .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
@@ -214,14 +203,8 @@ public void testWriteToHoodieLegacy() throws Exception {
.setParallelism(1);
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
- if (client.getJobStatus().get() != JobStatus.FAILED) {
- try {
- TimeUnit.SECONDS.sleep(8);
- client.cancel();
- } catch (Throwable var1) {
- // ignored
- }
- }
+ // wait for the streaming job to finish
+ client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
index 732cc65be0582..aa8be8e4cd2e7 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
@@ -58,7 +58,7 @@ public class StreamWriteOperatorCoordinatorTest {
@BeforeEach
public void before() throws Exception {
coordinator = new StreamWriteOperatorCoordinator(
- TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
+ TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2, false);
coordinator.start();
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
index 8b0eea7d7374c..8766844894318 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -83,7 +83,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
// one function
- this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, 1, false);
this.functionInitializationContext = new MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
index d9e603a1459f3..c28383e3a744d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -20,13 +20,17 @@
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
+import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import java.util.Map;
import java.util.Objects;
/**
@@ -36,14 +40,56 @@ public class TestConfigurations {
private TestConfigurations() {
}
- public static final RowType ROW_TYPE = (RowType) DataTypes.ROW(
+ public static final DataType ROW_DATA_TYPE = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
- .notNull()
- .getLogicalType();
+ .notNull();
+
+ public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType();
+
+ public static final TableSchema TABLE_SCHEMA = TableSchema.builder()
+ .fields(
+ ROW_TYPE.getFieldNames().toArray(new String[0]),
+ ROW_DATA_TYPE.getChildren().toArray(new DataType[0]))
+ .build();
+
+ public static String getCreateHoodieTableDDL(String tableName, Map options) {
+ String createTable = "create table " + tableName + "(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20)\n"
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi'";
+ StringBuilder builder = new StringBuilder(createTable);
+ if (options.size() != 0) {
+ options.forEach((k, v) -> builder.append(",\n")
+ .append(" '").append(k).append("' = '").append(v).append("'"));
+ }
+ builder.append("\n)");
+ return builder.toString();
+ }
+
+ public static String getFileSourceDDL(String tableName) {
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+ return "create table " + tableName + "(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20)\n"
+ + ") with (\n"
+ + " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n"
+ + " 'path' = '" + sourcePath + "'\n"
+ + ")";
+ }
public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index c8c83e909f586..9e671e61533cd 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -25,14 +25,19 @@
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.data.writer.BinaryWriter;
import org.apache.flink.table.runtime.types.InternalSerializers;
@@ -112,27 +117,46 @@ public class TestData {
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
}
- public static List DATA_SET_FOUR = Arrays.asList(
- // update: advance the age by 1
- binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
- TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
- TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
- TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
- TimestampData.fromEpochMillis(5), StringData.fromString("par2")),
- // same with before
- binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
- TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
- // new data
- binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
- TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
- TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
- TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
- );
+ /**
+ * Returns string format of a list of RowData.
+ */
+ public static String rowDataToString(List rows) {
+ DataStructureConverter converter =
+ DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
+ return rows.stream()
+ .map(row -> converter.toExternal(row).toString())
+ .sorted(Comparator.naturalOrder())
+ .collect(Collectors.toList()).toString();
+ }
+
+ /**
+ * Write a list of row data with Hoodie format base on the given configuration.
+ *
+ * @param dataBuffer The data buffer to write
+ * @param conf The flink configuration
+ * @throws Exception if error occurs
+ */
+ public static void writeData(
+ List dataBuffer,
+ Configuration conf) throws Exception {
+ StreamWriteFunctionWrapper funcWrapper = new StreamWriteFunctionWrapper<>(
+ conf.getString(FlinkOptions.PATH),
+ conf);
+ funcWrapper.openFunction();
+
+ for (RowData rowData : dataBuffer) {
+ funcWrapper.invoke(rowData);
+ }
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ funcWrapper.checkpointComplete(1);
+
+ funcWrapper.close();
+ }
/**
* Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
new file mode 100644
index 0000000000000..087954564ff24
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.utils.TestConfigurations;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * IT cases for Hoodie table source and sink.
+ *
+ * Note: should add more SQL cases when batch write is supported.
+ */
+public class HoodieDataSourceITCase extends AbstractTestBase {
+ private TableEnvironment streamTableEnv;
+ private TableEnvironment batchTableEnv;
+
+ @BeforeEach
+ void beforeEach() {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+ streamTableEnv = TableEnvironmentImpl.create(settings);
+ streamTableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ streamTableEnv.getConfig().getConfiguration()
+ .setString("execution.checkpointing.interval", "2s");
+
+ settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ batchTableEnv = TableEnvironmentImpl.create(settings);
+ batchTableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ void testStreamWriteBatchRead() {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ Map options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
+ Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ List rows = CollectionUtil.iterableToList(
+ () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
+ + "id2,Stephen,33,1970-01-01T00:00:02,par1, "
+ + "id3,Julian,53,1970-01-01T00:00:03,par2, "
+ + "id4,Fabian,31,1970-01-01T00:00:04,par2, "
+ + "id5,Sophia,18,1970-01-01T00:00:05,par3, "
+ + "id6,Emma,20,1970-01-01T00:00:06,par3, "
+ + "id7,Bob,44,1970-01-01T00:00:07,par4, "
+ + "id8,Han,56,1970-01-01T00:00:08,par4]";
+ assertRowsEquals(rows, expected);
+ }
+
+ @Test
+ void testBatchWriteAndRead() {
+ Map options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
+ Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ batchTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+ execInsertSql(batchTableEnv, insertInto);
+
+ List rows = CollectionUtil.iterableToList(
+ () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
+ + "id2,Stephen,33,1970-01-01T00:00:02,par1, "
+ + "id3,Julian,53,1970-01-01T00:00:03,par2, "
+ + "id4,Fabian,31,1970-01-01T00:00:04,par2, "
+ + "id5,Sophia,18,1970-01-01T00:00:05,par3, "
+ + "id6,Emma,20,1970-01-01T00:00:06,par3, "
+ + "id7,Bob,44,1970-01-01T00:00:07,par4, "
+ + "id8,Han,56,1970-01-01T00:00:08,par4]";
+ assertRowsEquals(rows, expected);
+ }
+
+ /**
+ * Sort the {@code rows} using field at index 0 and asserts
+ * it equals with the expected string {@code expected}.
+ *
+ * @param rows Actual result rows
+ * @param expected Expected string of the sorted rows
+ */
+ private static void assertRowsEquals(List rows, String expected) {
+ String rowsString = rows.stream()
+ .sorted(Comparator.comparing(o -> o.getField(0).toString()))
+ .collect(Collectors.toList()).toString();
+ assertThat(rowsString, is(expected));
+ }
+
+ private void execInsertSql(TableEnvironment tEnv, String insert) {
+ TableResult tableResult = tEnv.executeSql(insert);
+ // wait to finish
+ try {
+ tableResult.getJobClient().get()
+ .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ } catch (InterruptedException | ExecutionException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java
new file mode 100644
index 0000000000000..f7994574371e5
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test cases for HoodieTableSource.
+ */
+public class HoodieTableSourceTest {
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSourceTest.class);
+
+ private Configuration conf;
+
+ @TempDir
+ File tempFile;
+
+ @BeforeEach
+ void beforeEach() throws IOException {
+ final String path = tempFile.getAbsolutePath();
+ conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ StreamerUtil.initTableIfNotExists(conf);
+ IntStream.range(1, 5)
+ .forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
+ }
+
+ @Test
+ void testGetReadPaths() {
+ HoodieTableSource tableSource = new HoodieTableSource(
+ TestConfigurations.TABLE_SCHEMA,
+ new Path(tempFile.getPath()),
+ Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
+ "default-par",
+ conf);
+ Path[] paths = tableSource.getReadPaths();
+ assertNotNull(paths);
+ String[] names = Arrays.stream(paths).map(Path::getName)
+ .sorted(Comparator.naturalOrder()).toArray(String[]::new);
+ assertThat(Arrays.toString(names), is("[par1, par2, par3, par4]"));
+ // apply partition pruning
+ Map partitions = new HashMap<>();
+ partitions.put("partition", "par1");
+
+ tableSource = (HoodieTableSource) tableSource
+ .applyPartitionPruning(Collections.singletonList(partitions));
+
+ Path[] paths2 = tableSource.getReadPaths();
+ assertNotNull(paths2);
+ String[] names2 = Arrays.stream(paths2).map(Path::getName)
+ .sorted(Comparator.naturalOrder()).toArray(String[]::new);
+ assertThat(Arrays.toString(names2), is("[par1]"));
+ }
+
+ @Test
+ void testGetInputFormat() throws Exception {
+ // write some data to let the TableSchemaResolver get the right instant
+ TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+ HoodieTableSource tableSource = new HoodieTableSource(
+ TestConfigurations.TABLE_SCHEMA,
+ new Path(tempFile.getPath()),
+ Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
+ "default-par",
+ conf);
+ InputFormat inputFormat = tableSource.getInputFormat();
+ assertThat(inputFormat, is(instanceOf(FileInputFormat.class)));
+ conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ inputFormat = tableSource.getInputFormat();
+ assertThat(inputFormat, is(instanceOf(MergeOnReadInputFormat.class)));
+ conf.setString(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ assertThrows(HoodieException.class,
+ () -> tableSource.getInputFormat(),
+ "Invalid query type : 'incremental'. Only 'snapshot' is supported now");
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java b/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java
new file mode 100644
index 0000000000000..8bb529935c478
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.HoodieTableSource;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for MergeOnReadInputFormat and ParquetInputFormat.
+ */
+public class InputFormatTest {
+
+ private HoodieTableSource tableSource;
+ private Configuration conf;
+
+ @TempDir
+ File tempFile;
+
+ void beforeEach(String tableType) throws IOException {
+ conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_TYPE, tableType);
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
+
+ StreamerUtil.initTableIfNotExists(conf);
+ this.tableSource = new HoodieTableSource(
+ TestConfigurations.TABLE_SCHEMA,
+ new Path(tempFile.getAbsolutePath()),
+ Collections.singletonList("partition"),
+ "default",
+ conf);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
+ FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
+ void testRead(String tableType) throws Exception {
+ beforeEach(tableType);
+
+ TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+ InputFormat inputFormat = this.tableSource.getInputFormat();
+
+ List result = readData(inputFormat);
+
+ String actual = TestData.rowDataToString(result);
+ String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+ assertThat(actual, is(expected));
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+ // refresh the input format
+ this.tableSource.reloadActiveTimeline();
+ inputFormat = this.tableSource.getInputFormat();
+
+ result = readData(inputFormat);
+
+ actual = TestData.rowDataToString(result);
+ expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ + "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
+ + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
+ + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ + "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
+ + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
+ + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
+ + "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
+ + "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
+ + "id8,Han,56,1970-01-01T00:00:00.008,par4, "
+ + "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
+ assertThat(actual, is(expected));
+ }
+
+ @Test
+ void testReadBaseAndLogFiles() throws Exception {
+ beforeEach(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+ // write parquet first with compaction
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+ InputFormat inputFormat = this.tableSource.getInputFormat();
+
+ List result = readData(inputFormat);
+
+ String actual = TestData.rowDataToString(result);
+ String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+ assertThat(actual, is(expected));
+
+ // write another commit using logs and read again
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+ // refresh the input format
+ this.tableSource.reloadActiveTimeline();
+ inputFormat = this.tableSource.getInputFormat();
+
+ result = readData(inputFormat);
+
+ actual = TestData.rowDataToString(result);
+ expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ + "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
+ + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
+ + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ + "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
+ + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
+ + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
+ + "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
+ + "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
+ + "id8,Han,56,1970-01-01T00:00:00.008,par4, "
+ + "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
+ assertThat(actual, is(expected));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
+ FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
+ void testReadWithPartitionPrune(String tableType) throws Exception {
+ beforeEach(tableType);
+
+ TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+ Map prunedPartitions = new HashMap<>();
+ prunedPartitions.put("partition", "par1");
+ // prune to only be with partition 'par1'
+ HoodieTableSource newSource = (HoodieTableSource) tableSource
+ .applyPartitionPruning(Collections.singletonList(prunedPartitions));
+ InputFormat inputFormat = newSource.getInputFormat();
+
+ List result = readData(inputFormat);
+
+ String actual = TestData.rowDataToString(result);
+ String expected = "[id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1]";
+ assertThat(actual, is(expected));
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked, rawtypes")
+ private static List readData(InputFormat inputFormat) throws IOException {
+ InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+
+ List result = new ArrayList<>();
+
+ for (InputSplit inputSplit : inputSplits) {
+ inputFormat.open(inputSplit);
+ while (!inputFormat.reachedEnd()) {
+ result.add(TestConfigurations.SERIALIZER.copy((RowData) inputFormat.nextRecord(null))); // no reuse
+ }
+ inputFormat.close();
+ }
+ return result;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java
new file mode 100644
index 0000000000000..171f82dc09962
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils.factory;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.utils.source.ContinuousFileSource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory for ContinuousFileSource.
+ */
+public class ContinuousFileSourceFactory implements TableSourceFactory {
+ public static final String FACTORY_ID = "continuous-file-source";
+
+ @Override
+ public TableSource createTableSource(Context context) {
+ Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
+ Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
+ new ValidationException("Option [path] should be not empty.")));
+ return new ContinuousFileSource(context.getTable().getSchema(), path, conf);
+ }
+
+ @Override
+ public Map requiredContext() {
+ Map context = new HashMap<>();
+ context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
+ return context;
+ }
+
+ @Override
+ public List supportedProperties() {
+ return Collections.singletonList("*");
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
new file mode 100644
index 0000000000000..56f535b7077b9
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A continuous file source that can trigger checkpoints continuously.
+ *
+ * It loads the data in the specified file and split the data into number of checkpoints batches.
+ * Say, if you want 4 checkpoints and there are 8 records in the file, the emit strategy is:
+ *
+ *
+ * | 2 records | 2 records | 2 records | 2 records |
+ * | cp1 | cp2 |cp3 | cp4 |
+ *
+ *
+ * If all the data are flushed out, it waits for the next checkpoint to finish and tear down the source.
+ */
+public class ContinuousFileSource implements StreamTableSource {
+
+ private final TableSchema tableSchema;
+ private final Path path;
+ private final Configuration conf;
+
+ public ContinuousFileSource(
+ TableSchema tableSchema,
+ Path path,
+ Configuration conf) {
+ this.tableSchema = tableSchema;
+ this.path = path;
+ this.conf = conf;
+ }
+
+ @Override
+ public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
+ final RowType rowType = (RowType) this.tableSchema.toRowDataType().getLogicalType();
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType,
+ new RowDataTypeInfo(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+
+ return execEnv.addSource(new BoundedSourceFunction(this.path, 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)),
+ new RowDataTypeInfo(rowType));
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return this.tableSchema;
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ return this.tableSchema.toRowDataType().bridgedTo(RowData.class);
+ }
+
+ /**
+ * Source function that partition the data into given number checkpoints batches.
+ */
+ public static class BoundedSourceFunction implements SourceFunction, CheckpointListener {
+ private final Path path;
+ private List dataBuffer;
+
+ private final int checkpoints;
+ private final AtomicInteger currentCP = new AtomicInteger(0);
+
+ private volatile boolean isRunning = true;
+
+ public BoundedSourceFunction(Path path, int checkpoints) {
+ this.path = path;
+ this.checkpoints = checkpoints;
+ }
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ if (this.dataBuffer == null) {
+ loadDataBuffer();
+ }
+ int oldCP = this.currentCP.get();
+ boolean finish = false;
+ while (isRunning) {
+ int batchSize = this.dataBuffer.size() / this.checkpoints;
+ int start = batchSize * oldCP;
+ synchronized (context.getCheckpointLock()) {
+ for (int i = start; i < start + batchSize; i++) {
+ if (i >= this.dataBuffer.size()) {
+ finish = true;
+ break;
+ // wait for the next checkpoint and exit
+ }
+ context.collect(this.dataBuffer.get(i));
+ }
+ }
+ oldCP++;
+ while (this.currentCP.get() < oldCP) {
+ synchronized (context.getCheckpointLock()) {
+ context.getCheckpointLock().wait(10);
+ }
+ }
+ if (finish || !isRunning) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ }
+
+ private void loadDataBuffer() {
+ this.dataBuffer = new ArrayList<>();
+ try (BufferedReader reader =
+ new BufferedReader(new FileReader(this.path.getPath()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ this.dataBuffer.add(line);
+ // read next line
+ line = reader.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Read file " + this.path + " error", e);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long l) throws Exception {
+ this.currentCP.incrementAndGet();
+ }
+ }
+}
diff --git a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000000..994175038b2c8
--- /dev/null
+++ b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hudi.factory.HoodieTableFactory
+org.apache.hudi.utils.factory.ContinuousFileSourceFactory
diff --git a/style/checkstyle-suppressions.xml b/style/checkstyle-suppressions.xml
index a0a7680db4f10..401a3eadf7a7a 100644
--- a/style/checkstyle-suppressions.xml
+++ b/style/checkstyle-suppressions.xml
@@ -28,4 +28,5 @@
+