Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Jun 5, 2020

@waterlx have did the netflix Poc before. Had a discussion with him before, we're planning to divide the flink connector module into serveral patches, and push it into the offical apache repo.

This is the first pull requset, which would align the data types between flink & iceberg (also introduced the iceberg-flink module with necessary flink dependencies). The following pull requests would be:

  1. flink reader & writer to access the parquet/avro/orc file formats etc. ( background: the flink table will depends on the flink Row data type, which is similar to the iceberg GenericRecord. although the flink reader/writer and iceberg generic reader/writer would share most of code, we need still need to flink row reader/writer, so it will be an abstract pull request).
  2. sink to iceberg by flink data stream API. we have the mature solution to maintain the exactly-once semantics in the poc code and the generic data lake repo, will try to make it to a small pull request.
  3. sink to iceberg by flink SQL. we also did the work in the generic-datalake/iceberg-pro repo, will try to pr this.

For the part-2, we may better to have a doc describe the design behind it because it seems need some flink background to understand the flink operator state and it's design.

build.gradle Outdated
compile project(':iceberg-orc')
compile project(':iceberg-parquet')
compile project(':iceberg-arrow')
compile "org.apache.flink:flink-streaming-java_2.11::tests"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Flink have a Scala 2.12 version? Will it cause a problem to rely on 2.11?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we have scala 2.12 version. I'm not sure what's the specific problem you mean in 2.11, You have more information ? thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is compiling the Iceberg sink against the Flink Scala 2.11 API, right? But Scala isn't binary compatible between minor releases, so Jars compiled for 2.11 may not work with 2.12. How should we avoid problems with binary compatibility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we may need to load the corresponding flink Jars (which is depending on scala ) for the configured scala version. For example, if we choose to use the scala 2.12, then the iceberg-flink module should choose to use all the flink-xxx_2.12 jars (should throw error if the flink-xx_2.12 does not exist).

This may need to write few gradlew scripts or methods. I think it could be considered in a separate issue. There're also some other modules such as iceberg-spark which should handle this issue too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that if we compile against 2.11, it won't work in 2.12. Then we would need two separate builds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, building two iceberg-flink jars and deploying them to the repo, so that the downstream users can choose to use the correct iceberg-flink jar based on their scala version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't intend to ask you to add another module to the build. Sorry about not being clear.

In many cases, it's possible to work with both 2.11 and 2.12 with compiled binaries. We have two internal versions of Spark that we do this with, although Spark defines the catalog and table APIs in Java so we have less to worry about. Ideally, we would have one module that works with both and we would have some plan to test that. Otherwise, it's probably a good idea to move this code into Flink as soon as possible because it makes more sense to maintain just source compatibility there.

For now, I think we should consider options for testing this against both 2.11 and 2.12. Maybe we can add a build property to switch between 2.11 and 2.12 and just test both in CI. Until then, I think we should just build for 2.12. 2.11 hasn't been supported since 2017, so it makes sense for new development to target 2.12.

}
}

public T fields(FieldsDataType dataType, Map<String, Tuple2<String, T>> fieldResults) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fields is always a row, it seems to me that row might be a better name because it is more clear that it is a record/struct/tuple type. It wasn't clear to me that fields was the record type, since most other formats make a distinction between a record and its field list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name fields is matching the FieldsDataType , like the collection is matching the CollectionDataType, map is matching the KeyValueDataType.

Types.NestedField.optional(17, "decimal", Types.DecimalType.of(2, 2))
);

Assert.assertEquals(expectedSchema.toString(), actualSchema.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using toString, use asStruct. Struct implements equals so you can use it in tests.

@rdblue
Copy link
Contributor

rdblue commented Jun 5, 2020

Thanks for working on this, @openinx and @waterlx! This is a great start and I think we should be able to get it in soon.

The plan for getting the rest in sounds good to me, and any docs you can share about how Flink will work with Iceberg would be helpful. Thanks!

build.gradle Outdated
compile project(':iceberg-orc')
compile project(':iceberg-parquet')
compile project(':iceberg-arrow')
compile "org.apache.flink:flink-streaming-java_2.11::tests"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we have scala 2.12 version. I'm not sure what's the specific problem you mean in 2.11, You have more information ? thanks.

}
}

public T fields(FieldsDataType dataType, Map<String, Tuple2<String, T>> fieldResults) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name fields is matching the FieldsDataType , like the collection is matching the CollectionDataType, map is matching the KeyValueDataType.

