From dd419d10d769ea999a8cc393c6d71330ff162b68 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 18 Sep 2024 17:16:45 +0900 Subject: [PATCH 1/2] Add support for DML with properties in engine --- .../antlr4/io/trino/grammar/sql/SqlBase.g4 | 10 ++- .../main/java/io/trino/metadata/Metadata.java | 4 +- .../io/trino/metadata/MetadataManager.java | 15 ++-- .../trino/sql/analyzer/StatementAnalyzer.java | 74 +++++++++++++++++-- .../tracing/TracingConnectorMetadata.java | 9 +++ .../io/trino/tracing/TracingMetadata.java | 8 +- .../trino/execution/TestCreateViewTask.java | 3 +- .../trino/metadata/AbstractMockMetadata.java | 9 ++- .../main/java/io/trino/sql/SqlFormatter.java | 14 +++- .../java/io/trino/sql/parser/AstBuilder.java | 28 +++++-- .../main/java/io/trino/sql/tree/Table.java | 36 ++++++--- .../io/trino/sql/parser/TestSqlParser.java | 4 +- .../spi/connector/ConnectorMetadata.java | 22 +++++- .../ClassLoaderSafeConnectorMetadata.java | 8 ++ 14 files changed, 196 insertions(+), 48 deletions(-) diff --git a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 index 3bc36d93c3ad..9e4950b421fc 100644 --- a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 +++ b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 @@ -71,8 +71,10 @@ statement (COMMENT string)? (WITH properties)? #createTable | DROP TABLE (IF EXISTS)? qualifiedName #dropTable - | INSERT INTO qualifiedName columnAliases? rootQuery #insertInto - | DELETE FROM qualifiedName (WHERE booleanExpression)? #delete + | INSERT INTO qualifiedName (WITH properties)? + columnAliases? rootQuery #insertInto + | DELETE FROM qualifiedName (WITH properties)? + (WHERE booleanExpression)? #delete | TRUNCATE TABLE qualifiedName #truncateTable | COMMENT ON TABLE qualifiedName IS (string | NULL) #commentTable | COMMENT ON VIEW qualifiedName IS (string | NULL) #commentView @@ -191,10 +193,10 @@ statement | DESCRIBE OUTPUT identifier #describeOutput | SET PATH pathSpecification #setPath | SET TIME ZONE (LOCAL | expression) #setTimeZone - | UPDATE qualifiedName + | UPDATE qualifiedName (WITH properties)? SET updateAssignment (',' updateAssignment)* (WHERE where=booleanExpression)? #update - | MERGE INTO qualifiedName (AS? identifier)? + | MERGE INTO qualifiedName (AS? identifier)? (WITH properties)? USING relation ON expression mergeCase+ #merge ; diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 2c71eb193ab4..6621084899fa 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -834,12 +834,12 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName /** * Get the target table handle after performing redirection with a table version. */ - RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion); + RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties); /** * Returns a table handle for the specified table name with a specified version */ - Optional getTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion); + Optional getTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties); /** * Returns maximum number of tasks that can be created while writing data to specific connector. diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index f437ae0bd5f9..b2fba9e5abd4 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -270,11 +270,11 @@ public List listSchemaNames(Session session, String catalogName) @Override public Optional getTableHandle(Session session, QualifiedObjectName table) { - return getTableHandle(session, table, Optional.empty(), Optional.empty()); + return getTableHandle(session, table, Optional.empty(), Optional.empty(), ImmutableMap.of()); } @Override - public Optional getTableHandle(Session session, QualifiedObjectName table, Optional startVersion, Optional endVersion) + public Optional getTableHandle(Session session, QualifiedObjectName table, Optional startVersion, Optional endVersion, Map properties) { requireNonNull(table, "table is null"); if (cannotExist(table)) { @@ -294,7 +294,8 @@ public Optional getTableHandle(Session session, QualifiedObjectName connectorSession, table.asSchemaTableName(), startTableVersion, - endTableVersion); + endTableVersion, + properties); return Optional.ofNullable(tableHandle) .map(connectorTableHandle -> new TableHandle( catalogHandle, @@ -1969,18 +1970,18 @@ private QualifiedObjectName getRedirectedTableName(Session session, QualifiedObj @Override public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName) { - return getRedirectionAwareTableHandle(session, tableName, Optional.empty(), Optional.empty()); + return getRedirectionAwareTableHandle(session, tableName, Optional.empty(), Optional.empty(), ImmutableMap.of()); } @Override - public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) + public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties) { QualifiedObjectName targetTableName = getRedirectedTableName(session, tableName, startVersion, endVersion); if (targetTableName.equals(tableName)) { - return noRedirection(getTableHandle(session, tableName, startVersion, endVersion)); + return noRedirection(getTableHandle(session, tableName, startVersion, endVersion, properties)); } - Optional tableHandle = getTableHandle(session, targetTableName, startVersion, endVersion); + Optional tableHandle = getTableHandle(session, targetTableName, startVersion, endVersion, properties); if (tableHandle.isPresent()) { return withRedirectionTo(targetTableName, tableHandle.get()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 6f7cc0422026..087e6465f004 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -579,8 +579,21 @@ protected Scope visitInsert(Insert insert, Optional scope) // analyze the query that creates the data Scope queryScope = analyze(insert.getQuery(), Optional.empty(), false); + String catalogName = targetTable.catalogName(); + CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, insert, catalogName); + + Map properties = tablePropertyManager.getProperties( + catalogName, + catalogHandle, + insert.getTable().getProperties(), + session, + plannerContext, + accessControl, + analysis.getParameters(), + true); + // verify the insert destination columns match the query - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable); + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable, Optional.empty(), Optional.empty(), properties); Optional targetTableHandle = redirection.tableHandle(); targetTable = redirection.redirectedTableName().orElse(targetTable); if (targetTableHandle.isEmpty()) { @@ -821,7 +834,19 @@ protected Scope visitDelete(Delete node, Optional scope) throw semanticException(NOT_SUPPORTED, node, "Deleting from views is not supported"); } - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName); + String catalogName = originalName.catalogName(); + CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, node, catalogName); + Map properties = tablePropertyManager.getProperties( + catalogName, + catalogHandle, + node.getTable().getProperties(), + session, + plannerContext, + accessControl, + analysis.getParameters(), + true); + + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), Optional.empty(), properties); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName); TableHandle handle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -3375,7 +3400,19 @@ protected Scope visitUpdate(Update update, Optional scope) analysis.setUpdateType("UPDATE"); - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName); + String catalogName = originalName.catalogName(); + CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, update, catalogName); + Map properties = tablePropertyManager.getProperties( + catalogName, + catalogHandle, + update.getTable().getProperties(), + session, + plannerContext, + accessControl, + analysis.getParameters(), + true); + + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), Optional.empty(), properties); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName); TableHandle handle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -3505,7 +3542,19 @@ protected Scope visitMerge(Merge merge, Optional scope) analysis.setUpdateType("MERGE"); - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName); + String catalogName = originalTableName.catalogName(); + CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, merge, catalogName); + Map properties = tablePropertyManager.getProperties( + catalogName, + catalogHandle, + merge.getTargetTable().getProperties(), + session, + plannerContext, + accessControl, + analysis.getParameters(), + true); + + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName, Optional.empty(), Optional.empty(), properties); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalTableName); TableHandle targetTableHandle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -5860,12 +5909,25 @@ private OutputColumn createOutputColumn(Field field) */ private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectName name, Optional scope) { + String catalogName = name.catalogName(); + CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, table, catalogName); + + Map properties = tablePropertyManager.getProperties( + catalogName, + catalogHandle, + table.getProperties(), + session, + plannerContext, + accessControl, + analysis.getParameters(), + true); + if (table.getQueryPeriod().isPresent()) { Optional startVersion = extractTableVersion(table, table.getQueryPeriod().get().getStart(), scope); Optional endVersion = extractTableVersion(table, table.getQueryPeriod().get().getEnd(), scope); - return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion); + return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion, properties); } - return metadata.getRedirectionAwareTableHandle(session, name); + return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), Optional.empty(), properties); } /** diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index c4e2ecc31266..bd2ab7face97 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -143,6 +143,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable } } + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion, Map properties) + { + Span span = startSpan("getTableHandle", tableName); + try (var _ = scopedSpan(span)) { + return delegate.getTableHandle(session, tableName, startVersion, endVersion, properties); + } + } + @Override public Optional getTableHandleForExecute(ConnectorSession session, ConnectorTableHandle tableHandle, String procedureName, Map executeProperties, RetryMode retryMode) { diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 04a3f5b443c6..feec77c9d0ec 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -1497,20 +1497,20 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio } @Override - public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) + public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties) { Span span = startSpan("getRedirectionAwareTableHandle", tableName); try (var _ = scopedSpan(span)) { - return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion); + return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion, properties); } } @Override - public Optional getTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) + public Optional getTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties) { Span span = startSpan("getTableHandle", tableName); try (var _ = scopedSpan(span)) { - return delegate.getTableHandle(session, tableName, startVersion, endVersion); + return delegate.getTableHandle(session, tableName, startVersion, endVersion, properties); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java index a2f4578143d9..abc836031387 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java @@ -51,6 +51,7 @@ import static io.trino.sql.QueryUtil.simpleQuery; import static io.trino.sql.QueryUtil.table; import static io.trino.sql.analyzer.StatementAnalyzerFactory.createTestingStatementAnalyzerFactory; +import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; @@ -73,7 +74,7 @@ public void setUp() createTestingStatementAnalyzerFactory( plannerContext, new AllowAllAccessControl(), - new TablePropertyManager(CatalogServiceProvider.fail()), + new TablePropertyManager(CatalogServiceProvider.singleton(TEST_CATALOG_HANDLE, ImmutableMap.of())), new AnalyzePropertyManager(CatalogServiceProvider.fail())), new StatementRewrite(ImmutableSet.of()), plannerContext.getTracer()); diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 7edfd163ab0a..d52e2a5168f2 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -1003,13 +1003,16 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio } @Override - public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) + public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion, Map properties) { - throw new UnsupportedOperationException(); + if (startVersion.isPresent() || endVersion.isPresent() || !properties.isEmpty()) { + throw new UnsupportedOperationException(); + } + return noRedirection(getTableHandle(session, tableName)); } @Override - public Optional getTableHandle(Session session, QualifiedObjectName table, Optional startVersion, Optional endVersion) + public Optional getTableHandle(Session session, QualifiedObjectName table, Optional startVersion, Optional endVersion, Map properties) { throw new UnsupportedOperationException(); } diff --git a/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java b/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java index 90f29216f630..cb5ee588d8a9 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java +++ b/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java @@ -1067,6 +1067,8 @@ protected Void visitMerge(Merge node, Integer indent) builder.append("MERGE INTO ") .append(formatName(node.getTargetTable().getName())); + builder.append(formatPropertiesMultiLine(node.getTargetTable().getProperties())); + node.getTargetAlias().ifPresent(value -> builder .append(' ') .append(formatName(value))); @@ -1451,6 +1453,8 @@ protected Void visitDelete(Delete node, Integer indent) builder.append("DELETE FROM ") .append(formatName(node.getTable().getName())); + builder.append(formatPropertiesMultiLine(node.getTable().getProperties())); + node.getWhere().ifPresent(where -> builder .append(" WHERE ") .append(formatExpression(where))); @@ -1877,6 +1881,8 @@ protected Void visitInsert(Insert node, Integer indent) builder.append("INSERT INTO ") .append(formatName(node.getTarget())); + builder.append(formatPropertiesMultiLine(node.getTable().getProperties())); + node.getColumns().ifPresent(columns -> builder .append(" (") .append(Joiner.on(", ").join(columns)) @@ -1893,8 +1899,12 @@ protected Void visitInsert(Insert node, Integer indent) protected Void visitUpdate(Update node, Integer indent) { builder.append("UPDATE ") - .append(formatName(node.getTable().getName())) - .append(" SET"); + .append(formatName(node.getTable().getName())); + + builder.append(formatPropertiesMultiLine(node.getTable().getProperties())); + + builder.append(" SET"); + int setCounter = node.getAssignments().size() - 1; for (UpdateAssignment assignment : node.getAssignments()) { builder.append("\n") diff --git a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java index d5afc46d87fe..26576ab83dab 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java +++ b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java @@ -650,6 +650,11 @@ public Node visitDropView(SqlBaseParser.DropViewContext context) @Override public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) { + List properties = ImmutableList.of(); + if (context.properties() != null) { + properties = visit(context.properties().propertyAssignments().property(), Property.class); + } + Optional> columnAliases = Optional.empty(); if (context.columnAliases() != null) { columnAliases = Optional.of(visit(context.columnAliases().identifier(), Identifier.class)); @@ -657,7 +662,7 @@ public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) return new Insert( getLocation(context), - new Table(getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), properties), columnAliases, (Query) visit(context.rootQuery())); } @@ -665,18 +670,27 @@ public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) @Override public Node visitDelete(SqlBaseParser.DeleteContext context) { + List properties = ImmutableList.of(); + if (context.properties() != null) { + properties = visit(context.properties().propertyAssignments().property(), Property.class); + } + return new Delete( getLocation(context), - new Table(getLocation(context), getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), properties), visitIfPresent(context.booleanExpression(), Expression.class)); } @Override public Node visitUpdate(SqlBaseParser.UpdateContext context) { + List properties = ImmutableList.of(); + if (context.properties() != null) { + properties = visit(context.properties().propertyAssignments().property(), Property.class); + } return new Update( getLocation(context), - new Table(getLocation(context), getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), properties), visit(context.updateAssignment(), UpdateAssignment.class), visitIfPresent(context.booleanExpression(), Expression.class)); } @@ -696,7 +710,11 @@ public Node visitTruncateTable(SqlBaseParser.TruncateTableContext context) @Override public Node visitMerge(SqlBaseParser.MergeContext context) { - Table table = new Table(getLocation(context), getQualifiedName(context.qualifiedName())); + List properties = ImmutableList.of(); + if (context.properties() != null) { + properties = visit(context.properties().propertyAssignments().property(), Property.class); + } + Table table = new Table(getLocation(context), getQualifiedName(context.qualifiedName()), properties); Relation targetRelation = table; if (context.identifier() != null) { targetRelation = new AliasedRelation(table, (Identifier) visit(context.identifier()), null); @@ -1984,7 +2002,7 @@ public Node visitAliasedRelation(SqlBaseParser.AliasedRelationContext context) public Node visitTableName(SqlBaseParser.TableNameContext context) { if (context.queryPeriod() != null) { - return new Table(getLocation(context), getQualifiedName(context.qualifiedName()), (QueryPeriod) visit(context.queryPeriod())); + return new Table(getLocation(context), getQualifiedName(context.qualifiedName()), (QueryPeriod) visit(context.queryPeriod()), ImmutableList.of()); } return new Table(getLocation(context), getQualifiedName(context.qualifiedName())); } diff --git a/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java b/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java index 5afb5a438da2..716b7feebeed 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java +++ b/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java @@ -26,27 +26,34 @@ public class Table { private final QualifiedName name; private final Optional queryPeriod; + private final List properties; public Table(QualifiedName name) { - this(Optional.empty(), name, Optional.empty()); + this(Optional.empty(), name, Optional.empty(), ImmutableList.of()); } public Table(NodeLocation location, QualifiedName name) { - this(Optional.of(location), name, Optional.empty()); + this(Optional.of(location), name, Optional.empty(), ImmutableList.of()); } - public Table(NodeLocation location, QualifiedName name, QueryPeriod queryPeriod) + public Table(NodeLocation location, QualifiedName name, List properties) { - this(Optional.of(location), name, Optional.of(queryPeriod)); + this(Optional.of(location), name, Optional.empty(), properties); } - private Table(Optional location, QualifiedName name, Optional queryPeriod) + public Table(NodeLocation location, QualifiedName name, QueryPeriod queryPeriod, List properties) + { + this(Optional.of(location), name, Optional.of(queryPeriod), properties); + } + + private Table(Optional location, QualifiedName name, Optional queryPeriod, List properties) { super(location); this.name = name; this.queryPeriod = queryPeriod; + this.properties = ImmutableList.copyOf(properties); } public QualifiedName getName() @@ -63,10 +70,10 @@ public R accept(AstVisitor visitor, C context) @Override public List getChildren() { - if (queryPeriod.isPresent()) { - return ImmutableList.of(queryPeriod.get()); - } - return ImmutableList.of(); + ImmutableList.Builder nodes = ImmutableList.builder(); + queryPeriod.ifPresent(nodes::add); + nodes.addAll(properties); + return nodes.build(); } @Override @@ -75,6 +82,7 @@ public String toString() return toStringHelper(this) .addValue(name) .addValue(queryPeriod) + .addValue(properties) .toString(); } @@ -90,13 +98,14 @@ public boolean equals(Object o) Table table = (Table) o; return Objects.equals(name, table.name) && - Objects.equals(queryPeriod, table.getQueryPeriod()); + Objects.equals(queryPeriod, table.getQueryPeriod()) && + Objects.equals(properties, table.getProperties()); } @Override public int hashCode() { - return Objects.hash(name, queryPeriod); + return Objects.hash(name, queryPeriod, properties); } @Override @@ -114,4 +123,9 @@ public Optional getQueryPeriod() { return queryPeriod; } + + public List getProperties() + { + return properties; + } } diff --git a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java index dcc7e3f4b927..482204d229b5 100644 --- a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java +++ b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java @@ -5874,7 +5874,7 @@ public void testQueryPeriod() { Expression rangeValue = new GenericLiteral(location(1, 37), "TIMESTAMP", "2021-03-01 00:00:01"); QueryPeriod queryPeriod = new QueryPeriod(location(1, 17), QueryPeriod.RangeType.TIMESTAMP, rangeValue); - Table table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod); + Table table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod, ImmutableList.of()); assertThat(statement("SELECT * FROM t FOR TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'")) .isEqualTo( new Query( @@ -5905,7 +5905,7 @@ public void testQueryPeriod() rangeValue = new StringLiteral(location(1, 35), "version1"); queryPeriod = new QueryPeriod(new NodeLocation(1, 17), QueryPeriod.RangeType.VERSION, rangeValue); - table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod); + table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod, ImmutableList.of()); assertThat(statement("SELECT * FROM t FOR VERSION AS OF 'version1'")) .isEqualTo( new Query( diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 4696d8c03ea4..daee8822bb7b 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -116,6 +116,26 @@ default ConnectorTableHandle getTableHandle( throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getTableHandle() is not implemented"); } + /** + * Returns a table handle for the specified table name and version, or {@code null} if {@code tableName} relation does not exist + * or is not a table (e.g. is a view, or a materialized view). + * + * @throws TrinoException implementation can throw this exception when {@code tableName} refers to a table that + * cannot be queried. + * @see #getView(ConnectorSession, SchemaTableName) + * @see #getMaterializedView(ConnectorSession, SchemaTableName) + */ + @Nullable + default ConnectorTableHandle getTableHandle( + ConnectorSession session, + SchemaTableName tableName, + Optional startVersion, + Optional endVersion, + Map properties) + { + return getTableHandle(session, tableName, startVersion, endVersion); + } + /** * Create initial handle for execution of table procedure. The handle will be used through planning process. It will be converted to final * handle used for execution via @{link {@link ConnectorMetadata#beginTableExecute} @@ -420,7 +440,7 @@ default Iterator streamRelationComments(ConnectorSessio return RelationCommentMetadata.forRedirectedTable(tableName); } try { - ConnectorTableHandle tableHandle = getTableHandle(session, tableName, Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, tableName, Optional.empty(), Optional.empty(), Map.of()); if (tableHandle == null) { // disappeared during listing return null; diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 52f09777e537..1715e375e1e6 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -1246,6 +1246,14 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable } } + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion, Map properties) + { + try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { + return delegate.getTableHandle(session, tableName, startVersion, endVersion, properties); + } + } + @Override public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) { From 8f470b9bf1bb55c3aa94d8b6b0ef28c29a56ed93 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 18 Sep 2024 16:48:36 +0900 Subject: [PATCH 2/2] Add support for branching in Iceberg --- .../trino/plugin/iceberg/IcebergMetadata.java | 222 ++++++++++-- .../trino/plugin/iceberg/IcebergModule.java | 6 + .../iceberg/IcebergPageSinkProvider.java | 3 + .../plugin/iceberg/IcebergTableHandle.java | 19 +- .../iceberg/IcebergTableProperties.java | 12 + .../iceberg/IcebergWritableTableHandle.java | 5 +- .../procedure/CreateBranchProcedure.java | 42 +++ .../procedure/DropBranchProcedure.java | 42 +++ .../procedure/FastForwardProcedure.java | 47 +++ .../procedure/IcebergCreateBranchHandle.java | 25 ++ .../procedure/IcebergDropBranchHandle.java | 25 ++ .../procedure/IcebergFastForwardHandle.java | 26 ++ .../procedure/IcebergProcedureHandle.java | 3 + .../procedure/IcebergTableProcedureId.java | 3 + .../plugin/iceberg/TestIcebergBranching.java | 332 ++++++++++++++++++ ...stIcebergNodeLocalDynamicSplitPruning.java | 4 + .../iceberg/TestIcebergSplitSource.java | 1 + ...TestConnectorPushdownRulesWithIceberg.java | 4 + 18 files changed, 790 insertions(+), 31 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 6107360a40ad..0133110d07f4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -49,8 +49,11 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; +import io.trino.plugin.iceberg.procedure.IcebergCreateBranchHandle; +import io.trino.plugin.iceberg.procedure.IcebergDropBranchHandle; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; +import io.trino.plugin.iceberg.procedure.IcebergFastForwardHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle; import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; @@ -89,6 +92,7 @@ import io.trino.spi.connector.DiscretePredicates; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.PointerType; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -225,9 +229,11 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Maps.transformValues; import static com.google.common.collect.Sets.difference; +import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; +import static io.trino.plugin.base.util.Functions.checkFunctionArgument; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; @@ -281,6 +287,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; +import static io.trino.plugin.iceberg.IcebergTableProperties.getTargetBranch; import static io.trino.plugin.iceberg.IcebergUtil.buildPath; import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs; import static io.trino.plugin.iceberg.IcebergUtil.commit; @@ -314,17 +321,22 @@ import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.TRINO_QUERY_START_TIME; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_EXTENDED_STATS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; @@ -356,6 +368,7 @@ import static java.util.stream.Collectors.joining; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations; +import static org.apache.iceberg.SnapshotRef.MAIN_BRANCH; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; @@ -466,7 +479,8 @@ public ConnectorTableHandle getTableHandle( ConnectorSession session, SchemaTableName tableName, Optional startVersion, - Optional endVersion) + Optional endVersion, + Map properties) { if (startVersion.isPresent()) { throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported"); @@ -487,7 +501,7 @@ public ConnectorTableHandle getTableHandle( BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName) .orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName)); - return tableHandleForCurrentSnapshot(session, tableName, storageTable); + return tableHandleForCurrentSnapshot(session, tableName, storageTable, Optional.empty()); } if (!isDataTable(tableName.getTableName())) { @@ -511,20 +525,29 @@ public ConnectorTableHandle getTableHandle( throw e; } - if (endVersion.isPresent()) { - long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get()); + Optional branch = getTargetBranch(properties); + if (endVersion.isPresent() || branch.isPresent()) { + checkArgument(endVersion.isEmpty() || branch.isEmpty(), "Cannot specify both end version and branch"); + ConnectorTableVersion version = endVersion.orElseGet(() -> new ConnectorTableVersion(PointerType.TARGET_ID, VARCHAR, utf8Slice(branch.get()))); + long snapshotId = getSnapshotIdFromVersion(session, table, version); + Optional partitionSpec = Optional.empty(); + if (branch.isPresent()) { + int schemaId = table.snapshot(snapshotId).schemaId(); + partitionSpec = Optional.of(table.specs().get(schemaId)); + } return tableHandleForSnapshot( session, tableName, table, Optional.of(snapshotId), schemaFor(table, snapshotId), - Optional.empty()); + partitionSpec, + branch); } - return tableHandleForCurrentSnapshot(session, tableName, table); + return tableHandleForCurrentSnapshot(session, tableName, table, branch); } - private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table) + private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table, Optional branch) { return tableHandleForSnapshot( session, @@ -532,7 +555,8 @@ private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession sessio table, Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId), table.schema(), - Optional.of(table.spec())); + Optional.of(table.spec()), + branch); } private IcebergTableHandle tableHandleForSnapshot( @@ -541,7 +565,8 @@ private IcebergTableHandle tableHandleForSnapshot( BaseTable table, Optional tableSnapshotId, Schema tableSchema, - Optional partitionSpec) + Optional partitionSpec, + Optional branch) { Map tableProperties = table.properties(); return new IcebergTableHandle( @@ -561,6 +586,7 @@ private IcebergTableHandle tableHandleForSnapshot( table.location(), table.properties(), getTablePartitioning(session, table), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -599,7 +625,7 @@ private Optional getTablePartitioning(ConnectorSession IntStream.range(0, partitioningHandle.partitionFunctions().size()).boxed().collect(toImmutableList()))); } - private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) + public static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) { io.trino.spi.type.Type versionType = version.getVersionType(); return switch (version.getPointerType()) { @@ -1049,7 +1075,7 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc dropView(session, viewName); } for (SchemaTableName tableName : listTables(session, Optional.of(schemaName))) { - dropTable(session, getTableHandle(session, tableName, Optional.empty(), Optional.empty())); + dropTable(session, getTableHandle(session, tableName, Optional.empty(), Optional.empty(), ImmutableMap.of())); } } catalog.dropNamespace(session, schemaName); @@ -1159,7 +1185,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con String tableLocation = null; if (replace) { - ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTableSchema().getTable(), Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTableSchema().getTable(), Optional.empty(), Optional.empty(), Map.of()); if (tableHandle != null) { checkValidTableHandle(tableHandle); IcebergTableHandle table = (IcebergTableHandle) tableHandle; @@ -1187,7 +1213,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + "to use unique table locations for every table.", location)); } - return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode); + return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), SchemaParser.toJson(transaction.table().schema()), Optional.empty(), retryMode); } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); @@ -1289,12 +1315,15 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Optional branch = table.getBranch(); - validateNotModifyingOldSnapshot(table, icebergTable); + if (branch.isEmpty()) { + validateNotModifyingOldSnapshot(table, icebergTable); + } beginTransaction(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); } private List getChildNamespaces(ConnectorSession session, String parentNamespace) @@ -1310,20 +1339,22 @@ private List getChildNamespaces(ConnectorSession session, String parentN .collect(toImmutableList()); } - private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName schemaTableName, Table table, String schemaAsJson, Optional branch, RetryMode retryMode) { + Schema schema = SchemaParser.fromJson(schemaAsJson); return new IcebergWritableTableHandle( - name, - SchemaParser.toJson(table.schema()), + schemaTableName, + schemaAsJson, transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), - getSupportedSortFields(table.schema(), table.sortOrder()), - getProjectedColumns(table.schema(), typeManager), + getSupportedSortFields(schema, table.sortOrder()), + getProjectedColumns(schema, typeManager), table.location(), getFileFormat(table), table.properties(), retryMode, - table.io().properties()); + table.io().properties(), + branch); } private static List getSupportedSortFields(Schema schema, SortOrder sortOrder) @@ -1393,6 +1424,7 @@ public Optional finishInsert( } appendFiles.appendFile(builder.build()); + table.branch().ifPresent(appendFiles::toBranch); writtenFiles.add(task.path()); } @@ -1403,7 +1435,7 @@ public Optional finishInsert( commitUpdateAndTransaction(appendFiles, session, transaction, "insert"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer - long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); + long newSnapshotId = table.branch().isEmpty() ? transaction.table().currentSnapshot().snapshotId() : transaction.table().refs().get(table.branch().get()).snapshotId(); transaction = null; // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically @@ -1553,6 +1585,9 @@ public Optional getTableHandleForExecute( case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); case ADD_FILES_FROM_TABLE -> getTableHandleForAddFilesFromTable(session, accessControl, tableHandle, executeProperties); + case CREATE_BRANCH -> getTableHandleForCreateBranch(session, tableHandle, executeProperties); + case DROP_BRANCH -> getTableHandleForDropBranch(session, tableHandle, executeProperties); + case FAST_FORWARD -> getTableHandleForFastForward(session, tableHandle, executeProperties); }; } @@ -1621,6 +1656,49 @@ private Optional getTableHandleForRemoveOrphanFiles icebergTable.io().properties())); } + private Optional getTableHandleForCreateBranch(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + CREATE_BRANCH, + new IcebergCreateBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForDropBranch(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + DROP_BRANCH, + new IcebergDropBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForFastForward(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String from = (String) executeProperties.get("from"); + String to = (String) executeProperties.get("to"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + if (!icebergTable.refs().containsKey(from)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(from)); + } + if (!icebergTable.refs().containsKey(to)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(to)); + } + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + FAST_FORWARD, + new IcebergFastForwardHandle(from, to), + icebergTable.location(), + icebergTable.io().properties())); + } + private Optional getTableHandleForAddFiles(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) { if (!addFilesProcedureEnabled) { @@ -1738,6 +1816,9 @@ public Optional getLayoutForTableExecute(ConnectorSession case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via executeTableExecute } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); @@ -1767,6 +1848,9 @@ public BeginTableExecuteResult> readerForManifest(Table table, ManifestFile manifest) { return switch (manifest.content()) { @@ -2628,7 +2782,7 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector return TableStatisticsMetadata.empty(); } - ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty(), ImmutableMap.of()); if (tableHandle == null) { // Assume new table (CTAS), collect NDV stats on all columns return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {}); @@ -2818,11 +2972,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT verifyTableVersionForUpdate(table); Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - validateNotModifyingOldSnapshot(table, icebergTable); + Optional branch = table.getBranch(); + + if (branch.isEmpty()) { + validateNotModifyingOldSnapshot(table, icebergTable); + } beginTransaction(icebergTable); - IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); return new IcebergMergeTableHandle(table, insertHandle); } @@ -2831,8 +2989,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg { IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle; IcebergTableHandle handle = mergeHandle.getTableHandle(); + Optional branch = mergeHandle.getInsertTableHandle().branch(); RetryMode retryMode = mergeHandle.getInsertTableHandle().retryMode(); - finishWrite(session, handle, fragments, retryMode); + finishWrite(session, handle, branch, fragments, retryMode); } private static void verifyTableVersionForUpdate(IcebergTableHandle table) @@ -2849,7 +3008,7 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta } } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, RetryMode retryMode) + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Optional branch, Collection fragments, RetryMode retryMode) { Table icebergTable = transaction.table(); @@ -2866,6 +3025,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); RowDelta rowDelta = transaction.newRowDelta(); + branch.ifPresent(rowDelta::toBranch); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain)); @@ -3000,6 +3160,7 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle DeleteFiles deleteFiles = icebergTable.newDelete() .deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate())); + handle.getBranch().ifPresent(deleteFiles::toBranch); commit(deleteFiles, session); Map summary = icebergTable.currentSnapshot().summary(); @@ -3058,6 +3219,7 @@ public Optional> applyLimit(Connect table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), table.getConstraintColumns(), @@ -3167,6 +3329,7 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), newConstraintColumns, @@ -3318,6 +3481,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getTableLocation(), originalHandle.getStorageProperties(), Optional.empty(), // requiredTablePartitioning does not affect stats + originalHandle.getBranch(), false, // recordScannedFiles does not affect stats originalHandle.getMaxScannedFileSize(), ImmutableSet.of(), // constraintColumns do not affect stats @@ -3413,7 +3577,7 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue())); } - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), Optional.empty(), retryMode); } @Override @@ -3613,7 +3777,7 @@ else if (strings.size() != 2) { String schema = strings.get(0); String name = strings.get(1); SchemaTableName schemaTableName = new SchemaTableName(schema, name); - ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty(), ImmutableMap.of()); if (tableHandle == null || tableHandle instanceof CorruptedIcebergTableHandle) { // Base table is gone or table is corrupted diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 0992e51d2375..f7ce2830d85e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -44,8 +44,11 @@ import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.CreateBranchProcedure; +import io.trino.plugin.iceberg.procedure.DropBranchProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; +import io.trino.plugin.iceberg.procedure.FastForwardProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.plugin.iceberg.procedure.RegisterTableProcedure; import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure; @@ -134,6 +137,9 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(CreateBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(DropBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(FastForwardProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index c1f6e4c6de7d..8c8d8ca99974 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -151,6 +151,9 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via ConnectorMetadata.executeTableExecute } throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 21e1e0535e1e..c4b41761fce7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -49,6 +49,7 @@ public class IcebergTableHandle private final int formatVersion; private final String tableLocation; private final Map storageProperties; + private final Optional branch; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, - @JsonProperty("storageProperties") Map storageProperties) + @JsonProperty("storageProperties") Map storageProperties, + @JsonProperty("branch") Optional branch) { return new IcebergTableHandle( catalog, @@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( tableLocation, storageProperties, Optional.empty(), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -134,6 +137,7 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, Optional tablePartitioning, + Optional branch, boolean recordScannedFiles, Optional maxScannedFileSize, Set constraintColumns, @@ -155,6 +159,7 @@ public IcebergTableHandle( this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null"); + this.branch = requireNonNull(branch, "branch is null"); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); @@ -261,6 +266,12 @@ public Optional getTablePartitioning() return tablePartitioning; } + @JsonProperty + public Optional getBranch() + { + return branch; + } + @JsonIgnore public boolean isRecordScannedFiles() { @@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze() tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, Optional.of(maxScannedFileSize), constraintColumns, @@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -73,6 +74,7 @@ public class IcebergTableProperties .add(ORC_BLOOM_FILTER_FPP_PROPERTY) .add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY) .add(DATA_LOCATION_PROPERTY) + .add(TARGET_BRANCH_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -190,6 +192,11 @@ public IcebergTableProperties( "File system location URI for the table's data files", null, false)) + .add(stringProperty( + TARGET_BRANCH_PROPERTY, + "Target branch name", + null, + true)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -274,6 +281,11 @@ public static Optional getDataLocation(Map tableProperti return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); } + public static Optional getTargetBranch(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(TARGET_BRANCH_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 7f347564c3b7..831deb830771 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -37,7 +38,8 @@ public record IcebergWritableTableHandle( IcebergFileFormat fileFormat, Map storageProperties, RetryMode retryMode, - Map fileIoProperties) + Map fileIoProperties, + Optional branch) implements ConnectorInsertTableHandle, ConnectorOutputTableHandle { public IcebergWritableTableHandle @@ -53,6 +55,7 @@ public record IcebergWritableTableHandle( requireNonNull(retryMode, "retryMode is null"); checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs"); fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); + requireNonNull(branch, "branch is null"); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java new file mode 100644 index 000000000000..81cdf9f830b6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class CreateBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + CREATE_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java new file mode 100644 index 000000000000..123dbe678b90 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class DropBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + DROP_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java new file mode 100644 index 000000000000..e3ddd8af7f39 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class FastForwardProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + FAST_FORWARD.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "from", + "Branch to fast-forward", + null, + false)) + .add(stringProperty( + "to", + "Ref for the from branch to be fast forwarded to", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java new file mode 100644 index 000000000000..3cd9a0986bfd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergCreateBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergCreateBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java new file mode 100644 index 000000000000..27de2deb8385 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergDropBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergDropBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java new file mode 100644 index 000000000000..23dc9daf0e45 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergFastForwardHandle(String from, String to) + implements IcebergProcedureHandle +{ + public IcebergFastForwardHandle + { + requireNonNull(from, "from is null"); + requireNonNull(to, "to is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java index 1577b867363e..d0f242b6b2e3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -26,5 +26,8 @@ @JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"), @JsonSubTypes.Type(value = IcebergAddFilesHandle.class, name = "add_files"), @JsonSubTypes.Type(value = IcebergAddFilesFromTableHandle.class, name = "add_files_from_table"), + @JsonSubTypes.Type(value = IcebergCreateBranchHandle.class, name = "create_branch"), + @JsonSubTypes.Type(value = IcebergDropBranchHandle.class, name = "drop_branch"), + @JsonSubTypes.Type(value = IcebergFastForwardHandle.class, name = "fast_forward"), }) public interface IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java index 31884f752469..bc555708fab9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -21,4 +21,7 @@ public enum IcebergTableProcedureId REMOVE_ORPHAN_FILES, ADD_FILES, ADD_FILES_FROM_TABLE, + CREATE_BRANCH, + DROP_BRANCH, + FAST_FORWARD, } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java new file mode 100644 index 000000000000..da1ef22e261d --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java @@ -0,0 +1,332 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import org.apache.iceberg.Table; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.util.Optional; + +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.IcebergTestUtils.loadTable; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergBranching + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystemFactory fileSystemFactory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + QueryRunner queryRunner = IcebergQueryRunner.builder().build(); + + metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector() + .getInstance(HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + fileSystemFactory = getFileSystemFactory(queryRunner); + + return queryRunner; + } + + @Test + void testCreateBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertBranch(table.getName(), "main", "test-branch"); + + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE create_branch('test-branch')", "Branch 'test-branch' already exists"); + } + } + + @Test + void testDropBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_drop_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + dropBranch(table.getName(), "test-branch"); + dropBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('test-branch')", "Branch 'test-branch' does not exit"); + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('main')", "Cannot drop 'main' branch"); + } + } + + @Test + void testFastForward() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES 1, 2, 3", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + fastForward(table.getName(), "main", "test-branch"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(3L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('non-existing-branch', 'main')", + "Branch 'non-existing-branch' does not exit"); + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'non-existing-branch')", + "Branch 'non-existing-branch' does not exit"); + } + } + + @Test + void testFastForwardNotAncestor() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1); + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES 1, 2, 3", 3); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'test-branch')", + "Branch 'main' is not an ancestor of 'test-branch'"); + } + } + + @Test + void testInsert() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // insert into main (default) branch + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'main') VALUES (1, 2)", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + // insert into another branch + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES (10, 20), (30, 40)", 2); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(2L); + + // insert into another branch with a partial column + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') (x) VALUES 50", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "INSERT INTO " + table.getName() + " WITH (target_branch = 'non-existing') VALUES (1, 2, 3)", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testInsertAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN z int"); + + assertQueryFails( + "INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES (1, 2, 3)", + "\\QInsert query has mismatched column types: Table: [integer, integer], Query: [integer, integer, integer]"); + + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') SELECT x + 10, y + 10 FROM " + table.getName(), 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 2, CAST(NULL AS integer))"); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES (11, 12, CAST(NULL AS integer))"); + } + } + + @Test + void testDelete() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES (1, 10), (2, 20), (3, 30)", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertUpdate("DELETE FROM " + table.getName() + " WITH (target_branch = 'test-branch')"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + assertQueryFails( + "DELETE FROM " + table.getName() + " WITH (target_branch = 'non-existing')", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testDeleteAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + + // TODO This should be fixed after once https://github.com/trinodb/trino/issues/23601 is resolved + assertThat(query("DELETE FROM " + table.getName() + " WITH (target_branch = 'test-branch') WHERE y = 30")).failure() + .hasMessageContaining("Invalid metadata file") + .hasStackTraceContaining("Cannot find field 'y'"); + + // branch returns the latest schema once a new snapshot is created + assertUpdate("DELETE FROM " + table.getName() + " WITH (target_branch = 'test-branch') WHERE x = 1", 1); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 3"); + } + } + + @Test + void testUpdate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES 1, 2, 3", 3); + + assertUpdate("UPDATE " + table.getName() + " WITH (target_branch = 'test-branch') SET x = x * 2", 3); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 4, 6"); + + assertQueryFails( + "UPDATE " + table.getName() + " WITH (target_branch = 'non-existing') SET x = x * 2", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testUpdateAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " WITH (target_branch = 'test-branch') VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("UPDATE " + table.getName() + " WITH (target_branch = 'test-branch') SET y = 10", 3); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1, 2, 3"); + } + } + + @Test + void testMerge() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("MERGE INTO " + table.getName() + " WITH (target_branch = 'test-branch') USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + + assertUpdate("MERGE INTO " + table.getName() + " WITH (target_branch = 'test-branch') USING (VALUES 42) t(dummy) ON true " + + " WHEN MATCHED THEN UPDATE SET x = 10", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 10"); + + assertQueryFails( + "MERGE INTO " + table.getName() + " WITH (target_branch = 'not-existing') USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", + "Cannot find snapshot with reference name: not-existing"); + } + } + + @Test + void testMergeAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("MERGE INTO " + table.getName() + " WITH (target_branch = 'test-branch') USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", 1); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + } + } + + private void createBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE create_branch('" + branch + "')"); + } + + private void dropBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE drop_branch('" + branch + "')"); + } + + private void fastForward(String table, String from, String to) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE fast_forward('" + from + "', '" + to + "')"); + } + + private void assertBranch(String tableName, String... branchNames) + { + Table table = loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch"); + table.refresh(); + assertThat(table.refs()).containsOnlyKeys(branchNames); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 01bcecfe8d50..e55852b6f8a4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -172,6 +172,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -233,6 +234,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -344,6 +346,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -506,6 +509,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 62be3d6672c3..8acf4813c473 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -490,6 +490,7 @@ private static IcebergTableHandle createTableHandle(SchemaTableName schemaTableN nationTable.location(), nationTable.properties(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 676f4cf1f7d2..fd273b86868c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -176,6 +176,7 @@ public void testProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -261,6 +262,7 @@ public void testPredicatePushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -313,6 +315,7 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -375,6 +378,7 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(),