diff --git a/pom.xml b/pom.xml index 239cb518bfef9..6116a180c2861 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ 3.3.0 2.6.0 2.2.0 + 2.11.0 org.openjdk.jmh @@ -576,6 +589,40 @@ presto-jmx test + + + org.apache.iceberg + iceberg-core + 1.5.0 + tests + test + + + org.apache.avro + avro + + + org.apache.parquet + parquet-column + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-jdk14 + + + io.airlift + aircompressor + + + org.roaringbitmap + RoaringBitmap + + + @@ -585,10 +632,15 @@ org.apache.maven.plugins maven-dependency-plugin - + org.glassfish.jersey.core:jersey-common:jar org.eclipse.jetty:jetty-server:jar + + com.facebook.airlift:http-server:jar + com.facebook.airlift:node:jar + javax.servlet:javax.servlet-api:jar + org.apache.httpcomponents.core5:httpcore5:jar diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java index 9035d91ad906b..c7a54e829cd63 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java @@ -16,13 +16,14 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.nessie.NessieCatalog; +import org.apache.iceberg.rest.RESTCatalog; public enum CatalogType { HADOOP(HadoopCatalog.class.getName()), HIVE(HiveCatalog.class.getName()), NESSIE(NessieCatalog.class.getName()), - + REST(RESTCatalog.class.getName()) /**/; private final String catalogImpl; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCatalogModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCatalogModule.java new file mode 100644 index 0000000000000..7aecbb61273f6 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCatalogModule.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.iceberg.hadoop.IcebergHadoopCatalogModule; +import com.facebook.presto.iceberg.nessie.IcebergNessieCatalogModule; +import com.facebook.presto.iceberg.rest.IcebergRestCatalogModule; +import com.google.inject.Binder; +import com.google.inject.Module; + +import java.util.Optional; + +import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.CatalogType.HIVE; +import static com.facebook.presto.iceberg.CatalogType.NESSIE; +import static com.facebook.presto.iceberg.CatalogType.REST; + +public class IcebergCatalogModule + extends AbstractConfigurationAwareModule +{ + private final String connectorId; + private final Optional metastore; + + public IcebergCatalogModule(String connectorId, Optional metastore) + { + this.connectorId = connectorId; + this.metastore = metastore; + } + + @Override + protected void setup(Binder binder) + { + bindCatalogModule(HIVE, new IcebergHiveModule(connectorId, metastore)); + bindCatalogModule(HADOOP, new IcebergHadoopCatalogModule()); + bindCatalogModule(NESSIE, new IcebergNessieCatalogModule()); + bindCatalogModule(REST, new IcebergRestCatalogModule()); + } + + private void bindCatalogModule(CatalogType catalogType, Module module) + { + install(installModuleIf( + IcebergConfig.class, + config -> config.getCatalogType().equals(catalogType), + module)); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 6d1476433e5f0..ba6db6eec5dfa 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -37,7 +37,7 @@ import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; import com.facebook.presto.hive.gcs.HiveGcsConfig; import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; -import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.iceberg.nessie.IcebergNessieConfig; import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider; import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; @@ -90,6 +90,7 @@ import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static java.lang.Math.toIntExact; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -124,12 +125,12 @@ public void setup(Binder binder) binder.bind(CacheFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergTransactionManager.class).in(Scopes.SINGLETON); binder.bind(IcebergCatalogName.class).toInstance(new IcebergCatalogName(connectorId)); - binder.bind(IcebergResourceFactory.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(IcebergConfig.class); - configBinder(binder).bindConfig(NessieConfig.class); binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, IcebergNessieConfig.class); // bind optional Nessie config to IcebergSessionProperties + binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java similarity index 60% rename from presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java rename to presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java index 8b1c8911d8ef8..3890649d36592 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java @@ -15,11 +15,11 @@ import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; import com.facebook.presto.hive.s3.S3ConfigurationUpdater; -import com.facebook.presto.iceberg.nessie.NessieConfig; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -32,14 +32,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; -import static com.facebook.presto.iceberg.CatalogType.NESSIE; -import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceHash; -import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceName; import static com.facebook.presto.iceberg.IcebergUtil.loadCachingProperties; -import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC; -import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; @@ -48,31 +44,32 @@ import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; /** - * Factory for loading Iceberg resources such as Catalog. + * Factory for loading resources related to the Iceberg Catalog. */ -public class IcebergResourceFactory +public class IcebergNativeCatalogFactory { private final Cache catalogCache; - private final String catalogName; - private final CatalogType catalogType; + protected final CatalogType catalogType; private final String catalogWarehouse; + protected final IcebergConfig icebergConfig; + private final List hadoopConfigResources; - private final NessieConfig nessieConfig; private final S3ConfigurationUpdater s3ConfigurationUpdater; private final GcsConfigurationInitializer gcsConfigurationInitialize; - private final IcebergConfig icebergConfig; - @Inject - public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName, NessieConfig nessieConfig, S3ConfigurationUpdater s3ConfigurationUpdater, GcsConfigurationInitializer gcsConfigurationInitialize) + public IcebergNativeCatalogFactory( + IcebergConfig config, + IcebergCatalogName catalogName, + S3ConfigurationUpdater s3ConfigurationUpdater, + GcsConfigurationInitializer gcsConfigurationInitialize) { this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName(); this.icebergConfig = requireNonNull(config, "config is null"); this.catalogType = config.getCatalogType(); this.catalogWarehouse = config.getCatalogWarehouse(); - this.hadoopConfigResources = config.getHadoopConfigResources(); - this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null"); + this.hadoopConfigResources = icebergConfig.getHadoopConfigResources(); this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null"); this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null"); catalogCache = CacheBuilder.newBuilder() @@ -83,8 +80,8 @@ public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogNa public Catalog getCatalog(ConnectorSession session) { try { - return catalogCache.get(getCatalogCacheKey(session), () -> CatalogUtil.loadCatalog( - catalogType.getCatalogImpl(), catalogName, getCatalogProperties(session), getHadoopConfiguration())); + return catalogCache.get(getCacheKey(session), () -> CatalogUtil.loadCatalog( + catalogType.getCatalogImpl(), catalogName, getProperties(session), getHadoopConfiguration())); } catch (ExecutionException | UncheckedExecutionException e) { throwIfInstanceOf(e.getCause(), PrestoException.class); @@ -102,18 +99,39 @@ public SupportsNamespaces getNamespaces(ConnectorSession session) throw new PrestoException(NOT_SUPPORTED, "Iceberg catalog of type " + catalogType + " does not support namespace operations"); } - private String getCatalogCacheKey(ConnectorSession session) + private String getCacheKey(ConnectorSession session) { StringBuilder sb = new StringBuilder(); sb.append(catalogName); + getCatalogCacheKey(session).ifPresent(sb::append); + return sb.toString(); + } - if (catalogType == NESSIE) { - sb.append(getNessieReferenceName(session)); - sb.append("@"); - sb.append(getNessieReferenceHash(session)); + protected Optional getCatalogCacheKey(ConnectorSession session) + { + return Optional.empty(); + } + + private Map getProperties(ConnectorSession session) + { + Map properties = new HashMap<>(); + if (icebergConfig.getManifestCachingEnabled()) { + loadCachingProperties(properties, icebergConfig); + } + if (icebergConfig.getFileIOImpl() != null) { + properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl()); + } + if (catalogWarehouse != null) { + properties.put(WAREHOUSE_LOCATION, catalogWarehouse); } - return sb.toString(); + properties.putAll(getCatalogProperties(session)); + return properties; + } + + protected Map getCatalogProperties(ConnectorSession session) + { + return ImmutableMap.of(); } private Configuration getHadoopConfiguration() @@ -137,45 +155,4 @@ private Configuration getHadoopConfiguration() } return configuration; } - - public Map getCatalogProperties(ConnectorSession session) - { - Map properties = new HashMap<>(); - if (icebergConfig.getManifestCachingEnabled()) { - loadCachingProperties(properties, icebergConfig); - } - if (icebergConfig.getFileIOImpl() != null) { - properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl()); - } - if (catalogWarehouse != null) { - properties.put(WAREHOUSE_LOCATION, catalogWarehouse); - } - if (catalogType == NESSIE) { - properties.put("ref", getNessieReferenceName(session)); - properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie"))); - String hash = getNessieReferenceHash(session); - if (hash != null) { - properties.put("ref.hash", hash); - } - nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString())); - nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString())); - nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val)); - nessieConfig.getAuthenticationType().ifPresent(type -> { - if (type == BASIC) { - properties.put("authentication.username", nessieConfig.getUsername() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication"))); - properties.put("authentication.password", nessieConfig.getPassword() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication"))); - } - else if (type == BEARER) { - properties.put("authentication.token", nessieConfig.getBearerToken() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication"))); - } - }); - if (!nessieConfig.isCompressionEnabled()) { - properties.put("transport.disable-compression", "true"); - } - } - return properties; - } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index ed39adbe34fdc..5f418beb93beb 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -68,11 +68,11 @@ public class IcebergNativeMetadata private static final String INFORMATION_SCHEMA = "information_schema"; private static final String TABLE_COMMENT = "comment"; - private final IcebergResourceFactory resourceFactory; + private final IcebergNativeCatalogFactory catalogFactory; private final CatalogType catalogType; public IcebergNativeMetadata( - IcebergResourceFactory resourceFactory, + IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, @@ -81,14 +81,14 @@ public IcebergNativeMetadata( NodeVersion nodeVersion) { super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); - this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); + this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); } @Override protected Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { - return getNativeIcebergTable(resourceFactory, session, schemaTableName); + return getNativeIcebergTable(catalogFactory, session, schemaTableName); } @Override @@ -108,7 +108,7 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa @Override public List listSchemaNames(ConnectorSession session) { - SupportsNamespaces supportsNamespaces = resourceFactory.getNamespaces(session); + SupportsNamespaces supportsNamespaces = catalogFactory.getNamespaces(session); return supportsNamespaces.listNamespaces() .stream() .map(IcebergPrestoModelConverters::toPrestoSchemaName) @@ -124,7 +124,7 @@ public List listTables(ConnectorSession session, Optional listTables(ConnectorSession session, Optional properties) { - resourceFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName)), + catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName)), properties.entrySet().stream() .collect(toMap(Map.Entry::getKey, e -> e.getValue().toString()))); } @@ -142,7 +142,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map commitTaskCodec; - final IcebergResourceFactory resourceFactory; + final IcebergNativeCatalogFactory catalogFactory; final CatalogType catalogType; final StandardFunctionResolution functionResolution; final RowExpressionService rowExpressionService; @@ -38,14 +38,14 @@ public class IcebergNativeMetadataFactory @Inject public IcebergNativeMetadataFactory( IcebergConfig config, - IcebergResourceFactory resourceFactory, + IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, NodeVersion nodeVersion) { - this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); + this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); @@ -57,6 +57,6 @@ public IcebergNativeMetadataFactory( public ConnectorMetadata create() { - return new IcebergNativeMetadata(resourceFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); + return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index d28ff1bfd9d40..cf390a1b748d2 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -17,7 +17,7 @@ import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.hive.OrcFileWriterConfig; import com.facebook.presto.hive.ParquetFileWriterConfig; -import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.iceberg.nessie.IcebergNessieConfig; import com.facebook.presto.iceberg.util.StatisticsUtil; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.session.PropertyMetadata; @@ -31,6 +31,7 @@ import java.util.EnumSet; import java.util.List; +import java.util.Optional; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; @@ -72,10 +73,10 @@ public IcebergSessionProperties( ParquetFileWriterConfig parquetFileWriterConfig, OrcFileWriterConfig orcFileWriterConfig, CacheConfig cacheConfig, - NessieConfig nessieConfig) + Optional nessieConfig) { - sessionProperties = ImmutableList.of( - new PropertyMetadata<>( + ImmutableList.Builder> propertiesBuilder = ImmutableList.>builder() + .add(new PropertyMetadata<>( COMPRESSION_CODEC, "The compression codec to use when writing files", VARCHAR, @@ -83,18 +84,18 @@ public IcebergSessionProperties( icebergConfig.getCompressionCodec(), false, value -> HiveCompressionCodec.valueOf(((String) value).toUpperCase()), - HiveCompressionCodec::name), - dataSizeSessionProperty( + HiveCompressionCodec::name)) + .add(dataSizeSessionProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", parquetFileWriterConfig.getBlockSize(), - false), - dataSizeSessionProperty( + false)) + .add(dataSizeSessionProperty( PARQUET_WRITER_PAGE_SIZE, "Parquet: Writer page size", parquetFileWriterConfig.getPageSize(), - false), - new PropertyMetadata<>( + false)) + .add(new PropertyMetadata<>( PARQUET_WRITER_VERSION, "Parquet: Writer version", VARCHAR, @@ -102,64 +103,54 @@ public IcebergSessionProperties( parquetFileWriterConfig.getWriterVersion(), false, value -> ParquetProperties.WriterVersion.valueOf(((String) value).toUpperCase()), - ParquetProperties.WriterVersion::name), - dataSizeSessionProperty( + ParquetProperties.WriterVersion::name)) + .add(dataSizeSessionProperty( ORC_OPTIMIZED_WRITER_MIN_STRIPE_SIZE, "Experimental: ORC: Min stripe size", orcFileWriterConfig.getStripeMinSize(), - false), - dataSizeSessionProperty( + false)) + .add(dataSizeSessionProperty( ORC_OPTIMIZED_WRITER_MAX_STRIPE_SIZE, "Experimental: ORC: Max stripe size", orcFileWriterConfig.getStripeMaxSize(), - false), - integerProperty( + false)) + .add(integerProperty( ORC_OPTIMIZED_WRITER_MAX_STRIPE_ROWS, "Experimental: ORC: Max stripe row count", orcFileWriterConfig.getStripeMaxRowCount(), - false), - dataSizeSessionProperty( + false)) + .add(dataSizeSessionProperty( ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY, "Experimental: ORC: Max dictionary memory", orcFileWriterConfig.getDictionaryMaxMemory(), - false), - booleanProperty( + false)) + .add(booleanProperty( // required by presto-hive module, might be removed in future CACHE_ENABLED, "Enable cache for Iceberg", cacheConfig.isCachingEnabled(), - false), - doubleProperty( + false)) + .add(doubleProperty( MINIMUM_ASSIGNED_SPLIT_WEIGHT, "Minimum assigned split weight", icebergConfig.getMinimumAssignedSplitWeight(), - false), - stringProperty( - NESSIE_REFERENCE_NAME, - "Nessie reference name to use", - nessieConfig.getDefaultReferenceName(), - false), - stringProperty( - NESSIE_REFERENCE_HASH, - "Nessie reference hash to use", - null, - false), - dataSizeSessionProperty( + false)) + .add(dataSizeSessionProperty( ORC_STRING_STATISTICS_LIMIT, "ORC: Maximum size of string statistics; drop if exceeding", orcFileWriterConfig.getStringStatisticsLimit(), - false), - booleanProperty( + false)) + .add(booleanProperty( PARQUET_DEREFERENCE_PUSHDOWN_ENABLED, "Is dereference pushdown expression pushdown into Parquet reader enabled?", icebergConfig.isParquetDereferencePushdownEnabled(), - false), - booleanProperty( + false)) + .add(booleanProperty( MERGE_ON_READ_MODE_ENABLED, "Reads enabled for merge-on-read Iceberg tables", icebergConfig.isMergeOnReadModeEnabled(), - false), - new PropertyMetadata<>( + false)) + .add(new PropertyMetadata<>( HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "Flags to choose which statistics from the Hive Metastore are used when calculating table stats. Valid values are: " + Joiner.on(", ").join(SUPPORTED_MERGE_FLAGS), @@ -168,32 +159,46 @@ public IcebergSessionProperties( icebergConfig.getHiveStatisticsMergeFlags(), false, val -> decodeMergeFlags((String) val), - StatisticsUtil::encodeMergeFlags), - booleanProperty( + StatisticsUtil::encodeMergeFlags)) + .add(booleanProperty( PUSHDOWN_FILTER_ENABLED, "Experimental: Enable Filter Pushdown for Iceberg. This is only supported with Native Worker.", icebergConfig.isPushdownFilterEnabled(), - false), - doubleProperty( + false)) + .add(doubleProperty( STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT, "the amount that the difference in total record count matters" + "when calculating the closest snapshot when picking statistics. A " + "value of 1 means a single record is equivalent to 1 millisecond of " + "time difference.", icebergConfig.getStatisticSnapshotRecordDifferenceWeight(), - false), - booleanProperty( + false)) + .add(booleanProperty( DELETE_AS_JOIN_REWRITE_ENABLED, "When enabled equality delete row filtering will be pushed down into a join.", icebergConfig.isDeleteAsJoinRewriteEnabled(), - false), - integerProperty( + false)) + .add(integerProperty( ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, "The max partitions number to utilize metadata optimization. When partitions number " + "of an Iceberg table exceeds this threshold, metadata optimization would be skipped for " + "the table. A value of 0 means skip metadata optimization directly.", icebergConfig.getRowsForMetadataOptimizationThreshold(), false)); + + nessieConfig.ifPresent((config) -> propertiesBuilder + .add(stringProperty( + NESSIE_REFERENCE_NAME, + "Nessie reference name to use", + config.getDefaultReferenceName(), + false)) + .add(stringProperty( + NESSIE_REFERENCE_HASH, + "Nessie reference hash to use", + null, + false))); + + sessionProperties = propertiesBuilder.build(); } public List> getSessionProperties() @@ -259,16 +264,6 @@ public static DataSize getOrcOptimizedWriterMaxDictionaryMemory(ConnectorSession return session.getProperty(ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class); } - public static String getNessieReferenceName(ConnectorSession session) - { - return session.getProperty(NESSIE_REFERENCE_NAME, String.class); - } - - public static String getNessieReferenceHash(ConnectorSession session) - { - return session.getProperty(NESSIE_REFERENCE_HASH, String.class); - } - public static double getMinimumAssignedSplitWeight(ConnectorSession session) { return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); @@ -308,4 +303,14 @@ public static int getRowsForMetadataOptimizationThreshold(ConnectorSession sessi { return session.getProperty(ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, Integer.class); } + + public static String getNessieReferenceName(ConnectorSession session) + { + return session.getProperty(NESSIE_REFERENCE_NAME, String.class); + } + + public static String getNessieReferenceHash(ConnectorSession session) + { + return session.getProperty(NESSIE_REFERENCE_HASH, String.class); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 92ffe605db6fc..77d5624e5b9b1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -224,9 +224,9 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv return new BaseTable(operations, quotedTableName(table)); } - public static Table getNativeIcebergTable(IcebergResourceFactory resourceFactory, ConnectorSession session, SchemaTableName table) + public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table) { - return resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table)); + return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table)); } public static List getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index d2c2f4c82eed2..bab2cb74a63d3 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -63,13 +63,15 @@ import java.util.Optional; import java.util.Set; -import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf; - public final class InternalIcebergConnectorFactory { private InternalIcebergConnectorFactory() {} - public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Optional metastore) + public static Connector createConnector( + String catalogName, + Map config, + ConnectorContext context, + Optional metastore) { ClassLoader classLoader = InternalIcebergConnectorFactory.class.getClassLoader(); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { @@ -78,11 +80,7 @@ public static Connector createConnector(String catalogName, Map new MBeanModule(), new JsonModule(), new IcebergCommonModule(catalogName), - installModuleIf( - IcebergConfig.class, - conf -> conf.getCatalogType().equals(CatalogType.HIVE), - new IcebergHiveModule(catalogName, metastore), - new IcebergNativeModule()), + new IcebergCatalogModule(catalogName, metastore), new HiveS3Module(catalogName), new HiveGcsModule(), new HiveAuthenticationModule(), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/hadoop/IcebergHadoopCatalogModule.java similarity index 72% rename from presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeModule.java rename to presto-iceberg/src/main/java/com/facebook/presto/iceberg/hadoop/IcebergHadoopCatalogModule.java index 2a4eb955680c4..9be401d6c0f66 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/hadoop/IcebergHadoopCatalogModule.java @@ -11,18 +11,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.iceberg; +package com.facebook.presto.iceberg.hadoop; import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.iceberg.IcebergMetadataFactory; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergNativeMetadataFactory; import com.google.inject.Binder; import com.google.inject.Scopes; -public class IcebergNativeModule +public class IcebergHadoopCatalogModule extends AbstractConfigurationAwareModule { @Override public void setup(Binder binder) { + binder.bind(IcebergNativeCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).to(IcebergNativeMetadataFactory.class).in(Scopes.SINGLETON); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogFactory.java new file mode 100644 index 0000000000000..90b332ce291e7 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.nessie; + +import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; +import com.facebook.presto.hive.s3.S3ConfigurationUpdater; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.spi.ConnectorSession; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceHash; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceName; +import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC; +import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER; +import static java.util.Objects.requireNonNull; + +public class IcebergNessieCatalogFactory + extends IcebergNativeCatalogFactory +{ + private final IcebergNessieConfig catalogConfig; + + @Inject + public IcebergNessieCatalogFactory( + IcebergConfig config, + IcebergNessieConfig catalogConfig, + IcebergCatalogName catalogName, + S3ConfigurationUpdater s3ConfigurationUpdater, + GcsConfigurationInitializer gcsConfigurationInitialize) + { + super(config, catalogName, s3ConfigurationUpdater, gcsConfigurationInitialize); + this.catalogConfig = requireNonNull(catalogConfig, "catalogConfig is null"); + } + + @Override + protected Map getCatalogProperties(ConnectorSession session) + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + + properties.put("ref", getNessieReferenceName(session)); + properties.put("uri", catalogConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie"))); + String hash = getNessieReferenceHash(session); + if (hash != null) { + properties.put("ref.hash", hash); + } + catalogConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString())); + catalogConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString())); + catalogConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val)); + catalogConfig.getAuthenticationType().ifPresent(type -> { + if (type == BASIC) { + properties.put("authentication.username", catalogConfig.getUsername() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication"))); + properties.put("authentication.password", catalogConfig.getPassword() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication"))); + } + else if (type == BEARER) { + properties.put("authentication.token", catalogConfig.getBearerToken() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication"))); + } + }); + if (!catalogConfig.isCompressionEnabled()) { + properties.put("transport.disable-compression", "true"); + } + return properties.build(); + } + + @Override + protected Optional getCatalogCacheKey(ConnectorSession session) + { + StringBuilder sb = new StringBuilder(); + sb.append(getNessieReferenceName(session)); + sb.append("@"); + sb.append(getNessieReferenceHash(session)); + return Optional.of(sb.toString()); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogModule.java new file mode 100644 index 0000000000000..8bbb356c8ac10 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieCatalogModule.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.nessie; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.iceberg.IcebergMetadataFactory; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergNativeMetadataFactory; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class IcebergNessieCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + public void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergNessieConfig.class); + + binder.bind(IcebergNativeCatalogFactory.class).to(IcebergNessieCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergMetadataFactory.class).to(IcebergNativeMetadataFactory.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/NessieConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieConfig.java similarity index 85% rename from presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/NessieConfig.java rename to presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieConfig.java index 2439befbe5ba7..8e68bc5f5953b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/NessieConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/nessie/IcebergNessieConfig.java @@ -20,7 +20,7 @@ import java.util.Optional; -public class NessieConfig +public class IcebergNessieConfig { private String defaultReferenceName = "main"; private String serverUri; @@ -41,7 +41,7 @@ public String getDefaultReferenceName() @Config("iceberg.nessie.ref") @ConfigDescription("The default Nessie reference to work on") - public NessieConfig setDefaultReferenceName(String defaultReferenceName) + public IcebergNessieConfig setDefaultReferenceName(String defaultReferenceName) { this.defaultReferenceName = defaultReferenceName; return this; @@ -54,7 +54,7 @@ public Optional getServerUri() @Config("iceberg.nessie.uri") @ConfigDescription("The URI to connect to the Nessie server") - public NessieConfig setServerUri(String serverUri) + public IcebergNessieConfig setServerUri(String serverUri) { this.serverUri = serverUri; return this; @@ -62,7 +62,7 @@ public NessieConfig setServerUri(String serverUri) @Config("iceberg.nessie.auth.type") @ConfigDescription("The authentication type to use. Available values are BASIC | BEARER") - public NessieConfig setAuthenticationType(AuthenticationType authenticationType) + public IcebergNessieConfig setAuthenticationType(AuthenticationType authenticationType) { this.authenticationType = authenticationType; return this; @@ -76,7 +76,7 @@ public Optional getAuthenticationType() @Config("iceberg.nessie.auth.basic.username") @ConfigDescription("The username to use with BASIC authentication") @Deprecated - public NessieConfig setUsername(String username) + public IcebergNessieConfig setUsername(String username) { this.username = username; return this; @@ -91,7 +91,7 @@ public Optional getUsername() @Config("iceberg.nessie.auth.basic.password") @ConfigDescription("The password to use with BASIC authentication") @Deprecated - public NessieConfig setPassword(String password) + public IcebergNessieConfig setPassword(String password) { this.password = password; return this; @@ -105,7 +105,7 @@ public Optional getPassword() @Config("iceberg.nessie.auth.bearer.token") @ConfigDescription("The token to use with BEARER authentication") - public NessieConfig setBearerToken(String bearerToken) + public IcebergNessieConfig setBearerToken(String bearerToken) { this.bearerToken = bearerToken; return this; @@ -118,7 +118,7 @@ public Optional getBearerToken() @Config("iceberg.nessie.read-timeout-ms") @ConfigDescription("The read timeout in milliseconds for the client") - public NessieConfig setReadTimeoutMillis(Integer readTimeoutMillis) + public IcebergNessieConfig setReadTimeoutMillis(Integer readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; return this; @@ -131,7 +131,7 @@ public Optional getReadTimeoutMillis() @Config("iceberg.nessie.connect-timeout-ms") @ConfigDescription("The connection timeout in milliseconds for the client") - public NessieConfig setConnectTimeoutMillis(Integer connectTimeoutMillis) + public IcebergNessieConfig setConnectTimeoutMillis(Integer connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; return this; @@ -144,7 +144,7 @@ public Optional getConnectTimeoutMillis() @Config("iceberg.nessie.compression-enabled") @ConfigDescription("Configure whether compression should be enabled or not. Default: true") - public NessieConfig setCompressionEnabled(boolean compressionEnabled) + public IcebergNessieConfig setCompressionEnabled(boolean compressionEnabled) { this.compressionEnabled = compressionEnabled; return this; @@ -157,7 +157,7 @@ public boolean isCompressionEnabled() @Config("iceberg.nessie.client-builder-impl") @ConfigDescription("Configure the custom ClientBuilder implementation class to be used") - public NessieConfig setClientBuilderImpl(String clientBuilderImpl) + public IcebergNessieConfig setClientBuilderImpl(String clientBuilderImpl) { this.clientBuilderImpl = clientBuilderImpl; return this; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/SetTablePropertyProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/SetTablePropertyProcedure.java index 4c4c5ca91d58d..b983d1ae68fea 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/SetTablePropertyProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/SetTablePropertyProcedure.java @@ -17,7 +17,6 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.iceberg.IcebergConfig; import com.facebook.presto.iceberg.IcebergMetadataFactory; -import com.facebook.presto.iceberg.IcebergResourceFactory; import com.facebook.presto.iceberg.IcebergTableName; import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.spi.ConnectorSession; @@ -52,19 +51,16 @@ public class SetTablePropertyProcedure private final IcebergConfig config; private final IcebergMetadataFactory metadataFactory; private final HdfsEnvironment hdfsEnvironment; - private final IcebergResourceFactory resourceFactory; @Inject public SetTablePropertyProcedure( IcebergConfig config, IcebergMetadataFactory metadataFactory, - HdfsEnvironment hdfsEnvironment, - IcebergResourceFactory resourceFactory) + HdfsEnvironment hdfsEnvironment) { this.config = requireNonNull(config); this.metadataFactory = requireNonNull(metadataFactory); this.hdfsEnvironment = requireNonNull(hdfsEnvironment); - this.resourceFactory = requireNonNull(resourceFactory); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java new file mode 100644 index 0000000000000..95bd3d4f99555 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +public enum AuthenticationType +{ + NONE, + OAUTH2 +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogFactory.java new file mode 100644 index 0000000000000..ababf3b2e2cbd --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; +import com.facebook.presto.hive.s3.S3ConfigurationUpdater; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.spi.ConnectorSession; +import com.google.common.collect.ImmutableMap; +import io.jsonwebtoken.Jwts; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import javax.inject.Inject; + +import java.util.Date; +import java.util.Map; + +import static com.facebook.presto.iceberg.rest.AuthenticationType.OAUTH2; +import static com.facebook.presto.iceberg.rest.SessionType.USER; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogProperties.URI; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; + +public class IcebergRestCatalogFactory + extends IcebergNativeCatalogFactory +{ + private final IcebergRestConfig catalogConfig; + private final NodeVersion nodeVersion; + + @Inject + public IcebergRestCatalogFactory( + IcebergConfig config, + IcebergRestConfig catalogConfig, + IcebergCatalogName catalogName, + S3ConfigurationUpdater s3ConfigurationUpdater, + GcsConfigurationInitializer gcsConfigurationInitialize, + NodeVersion nodeVersion) + { + super(config, catalogName, s3ConfigurationUpdater, gcsConfigurationInitialize); + this.catalogConfig = requireNonNull(catalogConfig, "catalogConfig is null"); + this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); + } + + @Override + protected Map getCatalogProperties(ConnectorSession session) + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + + properties.put(URI, catalogConfig.getServerUri().orElseThrow( + () -> new IllegalStateException("iceberg.rest.uri must be set for REST catalog"))); + + catalogConfig.getAuthenticationType().ifPresent(type -> { + if (type == OAUTH2) { + if (!catalogConfig.credentialOrTokenExists()) { + throw new IllegalStateException("iceberg.rest.auth.oauth2 requires either a credential or a token"); + } + catalogConfig.getCredential().ifPresent(credential -> properties.put(CREDENTIAL, credential)); + catalogConfig.getToken().ifPresent(token -> properties.put(TOKEN, token)); + } + }); + + catalogConfig.getSessionType().ifPresent(type -> { + if (type == USER) { + properties.putAll(session.getIdentity().getExtraCredentials()); + + String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default")); + String jwt = Jwts.builder() + .setId(sessionId) + .setSubject(session.getUser()) + .setIssuedAt(new Date()) + .setIssuer(nodeVersion.toString()) + .claim("user", session.getUser()) + .claim("source", session.getSource().orElse("")) + .compact(); + + properties.put(OAuth2Properties.JWT_TOKEN_TYPE, jwt); + } + }); + + return properties.build(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogModule.java new file mode 100644 index 0000000000000..ffd2050532580 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestCatalogModule.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.iceberg.IcebergMetadataFactory; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergNativeMetadataFactory; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class IcebergRestCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + public void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergRestConfig.class); + + binder.bind(IcebergNativeCatalogFactory.class).to(IcebergRestCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergMetadataFactory.class).to(IcebergNativeMetadataFactory.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java new file mode 100644 index 0000000000000..b00c68f09aca5 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.NotNull; + +import java.util.Optional; + +public class IcebergRestConfig +{ + private String serverUri; + private SessionType sessionType; + private AuthenticationType authenticationType; + private String credential; + private String token; + + @NotNull + public Optional getServerUri() + { + return Optional.ofNullable(serverUri); + } + + @Config("iceberg.rest.uri") + @ConfigDescription("The URI to connect to the REST server") + public IcebergRestConfig setServerUri(String serverUri) + { + this.serverUri = serverUri; + return this; + } + + public Optional getSessionType() + { + return Optional.ofNullable(sessionType); + } + + @Config("iceberg.rest.session.type") + @ConfigDescription("The session type to use for communicating with REST catalog server (NONE | USER)") + public IcebergRestConfig setSessionType(SessionType sessionType) + { + this.sessionType = sessionType; + return this; + } + + public Optional getAuthenticationType() + { + return Optional.ofNullable(authenticationType); + } + + @Config("iceberg.rest.auth.type") + @ConfigDescription("The authentication type to use for communicating with REST catalog server (NONE | OAUTH2)") + public IcebergRestConfig setAuthenticationType(AuthenticationType authenticationType) + { + this.authenticationType = authenticationType; + return this; + } + + public Optional getCredential() + { + return Optional.ofNullable(credential); + } + + @Config("iceberg.rest.auth.oauth2.credential") + @ConfigDescription("The credential to use for OAUTH2 authentication") + public IcebergRestConfig setCredential(String credential) + { + this.credential = credential; + return this; + } + + public Optional getToken() + { + return Optional.ofNullable(token); + } + + @Config("iceberg.rest.auth.oauth2.token") + @ConfigDescription("The Bearer token to use for OAUTH2 authentication") + public IcebergRestConfig setToken(String token) + { + this.token = token; + return this; + } + + public boolean credentialOrTokenExists() + { + return credential != null || token != null; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java new file mode 100644 index 0000000000000..87e1fbee61cd3 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +public enum SessionType +{ + NONE, + USER +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 3dac9dc34e352..8a8972fd8b2d2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -765,7 +765,7 @@ public void testCreateTableWithFormatVersion() testWithAllFormatVersions(this::testCreateTableWithFormatVersion); } - private void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) + protected void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) { @Language("SQL") String createTable = "" + "CREATE TABLE test_create_table_with_format_version_" + formatVersion + " " + diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 824a519ef6866..f96c395a3534e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1385,7 +1385,7 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map getConnectorProperties(CatalogType icebergCat Path testDataDirectory = icebergDataDirectory.resolve(TEST_DATA_DIRECTORY); switch (icebergCatalogType) { case HADOOP: + case REST: case NESSIE: return ImmutableMap.of("iceberg.catalog.warehouse", testDataDirectory.getParent().toFile().toURI().toString()); case HIVE: diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java index ed10e14681896..537d70b5c966e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java @@ -20,9 +20,8 @@ import com.facebook.presto.iceberg.IcebergCatalogName; import com.facebook.presto.iceberg.IcebergConfig; import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; -import com.facebook.presto.iceberg.IcebergResourceFactory; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; import com.facebook.presto.iceberg.IcebergUtil; -import com.facebook.presto.iceberg.nessie.NessieConfig; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.tests.DistributedQueryRunner; @@ -65,13 +64,12 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String icebergConfig.setCatalogType(HADOOP); icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); - IcebergResourceFactory resourceFactory = new IcebergResourceFactory(icebergConfig, + IcebergNativeCatalogFactory catalogFactory = new IcebergNativeCatalogFactory(icebergConfig, new IcebergCatalogName(ICEBERG_CATALOG), - new NessieConfig(), new PrestoS3ConfigurationUpdater(new HiveS3Config()), new HiveGcsConfigurationInitializer(new HiveGcsConfig())); - return IcebergUtil.getNativeIcebergTable(resourceFactory, + return IcebergUtil.getNativeIcebergTable(catalogFactory, session, SchemaTableName.valueOf(schema + "." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java index 91e3c2214d43d..c0ce4f6c175ae 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java @@ -20,8 +20,8 @@ import com.facebook.presto.iceberg.IcebergCatalogName; import com.facebook.presto.iceberg.IcebergConfig; import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; import com.facebook.presto.iceberg.IcebergQueryRunner; -import com.facebook.presto.iceberg.IcebergResourceFactory; import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; @@ -102,15 +102,15 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String icebergConfig.setCatalogType(NESSIE); icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); - NessieConfig nessieConfig = new NessieConfig().setServerUri(nessieContainer.getRestApiUri()); + IcebergNessieConfig nessieConfig = new IcebergNessieConfig().setServerUri(nessieContainer.getRestApiUri()); - IcebergResourceFactory resourceFactory = new IcebergResourceFactory(icebergConfig, - new IcebergCatalogName(ICEBERG_CATALOG), + IcebergNativeCatalogFactory catalogFactory = new IcebergNessieCatalogFactory(icebergConfig, nessieConfig, + new IcebergCatalogName(ICEBERG_CATALOG), new PrestoS3ConfigurationUpdater(new HiveS3Config()), new HiveGcsConfigurationInitializer(new HiveGcsConfig())); - return IcebergUtil.getNativeIcebergTable(resourceFactory, + return IcebergUtil.getNativeIcebergTable(catalogFactory, session, SchemaTableName.valueOf(schema + "." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestNessieConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestNessieConfig.java index f76bd1d8c1208..92c89807f52ef 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestNessieConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestNessieConfig.java @@ -27,7 +27,7 @@ public class TestNessieConfig @Test public void testDefaults() { - assertRecordedDefaults(recordDefaults(NessieConfig.class) + assertRecordedDefaults(recordDefaults(IcebergNessieConfig.class) .setClientBuilderImpl(null) .setAuthenticationType(null) .setBearerToken(null) @@ -57,7 +57,7 @@ public void testExplicitPropertyMappings() .put("iceberg.nessie.client-builder-impl", "org.projectnessie.example.ClientBuilderImpl") .build(); - NessieConfig expected = new NessieConfig() + IcebergNessieConfig expected = new IcebergNessieConfig() .setServerUri("http://localhost:xxx/api/v1") .setDefaultReferenceName("someRef") .setAuthenticationType(AuthenticationType.BASIC) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java new file mode 100644 index 0000000000000..d9edd97ad81d7 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.http.server.TheServlet; +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.airlift.http.server.testing.TestingHttpServerModule; +import com.facebook.airlift.node.NodeInfo; +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.testing.TestingConnectorSession; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.rest.IcebergRestCatalogServlet; +import org.apache.iceberg.rest.RESTCatalogAdapter; +import org.apache.iceberg.rest.RESTSessionCatalog; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogProperties.URI; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; + +public class IcebergRestTestUtil +{ + public static final ConnectorSession SESSION = new TestingConnectorSession(ImmutableList.of()); + + private IcebergRestTestUtil() + { + } + + public static Map restConnectorProperties(String serverUri) + { + return ImmutableMap.of("iceberg.catalog.type", REST.name(), "iceberg.rest.uri", serverUri); + } + + public static TestingHttpServer getRestServer(String location) + { + JdbcCatalog backingCatalog = new JdbcCatalog(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(); + backingCatalog.setConf(hdfsEnvironment.getConfiguration(new HdfsContext(SESSION), new Path(location))); + + Map properties = ImmutableMap.builder() + .put(URI, "jdbc:h2:mem:test_" + System.nanoTime() + "_" + ThreadLocalRandom.current().nextInt()) + .put(WAREHOUSE_LOCATION, location) + .put("jdbc.username", "user") + .put("jdbc.password", "password") + .build(); + backingCatalog.initialize("rest_jdbc_backend", properties); + + DelegateRestSessionCatalog delegate = new DelegateRestSessionCatalog(new RESTCatalogAdapter(backingCatalog), backingCatalog); + return delegate.getServerInstance(); + } + + public static class DelegateRestSessionCatalog + extends RESTSessionCatalog + { + public RESTCatalogAdapter adapter; + private final Catalog delegate; + + public DelegateRestSessionCatalog(RESTCatalogAdapter adapter, Catalog delegate) + { + super(properties -> adapter, null); + this.adapter = requireNonNull(adapter, "adapter is null"); + this.delegate = requireNonNull(delegate, "delegate catalog is null"); + } + + @Override + public void close() + throws IOException + { + super.close(); + adapter.close(); + + if (delegate instanceof Closeable) { + ((Closeable) delegate).close(); + } + } + + public TestingHttpServer getServerInstance() + { + Bootstrap app = new Bootstrap( + new TestingHttpServerModule(), + new RestHttpServerModule()); + + Injector injector = app + .doNotInitializeLogging() + .initialize(); + + return injector.getInstance(TestingHttpServer.class); + } + + private class RestHttpServerModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(new TypeLiteral>() {}).annotatedWith(TheServlet.class).toInstance(ImmutableMap.of()); + binder.bind(javax.servlet.Servlet.class).annotatedWith(TheServlet.class).toInstance(new IcebergRestCatalogServlet(adapter)); + binder.bind(NodeInfo.class).toInstance(new NodeInfo("test")); + } + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java new file mode 100644 index 0000000000000..ff8474bf7394e --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import org.assertj.core.util.Files; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.getRestServer; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.restConnectorProperties; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; + +@Test +public class TestIcebergDistributedRest + extends IcebergDistributedTestBase +{ + private TestingHttpServer restServer; + private String serverUri; + private File warehouseLocation; + + protected TestIcebergDistributedRest() + { + super(REST); + } + + @Override + protected boolean supportsViews() + { + return false; + } + + @Override + protected Map getProperties() + { + return ImmutableMap.of("uri", serverUri); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + + restServer = getRestServer(warehouseLocation.getAbsolutePath()); + restServer.start(); + + serverUri = restServer.getBaseUrl().toString(); + super.init(); + } + + @AfterClass + public void tearDown() + throws Exception + { + if (restServer != null) { + restServer.stop(); + } + deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner( + ImmutableMap.of(), + restConnectorProperties(serverUri), + PARQUET, + true, + false, + OptionalInt.empty(), + Optional.of(warehouseLocation.toPath())); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java new file mode 100644 index 0000000000000..730a58d862c5b --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.configuration.testing.ConfigAssertions; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.presto.iceberg.rest.AuthenticationType.OAUTH2; +import static com.facebook.presto.iceberg.rest.SessionType.USER; + +public class TestIcebergRestConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(ConfigAssertions.recordDefaults(IcebergRestConfig.class) + .setServerUri(null) + .setAuthenticationType(null) + .setCredential(null) + .setToken(null) + .setSessionType(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.rest.uri", "http://localhost:xxx") + .put("iceberg.rest.auth.type", "OAUTH2") + .put("iceberg.rest.auth.oauth2.credential", "key:secret") + .put("iceberg.rest.auth.oauth2.token", "SXVLUXUhIExFQ0tFUiEK") + .put("iceberg.rest.session.type", "USER") + .build(); + + IcebergRestConfig expected = new IcebergRestConfig() + .setServerUri("http://localhost:xxx") + .setAuthenticationType(OAUTH2) + .setCredential("key:secret") + .setToken("SXVLUXUhIExFQ0tFUiEK") + .setSessionType(USER); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java new file mode 100644 index 0000000000000..3298c57ce0756 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.hive.gcs.HiveGcsConfig; +import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Table; +import org.assertj.core.util.Files; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.getRestServer; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.restConnectorProperties; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Test +public class TestIcebergSmokeRest + extends IcebergDistributedSmokeTestBase +{ + private File warehouseLocation; + private TestingHttpServer restServer; + private String serverUri; + + public TestIcebergSmokeRest() + { + super(REST); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + + restServer = getRestServer(warehouseLocation.getAbsolutePath()); + restServer.start(); + + serverUri = restServer.getBaseUrl().toString(); + super.init(); + } + + @AfterClass + public void tearDown() + throws Exception + { + if (restServer != null) { + restServer.stop(); + } + deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE); + } + + @Override + protected String getLocation(String schema, String table) + { + return format("%s/%s/%s", warehouseLocation, schema, table); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner( + ImmutableMap.of(), + restConnectorProperties(serverUri), + PARQUET, + true, + false, + OptionalInt.empty(), + Optional.of(warehouseLocation.toPath())); + } + + protected IcebergNativeCatalogFactory getCatalogFactory() + { + IcebergConfig icebergConfig = new IcebergConfig() + .setCatalogType(REST) + .setCatalogWarehouse(warehouseLocation.getAbsolutePath().toString()); + IcebergRestConfig restConfig = new IcebergRestConfig().setServerUri(serverUri); + + return new IcebergRestCatalogFactory( + icebergConfig, + restConfig, + new IcebergCatalogName(ICEBERG_CATALOG), + new PrestoS3ConfigurationUpdater(new HiveS3Config()), + new HiveGcsConfigurationInitializer(new HiveGcsConfig()), + new NodeVersion("test_version")); + } + + @Override + protected Table getIcebergTable(ConnectorSession session, String schema, String tableName) + { + return getNativeIcebergTable(getCatalogFactory(), + session, + SchemaTableName.valueOf(schema + "." + tableName)); + } + + @Test + public void testDeleteOnPartitionedV1Table() + { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(super::testDeleteOnPartitionedV1Table) + .isInstanceOf(RuntimeException.class) + .hasMessageMatching("Cannot downgrade v2 table to v1"); + } + + @Test + public void testCreateTableWithFormatVersion() + { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnNonIdentityPartitionColumn("1", "copy-on-write")) + .isInstanceOf(RuntimeException.class) + .hasMessageMatching("Cannot downgrade v2 table to v1"); + + // v2 succeeds + super.testCreateTableWithFormatVersion("2", "merge-on-read"); + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnNonIdentityPartitionColumn(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode); + } + } +} diff --git a/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java b/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java new file mode 100644 index 0000000000000..e9aa46a589683 --- /dev/null +++ b/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java @@ -0,0 +1,252 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.rest; + +import com.facebook.airlift.log.Logger; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.io.CharStreams; +import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod; +import org.apache.iceberg.rest.RESTCatalogAdapter.Route; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.util.Pair; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + +/** + * The IcebergRestCatalogServlet provides a servlet implementation used in combination with a + * RESTCatalogAdaptor to proxy the REST Spec to any Catalog implementation. + */ +public class IcebergRestCatalogServlet + extends HttpServlet +{ + private static final Logger LOG = Logger.get(IcebergRestCatalogServlet.class); + + private final RESTCatalogAdapter restCatalogAdapter; + private final Map responseHeaders = + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + + public IcebergRestCatalogServlet(RESTCatalogAdapter restCatalogAdapter) + { + this.restCatalogAdapter = restCatalogAdapter; + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doHead(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doDelete(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + protected void execute(ServletRequestContext context, HttpServletResponse response) + throws IOException + { + response.setStatus(HttpServletResponse.SC_OK); + responseHeaders.forEach(response::setHeader); + + if (context.error().isPresent()) { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + RESTObjectMapper.mapper().writeValue(response.getWriter(), context.error().get()); + return; + } + + try { + Object responseBody = + restCatalogAdapter.execute( + context.method(), + context.path(), + context.queryParams(), + context.body(), + context.route().responseClass(), + context.headers(), + handle(response)); + + if (responseBody != null) { + RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody); + } + } + catch (RESTException e) { + if (context.route() == Route.LOAD_TABLE && e.getLocalizedMessage().contains("NoSuchTableException")) { + // Suppress stack trace for load_table requests, most of which occur immediately + // preceding a create_table request + LOG.warn("Table at endpoint %s does not exist", context.path()); + } + else { + LOG.error(e, "Error processing REST request at endpoint %s", context.path()); + } + response.setStatus(SC_INTERNAL_SERVER_ERROR); + } + catch (Exception e) { + LOG.error(e, "Unexpected exception when processing REST request"); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + } + } + + protected Consumer handle(HttpServletResponse response) + { + return (errorResponse) -> { + response.setStatus(errorResponse.code()); + try { + RESTObjectMapper.mapper().writeValue(response.getWriter(), errorResponse); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + + public static class ServletRequestContext + { + private HTTPMethod method; + private Route route; + private String path; + private Map headers; + private Map queryParams; + private Object body; + private ErrorResponse errorResponse; + + private ServletRequestContext(ErrorResponse errorResponse) + { + this.errorResponse = errorResponse; + } + + private ServletRequestContext( + HTTPMethod method, + Route route, + String path, + Map headers, + Map queryParams, + Object body) + { + this.method = method; + this.route = route; + this.path = path; + this.headers = headers; + this.queryParams = queryParams; + this.body = body; + } + + static ServletRequestContext from(HttpServletRequest request) + throws IOException + { + HTTPMethod method = HTTPMethod.valueOf(request.getMethod()); + String path = request.getRequestURI().substring(1); + Pair> routeContext = Route.from(method, path); + + if (routeContext == null) { + return new ServletRequestContext( + ErrorResponse.builder() + .responseCode(400) + .withType("BadRequestException") + .withMessage(format("No route for request: %s %s", method, path)) + .build()); + } + + Route route = routeContext.first(); + Object requestBody = null; + if (route.requestClass() != null) { + requestBody = + RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass()); + } + else if (route == Route.TOKENS) { + try (Reader reader = new InputStreamReader(request.getInputStream())) { + requestBody = RESTUtil.decodeFormData(CharStreams.toString(reader)); + } + } + + Map queryParams = + request.getParameterMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()[0])); + Map headers = + Collections.list(request.getHeaderNames()).stream() + .collect(Collectors.toMap(Function.identity(), request::getHeader)); + + return new ServletRequestContext(method, route, path, headers, queryParams, requestBody); + } + + public HTTPMethod method() + { + return method; + } + + public Route route() + { + return route; + } + + public String path() + { + return path; + } + + public Map headers() + { + return headers; + } + + public Map queryParams() + { + return queryParams; + } + + public Object body() + { + return body; + } + + public Optional error() + { + return Optional.ofNullable(errorResponse); + } + } +}