Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public class HiveConfig

private boolean projectionPushdownEnabled = true;

private boolean redirectToIcebergEnabled;
private String redirectToIcebergCatalog = "iceberg";

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -977,4 +980,31 @@ public HiveConfig setProjectionPushdownEnabled(boolean projectionPushdownEnabled
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public boolean isRedirectToIcebergEnabled()
{
return redirectToIcebergEnabled;
}

@Config("hive.redirect-to-iceberg-enabled")
@ConfigDescription("Redirect to a catalog configured with Iceberg Connector")
public HiveConfig setRedirectToIcebergEnabled(boolean redirectToIcebergEnabled)
{
this.redirectToIcebergEnabled = redirectToIcebergEnabled;
return this;
}

@NotNull
public String getRedirectToIcebergCatalog()
{
return redirectToIcebergCatalog;
}

@Config("hive.redirect-to-iceberg-catalog")
@ConfigDescription("The Iceberg catalog to redirect to")
public HiveConfig setRedirectToIcebergCatalog(String redirectToIcebergCatalog)
{
this.redirectToIcebergCatalog = redirectToIcebergCatalog;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -159,11 +161,13 @@
import static io.prestosql.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.prestosql.plugin.hive.HiveSessionProperties.getCompressionCodec;
import static io.prestosql.plugin.hive.HiveSessionProperties.getHiveStorageFormat;
import static io.prestosql.plugin.hive.HiveSessionProperties.getRedirectToIcebergCatalog;
import static io.prestosql.plugin.hive.HiveSessionProperties.isBucketExecutionEnabled;
import static io.prestosql.plugin.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static io.prestosql.plugin.hive.HiveSessionProperties.isCreateEmptyBucketFiles;
import static io.prestosql.plugin.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static io.prestosql.plugin.hive.HiveSessionProperties.isProjectionPushdownEnabled;
import static io.prestosql.plugin.hive.HiveSessionProperties.isRedirectToIcebergEnabled;
import static io.prestosql.plugin.hive.HiveSessionProperties.isRespectTableFormat;
import static io.prestosql.plugin.hive.HiveSessionProperties.isSortedWritingEnabled;
import static io.prestosql.plugin.hive.HiveSessionProperties.isStatisticsEnabled;
Expand Down Expand Up @@ -279,6 +283,8 @@ public class HiveMetadata
public static final String AVRO_SCHEMA_URL_KEY = "avro.schema.url";
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";
public static final String TABLE_TYPE_PROP = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

private static final String CSV_SEPARATOR_KEY = OpenCSVSerde.SEPARATORCHAR;
private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR;
Expand All @@ -298,6 +304,12 @@ public class HiveMetadata
private final HiveStatisticsProvider hiveStatisticsProvider;
private final AccessControlMetadata accessControlMetadata;

// Copied from IcebergTableHandle
private static final Pattern ICEBERG_TABLE_NAME_PATTERN = Pattern.compile("" +
Comment thread
electrum marked this conversation as resolved.
Outdated
"(?<table>[^$@]+)" +
"(?:@(?<ver1>[0-9]+))?" +
"(?:\\$(?<type>[^@]+)(?:@(?<ver2>[0-9]+))?)?");

public HiveMetadata(
CatalogName catalogName,
SemiTransactionalHiveMetastore metastore,
Expand Down Expand Up @@ -2654,6 +2666,37 @@ public void cleanupQuery(ConnectorSession session)
metastore.cleanupQuery(session);
}

@Override
public Optional<String> redirectCatalog(ConnectorSession session, SchemaTableName tableName)
{
if (!isRedirectToIcebergEnabled(session)) {
return Optional.empty();
}
if (!filterSchema(tableName.getSchemaName())) {
return Optional.empty();
}
String table = tableName.getTableName();
Matcher icebergNameMatch = ICEBERG_TABLE_NAME_PATTERN.matcher(table);
if (icebergNameMatch.matches()) {
table = icebergNameMatch.group("table");
}
Optional<Table> hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), table);
if (hiveTable.isEmpty()) {
return Optional.empty();
}

if (isIcebergTable(hiveTable.get())) {
return Optional.of(getRedirectToIcebergCatalog(session));
}

return Optional.empty();
}

