Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.io;

import java.io.Serializable;
import java.util.Map;

/**
* Pluggable module for reading, writing, and deleting files.
Expand Down Expand Up @@ -58,4 +59,11 @@ default void deleteFile(InputFile file) {
default void deleteFile(OutputFile file) {
deleteFile(file.location());
}

/**
* Initialize File IO from catalog properties.
* @param properties catalog properties
*/
default void initialize(Map<String, String> properties) {
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ private CatalogProperties() {
}

public static final String CATALOG_IMPL = "catalog-impl";
public static final String FILE_IO_IMPL = "io-impl";
public static final String WAREHOUSE_LOCATION = "warehouse";

public static final String HIVE_URI = "uri";
public static final String HIVE_CLIENT_POOL_SIZE = "clients";
public static final int HIVE_CLIENT_POOL_SIZE_DEFAULT = 2;
}
44 changes: 44 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,48 @@ public static Catalog loadCatalog(
catalog.initialize(catalogName, properties);
return catalog;
}

/**
* Load a custom {@link FileIO} implementation.
* <p>
* The implementation must have a no-arg constructor.
* If the class implements {@link Configurable},
* a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}.
* {@link FileIO#initialize(Map properties)} is called to complete the initialization.
*
* @param impl full class name of a custom FileIO implementation
* @param hadoopConf hadoop configuration
* @return FileIO class
* @throws IllegalArgumentException if class path not found or
* right constructor not found or
* the loaded class cannot be casted to the given interface type
*/
public static FileIO loadFileIO(
String impl,
Map<String, String> properties,
Configuration hadoopConf) {
LOG.info("Loading custom FileIO implementation: {}", impl);
DynConstructors.Ctor<FileIO> ctor;
try {
ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize FileIO, missing no-arg constructor: %s", impl), e);
}

FileIO fileIO;
try {
fileIO = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize FileIO, %s does not implement FileIO.", impl), e);
}

if (fileIO instanceof Configurable) {
((Configurable) fileIO).setConf(hadoopConf);
}

fileIO.initialize(properties);
return fileIO;
}
}
21 changes: 20 additions & 1 deletion core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
Expand All @@ -45,11 +46,13 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

/**
Expand All @@ -76,6 +79,7 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, Su
private final Configuration conf;
private final String warehouseLocation;
private final FileSystem fs;
private final FileIO fileIO;

/**
* The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
Expand All @@ -85,13 +89,28 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, Su
* @param warehouseLocation The location used as warehouse directory
*/
public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
this(name, conf, warehouseLocation, Maps.newHashMap());
}

/**
* The all-arg constructor of the HadoopCatalog.
*
* @param name The catalog name
* @param conf The Hadoop configuration
* @param warehouseLocation The location used as warehouse directory
* @param properties catalog properties
*/
public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
"no location provided for warehouse");

this.catalogName = name;
this.conf = conf;
this.warehouseLocation = warehouseLocation.replaceAll("/*$", "");
this.fs = Util.getFs(new Path(warehouseLocation), conf);

String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
}

