From 52aa77231a1e3aa4c52e49cda4766a5b47b42fe3 Mon Sep 17 00:00:00 2001 From: Chunxu Tang Date: Wed, 26 Jan 2022 17:07:00 -0800 Subject: [PATCH] Consolidate iceberg native-mode into the catalog type --- .../facebook/presto/iceberg/IcebergConfig.java | 18 ++---------------- .../presto/iceberg/IcebergMetadataFactory.java | 16 +++++++++++----- .../presto/iceberg/IcebergSplitManager.java | 11 ++++++----- .../facebook/presto/iceberg/IcebergUtil.java | 6 ++++++ .../iceberg/RollbackToSnapshotProcedure.java | 7 ++++--- .../presto/iceberg/IcebergQueryRunner.java | 10 +++++----- .../presto/iceberg/TestIcebergConfig.java | 13 +++++-------- 7 files changed, 39 insertions(+), 42 deletions(-) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 5916e216b4d2a..8f9d35343a075 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -26,15 +26,14 @@ import java.util.List; import static com.facebook.presto.hive.HiveCompressionCodec.GZIP; -import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET; public class IcebergConfig { private IcebergFileFormat fileFormat = PARQUET; private HiveCompressionCodec compressionCodec = GZIP; - private boolean nativeMode; - private CatalogType catalogType = HADOOP; + private CatalogType catalogType = HIVE; private String catalogWarehouse; private String catalogUri; private int catalogCacheSize = 10; @@ -66,19 +65,6 @@ public IcebergConfig setCompressionCodec(HiveCompressionCodec compressionCodec) return this; } - public boolean isNativeMode() - { - return nativeMode; - } - - @Config("iceberg.native-mode") - @ConfigDescription("if use Iceberg connector native catalog mode") - public IcebergConfig setNativeMode(boolean nativeMode) - { - this.nativeMode = nativeMode; - return this; - } - @NotNull public CatalogType getCatalogType() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java index 470d07c60db83..6298b25af1153 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java @@ -17,10 +17,12 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.connector.ConnectorMetadata; import javax.inject.Inject; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; public class IcebergMetadataFactory @@ -30,7 +32,7 @@ public class IcebergMetadataFactory private final TypeManager typeManager; private final JsonCodec commitTaskCodec; private final IcebergResourceFactory resourceFactory; - private final boolean nativeCatalogMode; + private final CatalogType catalogType; @Inject public IcebergMetadataFactory( @@ -46,14 +48,18 @@ public IcebergMetadataFactory( this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); - this.nativeCatalogMode = requireNonNull(config, "config is null").isNativeMode(); + requireNonNull(config, "config is null"); + this.catalogType = config.getCatalogType(); } public ConnectorMetadata create() { - if (nativeCatalogMode) { - return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec); + switch (catalogType) { + case HADOOP: + return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec); + case HIVE: + return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec); } - return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec); + throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 1f4c5b2c2ce89..081bd19f26324 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -27,9 +27,10 @@ import javax.inject.Inject; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; -import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; +import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -38,7 +39,7 @@ public class IcebergSplitManager private final IcebergTransactionManager transactionManager; private final HdfsEnvironment hdfsEnvironment; private final IcebergResourceFactory resourceFactory; - private final boolean nativeCatalogMode; + private final CatalogType catalogType; @Inject public IcebergSplitManager( @@ -51,7 +52,7 @@ public IcebergSplitManager( this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); requireNonNull(config, "config is null"); - this.nativeCatalogMode = config.isNativeMode(); + this.catalogType = config.getCatalogType(); } @Override @@ -69,8 +70,8 @@ public ConnectorSplitSource getSplits( } Table icebergTable; - if (nativeCatalogMode) { - icebergTable = resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table.getSchemaTableName())); + if (catalogType == HADOOP) { + icebergTable = getNativeIcebergTable(resourceFactory, session, table.getSchemaTableName()); } else { ExtendedHiveMetastore metastore = ((IcebergMetadata) transactionManager.get(transaction)).getMetastore(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 033f1858ae0ef..e76188d380be0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -45,6 +45,7 @@ import static com.facebook.presto.hive.HiveMetadata.TABLE_COMMENT; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; +import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Lists.reverse; @@ -81,6 +82,11 @@ public static Table getIcebergTable(ExtendedHiveMetastore metastore, HdfsEnviron return new BaseTable(operations, quotedTableName(table)); } + public static Table getNativeIcebergTable(IcebergResourceFactory resourceFactory, ConnectorSession session, SchemaTableName table) + { + return resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table)); + } + public static long resolveSnapshotId(Table table, long snapshotId) { if (table.snapshot(snapshotId) != null) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RollbackToSnapshotProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RollbackToSnapshotProcedure.java index e03d54f5294bb..8d562a172b43b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RollbackToSnapshotProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RollbackToSnapshotProcedure.java @@ -30,6 +30,7 @@ import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; import static com.facebook.presto.common.type.StandardTypes.BIGINT; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; import static java.util.Objects.requireNonNull; @@ -48,7 +49,7 @@ public class RollbackToSnapshotProcedure private final IcebergMetadataFactory metadataFactory; private final HdfsEnvironment hdfsEnvironment; private final IcebergResourceFactory resourceFactory; - private final boolean nativeCatalogMode; + private final CatalogType catalogType; @Inject public RollbackToSnapshotProcedure( @@ -61,7 +62,7 @@ public RollbackToSnapshotProcedure( this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); requireNonNull(config, "config is null"); - this.nativeCatalogMode = config.isNativeMode(); + this.catalogType = config.getCatalogType(); } @Override @@ -82,7 +83,7 @@ public void rollbackToSnapshot(ConnectorSession clientSession, String schema, St SchemaTableName schemaTableName = new SchemaTableName(schema, table); ConnectorMetadata metadata = metadataFactory.create(); Table icebergTable; - if (nativeCatalogMode) { + if (catalogType == HADOOP) { icebergTable = resourceFactory.getCatalog(clientSession).loadTable(toIcebergTableIdentifier(schema, table)); } else { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index c3c34c67a2255..35faea5fecacd 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -37,16 +37,16 @@ public final class IcebergQueryRunner private IcebergQueryRunner() {} - public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties) throws Exception + public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties) + throws Exception { return createIcebergQueryRunner(extraProperties, ImmutableMap.of()); } - public static DistributedQueryRunner createNativeIcebergQueryRunner(Map extraProperties, CatalogType catalogType) throws Exception + public static DistributedQueryRunner createNativeIcebergQueryRunner(Map extraProperties, CatalogType catalogType) + throws Exception { - return createIcebergQueryRunner(extraProperties, ImmutableMap.of( - "iceberg.native-mode", "true", - "iceberg.catalog.type", catalogType.name())); + return createIcebergQueryRunner(extraProperties, ImmutableMap.of("iceberg.catalog.type", catalogType.name())); } public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties, Map extraConnectorProperties) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index cb72b66bea3f9..525467b5194a4 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.iceberg; -import com.facebook.presto.hive.HiveCompressionCodec; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; @@ -23,6 +22,7 @@ import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static com.facebook.presto.hive.HiveCompressionCodec.GZIP; +import static com.facebook.presto.hive.HiveCompressionCodec.NONE; import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.ORC; @@ -36,8 +36,7 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(IcebergConfig.class) .setFileFormat(PARQUET) .setCompressionCodec(GZIP) - .setNativeMode(false) - .setCatalogType(HADOOP) + .setCatalogType(HIVE) .setCatalogWarehouse(null) .setCatalogUri(null) .setCatalogCacheSize(10) @@ -50,8 +49,7 @@ public void testExplicitPropertyMappings() Map properties = new ImmutableMap.Builder() .put("iceberg.file-format", "ORC") .put("iceberg.compression-codec", "NONE") - .put("iceberg.native-mode", "true") - .put("iceberg.catalog.type", "HIVE") + .put("iceberg.catalog.type", "HADOOP") .put("iceberg.catalog.warehouse", "path") .put("iceberg.catalog.uri", "uri") .put("iceberg.catalog.cached-catalog-num", "6") @@ -60,9 +58,8 @@ public void testExplicitPropertyMappings() IcebergConfig expected = new IcebergConfig() .setFileFormat(ORC) - .setCompressionCodec(HiveCompressionCodec.NONE) - .setNativeMode(true) - .setCatalogType(HIVE) + .setCompressionCodec(NONE) + .setCatalogType(HADOOP) .setCatalogWarehouse("path") .setCatalogUri("uri") .setCatalogCacheSize(6)