private static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}

public static Optional<SchemaTableName> getSourceTableNameFromSystemTable(SchemaTableName tableName)
{
return Stream.of(SystemTableHandler.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public final class HiveSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String REDIRECT_TO_ICEBERG_ENABLED = "redirect_to_iceberg_enabled";
private static final String REDIRECT_TO_ICEBERG_CATALOG = "redirect_to_iceberg_catalog";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -369,6 +371,16 @@ public HiveSessionProperties(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
booleanProperty(
REDIRECT_TO_ICEBERG_ENABLED,
"Enable redirecting to a catalog configured with Iceberg Connector",
hiveConfig.isRedirectToIcebergEnabled(),
false),
stringProperty(
REDIRECT_TO_ICEBERG_CATALOG,
"The target Iceberg catalog for redirection",
hiveConfig.getRedirectToIcebergCatalog(),
false));
}

Expand Down Expand Up @@ -635,4 +647,14 @@ public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}

public static boolean isRedirectToIcebergEnabled(ConnectorSession session)
{
return session.getProperty(REDIRECT_TO_ICEBERG_ENABLED, Boolean.class);
}

public static String getRedirectToIcebergCatalog(ConnectorSession session)
{
return session.getProperty(REDIRECT_TO_ICEBERG_CATALOG, String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void testDefaults()
.setAllowRegisterPartition(false)
.setQueryPartitionFilterRequired(false)
.setPartitionUseColumnNames(false)
.setProjectionPushdownEnabled(true));
.setProjectionPushdownEnabled(true)
.setRedirectToIcebergEnabled(false)
.setRedirectToIcebergCatalog("iceberg"));
}

@Test
Expand Down Expand Up @@ -166,6 +168,8 @@ public void testExplicitPropertyMappings()
.put("hive.query-partition-filter-required", "true")
.put("hive.partition-use-column-names", "true")
.put("hive.projection-pushdown-enabled", "false")
.put("hive.redirect-to-iceberg-enabled", "true")
.put("hive.redirect-to-iceberg-catalog", "myiceberg")
.build();

HiveConfig expected = new HiveConfig()
Expand Down Expand Up @@ -231,7 +235,9 @@ public void testExplicitPropertyMappings()
.setAllowRegisterPartition(true)
.setQueryPartitionFilterRequired(true)
.setPartitionUseColumnNames(true)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setRedirectToIcebergEnabled(true)
.setRedirectToIcebergCatalog("myiceberg");

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS;
import static io.prestosql.spi.StandardErrorCode.COLUMN_TYPE_UNKNOWN;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
Expand Down Expand Up @@ -64,6 +65,7 @@ public ListenableFuture<?> execute(AddColumn statement, TransactionManager trans
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
if (!statement.isTableExists()) {
Expand All @@ -72,8 +74,9 @@ public ListenableFuture<?> execute(AddColumn statement, TransactionManager trans
return immediateFuture(null);
}

CatalogName catalogName = metadata.getCatalogHandle(session, tableName.getCatalogName())
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + tableName.getCatalogName()));
String catalog = tableName.getCatalogName();
CatalogName catalogName = metadata.getCatalogHandle(session, catalog)
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + catalog));

