Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Jul 2, 2020

The unit tests are passed now, but seems it did not report back to this issue (https://travis-ci.org/github/apache/iceberg/builds/704218624). Ping @rdblue for reviewing.
Thanks.

@rdblue rdblue requested a review from rdsr July 3, 2020 16:27
@rdblue
Copy link
Contributor

rdblue commented Jul 3, 2020

@shardulm94 and @rdsr, could you help review this?

@rdsr
Copy link
Contributor

rdsr commented Jul 3, 2020

I'll have a look this week, thks!

@rdblue rdblue added this to the Flink Sink milestone Jul 7, 2020
@openinx
Copy link
Member Author

openinx commented Jul 8, 2020

@rdsr Please help to take a look when you have time , Thanks.

import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;

public abstract class BaseOrcReader<T> implements OrcRowReader<T> {
Copy link
Contributor

@rdsr rdsr Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are extending the GenericOrcReader to also be used in Flink. Won't that create problems? E.g GenericOrcReader is being used to construct a readerFunc for IcebergGenerics . For instance it uses LocalTime for Iceberg's Time datatype. Won't Flink have its own in-memory representation for primitive types and maybe also for map and list types?

I think it will be better to have a completely separate FlinkOrcReader which does not rely on GenericOrcReader similar to SparkOrcReader. In this way changes to GenericOrcReader won't break FlinkOrcReader and there is no tight coupling between the two.

@rdblue , @openinx thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current flink stable version, Flink is using the Row type with an array of Java objects, it's the most common way for flink now. In feature, it will use RowData interface , whose implementation could be binary-oriented or java object oriented, I think in that time we could separate the FlinkOrcReader. (issue: https://issues.apache.org/jira/browse/FLINK-16995).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about Flink's primitive types do they align with well with Iceberg Generics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with @rdsr - main concern would be the flexibility for changes to GenericOrcReader, but I guess the trade-off is vs code re-usability. If there's confidence that the generics readers are fairly stable then it should not be a huge issue, but the concern seems valid on coupling these readers. I wonder if instead of using inheritance a delegator approach would be possible avoid a tight coupling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about Flink's primitive types do they align with well with Iceberg Generics?

My understanding is that Flink does (or can) use the same representations, except for structs. It would be good to have a response for @openinx or @JingsongLi, though. From looking at the Flink code, not all of the default conversions are these types. VarBinary uses byte[] instead of ByteBuffer and LocalZonedTimestampType uses Instant (but the Javadoc says its behavior is like OffsetDateTime that we use). That said, it looks like Flink might support multiple conversions.

Depending on what Flink uses internally, @rdsr might be right about building a set of readers specific to those types. But if we can make this more generic easily, then I like the idea of doing that. Ideally, I think new object models would be created by providing a few methods to create and read into an object, kind of like our methods to plug in struct types.

Copy link
Contributor

@rdsr rdsr Jul 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I also think the GenericOrcReader is a pretty small wrapper and the bulk of the functionality is provided by the readers/functions for specific types defined in OrcGenericReaders. In that regard extending the GenericOrcReader doesn't buy us much. We can easily share code by picking and choosing the right readers/functions from GenericOrcReaders and providing flink specific type readers where flink types diverge from Iceberg Generics. The good thing about doing this IMO is that we get rid of extending classes which makes code changes brittle and introduces tight coupling

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed that extending the BaseOrcReader and BaseOrcWriter introduces tight coupling, I tried to de-couple flink writer from generic orc writers and let them share the common writers. But seems it's hard to share the codes because we used a static buildConverter method to build the converter for each data type and few Converter depends on the static buildConverter, makes hard to abstract to the common converters. Just curious why did we implement the orc writer in converter way instead of visiting the types by OrcSchemaWithTypeVisitor and generate relative OrcRowWriter (in this way we could share most of the writers.), the current converter seems strange compared to other parquet writers and avro writers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarRd @rdsr @rdblue I did a refactor for the GenericOrcWriter and moved the common writers to GenericOrcWriter, the pull request is here: https://github.com/apache/iceberg/pull/1197/files. Mind to take a look ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada opened an issue for the concern about data types and @JingsongLi clarified the types that Flink uses there. @rdsr was right and it isn't correct to copy generics with a different row type.

Sounds like #1197 is a good start. We should probably reverse how we have refactored the Avro and Parquet generics as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. After the #1197 get merged, I will recreate this patch for reviewing. Thanks.

}

/**
* The interface for the conversion from Spark's SpecializedGetters to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove Spark from comments

Class<T> getJavaClass();

/**
* Take a value from the Spark data value and add it to the ORC output.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here as well

Copy link
Contributor

@rdsr rdsr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks ok to me, but I'm concerned regarding coupling of the GenericOrc[Reader|Writer] and FlinkOrc[Reader|Writer].

}
}

protected abstract Converter<T> createStructConverter(TypeDescription schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems apart from Struct, flink will use the same in-memory objects for map, list and primitive types?

public abstract class BaseOrcWriter<T> implements OrcValueWriter<T> {
private final Converter[] converters;
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use DateTimeUtil.EPOCH and DateTimeUtil.EPOCH_DAY instead.

import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;

public abstract class BaseOrcReader<T> implements OrcRowReader<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with @rdsr - main concern would be the flexibility for changes to GenericOrcReader, but I guess the trade-off is vs code re-usability. If there's confidence that the generics readers are fairly stable then it should not be a huge issue, but the concern seems valid on coupling these readers. I wonder if instead of using inheritance a delegator approach would be possible avoid a tight coupling.

@Override
public Record read(VectorizedRowBatch batch, int row) {
return (Record) reader.read(new StructColumnVector(batch.size, batch.cols), row);
return (Record) getReader().read(new StructColumnVector(batch.size, batch.cols), row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getter methods should not start with get. It doesn't add anything (every method "gets" its return value) and is not idiomatic in other JVM languages.

@Override
@SuppressWarnings("unchecked")
public void write(Row value, VectorizedRowBatch output) {
int row = output.size++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid using return values from ++ expressions. That helps readability because statement order is clear.

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2020

Looks like there are some minor things to clean up, but overall the code changes are close to being ready.

Before merging this, I'd like to understand how Flink will work with the data that is produced so we can make a good decision about whether we should continue with ORC like we have for Parquet and Avro (sharing generics code) or whether we should think about building separate readers and writers for its object model. Thanks for bringing this up, @rdsr!

@openinx
Copy link
Member Author

openinx commented Jul 27, 2020

According to the issue #1215, we've upgraded the flink to 1.11 version and planed to support the RowData avro, parquet, orc readers and writers, so I will create a new pull request with RowData implementation. Close this one now.

@openinx openinx closed this Jul 27, 2020
@openinx openinx deleted the flink-orc branch July 27, 2020 07:48
szehon-ho pushed a commit to szehon-ho/iceberg that referenced this pull request Sep 16, 2024
rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Jun 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants