From 1c7572124e8fd79af0ca8e79d3b5b4e110251624 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 13 Aug 2020 16:55:48 +0800 Subject: [PATCH 1/3] Flink: Introduce CatalogLoader and TableLoader --- .../apache/iceberg/flink/CatalogLoader.java | 42 +++++++ .../apache/iceberg/flink/FlinkCatalog.java | 16 ++- .../iceberg/flink/FlinkCatalogFactory.java | 37 +++--- .../org/apache/iceberg/flink/TableLoader.java | 107 ++++++++++++++++++ .../iceberg/flink/FlinkCatalogTestBase.java | 8 +- 5 files changed, 177 insertions(+), 33 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/TableLoader.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java new file mode 100644 index 000000000000..15df71da1500 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; + +/** + * Serializable loader to load an Iceberg {@link Catalog}. + */ +public interface CatalogLoader extends Serializable { + + Catalog loadCatalog(Configuration hadoopConf); + + static CatalogLoader hadoop(String name, String warehouseLocation) { + return conf -> new HadoopCatalog(name, conf, warehouseLocation); + } + + static CatalogLoader hive(String name, String uri, int clientPoolSize) { + return conf -> new HiveCatalog(name, uri, clientPoolSize, conf); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index eea4c07d1da0..0029f89500f4 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -47,6 +47,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; import org.apache.flink.util.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -71,6 +72,8 @@ */ public class FlinkCatalog extends AbstractCatalog { + private final CatalogLoader catalogLoader; + private final Configuration hadoopConf; private final Catalog originalCatalog; private final Catalog icebergCatalog; private final String[] baseNamespace; @@ -80,14 +83,17 @@ public FlinkCatalog( String catalogName, String defaultDatabase, String[] baseNamespace, - Catalog icebergCatalog, + CatalogLoader catalogLoader, + Configuration hadoopConf, boolean cacheEnabled) { super(catalogName, defaultDatabase); - this.originalCatalog = icebergCatalog; - this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog; + this.hadoopConf = hadoopConf; + this.originalCatalog = catalogLoader.loadCatalog(hadoopConf); + this.catalogLoader = catalogLoader; + this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog; this.baseNamespace = baseNamespace; - if (icebergCatalog instanceof SupportsNamespaces) { - asNamespaceCatalog = (SupportsNamespaces) icebergCatalog; + if (originalCatalog instanceof SupportsNamespaces) { + asNamespaceCatalog = (SupportsNamespaces) originalCatalog; } else { asNamespaceCatalog = null; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 59f4e8201d8b..ba0cf76cb61b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -27,8 +27,6 @@ import org.apache.flink.table.descriptors.CatalogDescriptorValidator; import org.apache.flink.table.factories.CatalogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -48,7 +46,7 @@ * *

* To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override - * {@link #buildIcebergCatalog(String, Map)}. + * {@link #createCatalogLoader(String, Map)}. */ public class FlinkCatalogFactory implements CatalogFactory { @@ -62,37 +60,23 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String BASE_NAMESPACE = "base-namespace"; /** - * Build an Iceberg {@link org.apache.iceberg.catalog.Catalog} to be used by this Flink catalog adapter. + * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter. * * @param name Flink's catalog name * @param options Flink's catalog options - * @return an Iceberg catalog + * @return an Iceberg catalog loader */ - protected org.apache.iceberg.catalog.Catalog buildIcebergCatalog(String name, Map options) { - Configuration conf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); - return buildIcebergCatalog(name, options, conf); - } - - /** - * Build an Iceberg {@link org.apache.iceberg.catalog.Catalog} to be used by this Flink catalog adapter. - * - * @param name Flink's catalog name - * @param options Flink's catalog options - * @param conf Flink's hadoop configuration - * @return an Iceberg catalog - */ - protected org.apache.iceberg.catalog.Catalog buildIcebergCatalog( - String name, Map options, Configuration conf) { + protected CatalogLoader createCatalogLoader(String name, Map options) { String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, "hive"); switch (catalogType) { case "hive": int clientPoolSize = Integer.parseInt(options.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2")); String uri = options.get(HIVE_URI); - return new HiveCatalog(name, uri, clientPoolSize, conf); + return CatalogLoader.hive(name, uri, clientPoolSize); case "hadoop": String warehouseLocation = options.get(HADOOP_WAREHOUSE_LOCATION); - return new HadoopCatalog(name, conf, warehouseLocation); + return CatalogLoader.hadoop(name, warehouseLocation); default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); @@ -121,12 +105,17 @@ public List supportedProperties() { @Override public Catalog createCatalog(String name, Map properties) { - org.apache.iceberg.catalog.Catalog catalog = buildIcebergCatalog(name, properties); + Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); + return createCatalog(name, properties, hadoopConf); + } + + protected Catalog createCatalog(String name, Map properties, Configuration hadoopConf) { + CatalogLoader catalogLoader = createCatalogLoader(name, properties); String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, "default"); String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ? Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new String[0]) : new String[0]; boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true")); - return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalog, cacheEnabled); + return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, hadoopConf, cacheEnabled); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java new file mode 100644 index 000000000000..540cb5b77990 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; + +/** + * Serializable loader to load an Iceberg {@link Table}. + * Flink needs to get {@link Table} objects in the cluster (for example, to get splits), not just on the client side. + * So we need an Iceberg table loader to get the {@link Table} object. + */ +public interface TableLoader extends Closeable, Serializable { + + void open(Configuration configuration); + + Table loadTable(); + + static TableLoader fromCatalog(CatalogLoader catalogLoader, TableIdentifier identifier) { + return new CatalogTableLoader(catalogLoader, identifier); + } + + static TableLoader fromHadoopTable(String location) { + return new HadoopTableLoader(location); + } + + class HadoopTableLoader implements TableLoader { + + private static final long serialVersionUID = 1L; + + private final String location; + private transient HadoopTables tables; + + private HadoopTableLoader(String location) { + this.location = location; + } + + @Override + public void open(Configuration configuration) { + tables = new HadoopTables(configuration); + } + + @Override + public Table loadTable() { + return tables.load(location); + } + + @Override + public void close() { + } + } + + class CatalogTableLoader implements TableLoader { + + private static final long serialVersionUID = 1L; + + private final CatalogLoader catalogLoader; + private final String identifier; + + private transient Catalog catalog; + + private CatalogTableLoader(CatalogLoader catalogLoader, TableIdentifier tableIdentifier) { + this.catalogLoader = catalogLoader; + this.identifier = tableIdentifier.toString(); + } + + @Override + public void open(Configuration configuration) { + catalog = catalogLoader.loadCatalog(configuration); + } + + @Override + public Table loadTable() { + return catalog.loadTable(TableIdentifier.parse(identifier)); + } + + @Override + public void close() throws IOException { + if (catalog instanceof Closeable) { + ((Closeable) catalog).close(); + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java index 387b927ac875..ab2230e364d7 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java @@ -31,6 +31,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.ArrayUtils; import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -103,10 +104,9 @@ public FlinkCatalogTestBase(String catalogName, String[] baseNamespace) { FlinkCatalogFactory factory = new FlinkCatalogFactory() { @Override - protected Catalog buildIcebergCatalog(String name, Map options) { - // Flink hadoop configuration depends on system env, it is quiet hard to set from testing. So directly pass - // correct hadoop configuration. - return super.buildIcebergCatalog(name, options, hiveConf); + protected org.apache.flink.table.catalog.Catalog createCatalog( + String name, Map properties, Configuration hadoopConf) { + return super.createCatalog(name, properties, hiveConf); } }; tEnv.registerCatalog( From de62dac2cae6c391f4efa44023cc3f4120105dc4 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 17 Aug 2020 16:18:35 +0800 Subject: [PATCH 2/3] Add util clusterHadoopConf --- .../java/org/apache/iceberg/flink/FlinkCatalogFactory.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index ba0cf76cb61b..ecf8923ac9bb 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -105,8 +105,7 @@ public List supportedProperties() { @Override public Catalog createCatalog(String name, Map properties) { - Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); - return createCatalog(name, properties, hadoopConf); + return createCatalog(name, properties, clusterHadoopConf()); } protected Catalog createCatalog(String name, Map properties, Configuration hadoopConf) { @@ -118,4 +117,8 @@ protected Catalog createCatalog(String name, Map properties, Con boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true")); return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, hadoopConf, cacheEnabled); } + + public static Configuration clusterHadoopConf() { + return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); + } } From 103997e68910e767173f86edaf5ebb6c9edafc45 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 18 Aug 2020 13:27:57 +0800 Subject: [PATCH 3/3] Address comments --- .../org/apache/iceberg/flink/FlinkCatalog.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 0029f89500f4..4182964a8679 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -74,10 +74,10 @@ public class FlinkCatalog extends AbstractCatalog { private final CatalogLoader catalogLoader; private final Configuration hadoopConf; - private final Catalog originalCatalog; private final Catalog icebergCatalog; private final String[] baseNamespace; private final SupportsNamespaces asNamespaceCatalog; + private final Closeable closeable; public FlinkCatalog( String catalogName, @@ -88,15 +88,13 @@ public FlinkCatalog( boolean cacheEnabled) { super(catalogName, defaultDatabase); this.hadoopConf = hadoopConf; - this.originalCatalog = catalogLoader.loadCatalog(hadoopConf); this.catalogLoader = catalogLoader; - this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog; this.baseNamespace = baseNamespace; - if (originalCatalog instanceof SupportsNamespaces) { - asNamespaceCatalog = (SupportsNamespaces) originalCatalog; - } else { - asNamespaceCatalog = null; - } + + Catalog originalCatalog = catalogLoader.loadCatalog(hadoopConf); + icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog; + asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null; + closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null; } @Override @@ -105,9 +103,9 @@ public void open() throws CatalogException { @Override public void close() throws CatalogException { - if (originalCatalog instanceof Closeable) { + if (closeable != null) { try { - ((Closeable) originalCatalog).close(); + closeable.close(); } catch (IOException e) { throw new CatalogException(e); }