/**
Expand Down Expand Up @@ -163,7 +182,7 @@ protected boolean isValidIdentifier(TableIdentifier identifier) {

@Override
protected TableOperations newTableOps(TableIdentifier identifier) {
return new HadoopTableOperations(new Path(defaultWarehouseLocation(identifier)), conf);
return new HadoopTableOperations(new Path(defaultWarehouseLocation(identifier)), fileIO, conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ public class HadoopTableOperations implements TableOperations {

private final Configuration conf;
private final Path location;
private HadoopFileIO defaultFileIo = null;
private final FileIO fileIO;

private volatile TableMetadata currentMetadata = null;
private volatile Integer version = null;
private volatile boolean shouldRefresh = true;

protected HadoopTableOperations(Path location, Configuration conf) {
protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf) {
this.conf = conf;
this.location = location;
this.fileIO = fileIO;
}

@Override
Expand Down Expand Up @@ -173,10 +174,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {

@Override
public FileIO io() {
if (defaultFileIo == null) {
defaultFileIo = new HadoopFileIO(conf);
}
return defaultFileIo;
return fileIO;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ TableOperations newTableOps(String location) {
if (location.contains(METADATA_JSON)) {
return new StaticTableOperations(location, new HadoopFileIO(conf));
} else {
return new HadoopTableOperations(new Path(location), conf);
return new HadoopTableOperations(new Path(location), new HadoopFileIO(conf), conf);
}
}

Expand Down
141 changes: 141 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -83,6 +87,41 @@ public void loadCustomCatalog_NotImplementCatalog() {
() -> CatalogUtil.loadCatalog(TestCatalogNoInterface.class.getName(), name, options, hadoopConf));
}


@Test
public void loadCustomFileIO_noArg() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key", "val");
FileIO fileIO = CatalogUtil.loadFileIO(TestFileIONoArg.class.getName(), properties, null);
Assert.assertTrue(fileIO instanceof TestFileIONoArg);
Assert.assertEquals(properties, ((TestFileIONoArg) fileIO).map);
}

@Test
public void loadCustomFileIO_configurable() {
Configuration configuration = new Configuration();
configuration.set("key", "val");
FileIO fileIO = CatalogUtil.loadFileIO(TestFileIOConfigurable.class.getName(), Maps.newHashMap(), configuration);
Assert.assertTrue(fileIO instanceof TestFileIOConfigurable);
Assert.assertEquals(configuration, ((TestFileIOConfigurable) fileIO).configuration);
}

@Test
public void loadCustomFileIO_badArg() {
AssertHelpers.assertThrows("cannot find constructor",
IllegalArgumentException.class,
"missing no-arg constructor",
() -> CatalogUtil.loadFileIO(TestFileIOBadArg.class.getName(), Maps.newHashMap(), null));
}

@Test
public void loadCustomFileIO_badClass() {
AssertHelpers.assertThrows("cannot cast",
IllegalArgumentException.class,
"does not implement FileIO",
() -> CatalogUtil.loadFileIO(TestFileIONotImpl.class.getName(), Maps.newHashMap(), null));
}

public static class TestCatalog extends BaseMetastoreCatalog {

private String catalogName;
Expand Down Expand Up @@ -213,4 +252,106 @@ public static class TestCatalogNoInterface {
public TestCatalogNoInterface() {
}
}

public static class TestFileIOConfigurable implements FileIO, Configurable {

private Configuration configuration;

public TestFileIOConfigurable() {
}

@Override
public void setConf(Configuration conf) {
this.configuration = conf;
}

@Override
public Configuration getConf() {
return configuration;
}

@Override
public InputFile newInputFile(String path) {
return null;
}

@Override
public OutputFile newOutputFile(String path) {
return null;
}

@Override
public void deleteFile(String path) {

}

public Configuration getConfiguration() {
return configuration;
}
}

public static class TestFileIONoArg implements FileIO {

private Map<String, String> map;

public TestFileIONoArg() {
}

@Override
public InputFile newInputFile(String path) {
return null;
}

@Override
public OutputFile newOutputFile(String path) {
return null;
}

@Override
public void deleteFile(String path) {

}

public Map<String, String> getMap() {
return map;
}

@Override
public void initialize(Map<String, String> properties) {
map = properties;
}
}

public static class TestFileIOBadArg implements FileIO {

private final String arg;

public TestFileIOBadArg(String arg) {
this.arg = arg;
}

@Override
public InputFile newInputFile(String path) {
return null;
}

@Override
public OutputFile newOutputFile(String path) {
return null;
}

@Override
public void deleteFile(String path) {

}

public String getArg() {
return arg;
}
}

public static class TestFileIONotImpl {
public TestFileIONotImpl() {
}
}
}
Loading