diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
index c615283c7c5ad..3cb491cfaf575 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
@@ -18,11 +18,10 @@
package org.apache.hudi.table.format.cow;
-import org.apache.hudi.table.format.cow.data.ColumnarRowData;
-import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
-
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 12d63aa974a5d..ca1408dcb7a5c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -22,7 +22,6 @@
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
@@ -41,6 +40,7 @@
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
import org.apache.flink.table.data.vector.heap.HeapByteVector;
import org.apache.flink.table.data.vector.heap.HeapBytesVector;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
deleted file mode 100644
index a16a4dd8d0142..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.data;
-
-import org.apache.hudi.table.format.cow.vector.MapColumnVector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.TypedSetters;
-import org.apache.flink.table.data.vector.ArrayColumnVector;
-import org.apache.flink.table.data.vector.BooleanColumnVector;
-import org.apache.flink.table.data.vector.ByteColumnVector;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.DecimalColumnVector;
-import org.apache.flink.table.data.vector.DoubleColumnVector;
-import org.apache.flink.table.data.vector.FloatColumnVector;
-import org.apache.flink.table.data.vector.IntColumnVector;
-import org.apache.flink.table.data.vector.LongColumnVector;
-import org.apache.flink.table.data.vector.RowColumnVector;
-import org.apache.flink.table.data.vector.ShortColumnVector;
-import org.apache.flink.table.data.vector.TimestampColumnVector;
-
-import java.util.Arrays;
-
-/**
- * Columnar array to support access to vector column data.
- *
- *
References {@code org.apache.flink.table.data.ColumnarArrayData} to include FLINK-15390.
- */
-public final class ColumnarArrayData implements ArrayData, TypedSetters {
-
- private final ColumnVector data;
- private final int offset;
- private final int numElements;
-
- public ColumnarArrayData(ColumnVector data, int offset, int numElements) {
- this.data = data;
- this.offset = offset;
- this.numElements = numElements;
- }
-
- @Override
- public int size() {
- return numElements;
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return data.isNullAt(offset + pos);
- }
-
- @Override
- public void setNullAt(int pos) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return ((BooleanColumnVector) data).getBoolean(offset + pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return ((ByteColumnVector) data).getByte(offset + pos);
- }
-
- @Override
- public short getShort(int pos) {
- return ((ShortColumnVector) data).getShort(offset + pos);
- }
-
- @Override
- public int getInt(int pos) {
- return ((IntColumnVector) data).getInt(offset + pos);
- }
-
- @Override
- public long getLong(int pos) {
- return ((LongColumnVector) data).getLong(offset + pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return ((FloatColumnVector) data).getFloat(offset + pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return ((DoubleColumnVector) data).getDouble(offset + pos);
- }
-
- @Override
- public StringData getString(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
- }
-
- @Override
- public DecimalData getDecimal(int pos, int precision, int scale) {
- return ((DecimalColumnVector) data).getDecimal(offset + pos, precision, scale);
- }
-
- @Override
- public TimestampData getTimestamp(int pos, int precision) {
- return ((TimestampColumnVector) data).getTimestamp(offset + pos, precision);
- }
-
- @Override
- public RawValueData getRawValue(int pos) {
- throw new UnsupportedOperationException("RawValueData is not supported.");
- }
-
- @Override
- public byte[] getBinary(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- return Arrays.copyOfRange(byteArray.data, byteArray.offset, byteArray.len);
- }
- }
-
- @Override
- public ArrayData getArray(int pos) {
- return ((ArrayColumnVector) data).getArray(offset + pos);
- }
-
- @Override
- public MapData getMap(int pos) {
- return ((MapColumnVector) data).getMap(offset + pos);
- }
-
- @Override
- public RowData getRow(int pos, int numFields) {
- return ((RowColumnVector) data).getRow(offset + pos);
- }
-
- @Override
- public void setBoolean(int pos, boolean value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setByte(int pos, byte value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setShort(int pos, short value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setInt(int pos, int value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setLong(int pos, long value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setFloat(int pos, float value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDouble(int pos, double value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDecimal(int pos, DecimalData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setTimestamp(int pos, TimestampData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean[] toBooleanArray() {
- boolean[] res = new boolean[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getBoolean(i);
- }
- return res;
- }
-
- @Override
- public byte[] toByteArray() {
- byte[] res = new byte[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getByte(i);
- }
- return res;
- }
-
- @Override
- public short[] toShortArray() {
- short[] res = new short[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getShort(i);
- }
- return res;
- }
-
- @Override
- public int[] toIntArray() {
- int[] res = new int[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getInt(i);
- }
- return res;
- }
-
- @Override
- public long[] toLongArray() {
- long[] res = new long[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getLong(i);
- }
- return res;
- }
-
- @Override
- public float[] toFloatArray() {
- float[] res = new float[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getFloat(i);
- }
- return res;
- }
-
- @Override
- public double[] toDoubleArray() {
- double[] res = new double[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getDouble(i);
- }
- return res;
- }
-
- private BytesColumnVector.Bytes getByteArray(int pos) {
- return ((BytesColumnVector) data).getBytes(offset + pos);
- }
-}
-
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
deleted file mode 100644
index 9792e87ec9365..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.data;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.ColumnarArrayData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Columnar map to support access to vector column data.
- *
- * Referenced from flink 1.14.0 {@code org.apache.flink.table.data.ColumnarMapData}.
- */
-public final class ColumnarMapData implements MapData {
-
- private final ColumnVector keyColumnVector;
- private final ColumnVector valueColumnVector;
- private final int offset;
- private final int size;
-
- public ColumnarMapData(
- ColumnVector keyColumnVector,
- ColumnVector valueColumnVector,
- int offset,
- int size) {
- this.keyColumnVector = keyColumnVector;
- this.valueColumnVector = valueColumnVector;
- this.offset = offset;
- this.size = size;
- }
-
- @Override
- public int size() {
- return size;
- }
-
- @Override
- public ArrayData keyArray() {
- return new ColumnarArrayData(keyColumnVector, offset, size);
- }
-
- @Override
- public ArrayData valueArray() {
- return new ColumnarArrayData(valueColumnVector, offset, size);
- }
-
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- "ColumnarMapData do not support equals, please compare fields one by one!");
- }
-
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- "ColumnarMapData do not support hashCode, please hash fields one by one!");
- }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
deleted file mode 100644
index ebb4ca26fa87d..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.data;
-
-import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.TypedSetters;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.types.RowKind;
-
-/**
- * Columnar row to support access to vector column data.
- * It is a row view in {@link VectorizedColumnBatch}.
- *
- *
References {@code org.apache.flink.table.data.ColumnarRowData} to include FLINK-15390.
- */
-public final class ColumnarRowData implements RowData, TypedSetters {
-
- private RowKind rowKind = RowKind.INSERT;
- private VectorizedColumnBatch vectorizedColumnBatch;
- private int rowId;
-
- public ColumnarRowData() {
- }
-
- public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
- this(vectorizedColumnBatch, 0);
- }
-
- public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, int rowId) {
- this.vectorizedColumnBatch = vectorizedColumnBatch;
- this.rowId = rowId;
- }
-
- public void setVectorizedColumnBatch(VectorizedColumnBatch vectorizedColumnBatch) {
- this.vectorizedColumnBatch = vectorizedColumnBatch;
- this.rowId = 0;
- }
-
- public void setRowId(int rowId) {
- this.rowId = rowId;
- }
-
- @Override
- public RowKind getRowKind() {
- return rowKind;
- }
-
- @Override
- public void setRowKind(RowKind kind) {
- this.rowKind = kind;
- }
-
- @Override
- public int getArity() {
- return vectorizedColumnBatch.getArity();
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return vectorizedColumnBatch.isNullAt(rowId, pos);
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return vectorizedColumnBatch.getBoolean(rowId, pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return vectorizedColumnBatch.getByte(rowId, pos);
- }
-
- @Override
- public short getShort(int pos) {
- return vectorizedColumnBatch.getShort(rowId, pos);
- }
-
- @Override
- public int getInt(int pos) {
- return vectorizedColumnBatch.getInt(rowId, pos);
- }
-
- @Override
- public long getLong(int pos) {
- return vectorizedColumnBatch.getLong(rowId, pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return vectorizedColumnBatch.getFloat(rowId, pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return vectorizedColumnBatch.getDouble(rowId, pos);
- }
-
- @Override
- public StringData getString(int pos) {
- BytesColumnVector.Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
- }
-
- @Override
- public DecimalData getDecimal(int pos, int precision, int scale) {
- return vectorizedColumnBatch.getDecimal(rowId, pos, precision, scale);
- }
-
- @Override
- public TimestampData getTimestamp(int pos, int precision) {
- return vectorizedColumnBatch.getTimestamp(rowId, pos, precision);
- }
-
- @Override
- public RawValueData getRawValue(int pos) {
- throw new UnsupportedOperationException("RawValueData is not supported.");
- }
-
- @Override
- public byte[] getBinary(int pos) {
- BytesColumnVector.Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- byte[] ret = new byte[byteArray.len];
- System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len);
- return ret;
- }
- }
-
- @Override
- public RowData getRow(int pos, int numFields) {
- return vectorizedColumnBatch.getRow(rowId, pos);
- }
-
- @Override
- public ArrayData getArray(int pos) {
- return vectorizedColumnBatch.getArray(rowId, pos);
- }
-
- @Override
- public MapData getMap(int pos) {
- return vectorizedColumnBatch.getMap(rowId, pos);
- }
-
- @Override
- public void setNullAt(int pos) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setBoolean(int pos, boolean value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setByte(int pos, byte value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setShort(int pos, short value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setInt(int pos, int value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setLong(int pos, long value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setFloat(int pos, float value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDouble(int pos, double value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDecimal(int pos, DecimalData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setTimestamp(int pos, TimestampData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- "ColumnarRowData do not support equals, please compare fields one by one!");
- }
-
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- "ColumnarRowData do not support hashCode, please hash fields one by one!");
- }
-}
-
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
index f4c15b6a9b366..edd90714c87a7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -18,9 +18,8 @@
package org.apache.hudi.table.format.cow.vector;
-import org.apache.hudi.table.format.cow.data.ColumnarArrayData;
-
import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.ColumnarArrayData;
import org.apache.flink.table.data.vector.ArrayColumnVector;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index f05a2e73431d0..2b34a02f116b3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -18,10 +18,10 @@
package org.apache.hudi.table.format.cow.vector;
-import org.apache.hudi.table.format.cow.data.ColumnarMapData;
-
+import org.apache.flink.table.data.ColumnarMapData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.MapColumnVector;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index ad05a612c7bde..0193e6cbb1d22 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -18,8 +18,9 @@
package org.apache.hudi.table.format.cow.vector;
-import org.apache.hudi.table.format.cow.data.ColumnarRowData;
-
+import org.apache.flink.table.data.ColumnarRowData;
+import org.apache.flink.table.data.vector.RowColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
deleted file mode 100644
index 38424dad7d3a7..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Map column vector.
- */
-public interface MapColumnVector extends ColumnVector {
- MapData getMap(int i);
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
deleted file mode 100644
index 293af7b9cf2eb..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.hudi.table.format.cow.data.ColumnarRowData;
-
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Row column vector.
- */
-public interface RowColumnVector extends ColumnVector {
- ColumnarRowData getRow(int i);
-}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
deleted file mode 100644
index 9eee55d1eeae6..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.vector.ArrayColumnVector;
-import org.apache.flink.table.data.vector.BooleanColumnVector;
-import org.apache.flink.table.data.vector.ByteColumnVector;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.DecimalColumnVector;
-import org.apache.flink.table.data.vector.DoubleColumnVector;
-import org.apache.flink.table.data.vector.FloatColumnVector;
-import org.apache.flink.table.data.vector.IntColumnVector;
-import org.apache.flink.table.data.vector.LongColumnVector;
-import org.apache.flink.table.data.vector.ShortColumnVector;
-import org.apache.flink.table.data.vector.TimestampColumnVector;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the unit
- * of query execution, organized to minimize the cost per row.
- *
- * {@code VectorizedColumnBatch}s are influenced by Apache Hive VectorizedRowBatch.
- *
- *
References {@code org.apache.flink.table.data.vector.VectorizedColumnBatch} to include FLINK-15390.
- */
-public class VectorizedColumnBatch implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * This number is carefully chosen to minimize overhead and typically allows one
- * VectorizedColumnBatch to fit in cache.
- */
- public static final int DEFAULT_SIZE = 2048;
-
- private int numRows;
- public final ColumnVector[] columns;
-
- public VectorizedColumnBatch(ColumnVector[] vectors) {
- this.columns = vectors;
- }
-
- public void setNumRows(int numRows) {
- this.numRows = numRows;
- }
-
- public int getNumRows() {
- return numRows;
- }
-
- public int getArity() {
- return columns.length;
- }
-
- public boolean isNullAt(int rowId, int colId) {
- return columns[colId].isNullAt(rowId);
- }
-
- public boolean getBoolean(int rowId, int colId) {
- return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
- }
-
- public byte getByte(int rowId, int colId) {
- return ((ByteColumnVector) columns[colId]).getByte(rowId);
- }
-
- public short getShort(int rowId, int colId) {
- return ((ShortColumnVector) columns[colId]).getShort(rowId);
- }
-
- public int getInt(int rowId, int colId) {
- return ((IntColumnVector) columns[colId]).getInt(rowId);
- }
-
- public long getLong(int rowId, int colId) {
- return ((LongColumnVector) columns[colId]).getLong(rowId);
- }
-
- public float getFloat(int rowId, int colId) {
- return ((FloatColumnVector) columns[colId]).getFloat(rowId);
- }
-
- public double getDouble(int rowId, int colId) {
- return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
- }
-
- public BytesColumnVector.Bytes getByteArray(int rowId, int colId) {
- return ((BytesColumnVector) columns[colId]).getBytes(rowId);
- }
-
- private byte[] getBytes(int rowId, int colId) {
- BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- return byteArray.getBytes();
- }
- }
-
- public String getString(int rowId, int colId) {
- BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
- return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
- }
-
- public DecimalData getDecimal(int rowId, int colId, int precision, int scale) {
- return ((DecimalColumnVector) (columns[colId])).getDecimal(rowId, precision, scale);
- }
-
- public TimestampData getTimestamp(int rowId, int colId, int precision) {
- return ((TimestampColumnVector) (columns[colId])).getTimestamp(rowId, precision);
- }
-
- public ArrayData getArray(int rowId, int colId) {
- return ((ArrayColumnVector) columns[colId]).getArray(rowId);
- }
-
- public RowData getRow(int rowId, int colId) {
- return ((RowColumnVector) columns[colId]).getRow(rowId);
- }
-
- public MapData getMap(int rowId, int colId) {
- return ((MapColumnVector) columns[colId]).getMap(rowId);
- }
-}
-
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 5734e2257b447..43116c2ff4d4d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -1171,6 +1171,7 @@ void testParquetComplexNestedRowTypes(String operation) {
String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_array array")
+ .field("int_array array")
.field("f_map map")
.field("f_row row(f_nested_array array, f_nested_row row(f_row_f0 int, f_row_f1 varchar(10)))")
.pkField("f_int")
@@ -1185,9 +1186,9 @@ void testParquetComplexNestedRowTypes(String operation) {
List result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
- + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[[abc1, def1], +I[1, abc1]]], "
- + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[[abc2, def2], +I[2, abc2]]], "
- + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[[abc3, def3], +I[3, abc3]]]]";
+ + "+I[1, [abc1, def1], [1, 1], {abc1=1, def1=3}, +I[[abc1, def1], +I[1, abc1]]], "
+ + "+I[2, [abc2, def2], [2, 2], {def2=3, abc2=1}, +I[[abc2, def2], +I[2, abc2]]], "
+ + "+I[3, [abc3, def3], [3, 3], {def3=3, abc3=1}, +I[[abc3, def3], +I[3, abc3]]]]";
assertRowsEquals(result, expected);
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 595d142b7cc0d..1695e4e7149a9 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -58,7 +58,7 @@ private TestSQL() {
+ "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, 'abc3'))";
public static final String COMPLEX_NESTED_ROW_TYPE_INSERT_T1 = "insert into t1 values\n"
- + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
- + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
- + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";
+ + "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
+ + "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ + "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";
}