diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index e466e2c43fdb..770d840e70df 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -20,6 +20,7 @@ package org.apache.iceberg.io; import java.io.Serializable; +import java.util.Map; /** * Pluggable module for reading, writing, and deleting files. @@ -58,4 +59,11 @@ default void deleteFile(InputFile file) { default void deleteFile(OutputFile file) { deleteFile(file.location()); } + + /** + * Initialize File IO from catalog properties. + * @param properties catalog properties + */ + default void initialize(Map properties) { + } } diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index fad7cb402484..e27a507fef8b 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -25,4 +25,10 @@ private CatalogProperties() { } public static final String CATALOG_IMPL = "catalog-impl"; + public static final String FILE_IO_IMPL = "io-impl"; + public static final String WAREHOUSE_LOCATION = "warehouse"; + + public static final String HIVE_URI = "uri"; + public static final String HIVE_CLIENT_POOL_SIZE = "clients"; + public static final int HIVE_CLIENT_POOL_SIZE_DEFAULT = 2; } diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 2d96d49e4f49..ee9b374d5887 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -168,4 +168,48 @@ public static Catalog loadCatalog( catalog.initialize(catalogName, properties); return catalog; } + + /** + * Load a custom {@link FileIO} implementation. + *

+ * The implementation must have a no-arg constructor. + * If the class implements {@link Configurable}, + * a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}. + * {@link FileIO#initialize(Map properties)} is called to complete the initialization. + * + * @param impl full class name of a custom FileIO implementation + * @param hadoopConf hadoop configuration + * @return FileIO class + * @throws IllegalArgumentException if class path not found or + * right constructor not found or + * the loaded class cannot be casted to the given interface type + */ + public static FileIO loadFileIO( + String impl, + Map properties, + Configuration hadoopConf) { + LOG.info("Loading custom FileIO implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize FileIO, missing no-arg constructor: %s", impl), e); + } + + FileIO fileIO; + try { + fileIO = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize FileIO, %s does not implement FileIO.", impl), e); + } + + if (fileIO instanceof Configurable) { + ((Configurable) fileIO).setConf(hadoopConf); + } + + fileIO.initialize(properties); + return fileIO; + } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 73a28981216e..79cd3efee009 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; @@ -45,11 +46,13 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; /** @@ -76,6 +79,7 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, Su private final Configuration conf; private final String warehouseLocation; private final FileSystem fs; + private final FileIO fileIO; /** * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory. @@ -85,6 +89,18 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, Su * @param warehouseLocation The location used as warehouse directory */ public HadoopCatalog(String name, Configuration conf, String warehouseLocation) { + this(name, conf, warehouseLocation, Maps.newHashMap()); + } + + /** + * The all-arg constructor of the HadoopCatalog. + * + * @param name The catalog name + * @param conf The Hadoop configuration + * @param warehouseLocation The location used as warehouse directory + * @param properties catalog properties + */ + public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map properties) { Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""), "no location provided for warehouse"); @@ -92,6 +108,9 @@ public HadoopCatalog(String name, Configuration conf, String warehouseLocation) this.conf = conf; this.warehouseLocation = warehouseLocation.replaceAll("/*$", ""); this.fs = Util.getFs(new Path(warehouseLocation), conf); + + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); } /** @@ -163,7 +182,7 @@ protected boolean isValidIdentifier(TableIdentifier identifier) { @Override protected TableOperations newTableOps(TableIdentifier identifier) { - return new HadoopTableOperations(new Path(defaultWarehouseLocation(identifier)), conf); + return new HadoopTableOperations(new Path(defaultWarehouseLocation(identifier)), fileIO, conf); } @Override diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index bd5ebeb0baff..dcbd57f1f158 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -62,15 +62,16 @@ public class HadoopTableOperations implements TableOperations { private final Configuration conf; private final Path location; - private HadoopFileIO defaultFileIo = null; + private final FileIO fileIO; private volatile TableMetadata currentMetadata = null; private volatile Integer version = null; private volatile boolean shouldRefresh = true; - protected HadoopTableOperations(Path location, Configuration conf) { + protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf) { this.conf = conf; this.location = location; + this.fileIO = fileIO; } @Override @@ -173,10 +174,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { @Override public FileIO io() { - if (defaultFileIo == null) { - defaultFileIo = new HadoopFileIO(conf); - } - return defaultFileIo; + return fileIO; } @Override diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index d01939cada8d..645efc964778 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -196,7 +196,7 @@ TableOperations newTableOps(String location) { if (location.contains(METADATA_JSON)) { return new StaticTableOperations(location, new HadoopFileIO(conf)); } else { - return new HadoopTableOperations(new Path(location), conf); + return new HadoopTableOperations(new Path(location), new HadoopFileIO(conf), conf); } } diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java index 67729d19a437..84ab3d4759ca 100644 --- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -27,6 +27,10 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -83,6 +87,41 @@ public void loadCustomCatalog_NotImplementCatalog() { () -> CatalogUtil.loadCatalog(TestCatalogNoInterface.class.getName(), name, options, hadoopConf)); } + + @Test + public void loadCustomFileIO_noArg() { + Map properties = Maps.newHashMap(); + properties.put("key", "val"); + FileIO fileIO = CatalogUtil.loadFileIO(TestFileIONoArg.class.getName(), properties, null); + Assert.assertTrue(fileIO instanceof TestFileIONoArg); + Assert.assertEquals(properties, ((TestFileIONoArg) fileIO).map); + } + + @Test + public void loadCustomFileIO_configurable() { + Configuration configuration = new Configuration(); + configuration.set("key", "val"); + FileIO fileIO = CatalogUtil.loadFileIO(TestFileIOConfigurable.class.getName(), Maps.newHashMap(), configuration); + Assert.assertTrue(fileIO instanceof TestFileIOConfigurable); + Assert.assertEquals(configuration, ((TestFileIOConfigurable) fileIO).configuration); + } + + @Test + public void loadCustomFileIO_badArg() { + AssertHelpers.assertThrows("cannot find constructor", + IllegalArgumentException.class, + "missing no-arg constructor", + () -> CatalogUtil.loadFileIO(TestFileIOBadArg.class.getName(), Maps.newHashMap(), null)); + } + + @Test + public void loadCustomFileIO_badClass() { + AssertHelpers.assertThrows("cannot cast", + IllegalArgumentException.class, + "does not implement FileIO", + () -> CatalogUtil.loadFileIO(TestFileIONotImpl.class.getName(), Maps.newHashMap(), null)); + } + public static class TestCatalog extends BaseMetastoreCatalog { private String catalogName; @@ -213,4 +252,106 @@ public static class TestCatalogNoInterface { public TestCatalogNoInterface() { } } + + public static class TestFileIOConfigurable implements FileIO, Configurable { + + private Configuration configuration; + + public TestFileIOConfigurable() { + } + + @Override + public void setConf(Configuration conf) { + this.configuration = conf; + } + + @Override + public Configuration getConf() { + return configuration; + } + + @Override + public InputFile newInputFile(String path) { + return null; + } + + @Override + public OutputFile newOutputFile(String path) { + return null; + } + + @Override + public void deleteFile(String path) { + + } + + public Configuration getConfiguration() { + return configuration; + } + } + + public static class TestFileIONoArg implements FileIO { + + private Map map; + + public TestFileIONoArg() { + } + + @Override + public InputFile newInputFile(String path) { + return null; + } + + @Override + public OutputFile newOutputFile(String path) { + return null; + } + + @Override + public void deleteFile(String path) { + + } + + public Map getMap() { + return map; + } + + @Override + public void initialize(Map properties) { + map = properties; + } + } + + public static class TestFileIOBadArg implements FileIO { + + private final String arg; + + public TestFileIOBadArg(String arg) { + this.arg = arg; + } + + @Override + public InputFile newInputFile(String path) { + return null; + } + + @Override + public OutputFile newOutputFile(String path) { + return null; + } + + @Override + public void deleteFile(String path) { + + } + + public String getArg() { + return arg; + } + } + + public static class TestFileIONotImpl { + public TestFileIONotImpl() { + } + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java index 982b3da44c16..a793767155d5 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -45,12 +46,12 @@ public interface CatalogLoader extends Serializable { */ Catalog loadCatalog(); - static CatalogLoader hadoop(String name, Configuration hadoopConf, String warehouseLocation) { - return new HadoopCatalogLoader(name, hadoopConf, warehouseLocation); + static CatalogLoader hadoop(String name, Configuration hadoopConf, Map properties) { + return new HadoopCatalogLoader(name, hadoopConf, properties); } - static CatalogLoader hive(String name, Configuration hadoopConf, String uri, String warehouse, int clientPoolSize) { - return new HiveCatalogLoader(name, hadoopConf, uri, warehouse, clientPoolSize); + static CatalogLoader hive(String name, Configuration hadoopConf, Map properties) { + return new HiveCatalogLoader(name, hadoopConf, properties); } static CatalogLoader custom(String name, Map properties, Configuration hadoopConf, String impl) { @@ -61,16 +62,21 @@ class HadoopCatalogLoader implements CatalogLoader { private final String catalogName; private final SerializableConfiguration hadoopConf; private final String warehouseLocation; + private final Map properties; - private HadoopCatalogLoader(String catalogName, Configuration conf, String warehouseLocation) { + private HadoopCatalogLoader( + String catalogName, + Configuration conf, + Map properties) { this.catalogName = catalogName; this.hadoopConf = new SerializableConfiguration(conf); - this.warehouseLocation = warehouseLocation; + this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION); + this.properties = Maps.newHashMap(properties); } @Override public Catalog loadCatalog() { - return new HadoopCatalog(catalogName, hadoopConf.get(), warehouseLocation); + return new HadoopCatalog(catalogName, hadoopConf.get(), warehouseLocation, properties); } @Override @@ -88,19 +94,22 @@ class HiveCatalogLoader implements CatalogLoader { private final String uri; private final String warehouse; private final int clientPoolSize; + private final Map properties; - private HiveCatalogLoader(String catalogName, Configuration conf, String uri, String warehouse, - int clientPoolSize) { + private HiveCatalogLoader(String catalogName, Configuration conf, Map properties) { this.catalogName = catalogName; this.hadoopConf = new SerializableConfiguration(conf); - this.uri = uri; - this.warehouse = warehouse; - this.clientPoolSize = clientPoolSize; + this.uri = properties.get(CatalogProperties.HIVE_URI); + this.warehouse = properties.get(CatalogProperties.WAREHOUSE_LOCATION); + this.clientPoolSize = properties.containsKey(CatalogProperties.HIVE_CLIENT_POOL_SIZE) ? + Integer.parseInt(properties.get(CatalogProperties.HIVE_CLIENT_POOL_SIZE)) : + CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT; + this.properties = Maps.newHashMap(properties); } @Override public Catalog loadCatalog() { - return new HiveCatalog(catalogName, uri, warehouse, clientPoolSize, hadoopConf.get()); + return new HiveCatalog(catalogName, uri, warehouse, clientPoolSize, hadoopConf.get(), properties); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index acaece5f19fb..483fa9bdf53f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -63,11 +63,7 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; - public static final String HIVE_URI = "uri"; - public static final String HIVE_CLIENT_POOL_SIZE = "clients"; public static final String HIVE_CONF_DIR = "hive-conf-dir"; - public static final String WAREHOUSE_LOCATION = "warehouse"; - public static final String DEFAULT_DATABASE = "default-database"; public static final String BASE_NAMESPACE = "base-namespace"; @@ -90,16 +86,12 @@ protected CatalogLoader createCatalogLoader(String name, Map pro case ICEBERG_CATALOG_TYPE_HIVE: // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case it will // fallback to parse those values from hadoop configuration which is loaded from classpath. - String uri = properties.get(HIVE_URI); - String warehouse = properties.get(WAREHOUSE_LOCATION); - int clientPoolSize = Integer.parseInt(properties.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2")); String hiveConfDir = properties.get(HIVE_CONF_DIR); Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir); - return CatalogLoader.hive(name, newHadoopConf, uri, warehouse, clientPoolSize); + return CatalogLoader.hive(name, newHadoopConf, properties); case ICEBERG_CATALOG_TYPE_HADOOP: - String warehouseLocation = properties.get(WAREHOUSE_LOCATION); - return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation); + return CatalogLoader.hadoop(name, hadoopConf, properties); default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); @@ -118,12 +110,13 @@ public Map requiredContext() { public List supportedProperties() { List properties = Lists.newArrayList(); properties.add(ICEBERG_CATALOG_TYPE); - properties.add(HIVE_URI); - properties.add(HIVE_CLIENT_POOL_SIZE); properties.add(HIVE_CONF_DIR); - properties.add(WAREHOUSE_LOCATION); properties.add(DEFAULT_DATABASE); properties.add(BASE_NAMESPACE); + properties.add(CatalogProperties.FILE_IO_IMPL); + properties.add(CatalogProperties.WAREHOUSE_LOCATION); + properties.add(CatalogProperties.HIVE_URI); + properties.add(CatalogProperties.HIVE_CLIENT_POOL_SIZE); return properties; } diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java index 3a5d8279db28..9ec4dcdf5af5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.flink.util.ArrayUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -104,11 +105,11 @@ public FlinkCatalogTestBase(String catalogName, String[] baseNamespace) { } if (isHadoopCatalog) { config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"); - config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot()); + config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot()); } else { config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot()); - config.put(FlinkCatalogFactory.HIVE_URI, getURI(hiveConf)); + config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot()); + config.put(CatalogProperties.HIVE_URI, getURI(hiveConf)); } this.flinkDatabase = catalogName + "." + DATABASE; diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java index ce3d829a9142..2cdb34bc8658 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java @@ -25,12 +25,15 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.junit.AfterClass; import org.junit.Assert; @@ -62,13 +65,15 @@ public static void dropWarehouse() { @Test public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException { - CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, "file:" + warehouse); + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse); + CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, properties); validateCatalogLoader(loader); } @Test public void testHiveCatalogLoader() throws IOException, ClassNotFoundException { - CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2); + CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, Maps.newHashMap()); validateCatalogLoader(loader); } @@ -81,7 +86,7 @@ public void testHadoopTableLoader() throws IOException, ClassNotFoundException { @Test public void testHiveCatalogTableLoader() throws IOException, ClassNotFoundException { - CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2); + CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, Maps.newHashMap()); validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER)); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java index 5a0f2a6cd5b4..d47e8a7659ba 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Rule; @@ -43,10 +44,10 @@ public void testCreateCatalogWithWarehouseLocation() throws IOException { Map props = Maps.newHashMap(); props.put("type", "iceberg"); props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf)); + props.put(CatalogProperties.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf)); File warehouseDir = tempFolder.newFolder(); - props.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + warehouseDir.getAbsolutePath()); + props.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + warehouseDir.getAbsolutePath()); checkSQLQuery(props, warehouseDir); } @@ -69,7 +70,7 @@ public void testCreateCatalogWithHiveConfDir() throws IOException { Map props = Maps.newHashMap(); props.put("type", "iceberg"); props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); - props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf)); + props.put(CatalogProperties.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf)); // Set the 'hive-conf-dir' instead of 'warehouse' props.put(FlinkCatalogFactory.HIVE_CONF_DIR, hiveConfDir.getAbsolutePath()); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java index c24a887cd59b..5f8680979777 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java @@ -24,6 +24,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -105,12 +106,12 @@ protected void dropTable(String name) { protected StructLikeSet rowSet(String name, Table testTable, String... columns) throws IOException { Schema projected = testTable.schema().select(columns); RowType rowType = FlinkSchemaUtil.convert(projected); - CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), - hiveConf, - hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname), - hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), - hiveConf.getInt("iceberg.hive.client-pool-size", 5) - ); + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); + properties.put(CatalogProperties.HIVE_URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); + properties.put(CatalogProperties.HIVE_CLIENT_POOL_SIZE, + Integer.toString(hiveConf.getInt("iceberg.hive.client-pool-size", 5))); + CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), hiveConf, properties); FlinkInputFormat inputFormat = FlinkSource.forRowData() .tableLoader(TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", name))) .project(FlinkSchemaUtil.toSchema(rowType)).buildFormat(); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index d188da5d3f18..b9467ac07103 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -45,6 +46,8 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -61,6 +64,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp private final HiveClientPool clients; private final Configuration conf; private final StackTraceElement[] createStack; + private final FileIO fileIO; private boolean closed; public HiveCatalog(Configuration conf) { @@ -69,6 +73,7 @@ public HiveCatalog(Configuration conf) { this.conf = conf; this.createStack = Thread.currentThread().getStackTrace(); this.closed = false; + this.fileIO = new HadoopFileIO(conf); } public HiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) { @@ -76,6 +81,16 @@ public HiveCatalog(String name, String uri, int clientPoolSize, Configuration co } public HiveCatalog(String name, String uri, String warehouse, int clientPoolSize, Configuration conf) { + this(name, uri, warehouse, clientPoolSize, conf, Maps.newHashMap()); + } + + public HiveCatalog( + String name, + String uri, + String warehouse, + int clientPoolSize, + Configuration conf, + Map properties) { this.name = name; this.conf = new Configuration(conf); // before building the client pool, overwrite the configuration's URIs if the argument is non-null @@ -90,6 +105,9 @@ public HiveCatalog(String name, String uri, String warehouse, int clientPoolSize this.clients = new HiveClientPool(clientPoolSize, this.conf); this.createStack = Thread.currentThread().getStackTrace(); this.closed = false; + + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); } @Override @@ -403,7 +421,7 @@ private boolean isValidateNamespace(Namespace namespace) { public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new HiveTableOperations(conf, clients, name, dbName, tableName); + return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); } @Override diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index badc911c1f7e..995419a767ed 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -56,7 +56,6 @@ import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.ConfigProperties; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -86,13 +85,13 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private final String tableName; private final Configuration conf; private final long lockAcquireTimeout; + private final FileIO fileIO; - private FileIO fileIO; - - protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, + protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO, String catalogName, String database, String table) { this.conf = conf; this.metaClients = metaClients; + this.fileIO = fileIO; this.fullName = catalogName + "." + database + "." + table; this.database = database; this.tableName = table; @@ -107,10 +106,6 @@ protected String tableName() { @Override public FileIO io() { - if (fileIO == null) { - fileIO = new HadoopFileIO(conf); - } - return fileIO; } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 0228eae705d5..e1fb28c32cf7 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -109,13 +109,14 @@ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap opti String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); switch (catalogType.toLowerCase(Locale.ENGLISH)) { case ICEBERG_CATALOG_TYPE_HIVE: - int clientPoolSize = options.getInt("clients", 2); - String uri = options.get("uri"); + int clientPoolSize = options.getInt(CatalogProperties.HIVE_CLIENT_POOL_SIZE, + CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT); + String uri = options.get(CatalogProperties.HIVE_URI); return new HiveCatalog(name, uri, clientPoolSize, conf); case ICEBERG_CATALOG_TYPE_HADOOP: - String warehouseLocation = options.get("warehouse"); - return new HadoopCatalog(name, conf, warehouseLocation); + String warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION); + return new HadoopCatalog(name, conf, warehouseLocation, options.asCaseSensitiveMap()); default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);