Skip to content

Conversation

@JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Jul 8, 2020

Like Spark 3, Flink also has Catalog interface, we can integrate Iceberg catalog to Flink catalog, iceberg as a Flink catalog, users can use Flink DDLs to manipulate iceberg metadata. And query iceberg tables directly.

The mapping between Flink database and Iceberg namespace:

  • Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
  • The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the partition of Flink.

This PR solve #1170

This PR depends on:

@rdblue rdblue added this to the Flink Sink milestone Jul 9, 2020
}

/**
* TODO Implement DDL-string parser for PartitionSpec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add partitioning to the Flink DDL parser instead? That seems like a more appropriate place for it.

Otherwise, I'd recommend just using the PartitionSpecParser.fromJson method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer that adding partitioning to the Flink DDL parser. I'll modify the comments.
Using PartitionSpecParser.fromJson looks very difficult to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some discussions with Flink developers, we can map Iceberg Partition Transform to Flink Computed Column and Partition. We can support it in future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionSpecParser.fromJson is how we serialize partition specs internally. It isn't great to expose it directly to users, but would at least make it possible to configure partitioning. If you have a different approach, that is much better!

How would the computed column and partition approach work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A rough idea, Flink support computed column: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Flink DDL CREATE TABLE T (pk INT, ... dt STRING, year AS YEAR(dt), month AS MONTH(dt), d AS DAY(dt)) PARTITIONED BY(year, month, d) should be same to Spark DDL CREATE TABLE T (pk INT, ... dt STRING) PARTITIONED BY(YEAR(dt), MONTH(dt), DAY(dt)).

The computed columns are not stored in the real data, they are just virtual columns, which means we can map they to iceberg partition transforms of iceberg table in iceberg Flink Catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but there are a couple of things to watch out for:

  • Where possible, we avoid exposing the actual partition values, in order to maintain a separation between logical queries and physical layout. That way, the physical layout can change, but the logical queries will continue to work. In this case, we would need to make sure that the computed columns are tracked separately so that we don't drop the day column when the table gets converted to partitioning by hour.
  • Year, month, and day are functions with concrete behavior for Flink SQL, and Iceberg's partitioning may not align with that behavior. So we probably would not want to supply the data for those columns using Iceberg partition values. Instead, I think we should derive them from the dt field.

Copy link
Contributor Author

@JingsongLi JingsongLi Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points.

For Flink SQL, computed columns are virtual columns, the source and sink can just ignore them, the source just produces columns without computed columns, the Flink core will generate computed columns for input records. For sink, Flink core just give the records without computed columns to connector sink.

  • I see, you mean https://iceberg.apache.org/evolution/#partition-evolution , the computed columns should be calculated by Flink core, iceberg should just deal with its physical logical.
  • There are three types of function: 1.hour,day,month,year are the same as Flink's functions. 2. For truncate, Flink also supports this function, but not support truncate with input type string and bytes, iceberg can provides catalog function (Catalog.getFunction), users can use iceberg_catalog.truncate to create computed column. 3. For bucket, Flink not support this function, so iceberg can provides catalog functions, users can directly use it.

@Override
public void alterPartition(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists
) throws CatalogException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer two options for formatting argument lists. Either aligned with the first argument:

public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
                           boolean ignoreIfNotExists) throws CatalogException {
  ...
}

Or, indented by 2 indents (4 spaces) and aligned with that position:

public void alterPartition(
    ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
    throws CatalogException {
  ...
}

throws can be on the next line, indented to the same place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

/**
* Converter between Flink types and Iceberg type.
* The conversion is not a 1:1 mapping that not allows back-and-forth conversion. So some information might get lost
* during the back-and-forth conversion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific about this? What is a case where information is lost?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is lossy because Iceberg doesn't represent some types that Flink supports, like CHAR(N). Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg to Flink: will loss UUID.
Flink to Iceberg: will loss precisions.

return new VarCharType(VarCharType.MAX_LENGTH);
case UUID:
// UUID length is 16
return new CharType(16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Char? Wouldn't this be fixed-length binary?

Copy link
Contributor Author

@JingsongLi JingsongLi Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought UUID should be a Char with 36 precision because:

  • In Spark, UUID function returns StringType.
  • In Flink, UUID function returns CharType with 36 precision.

But you are right, in Orc and Parquet, UUID just be treated as a fixed-length binary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either CHAR(36) or VARBINARY(16) would work, but not CHAR(16).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose fixed-length binary(16).

this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] { DATABASE }));
}

@After
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this close the catalog after every test method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, every test method will create a new catalog too.
But it seems we can reuse them by catalog name.


@Test
public void testDropNonEmptyNamespace() {
Assume.assumeFalse("Hadoop catalog throws IOException: Directory is not empty.", isHadoopCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a bug in the Hadoop catalog. Can we fix it instead of ignoring this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll modify in this PR. Tell me if I need create a new PR for fixing hadoop catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to fix the Hadoop catalog in a separate PR and leave this one with the Assume until it is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create it.


Assert.assertTrue("Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));

Assert.assertEquals("Should not list any tables", 0, tEnv.listTables().length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call SHOW TABLES?

Copy link
Contributor Author

@JingsongLi JingsongLi Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink 1.10, not support DDL SHOW TABLES. It is supported in 1.11.


@Test
public void testCreateNamespaceWithLocation() throws Exception {
Assume.assumeFalse("HadoopCatalog does not support namespace locations", isHadoopCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test to validate that the CREATE DATABASE statement fails for Hadoop?

@rdblue
Copy link
Contributor

rdblue commented Jul 10, 2020

Thanks @JingsongLi, this looks close. I just had a few questions.

@JingsongLi
Copy link
Contributor Author

Thanks @rdblue for your review, I have addressed your comments.

@JingsongLi
Copy link
Contributor Author

I updated the branch, but the changes were not synchronized to this PR. It seems something wrong in github...

@rdblue rdblue mentioned this pull request Jul 13, 2020
@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2020

@JingsongLi, I think this needs to be rebased now that #1180 is in.

Also, should we get #1174 updated for the comments here so we can merge them separately? If we can, I'd prefer to make the commits smaller.

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Jul 14, 2020

@JingsongLi, I think this needs to be rebased now that #1180 is in.

Also, should we get #1174 updated for the comments here so we can merge them separately? If we can, I'd prefer to make the commits smaller.

Yes, I think we can, I'll update #1174 , create PR for HadoopCatalog bug, create PR for Flink 1.11..

@JingsongLi JingsongLi force-pushed the catalog branch 3 times, most recently from aed7596 to 6a4a84a Compare July 16, 2020 02:37
@JingsongLi JingsongLi changed the title Integrate Iceberg catalog to Flink catalog Flink: Integrate Iceberg catalog to Flink catalog Jul 16, 2020
protected static ConcurrentMap<String, Catalog> flinkCatalogs;

@BeforeClass
public static void startMetastoreAndSpark() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the method names weren't updated.

@rdblue rdblue merged commit b8700c3 into apache:master Jul 20, 2020
@rdblue
Copy link
Contributor

rdblue commented Jul 20, 2020

Thanks for the updates, @JingsongLi! This looks good to me. I'll merge it.

@JingsongLi
Copy link
Contributor Author

Thanks @rdblue for your patient review~

cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
@JingsongLi JingsongLi deleted the catalog branch November 5, 2020 09:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants