From 4c51a347769516ca72ad9843937a178dcf4f2912 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 9 Mar 2023 15:41:19 -0800 Subject: [PATCH 1/5] Core: Allow customizing FileIO in REST catalog. --- .../iceberg/rest/RESTSessionCatalog.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index fb6660f3a2ec..27cfc29c0291 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -98,6 +98,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog OAuth2Properties.SAML1_TOKEN_TYPE); private final Function, RESTClient> clientBuilder; + private Function, FileIO> ioBuilder; private Cache sessions = null; private AuthSession catalogAuth = null; private boolean keepTokenRefreshed = true; @@ -187,10 +188,7 @@ public void initialize(String name, Map unresolved) { client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth); } - String ioImpl = mergedProps.get(CatalogProperties.FILE_IO_IMPL); - this.io = - CatalogUtil.loadFileIO( - ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf); + this.io = newFileIO(mergedProps); this.snapshotMode = SnapshotMode.valueOf( @@ -205,6 +203,11 @@ public void initialize(String name, Map unresolved) { super.initialize(name, mergedProps); } + public void setFileIOBuilder(Function, FileIO> ioBuilder) { + Preconditions.checkState(null == io, "Cannot set IO builder after calling initialize"); + this.ioBuilder = ioBuilder; + } + private AuthSession session(SessionContext context) { AuthSession session = sessions.get( @@ -762,16 +765,25 @@ private String fullTableName(TableIdentifier ident) { return String.format("%s.%s", name(), ident); } + private FileIO newFileIO(Map properties) { + if (null != ioBuilder) { + return ioBuilder.apply(properties); + } else { + String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + return + CatalogUtil.loadFileIO( + ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf); + } + } + private FileIO tableFileIO(Map config) { if (config.isEmpty()) { return io; // reuse client and io since config is the same } Map fullConf = RESTUtil.merge(properties(), config); - String ioImpl = fullConf.get(CatalogProperties.FILE_IO_IMPL); - return CatalogUtil.loadFileIO( - ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), fullConf, this.conf); + return newFileIO(fullConf); } private AuthSession tableSession(Map tableConf, AuthSession parent) { From dd65c2db8ef5b7d781e4378c1b34ff1ec87f592b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Mar 2023 09:56:12 -0800 Subject: [PATCH 2/5] Apply spotless. --- .../java/org/apache/iceberg/rest/RESTSessionCatalog.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 27cfc29c0291..624c4e4a05c6 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -770,9 +770,8 @@ private FileIO newFileIO(Map properties) { return ioBuilder.apply(properties); } else { String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL); - return - CatalogUtil.loadFileIO( - ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf); + return CatalogUtil.loadFileIO( + ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf); } } From 33ad0bea369dfbf42c354b977383d41550885ba9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Mar 2023 10:05:20 -0800 Subject: [PATCH 3/5] Core: Allow customizing FileIO in JDBC catalog. --- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 21 ++++++++++++++----- .../iceberg/rest/RESTSessionCatalog.java | 6 +++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 9ddaf2c3119a..58f92a8b21ac 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -68,12 +69,13 @@ public class JdbcCatalog extends BaseMetastoreCatalog private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class); private static final Joiner SLASH = Joiner.on("/"); - private FileIO io; + private FileIO io = null; private String catalogName = "jdbc"; private String warehouseLocation; private Object conf; private JdbcClientPool connections; private Map catalogProperties; + private Function, FileIO> ioBuilder; public JdbcCatalog() {} @@ -95,10 +97,14 @@ public void initialize(String name, Map properties) { this.catalogName = name; } - String fileIOImpl = - properties.getOrDefault( - CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO"); - this.io = CatalogUtil.loadFileIO(fileIOImpl, properties, conf); + if (null != ioBuilder) { + this.io = ioBuilder.apply(properties); + } else { + String ioImpl = + properties.getOrDefault( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO"); + this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf); + } try { LOG.debug("Connecting to JDBC database {}", properties.get(CatalogProperties.URI)); @@ -116,6 +122,11 @@ public void initialize(String name, Map properties) { } } + public void setFileIOBuilder(Function, FileIO> ioBuilder) { + Preconditions.checkState(null == io, "Cannot set IO builder after calling initialize"); + this.ioBuilder = ioBuilder; + } + private void initializeCatalogTables() throws InterruptedException, SQLException { LOG.trace("Creating database tables (if missing) to store iceberg catalog"); connections.run( diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 624c4e4a05c6..8fb2d7a81ad7 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -769,9 +769,9 @@ private FileIO newFileIO(Map properties) { if (null != ioBuilder) { return ioBuilder.apply(properties); } else { - String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL); - return CatalogUtil.loadFileIO( - ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf); + String ioImpl = + properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, ResolvingFileIO.class.getName()); + return CatalogUtil.loadFileIO(ioImpl, properties, conf); } } From 95b2790da7e5df990efea9fd9549bd18124ed42a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Mar 2023 10:35:42 -0800 Subject: [PATCH 4/5] Fix checkstyle. --- core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java | 4 ++-- .../main/java/org/apache/iceberg/rest/RESTSessionCatalog.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 58f92a8b21ac..19ea889cbd75 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -122,9 +122,9 @@ public void initialize(String name, Map properties) { } } - public void setFileIOBuilder(Function, FileIO> ioBuilder) { + public void setFileIOBuilder(Function, FileIO> newIOBuilder) { Preconditions.checkState(null == io, "Cannot set IO builder after calling initialize"); - this.ioBuilder = ioBuilder; + this.ioBuilder = newIOBuilder; } private void initializeCatalogTables() throws InterruptedException, SQLException { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 8fb2d7a81ad7..49fd27b9b060 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -203,9 +203,9 @@ public void initialize(String name, Map unresolved) { super.initialize(name, mergedProps); } - public void setFileIOBuilder(Function, FileIO> ioBuilder) { + public void setFileIOBuilder(Function, FileIO> newIOBuilder) { Preconditions.checkState(null == io, "Cannot set IO builder after calling initialize"); - this.ioBuilder = ioBuilder; + this.ioBuilder = newIOBuilder; } private AuthSession session(SessionContext context) { From 36552cff0d2f10c6510494b155ea97dec61325b5 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Mar 2023 10:36:53 -0800 Subject: [PATCH 5/5] Initialize with null. --- core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java | 2 +- .../main/java/org/apache/iceberg/rest/RESTSessionCatalog.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 19ea889cbd75..07018dd83ee4 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -75,7 +75,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog private Object conf; private JdbcClientPool connections; private Map catalogProperties; - private Function, FileIO> ioBuilder; + private Function, FileIO> ioBuilder = null; public JdbcCatalog() {} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 49fd27b9b060..25c0d2d9b911 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -98,7 +98,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog OAuth2Properties.SAML1_TOKEN_TYPE); private final Function, RESTClient> clientBuilder; - private Function, FileIO> ioBuilder; + private Function, FileIO> ioBuilder = null; private Cache sessions = null; private AuthSession catalogAuth = null; private boolean keepTokenRefreshed = true;