Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
import java.util.List;

import static com.facebook.presto.hive.HiveCompressionCodec.GZIP;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;

public class IcebergConfig
{
private IcebergFileFormat fileFormat = PARQUET;
private HiveCompressionCodec compressionCodec = GZIP;
private boolean nativeMode;
private CatalogType catalogType = HADOOP;
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private String catalogUri;
private int catalogCacheSize = 10;
Expand Down Expand Up @@ -66,19 +65,6 @@ public IcebergConfig setCompressionCodec(HiveCompressionCodec compressionCodec)
return this;
}

public boolean isNativeMode()
{
return nativeMode;
}

@Config("iceberg.native-mode")
@ConfigDescription("if use Iceberg connector native catalog mode")
public IcebergConfig setNativeMode(boolean nativeMode)
{
this.nativeMode = nativeMode;
return this;
}

@NotNull
public CatalogType getCatalogType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorMetadata;

import javax.inject.Inject;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;

public class IcebergMetadataFactory
Expand All @@ -30,7 +32,7 @@ public class IcebergMetadataFactory
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final IcebergResourceFactory resourceFactory;
private final boolean nativeCatalogMode;
private final CatalogType catalogType;

@Inject
public IcebergMetadataFactory(
Expand All @@ -46,14 +48,18 @@ public IcebergMetadataFactory(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nativeCatalogMode = requireNonNull(config, "config is null").isNativeMode();
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
}

public ConnectorMetadata create()
{
if (nativeCatalogMode) {
return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec);
switch (catalogType) {
case HADOOP:
return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec);
case HIVE:
return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec);
}
return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec);
throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@

import javax.inject.Inject;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand All @@ -38,7 +39,7 @@ public class IcebergSplitManager
private final IcebergTransactionManager transactionManager;
private final HdfsEnvironment hdfsEnvironment;
private final IcebergResourceFactory resourceFactory;
private final boolean nativeCatalogMode;
private final CatalogType catalogType;

@Inject
public IcebergSplitManager(
Expand All @@ -51,7 +52,7 @@ public IcebergSplitManager(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
requireNonNull(config, "config is null");
this.nativeCatalogMode = config.isNativeMode();
this.catalogType = config.getCatalogType();
}

@Override
Expand All @@ -69,8 +70,8 @@ public ConnectorSplitSource getSplits(
}

Table icebergTable;
if (nativeCatalogMode) {
icebergTable = resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table.getSchemaTableName()));
if (catalogType == HADOOP) {
icebergTable = getNativeIcebergTable(resourceFactory, session, table.getSchemaTableName());
}
else {
ExtendedHiveMetastore metastore = ((IcebergMetadata) transactionManager.get(transaction)).getMetastore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.facebook.presto.hive.HiveMetadata.TABLE_COMMENT;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Lists.reverse;
Expand Down Expand Up @@ -81,6 +82,11 @@ public static Table getIcebergTable(ExtendedHiveMetastore metastore, HdfsEnviron
return new BaseTable(operations, quotedTableName(table));
}

public static Table getNativeIcebergTable(IcebergResourceFactory resourceFactory, ConnectorSession session, SchemaTableName table)
{
return resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table));
}

public static long resolveSnapshotId(Table table, long snapshotId)
{
if (table.snapshot(snapshotId) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.BIGINT;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static java.util.Objects.requireNonNull;
Expand All @@ -48,7 +49,7 @@ public class RollbackToSnapshotProcedure
private final IcebergMetadataFactory metadataFactory;
private final HdfsEnvironment hdfsEnvironment;
private final IcebergResourceFactory resourceFactory;
private final boolean nativeCatalogMode;
private final CatalogType catalogType;

@Inject
public RollbackToSnapshotProcedure(
Expand All @@ -61,7 +62,7 @@ public RollbackToSnapshotProcedure(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
requireNonNull(config, "config is null");
this.nativeCatalogMode = config.isNativeMode();
this.catalogType = config.getCatalogType();
}

@Override
Expand All @@ -82,7 +83,7 @@ public void rollbackToSnapshot(ConnectorSession clientSession, String schema, St
SchemaTableName schemaTableName = new SchemaTableName(schema, table);
ConnectorMetadata metadata = metadataFactory.create();
Table icebergTable;
if (nativeCatalogMode) {
if (catalogType == HADOOP) {
icebergTable = resourceFactory.getCatalog(clientSession).loadTable(toIcebergTableIdentifier(schema, table));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ public final class IcebergQueryRunner

private IcebergQueryRunner() {}

public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String> extraProperties) throws Exception
public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String> extraProperties)
throws Exception
{
return createIcebergQueryRunner(extraProperties, ImmutableMap.of());
}

public static DistributedQueryRunner createNativeIcebergQueryRunner(Map<String, String> extraProperties, CatalogType catalogType) throws Exception
public static DistributedQueryRunner createNativeIcebergQueryRunner(Map<String, String> extraProperties, CatalogType catalogType)
throws Exception
{
return createIcebergQueryRunner(extraProperties, ImmutableMap.of(
"iceberg.native-mode", "true",
"iceberg.catalog.type", catalogType.name()));
return createIcebergQueryRunner(extraProperties, ImmutableMap.of("iceberg.catalog.type", catalogType.name()));
}

public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String> extraProperties, Map<String, String> extraConnectorProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HiveCompressionCodec;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

Expand All @@ -23,6 +22,7 @@
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static com.facebook.presto.hive.HiveCompressionCodec.GZIP;
import static com.facebook.presto.hive.HiveCompressionCodec.NONE;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.ORC;
Expand All @@ -36,8 +36,7 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(IcebergConfig.class)
.setFileFormat(PARQUET)
.setCompressionCodec(GZIP)
.setNativeMode(false)
.setCatalogType(HADOOP)
.setCatalogType(HIVE)
.setCatalogWarehouse(null)
.setCatalogUri(null)
.setCatalogCacheSize(10)
Expand All @@ -50,8 +49,7 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("iceberg.file-format", "ORC")
.put("iceberg.compression-codec", "NONE")
.put("iceberg.native-mode", "true")
.put("iceberg.catalog.type", "HIVE")
.put("iceberg.catalog.type", "HADOOP")
.put("iceberg.catalog.warehouse", "path")
.put("iceberg.catalog.uri", "uri")
.put("iceberg.catalog.cached-catalog-num", "6")
Expand All @@ -60,9 +58,8 @@ public void testExplicitPropertyMappings()

IcebergConfig expected = new IcebergConfig()
.setFileFormat(ORC)
.setCompressionCodec(HiveCompressionCodec.NONE)
.setNativeMode(true)
.setCatalogType(HIVE)
.setCompressionCodec(NONE)
.setCatalogType(HADOOP)
.setCatalogWarehouse("path")
.setCatalogUri("uri")
.setCatalogCacheSize(6)
Expand Down