versions.props Outdated
@@ -1,5 +1,6 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.9.2
org.apache.flink:* = 1.10.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flink version can be upgraded to 1.10.1 now.

@Override
public int hashCode() {
return Objects.hash(struct.fields(), aliasToId == null ? 0 : aliasToId.hashCode());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema doesn't implement equals because it isn't clear what schema equality means. Because we track fields by ID, two schemas are equal only if their fields have the same IDs, but most people don't think about schema equality that way and think of a SQL schema, like id bigint, data string. To avoid confusion, we don't provide an equals method that may have confusing results. Instead, we implement equals for structs so you can use schema.asStruct() in test assertions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

FieldsDataType root = (FieldsDataType) schema.toRowDataType();
Type converted = FlinkTypeVisitor.visit(root, new FlinkTypeToType(root));

return new Schema(converted.asNestedType().asStructType().fields());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asNestedType is no longer necessary. Now asStructType is defined on Type, not just NestedType.


int index = 0;
for (int i = 0; i < rowFields.size(); i++) {
int id = isRoot ? index : getNextId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have variable index when its value is always equal to i?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the index can be removed now (The first version I used a for (String name : types.keySet()) so defined a index variable) .

build.gradle Outdated
}
}

project(':iceberg-flink_2.12') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no source code in flink_2.12, so this module is actually empty. Like I said in the other comment, let's have one 2.12 module for now to get this PR in. Then we can figure out how to test compatibility with 2.11 later.

…taTypes.ARRAY (it should depend on the nullbility of elementType.)
@rdblue
Copy link
Contributor

rdblue commented Jun 11, 2020

This looks ready to go once we deduplicate the Flink modules. Whether we depend on 2.11 or 2.12 doesn't matter to me, but I think we should just have one for now, like I said in the thread.

@openinx
Copy link
Member Author

openinx commented Jun 12, 2020

Hi @rdblue , I've removed the flink module with scala 2.11 as you suggested. I've one minor question, when I generate the dependencies.lock file, seems it contains all the allProcessors such as:

{
    "allProcessors": {
        "com.github.kevinstern:software-and-algorithms": {
            "locked": "1.0",
            "transitive": [
                "com.google.errorprone:error_prone_check_api"
            ]
        },
        "com.github.stephenc.jcip:jcip-annotations": {
            "locked": "1.0-1",
            "transitive": [
                "com.google.errorprone:error_prone_core"
            ]
        },
      ....

That makes the dependencies.lock file to be really large. while I saw the file under the other modules such as iceberg-api module, it only have the compile component. Would this matter ? I followed the guide in your pull requests #1067.
Any other trick to keep only the compile parts ...

@openinx
Copy link
Member Author

openinx commented Jun 15, 2020

Ping @rdblue , any other concern about this pull request ? If no more concern, please help to merge this patch so that I could PR the following patches, Thanks in advance.

@openinx
Copy link
Member Author

openinx commented Jun 17, 2020

Ping @rdblue.

@rdblue rdblue merged commit 20651b1 into apache:master Jun 17, 2020
@rdblue
Copy link
Contributor

rdblue commented Jun 17, 2020

Looks good now. Thanks, @openinx!

Sorry for the delay getting back to this for a review. I was reviewing a few other big patches in the mean time.

@openinx openinx deleted the flink-connector-pr branch June 18, 2020 02:00
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @openinx and @rdblue for your great work, I am from Apache Flink community, I was involved in the Apache beam community for unification of batch and streaming, and I am very interested in iceberg.

Sorry for the late join and review, but I think we can improve the codes in future.

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class FlinkTypeVisitor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink has LogicalTypeVisitor and DataTypeVisitor, they are very useful for visiting types and you don't need have this static <T> T visit method, it is not so elegant.

And for FieldsDataType, it not has a good design in 1.9 and 1.10, so in Flink 1.11, it has been refactored to be removed getFieldDataTypes.

And I think maybe a LogicalTypeVisitor is enough, since we never touch the physical information in the DataTypes.

this.nextId = root.getFieldDataTypes().size();
}

private int getNextId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name is generateNextId or just nextId. Looks like it is not only "get".

}

@Override
public Type collection(CollectionDataType collection, Type elementType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A CollectionDataType may be MultisetType too. Maybe iceberg not support it now? Or we can map it to Map<T, Integer>.


@Override
public Type map(KeyValueDataType map, Type keyType, Type valueType) {
// keys in map are not allowed to be null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But flink can be, so should we throw unsupported exception here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants