Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.UpdateKind;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
Expand Down Expand Up @@ -107,6 +108,21 @@ public interface Metadata
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName);

/**
* Returns a table handle for the specified table name with updateKind.
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<UpdateKind> updateKind);

/**
* Returns a table handle for the specified table name with a specified version.
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);

/**
* Returns a table handle for the specified table name with a specified version and updateKind.
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind);

Optional<SystemTable> getSystemTable(Session session, QualifiedObjectName tableName);

Optional<TableExecuteHandle> getTableHandleForExecute(
Expand Down Expand Up @@ -851,14 +867,14 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName);

/**
* Get the target table handle after performing redirection with a table version.
* Get the target table handle after performing redirection with updateKind.
*/
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<UpdateKind> updateKind);

/**
* Returns a table handle for the specified table name with a specified version
* Get the target table handle after performing redirection with a table version and updateKind.
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind);

/**
* Returns maximum number of tasks that can be created while writing data to specific connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.UpdateKind;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
Expand Down Expand Up @@ -270,11 +271,23 @@ public List<String> listSchemaNames(Session session, String catalogName)
@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table)
{
return getTableHandle(session, table, Optional.empty(), Optional.empty());
return getTableHandle(session, table, Optional.empty(), Optional.empty(), Optional.empty());
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table, Optional<UpdateKind> updateKind)
{
return getTableHandle(session, table, Optional.empty(), Optional.empty(), updateKind);
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion)
{
return getTableHandle(session, table, startVersion, endVersion, Optional.empty());
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind)
{
requireNonNull(table, "table is null");
if (cannotExist(table)) {
Expand All @@ -294,7 +307,8 @@ public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName
connectorSession,
table.asSchemaTableName(),
startTableVersion,
endTableVersion);
endTableVersion,
updateKind);
return Optional.ofNullable(tableHandle)
.map(connectorTableHandle -> new TableHandle(
catalogHandle,
Expand Down Expand Up @@ -1925,18 +1939,24 @@ private QualifiedObjectName getRedirectedTableName(Session session, QualifiedObj
@Override
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName)
{
return getRedirectionAwareTableHandle(session, tableName, Optional.empty(), Optional.empty());
return getRedirectionAwareTableHandle(session, tableName, Optional.empty(), Optional.empty(), Optional.empty());
}

@Override
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<UpdateKind> updateKind)
{
return getRedirectionAwareTableHandle(session, tableName, Optional.empty(), Optional.empty(), updateKind);
}

@Override
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion)
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind)
{
QualifiedObjectName targetTableName = getRedirectedTableName(session, tableName, startVersion, endVersion);
if (targetTableName.equals(tableName)) {
return noRedirection(getTableHandle(session, tableName, startVersion, endVersion));
return noRedirection(getTableHandle(session, tableName, startVersion, endVersion, updateKind));
}

Optional<TableHandle> tableHandle = getTableHandle(session, targetTableName, startVersion, endVersion);
Optional<TableHandle> tableHandle = getTableHandle(session, targetTableName, startVersion, endVersion, updateKind);
if (tableHandle.isPresent()) {
return withRedirectionTo(targetTableName, tableHandle.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.trino.spi.connector.PointerType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.connector.UpdateKind;
import io.trino.spi.function.CatalogSchemaFunctionName;
import io.trino.spi.function.FunctionKind;
import io.trino.spi.function.OperatorType;
Expand Down Expand Up @@ -516,13 +517,6 @@ private Scope analyzeForUpdate(Relation relation, Optional<Scope> outerQueryScop
.process(relation, Optional.empty());
}

private enum UpdateKind
{
DELETE,
UPDATE,
MERGE,
}

/**
* Visitor context represents local query scope (if exists). The invariant is
* that the local query scopes hierarchy should always have outer query scope
Expand Down Expand Up @@ -597,7 +591,7 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)
endVersion = Optional.of(toTableVersion(branch));
}
// verify the insert destination columns match the query
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable, Optional.empty(), endVersion);
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable, Optional.empty(), endVersion, Optional.of(UpdateKind.DELETE));
Optional<TableHandle> targetTableHandle = redirection.tableHandle();
targetTable = redirection.redirectedTableName().orElse(targetTable);
if (targetTableHandle.isEmpty()) {
Expand Down Expand Up @@ -853,7 +847,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
}
endVersion = Optional.of(toTableVersion(branch));
}
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion);
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion, Optional.of(UpdateKind.DELETE));
QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName);
TableHandle handle = redirection.tableHandle()
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName));
Expand Down Expand Up @@ -2303,7 +2297,7 @@ protected Scope visitTable(Table table, Optional<Scope> scope)
}

// This can only be a table
RedirectionAwareTableHandle redirection = getTableHandle(table, name, scope);
RedirectionAwareTableHandle redirection = getTableHandle(table, name, scope, updateKind);
Optional<TableHandle> tableHandle = redirection.tableHandle();
QualifiedObjectName targetTableName = redirection.redirectedTableName().orElse(name);
analysis.addEmptyColumnReferencesForTable(accessControl, session.getIdentity(), targetTableName);
Expand Down Expand Up @@ -2383,7 +2377,7 @@ private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedOb

private void checkStorageTableNotRedirected(QualifiedObjectName source)
{
metadata.getRedirectionAwareTableHandle(session, source).redirectedTableName().ifPresent(name -> {
metadata.getRedirectionAwareTableHandle(session, source, updateKind).redirectedTableName().ifPresent(name -> {
throw new TrinoException(NOT_SUPPORTED, format("Redirection of materialized view storage table '%s' to '%s' is not supported", source, name));
});
}
Expand Down Expand Up @@ -3489,7 +3483,7 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
}
endVersion = Optional.of(toTableVersion(branch));
}
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion);
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion, Optional.of(UpdateKind.UPDATE));
QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName);
TableHandle handle = redirection.tableHandle()
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName));
Expand Down Expand Up @@ -3628,7 +3622,7 @@ protected Scope visitMerge(Merge merge, Optional<Scope> scope)

analysis.setUpdateType("MERGE");

RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName, Optional.empty(), endVersion);
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName, Optional.empty(), endVersion, Optional.of(UpdateKind.MERGE));
QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalTableName);
TableHandle targetTableHandle = redirection.tableHandle()
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName));
Expand Down Expand Up @@ -6022,21 +6016,21 @@ private OutputColumn createOutputColumn(Field field)
* Helper function that analyzes any versioning and returns the appropriate table handle.
* If no for clause exists, this is just a wrapper around getRedirectionAwareTableHandle in MetadataManager.
*/
private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectName name, Optional<Scope> scope)
private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectName name, Optional<Scope> scope, Optional<UpdateKind> updateKind)
{
if (table.getQueryPeriod().isPresent()) {
verify(table.getBranch().isEmpty(), "branch must be empty");
Optional<TableVersion> startVersion = extractTableVersion(table, table.getQueryPeriod().get().getStart(), scope);
Optional<TableVersion> endVersion = extractTableVersion(table, table.getQueryPeriod().get().getEnd(), scope);
return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion);
return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion, updateKind);
}
if (table.getBranch().isPresent()) {
verify(table.getQueryPeriod().isEmpty(), "query period must be empty");
String branch = table.getBranch().get().getValue();
Optional<TableVersion> endVersion = Optional.of(toTableVersion(branch));
return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), endVersion);
return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), endVersion, updateKind);
}
return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), Optional.empty());
return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), Optional.empty(), updateKind);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.UpdateKind;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
Expand Down Expand Up @@ -144,6 +145,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}
}

@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion, Optional<UpdateKind> updateKind)
{
Span span = startSpan("getTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName, startVersion, endVersion, updateKind);
}
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorAccessControl accessControl, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.UpdateKind;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
Expand Down Expand Up @@ -185,7 +186,34 @@ public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName
{
Span span = startSpan("getTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName);
return delegate.getTableHandle(session, tableName, Optional.empty());
}
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion)
{
Span span = startSpan("getTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName, startVersion, endVersion);
}
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<UpdateKind> updateKind)
{
Span span = startSpan("getTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName, updateKind);
}
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind)
{
Span span = startSpan("getTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName, startVersion, endVersion, updateKind);
}
}

Expand Down Expand Up @@ -1548,29 +1576,29 @@ public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(Sessi
}

@Override
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName)
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<UpdateKind> updateKind)
{
Span span = startSpan("getRedirectionAwareTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getRedirectionAwareTableHandle(session, tableName);
return delegate.getRedirectionAwareTableHandle(session, tableName, updateKind);
}
}

@Override
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion)
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName)
{
Span span = startSpan("getRedirectionAwareTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion);
return delegate.getRedirectionAwareTableHandle(session, tableName);
}
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion)
public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion, Optional<UpdateKind> updateKind)
{
Span span = startSpan("getTableHandle", tableName);
Span span = startSpan("getRedirectionAwareTableHandle", tableName);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandle(session, tableName, startVersion, endVersion);
return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion, updateKind);
}
}

Expand Down
Loading