accessControl.checkCanAddColumns(session.toSecurityContext(), tableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.COLUMN_NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.MISSING_TABLE;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -53,6 +54,7 @@ public ListenableFuture<?> execute(Comment statement, TransactionManager transac

if (statement.getType() == Comment.Type.TABLE) {
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table does not exist: %s", tableName);
Expand All @@ -69,6 +71,7 @@ else if (statement.getType() == Comment.Type.COLUMN) {
}

QualifiedObjectName tableName = createQualifiedObjectName(session, statement, prefix.get());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table does not exist: " + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.prestosql.spi.StandardErrorCode.CATALOG_NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.COLUMN_TYPE_UNKNOWN;
Expand Down Expand Up @@ -98,6 +99,7 @@ ListenableFuture<?> internalExecute(CreateTable statement, Metadata metadata, Ac

Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(statement, parameters);
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isPresent()) {
if (!statement.isNotExists()) {
Expand All @@ -106,8 +108,9 @@ ListenableFuture<?> internalExecute(CreateTable statement, Metadata metadata, Ac
return immediateFuture(null);
}

CatalogName catalogName = metadata.getCatalogHandle(session, tableName.getCatalogName())
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + tableName.getCatalogName()));
String catalog = tableName.getCatalogName();
CatalogName catalogName = metadata.getCatalogHandle(session, catalog)
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + catalog));

LinkedHashMap<String, ColumnMetadata> columns = new LinkedHashMap<>();
Map<String, Object> inheritedProperties = ImmutableMap.of();
Expand Down Expand Up @@ -154,14 +157,16 @@ ListenableFuture<?> internalExecute(CreateTable statement, Metadata metadata, Ac
else if (element instanceof LikeClause) {
LikeClause likeClause = (LikeClause) element;
QualifiedObjectName likeTableName = createQualifiedObjectName(session, statement, likeClause.getTableName());
likeTableName = redirectToNewCatalogIfNecessary(session, likeTableName, metadata);
if (metadata.getCatalogHandle(session, likeTableName.getCatalogName()).isEmpty()) {
throw semanticException(CATALOG_NOT_FOUND, statement, "LIKE table catalog '%s' does not exist", likeTableName.getCatalogName());
}
if (!tableName.getCatalogName().equals(likeTableName.getCatalogName())) {
throw semanticException(NOT_SUPPORTED, statement, "LIKE table across catalogs is not supported");
}
QualifiedObjectName finalLikeTableName = likeTableName;
TableHandle likeTable = metadata.getTableHandle(session, likeTableName)
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, statement, "LIKE table '%s' does not exist", likeTableName));
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, statement, "LIKE table '%s' does not exist", finalLikeTableName));

TableMetadata likeTableMetadata = metadata.getTableMetadata(session, likeTable);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.connector.ConnectorViewDefinition.ViewColumn;
import static io.prestosql.sql.ParameterUtils.parameterExtractor;
import static io.prestosql.sql.SqlFormatterUtil.getFormattedSql;
Expand Down Expand Up @@ -72,7 +73,7 @@ public ListenableFuture<?> execute(CreateView statement, TransactionManager tran
{
Session session = stateMachine.getSession();
QualifiedObjectName name = createQualifiedObjectName(session, statement, statement.getName());

name = redirectToNewCatalogIfNecessary(session, name, metadata);
accessControl.checkCanCreateView(session.toSecurityContext(), name);

String sql = getFormattedSql(statement.getQuery(), sqlParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.COLUMN_NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.StandardErrorCode.TABLE_NOT_FOUND;
Expand All @@ -49,6 +50,7 @@ public ListenableFuture<?> execute(DropColumn statement, TransactionManager tran
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTable());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandleOptional = metadata.getTableHandle(session, tableName);

if (tableHandleOptional.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.prestosql.sql.analyzer.SemanticExceptions.semanticException;

Expand All @@ -45,6 +46,7 @@ public ListenableFuture<?> execute(DropTable statement, TransactionManager trans
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTableName());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);

Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.prestosql.sql.analyzer.SemanticExceptions.semanticException;

Expand All @@ -45,6 +46,7 @@ public ListenableFuture<?> execute(DropView statement, TransactionManager transa
{
Session session = stateMachine.getSession();
QualifiedObjectName name = createQualifiedObjectName(session, statement, statement.getName());
name = redirectToNewCatalogIfNecessary(session, name, metadata);

Optional<ConnectorViewDefinition> view = metadata.getView(session, name);
if (view.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createPrincipal;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.metadata.MetadataUtil.redirectToNewCatalogIfNecessary;
import static io.prestosql.spi.StandardErrorCode.INVALID_PRIVILEGE;
import static io.prestosql.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.prestosql.sql.analyzer.SemanticExceptions.semanticException;
Expand All @@ -51,6 +52,7 @@ public ListenableFuture<?> execute(Grant statement, TransactionManager transacti
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTableName());
tableName = redirectToNewCatalogIfNecessary(session, tableName, metadata);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
Expand Down
Loading