Skip to content

Conversation

@zhangjun0x01
Copy link
Contributor

@zhangjun0x01 zhangjun0x01 commented May 8, 2021

vectorized read for flink, now I have completed the vectorized read in orc format,I will do parquet format later.

this is sub PR for #2534

@stevenzwu
Copy link
Contributor

@zhangjun0x01 I am very interested in this PR and the general support of vectorized reader. While working on FLIP-27 Flink source [1], one of the open question is if we should extend from Flink's FileSourceSplit and BulkFormat for vectorized reader [2]. Now if we are making the effort in Iceberg to support vectorized readers for Orc and Parquet. Then I think we shouldn't need to extend from Flink's vectorized reader.

cc @openinx

[1] the uber PR for FLIP-27 Iceberg source that are broken down into smaller PRs: #2105
[2] PR for the split reader: #2305

@zhangjun0x01 zhangjun0x01 changed the title Flink : Vectorize read of orc format in flink Flink : vectorized read of orc format in flink May 8, 2021

@Override
public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) {
throw new UnsupportedOperationException();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink has RowColumnVector and ArrayColumnVector, but it doesn't has MapColumnVector , we will add the Map support when flink implements MapColumnVector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a proper error msg like Map vector type not supported yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ,I added the msg.

@zhangjun0x01
Copy link
Contributor Author

@stevenzwu Thanks for your response, could you help me review it ?
@rdblue @openinx

@zhangjun0x01 zhangjun0x01 marked this pull request as draft May 8, 2021 08:57
@openinx
Copy link
Member

openinx commented May 10, 2021

Thanks for the work, I will take a look today.

@zhangjun0x01 zhangjun0x01 force-pushed the flink-vectorized-read branch from cddb51d to 98a299e Compare May 10, 2021 01:09
@zhangjun0x01 zhangjun0x01 marked this pull request as ready for review May 10, 2021 01:11
@zhangjun0x01 zhangjun0x01 force-pushed the flink-vectorized-read branch 2 times, most recently from 7d57f01 to ac1756b Compare May 10, 2021 02:17
boolean enableVectorizedRead = readableConfig.get(FlinkTableOptions.ENABLE_VECTORIZED_READ);

if (enableVectorizedRead) {
if (useOrcVectorizedRead()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other required requisition so that we could apply vectorized read:

  1. All those files from the CombinedScanTask are data files, if there is a delete file, the current deletions apply process are compared row by row which will disable the vectorized read actually.
  2. All the files from CombinedScanTask must be ORC files.
  3. All the columns to read should all be primitives, that means all the byte width should be the same size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. All the columns to read should all be primitives, that means all the byte width should be the same size

We don't support Map, Row, Array data type?

Comment on lines 321 to 216
private static class FlinkRowColumnVector implements RowColumnVector {

private ColumnVector[] fieldVectors;
private StructColumnVector structVector;

FlinkRowColumnVector(ColumnVector[] fieldVectors,
StructColumnVector structVector) {
this.fieldVectors = fieldVectors;
this.structVector = structVector;
}

@Override
public ColumnarRowData getRow(int i) {
VectorizedColumnBatch vectorizedColumnBatch = new VectorizedColumnBatch(fieldVectors);
return new ColumnarRowData(vectorizedColumnBatch, i);
}

@Override
public boolean isNullAt(int i) {
return structVector.isNull[i];
}

public ColumnVector[] getFieldVectors() {
return fieldVectors;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be an inner class of the outside VectorizedFlinkOrcReaders , rather than an inner static class of StructConverter ?

}

static OrcValueReader<StringData> strings() {
public static OrcValueReader<StringData> strings() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why only making these 3 public. what about other package scope methods like times, timestamps, array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is my mistake, I rolled back it

}
}

private static class LongOrcColumnVector implements org.apache.flink.table.data.vector.LongColumnVector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot just import the org.apache.flink.table.data.vector.LongColumnVector because it has been conflicted with the imported one org.apache.orc.storage.ql.exec.vector.LongColumnVector.

import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;

abstract class BaseDataIterator extends DataIterator<RowData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of introducing this new class? DataIterator is already an abstract base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I wanted to introduce a base iterator for RowData, indeed DataIterator<RowData> is enough, I rolled back the code

return ConstantColumnVectors.bytes(constant);

default:
throw new UnsupportedOperationException("Unsupported data type for constant.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the iceberg data type UUID, FIXED, BINARY cases, we should return ConstantColumnVectors.bytes(constant). The correct mapping relationships (iceberg data types <=> flink data types <=> orc data types) are here.

@Override
public VectorizedColumnBatch read(VectorizedRowBatch batch) {
FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert(
new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the batchOffsetInFile is used for metacolumn to get the correct row offset for a given row, mainly used for iceberg format v2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add comment to explain this ? I think it will be easier for people to read this code.

boolean enableVectorizedRead = readableConfig.get(FlinkTableOptions.ENABLE_VECTORIZED_READ);

if (enableVectorizedRead) {
if (useOrcVectorizedRead(split.getTask())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check should be moved inside BatchRowDateIterator. Here the logic should be simple

if (enableVectorizedRead) {
  ... new BatchRowDataIterator
} else {
   ... new RowDataIterator
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also don't need the separate method of setDefaultIterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check should be moved inside BatchRowDateIterator. Here the logic should be simple

if (enableVectorizedRead) {
  ... new BatchRowDataIterator
} else {
   ... new RowDataIterator
}

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also don't need the separate method of setDefaultIterator

At first, I wanted to use non-vectorized read when judge failed of vectorized read, so I extracted this method, but later I think it inappropriate.

}
}

private static class ConstantLongColumnVongector implements LongColumnVector {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo ? ConstantLongColumnVongector -> ConstantLongColumnVector

Comment on lines +99 to +106
public long getLong(int i) {
return (long) constant;
}

@Override
public boolean isNullAt(int i) {
return constant == null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we keep all those ConstantXXColumnVector has the same order for getLong method and isNullAt method ? I see ConstantIntColumnVector put the isNullAt ahead of getInt, but ConstantLongColumnVongector is the reversed order.


case FIXED:
case UUID:
case BINARY:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think we could use the ConstantBytesColumnVector to read the BINARY & FIXED data type because its constant object is a StringData, we will definitely encounter a class cast exception when cast the byte[] to BinaryStringData , do we have an unit test to cover this ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful that the FIXED data type has three constant data type:

  • byte[]
  • GenerateData.Fixed
  • ByteBuffer

We will need to make the ConstantBytesColumnVector handle all those types !

Copy link
Contributor Author

@zhangjun0x01 zhangjun0x01 May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think we could use the ConstantBytesColumnVector to read the BINARY & FIXED data type because its constant object is a StringData, we will definitely encounter a class cast exception when cast the byte[] to BinaryStringData , do we have an unit test to cover this ?

from the code ,for FIXED data type,it will be convert to byte[],so I added the judgment that constant is byte[] instead of throw a cast exception. and add the UT for byte[] data type

@Override
public Bytes getBytes(int i) {
BinaryStringData str = (BinaryStringData) constant;
return new Bytes(str.toBytes(), 0, str.getSizeInBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The str.toBytes() will create a new object byte array each time, for a constant value it's not worth to new byte[] every time. I will suggest to allocate that data bytes in the constructor, and then always return the same instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the code of str.toBytes() ,and I found that it is still more complicated, and it have done a lot of operations. I have not found a suitable method to assign different BinaryStringData to the same byte[].

return (vector, batchSize, batchOffsetInFile) -> {
if (vector instanceof LongColumnVector) {
return new OrcNoHiveLongVector((LongColumnVector) vector);
} else if (vector instanceof DoubleColumnVector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flink's OrcNoHiveDoubleVector have a very strange design, sounds like it could read both float & double data type from it. I was confused by this name a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,the OrcNoHiveLongVector can handle long , int, short etc.

return new OrcNoHiveTimestampVector((TimestampColumnVector) vector);
} else {
throw new UnsupportedOperationException(
"Unsupported vector: " + vector.getClass().getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the iceberg iPrimitive data type & ORC primitive data type in this error message.

StructColumnVector structVector) {
this.fieldVectors = fieldVectors;
this.structVector = structVector;
vectorizedColumnBatch = new VectorizedColumnBatch(fieldVectors);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It will be good if we could algin the code style with the above line : this.vectorizedColumnBatch=new VectorizedColumnBatch(fieldVectors).

import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestVectorizedReads extends FlinkCatalogTestBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is possible to add a parameterized variable named vectorized in the previous flink+ORC test cases , so that we could cover all the existing cases in the test suites ? Let's follow the spark's test approach in this PR if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add our own flink MapColumnVector in apache iceberg repo now so that we could pass all test cases after introducing the vectorized parameterized variable ( Because we've already had many test suites that introduced MAP data types, if the flink vectorized reader don't support MAP data type, then we have to pick those cases one by one, It will be very trivial).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add MapColumnVector in flink instead of iceberg,so I open an issue in flink FLINK-22591 , if we add MapColumnVector in iceberg, and flink also add MapColumnVector in the futher, if the implementation is different, how do we resolve this conflict?

@zhangjun0x01 zhangjun0x01 force-pushed the flink-vectorized-read branch from b6ecd3e to c3c52a1 Compare May 20, 2021 06:44
.withDescription("Sets max infer parallelism for source operator.");

public static final ConfigOption<Boolean> ENABLE_VECTORIZED_READ =
ConfigOptions.key("enable.vectorized.read")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that Iceberg uses hyphen as config separator. maybe enable-vectorized-read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update it,but I read the fields in TableProperties, - and . are both being used

this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.dataTypes = dataTypes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few questions about passing in DataType[] here to check if they contain any unsupported types

  1. I believe the array is only for top level columns. what if a nested field is unsupported type?
  2. Since the DataType[] is only extracted from table or projected schema in FlinkSource, should such validation be done once in the constructor of FlinkInputFormat?
  3. Will vectorized read be able to support all valid types so that we don't need to do this check in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I believe the array is only for top level columns. what if a nested field is unsupported type

Some types are supported, some are not,I add the test case :
https://github.com/zhangjun0x01/iceberg/blob/flink-vectorized-read/flink/src/test/java/org/apache/iceberg/flink/data/vectorized/TestVectorizedReads.java#L224

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. Will vectorized read be able to support all valid types so that we don't need to do this check in the future

I copy the code from flink, I guess that flink should not support all types for vectorized read now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. Since the DataType[] is only extracted from table or projected schema in FlinkSource, should such validation be done once in the constructor of FlinkInputFormat?

I am a little confused and don't know what verification you are referring to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the useOrcVectorizedRead method on the DataType[] that eventually calls isVectorizationUnsupported for each field to validate supported or not. The validation seems only applied at top-level fields.

also a nit on the naming of isVectorizationUnsupported. Maybe rename and change it isVectorizationSupported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we verify all nested types, including multi-level nested, it might be too complicated. I read the code of VectorizedSparkOrcReaders. It also does not do type verification, so I wonder if it is necessary for us to do type verification in flink?

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 17, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jul 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants