Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;

import java.util.Optional;

import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName;
import static io.trino.plugin.hive.HiveSessionProperties.getHudiCatalogName;
import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName;
import static io.trino.plugin.hive.ViewReaderUtil.isSomeKindOfAView;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHudiTable;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;

public class DefaultHiveTableRedirectionsProvider
implements HiveTableRedirectionsProvider
{
@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName name, Optional<Table> table)
{
return table.filter(t -> !isSomeKindOfAView(t)).flatMap(t -> {
Optional<String> targetCatalogName;
if (getIcebergCatalogName(session).isPresent() && isIcebergTable(t)) {
targetCatalogName = getIcebergCatalogName(session);
}
else if (getDeltaLakeCatalogName(session).isPresent() && isDeltaLakeTable(t)) {
targetCatalogName = getDeltaLakeCatalogName(session);
}
else if (getHudiCatalogName(session).isPresent() && isHudiTable(t)) {
targetCatalogName = getHudiCatalogName(session);
}
else {
targetCatalogName = Optional.empty();
}
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, t.getSchemaTableName()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED;
import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName;
import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat;
import static io.trino.plugin.hive.HiveSessionProperties.getHudiCatalogName;
import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static io.trino.plugin.hive.HiveSessionProperties.getQueryPartitionFilterRequiredSchemas;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
Expand Down Expand Up @@ -255,7 +252,6 @@
import static io.trino.plugin.hive.ViewReaderUtil.createViewReader;
import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
import static io.trino.plugin.hive.ViewReaderUtil.isHiveView;
import static io.trino.plugin.hive.ViewReaderUtil.isSomeKindOfAView;
import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView;
import static io.trino.plugin.hive.ViewReaderUtil.isTrinoView;
import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION;
Expand Down Expand Up @@ -385,6 +381,7 @@ public class HiveMetadata
private final Set<SystemTableProvider> systemTableProviders;
private final HiveMaterializedViewMetadata hiveMaterializedViewMetadata;
private final AccessControlMetadata accessControlMetadata;
private final HiveTableRedirectionsProvider tableRedirectionsProvider;
private final DirectoryLister directoryLister;
private final PartitionProjectionService partitionProjectionService;
private final boolean allowTableRename;
Expand Down Expand Up @@ -414,6 +411,7 @@ public HiveMetadata(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadata hiveMaterializedViewMetadata,
AccessControlMetadata accessControlMetadata,
HiveTableRedirectionsProvider tableRedirectionsProvider,
DirectoryLister directoryLister,
PartitionProjectionService partitionProjectionService,
boolean allowTableRename,
Expand Down Expand Up @@ -442,6 +440,7 @@ public HiveMetadata(
this.systemTableProviders = requireNonNull(systemTableProviders, "systemTableProviders is null");
this.hiveMaterializedViewMetadata = requireNonNull(hiveMaterializedViewMetadata, "hiveMaterializedViewMetadata is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.tableRedirectionsProvider = requireNonNull(tableRedirectionsProvider, "tableRedirectionsProvider is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.partitionProjectionService = requireNonNull(partitionProjectionService, "partitionProjectionService is null");
this.allowTableRename = allowTableRename;
Expand Down Expand Up @@ -3844,28 +3843,14 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");

Optional<String> icebergCatalogName = getIcebergCatalogName(session);
Optional<String> deltaLakeCatalogName = getDeltaLakeCatalogName(session);
Optional<String> hudiCatalogName = getHudiCatalogName(session);

if (icebergCatalogName.isEmpty() && deltaLakeCatalogName.isEmpty() && hudiCatalogName.isEmpty()) {
return Optional.empty();
}

Comment on lines -3847 to -3854
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this we lost the ability to short-circuit the redirection lookup when no redirect catalogs are configured, when using the default behavior. That was added recently in 4b4bb6f (#18341)

@findepi was that behavior important? I see it was added as part of a broader PR

I think we could preserve this behavior by adding a method to the HiveTableRedirectionsProvider interface like boolean mayRedirect() which says if the provider will possibly redirect -- then we could leverage it here for short-circuiting. But not sure if this is overkill.

if (isHiveSystemSchema(tableName.getSchemaName())) {
return Optional.empty();
}
// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object
TableNameSplitResult tableNameSplit = splitTableName(tableName.getTableName());
Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableNameSplit.getBaseTableName());
if (table.isEmpty() || isSomeKindOfAView(table.get())) {
return Optional.empty();
}

Optional<CatalogSchemaTableName> catalogSchemaTableName = Optional.<CatalogSchemaTableName>empty()
.or(() -> redirectTableToIceberg(icebergCatalogName, table.get()))
.or(() -> redirectTableToDeltaLake(deltaLakeCatalogName, table.get()))
.or(() -> redirectTableToHudi(hudiCatalogName, table.get()));
SchemaTableName tableNameWithoutSuffix = new SchemaTableName(tableName.getSchemaName(), tableNameSplit.getBaseTableName());
Optional<CatalogSchemaTableName> catalogSchemaTableName = tableRedirectionsProvider.redirectTable(session, tableNameWithoutSuffix, table);

// stitch back the suffix we cut off.
return catalogSchemaTableName.map(name -> new CatalogSchemaTableName(
Expand All @@ -3875,39 +3860,6 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
name.getSchemaTableName().getTableName() + tableNameSplit.getSuffix().orElse(""))));
}

private Optional<CatalogSchemaTableName> redirectTableToIceberg(Optional<String> targetCatalogName, Table table)
{
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isIcebergTable(table)) {
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, table.getSchemaTableName()));
}
return Optional.empty();
}

private Optional<CatalogSchemaTableName> redirectTableToDeltaLake(Optional<String> targetCatalogName, Table table)
{
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isDeltaLakeTable(table)) {
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, table.getSchemaTableName()));
}
return Optional.empty();
}

private Optional<CatalogSchemaTableName> redirectTableToHudi(Optional<String> targetCatalogName, Table table)
{
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isHudiTable(table)) {
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, table.getSchemaTableName()));
}
return Optional.empty();
}

@Override
public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map<String, Object> tableProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class HiveMetadataFactory
private final HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory;
private final AccessControlMetadataFactory accessControlMetadataFactory;
private final Optional<Duration> hiveTransactionHeartbeatInterval;
private final HiveTableRedirectionsProvider tableRedirectionsProvider;
private final ScheduledExecutorService heartbeatService;
private final DirectoryLister directoryLister;
private final TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory;
Expand Down Expand Up @@ -103,6 +104,7 @@ public HiveMetadataFactory(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider,
DirectoryLister directoryLister,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
Expand Down Expand Up @@ -140,6 +142,7 @@ public HiveMetadataFactory(
systemTableProviders,
hiveMaterializedViewMetadataFactory,
accessControlMetadataFactory,
tableRedirectionsProvider,
directoryLister,
transactionScopeCachingDirectoryListerFactory,
partitionProjectionService,
Expand Down Expand Up @@ -179,6 +182,7 @@ public HiveMetadataFactory(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider,
DirectoryLister directoryLister,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
Expand Down Expand Up @@ -210,6 +214,7 @@ public HiveMetadataFactory(
this.systemTableProviders = requireNonNull(systemTableProviders, "systemTableProviders is null");
this.hiveMaterializedViewMetadataFactory = requireNonNull(hiveMaterializedViewMetadataFactory, "hiveMaterializedViewMetadataFactory is null");
this.accessControlMetadataFactory = requireNonNull(accessControlMetadataFactory, "accessControlMetadataFactory is null");
this.tableRedirectionsProvider = requireNonNull(tableRedirectionsProvider, "tableRedirectionsProvider is null");
this.hiveTransactionHeartbeatInterval = requireNonNull(hiveTransactionHeartbeatInterval, "hiveTransactionHeartbeatInterval is null");

fileSystemExecutor = new BoundedExecutor(executorService, maxConcurrentFileSystemOperations);
Expand Down Expand Up @@ -273,6 +278,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
systemTableProviders,
hiveMaterializedViewMetadataFactory.create(hiveMetastoreClosure),
accessControlMetadataFactory.create(metastore),
tableRedirectionsProvider,
directoryLister,
partitionProjectionService,
allowTableRename,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public void configure(Binder binder)
.setDefault().to(DefaultHiveMaterializedViewMetadataFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, TransactionalMetadataFactory.class)
.setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, HiveTableRedirectionsProvider.class)
.setDefault().to(DefaultHiveTableRedirectionsProvider.class);
binder.bind(TransactionScopeCachingDirectoryListerFactory.class).in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;

import java.util.Optional;

public interface HiveTableRedirectionsProvider
{
HiveTableRedirectionsProvider NO_REDIRECTIONS = (session, tableName, table) -> Optional.empty();

Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName, Optional<Table> table);
}
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,14 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
}
},
SqlStandardAccessControlMetadata::new,
(session, schemaTableName, tableHandle) -> {
if (schemaTableName.getTableName().contains("hive_table_redirection_tester")) {
return Optional.of(new CatalogSchemaTableName("hive", databaseName, "redirection_target"));
}
else {
return Optional.empty();
}
},
countingDirectoryLister,
new TransactionScopeCachingDirectoryListerFactory(hiveConfig),
new PartitionProjectionService(hiveConfig, ImmutableMap.of(), new TestingTypeManager()),
Expand Down Expand Up @@ -4003,6 +4011,72 @@ public void testApplyRedirection()
}
}

@Test
public void testHiveTableExistsNoRedirection()
throws Exception
{
SchemaTableName tableName = temporaryTable("hive_table_no_redirection_tester");
doCreateTable(tableName, ORC);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
HiveMetadata metadata = (HiveMetadata) transaction.getMetadata();
SchemaTableName schemaTableName = new SchemaTableName("testSchemaName", tableName.getTableName());
Optional<CatalogSchemaTableName> result = metadata.redirectTable(session, schemaTableName);
assertThat(result).isEmpty();
}
finally {
dropTable(tableName);
}
}

@Test
public void testHiveTableExistsWithRedirection()
throws Exception
{
SchemaTableName tableName = temporaryTable("hive_table_redirection_tester");
doCreateTable(tableName, ORC);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
HiveMetadata metadata = (HiveMetadata) transaction.getMetadata();
SchemaTableName schemaTableName = new SchemaTableName(tableName.getSchemaName(), tableName.getTableName());
Optional<CatalogSchemaTableName> result = metadata.redirectTable(session, schemaTableName);
assertThat(result).isPresent();
assertThat(result.get().getCatalogName()).isEqualTo("hive");
assertThat(result.get().getSchemaTableName().toString()).contains("redirection_target");
}
finally {
dropTable(tableName);
}
}

@Test
public void testHiveTableDoesNotExistNoRedirection()
{
SchemaTableName tableName = temporaryTable("no_hive_table_no_redirection_tester");
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
HiveMetadata metadata = (HiveMetadata) transaction.getMetadata();
SchemaTableName schemaTableName = new SchemaTableName(tableName.getSchemaName(), tableName.getTableName());
Optional<CatalogSchemaTableName> result = metadata.redirectTable(session, schemaTableName);
assertThat(result).isEmpty();
}
}

@Test
public void testHiveTableDoesNotExistWithRedirection()
{
SchemaTableName tableName = temporaryTable("no_hive_table_redirection_tester");
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
HiveMetadata metadata = (HiveMetadata) transaction.getMetadata();
SchemaTableName schemaTableName = new SchemaTableName(tableName.getSchemaName(), tableName.getTableName());
Optional<CatalogSchemaTableName> result = metadata.redirectTable(session, schemaTableName);
assertThat(result).isPresent();
assertThat(result.get().getCatalogName()).isEqualTo("hive");
assertThat(result.get().getSchemaTableName().toString()).contains("redirection_target");
}
}

@Test
public void testMaterializedViewMetadata()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static io.trino.plugin.hive.AbstractTestHive.getAllSplits;
import static io.trino.plugin.hive.AbstractTestHive.getSplits;
import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand Down Expand Up @@ -235,6 +236,7 @@ protected void setup(String host, int port, String databaseName, HdfsConfigurati
new PropertiesSystemTableProvider()),
new DefaultHiveMaterializedViewMetadataFactory(),
SqlStandardAccessControlMetadata::new,
NO_REDIRECTIONS,
new FileSystemDirectoryLister(),
new TransactionScopeCachingDirectoryListerFactory(config),
new PartitionProjectionService(config, ImmutableMap.of(), new TestingTypeManager()),
Expand Down