Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public class FlinkCatalogFactory implements CatalogFactory {
static CatalogLoader createCatalogLoader(String name, Map<String, String> properties, Configuration hadoopConf) {
String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
if (catalogImpl != null) {
String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
Preconditions.checkArgument(catalogType == null,
"Cannot create catalog %s, both catalog-type and catalog-impl are set: catalog-type=%s, catalog-impl=%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalog-type isn't a property right? Shouldn't this by type=%s instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, looks like Flink sets this differently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have recommendations on which values to set that we can provide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have recommendations on which values to set that we can provide?

You mean only hive or hadoop? 🤔

name, catalogType, catalogImpl);
return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl);
}

Expand All @@ -99,7 +103,8 @@ static CatalogLoader createCatalogLoader(String name, Map<String, String> proper
return CatalogLoader.hadoop(name, hadoopConf, properties);

default:
throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
throw new UnsupportedOperationException("Unknown catalog-type: " + catalogType +
" (Must be 'hive' or 'hadoop')");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestFlinkCatalogFactory {

private Map<String, String> props;

@Before
public void before() {
props = Maps.newHashMap();
props.put("type", "iceberg");
props.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp/location");
}

@Test
public void testCreateCreateCatalogHive() {
String catalogName = "hiveCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);

Catalog catalog = FlinkCatalogFactory
.createCatalogLoader(catalogName, props, new Configuration())
.loadCatalog();

Assertions.assertThat(catalog).isNotNull().isInstanceOf(HiveCatalog.class);
}

@Test
public void testCreateCreateCatalogHadoop() {
String catalogName = "hadoopCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP);

Catalog catalog = FlinkCatalogFactory
.createCatalogLoader(catalogName, props, new Configuration())
.loadCatalog();

Assertions.assertThat(catalog).isNotNull().isInstanceOf(HadoopCatalog.class);
}

@Test
public void testCreateCreateCatalogCustom() {
String catalogName = "customCatalog";
props.put(CatalogProperties.CATALOG_IMPL, CustomHadoopCatalog.class.getName());

Catalog catalog = FlinkCatalogFactory
.createCatalogLoader(catalogName, props, new Configuration())
.loadCatalog();

Assertions.assertThat(catalog).isNotNull().isInstanceOf(CustomHadoopCatalog.class);
}

@Test
public void testCreateCreateCatalogCustomWithHiveCatalogTypeSet() {
String catalogName = "customCatalog";
props.put(CatalogProperties.CATALOG_IMPL, CustomHadoopCatalog.class.getName());
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);

AssertHelpers.assertThrows(
"Should throw when both catalog-type and catalog-impl are set",
IllegalArgumentException.class,
"both catalog-type and catalog-impl are set", () ->
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()));
}

@Test
public void testLoadCatalogUnknown() {
String catalogName = "unknownCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");

AssertHelpers.assertThrows(
"Should throw when an unregistered / unknown catalog is set as the catalog factor's`type` setting",
UnsupportedOperationException.class,
"Unknown catalog-type", () ->
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration())
);
}

public static class CustomHadoopCatalog extends HadoopCatalog {

public CustomHadoopCatalog() {

}

public CustomHadoopCatalog(Configuration conf, String warehouseLocation) {
setConf(conf);
initialize("custom", ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation));
}
}
}
33 changes: 26 additions & 7 deletions site/docs/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,29 @@ For more details, please refer to the [Python Table API](https://ci.apache.org/p

Flink 1.11 support to create catalogs by using flink sql.

### Catalog Configuration

A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and
`<config_key>`=`<config_value>` with catalog implementation config):

```sql
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
```

The following properties can be set globally and are not limited to a specific catalog implementation:

* `type`: Must be `iceberg`. (required)
* `catalog-type`: `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
* `catalog-impl`: The fully-qualified class name custom catalog implementation, must be set if `catalog-type` is unset. (Optional)
* `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional)
* `cache-enabled`: Whether to enable catalog cache, default value is `true`

### Hive catalog

This creates an iceberg catalog named `hive_catalog` that loads tables from a hive metastore:
This creates an iceberg catalog named `hive_catalog` that can be configured using `'catalog-type'='hive'`, which loads tables from a hive metastore:

```sql
CREATE CATALOG hive_catalog WITH (
Expand All @@ -205,14 +225,12 @@ CREATE CATALOG hive_catalog WITH (
);
```

* `type`: Please just use `iceberg` for iceberg table format. (Required)
* `catalog-type`: Iceberg currently support `hive` or `hadoop` catalog type. (Required)
The following properties can be set if using the Hive catalog:

* `uri`: The Hive metastore's thrift URI. (Required)
* `clients`: The Hive metastore client pool size, default value is 2. (Optional)
* `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional)
* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
* `cache-enabled`: Whether to enable catalog cache, default value is `true`

### Hadoop catalog

Expand All @@ -227,14 +245,15 @@ CREATE CATALOG hadoop_catalog WITH (
);
```

The following properties can be set if using the Hadoop catalog:

* `warehouse`: The HDFS directory to store metadata files and data files. (Required)

We could execute the sql command `USE CATALOG hive_catalog` to set the current catalog.

### Custom catalog

Flink also supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property.
When `catalog-impl` is set, the value of `catalog-type` is ignored. Here is an example:
Flink also supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property. Here is an example:

```sql
CREATE CATALOG my_catalog WITH (
Expand Down