diff --git a/core/trino-main/src/main/java/io/trino/execution/CreateTableTask.java b/core/trino-main/src/main/java/io/trino/execution/CreateTableTask.java index b8804373a246..955e3a83ae95 100644 --- a/core/trino-main/src/main/java/io/trino/execution/CreateTableTask.java +++ b/core/trino-main/src/main/java/io/trino/execution/CreateTableTask.java @@ -32,6 +32,7 @@ import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SaveMode; import io.trino.spi.security.AccessDeniedException; import io.trino.spi.type.Type; import io.trino.spi.type.TypeNotFoundException; @@ -83,7 +84,6 @@ import static io.trino.sql.tree.LikeClause.PropertiesOption.EXCLUDING; import static io.trino.sql.tree.LikeClause.PropertiesOption.INCLUDING; import static io.trino.sql.tree.SaveMode.FAIL; -import static io.trino.sql.tree.SaveMode.IGNORE; import static io.trino.sql.tree.SaveMode.REPLACE; import static io.trino.type.UnknownType.UNKNOWN; import static java.util.Locale.ENGLISH; @@ -130,10 +130,6 @@ public ListenableFuture execute( ListenableFuture internalExecute(CreateTable statement, Session session, List parameters, Consumer outputConsumer) { checkArgument(!statement.getElements().isEmpty(), "no columns for table"); - // TODO: Remove when engine is supporting table replacement - if (statement.getSaveMode() == REPLACE) { - throw semanticException(NOT_SUPPORTED, statement, "Replace table is not supported"); - } Map, Expression> parameterLookup = bindParameters(statement, parameters); QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName()); @@ -147,7 +143,7 @@ ListenableFuture internalExecute(CreateTable statement, Session session, L } throw e; } - if (tableHandle.isPresent()) { + if (tableHandle.isPresent() && statement.getSaveMode() != REPLACE) { if (statement.getSaveMode() == FAIL) { throw semanticException(TABLE_ALREADY_EXISTS, statement, "Table '%s' already exists", tableName); } @@ -298,7 +294,7 @@ else if (element instanceof LikeClause likeClause) { Map finalProperties = combineProperties(specifiedPropertyKeys, properties, inheritedProperties); ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName.asSchemaTableName(), ImmutableList.copyOf(columns.values()), finalProperties, statement.getComment()); try { - plannerContext.getMetadata().createTable(session, catalogName, tableMetadata, statement.getSaveMode() == IGNORE); + plannerContext.getMetadata().createTable(session, catalogName, tableMetadata, toConnectorSaveMode(statement.getSaveMode())); } catch (TrinoException e) { // connectors are not required to handle the ignoreExisting flag @@ -334,4 +330,13 @@ private static Map combineProperties(Set specifiedProper } return finalProperties; } + + private static SaveMode toConnectorSaveMode(io.trino.sql.tree.SaveMode saveMode) + { + return switch (saveMode) { + case FAIL -> SaveMode.FAIL; + case IGNORE -> SaveMode.IGNORE; + case REPLACE -> SaveMode.REPLACE; + }; + } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 123371da2b35..8edc880c34d1 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -40,6 +40,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortItem; import io.trino.spi.connector.SystemTable; @@ -213,9 +214,9 @@ Optional getTableHandleForExecute( /** * Creates a table using the specified table metadata. * - * @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param ignoreExisting} is not set + * @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param saveMode} is set to FAIL. */ - void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting); + void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode); /** * Rename the specified table. @@ -314,7 +315,7 @@ Optional getTableHandleForExecute( /** * Begin the atomic creation of a table with data. */ - OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout); + OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout, boolean replace); /** * Finish a table creation with data after the data is written. diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 0906f31f70f9..13ae279e4bbc 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -71,6 +71,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SortItem; @@ -762,12 +763,12 @@ public void setSchemaAuthorization(Session session, CatalogSchemaName source, Tr } @Override - public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName); CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle(); ConnectorMetadata metadata = catalogMetadata.getMetadata(session); - metadata.createTable(session.toConnectorSession(catalogHandle), tableMetadata, ignoreExisting); + metadata.createTable(session.toConnectorSession(catalogHandle), tableMetadata, saveMode); if (catalogMetadata.getSecurityManagement() == SYSTEM) { systemSecurityMetadata.tableCreated(session, new CatalogSchemaTableName(catalogName, tableMetadata.getTable())); } @@ -1034,7 +1035,7 @@ public void cleanupQuery(Session session) } @Override - public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout) + public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout, boolean replace) { CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName); CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle(); @@ -1042,7 +1043,7 @@ public OutputTableHandle beginCreateTable(Session session, String catalogName, C ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(catalogHandle); ConnectorSession connectorSession = session.toConnectorSession(catalogHandle); - ConnectorOutputTableHandle handle = metadata.beginCreateTable(connectorSession, tableMetadata, layout.map(TableLayout::getLayout), getRetryPolicy(session).getRetryMode()); + ConnectorOutputTableHandle handle = metadata.beginCreateTable(connectorSession, tableMetadata, layout.map(TableLayout::getLayout), getRetryPolicy(session).getRetryMode(), replace); return new OutputTableHandle(catalogHandle, tableMetadata.getTable(), transactionHandle, handle); } diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java index 85ecd972178b..b55bdf60d84b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java @@ -1316,19 +1316,22 @@ public static final class Create private final Optional layout; private final boolean createTableAsSelectWithData; private final boolean createTableAsSelectNoOp; + private final boolean replace; public Create( Optional destination, Optional metadata, Optional layout, boolean createTableAsSelectWithData, - boolean createTableAsSelectNoOp) + boolean createTableAsSelectNoOp, + boolean replace) { this.destination = requireNonNull(destination, "destination is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.layout = requireNonNull(layout, "layout is null"); this.createTableAsSelectWithData = createTableAsSelectWithData; this.createTableAsSelectNoOp = createTableAsSelectNoOp; + this.replace = replace; } public Optional getDestination() @@ -1355,6 +1358,11 @@ public boolean isCreateTableAsSelectNoOp() { return createTableAsSelectNoOp; } + + public boolean isReplace() + { + return replace; + } } @Immutable diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index afdcfa920950..4671d5ecabd7 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -880,23 +880,19 @@ protected Scope visitAnalyze(Analyze node, Optional scope) @Override protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional scope) { - // TODO: Remove when engine is supporting table replacement - if (node.getSaveMode() == REPLACE) { - throw semanticException(NOT_SUPPORTED, node, "Replace table is not supported"); - } - // turn this into a query that has a new table writer node on top. QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getName()); Optional targetTableHandle = metadata.getTableHandle(session, targetTable); - if (targetTableHandle.isPresent()) { + if (targetTableHandle.isPresent() && node.getSaveMode() != REPLACE) { if (node.getSaveMode() == IGNORE) { analysis.setCreate(new Analysis.Create( Optional.of(targetTable), Optional.empty(), Optional.empty(), node.isWithData(), - true)); + true, + false)); analysis.setUpdateType("CREATE TABLE"); analysis.setUpdateTarget(targetTableHandle.get().getCatalogHandle().getVersion(), targetTable, Optional.empty(), Optional.of(ImmutableList.of())); return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT)); @@ -985,7 +981,8 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional scope) { - // TODO: Remove when engine is supporting table replacement - if (node.getSaveMode() == REPLACE) { - throw semanticException(NOT_SUPPORTED, node, "Replace table is not supported"); - } - validateProperties(node.getProperties(), scope); return createAndAssignScope(node, scope); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index cd8981cc43e0..75fc8e8fec33 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -493,7 +493,7 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) analysis, plan.getRoot(), visibleFields(plan), - new CreateReference(catalogName, tableMetadata, newTableLayout), + new CreateReference(catalogName, tableMetadata, newTableLayout, create.isReplace()), columnNames, newTableLayout, statisticsMetadata); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index 33d8fd70e2fa..c111a8284537 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -242,11 +242,12 @@ private WriterTarget createWriterTarget(WriterTarget target) // TODO: we shouldn't need to store the schemaTableName in the handles, but there isn't a good way to pass this around with the current architecture if (target instanceof CreateReference create) { return new CreateTarget( - metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), + metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout(), create.isReplace()), create.getTableMetadata().getTable(), target.supportsMultipleWritersPerPartition(metadata, session), target.getMaxWriterTasks(metadata, session), - target.getWriterScalingOptions(metadata, session)); + target.getWriterScalingOptions(metadata, session), + create.isReplace()); } if (target instanceof InsertReference insert) { return new InsertTarget( diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java index 099ae3465f96..9a359504cc8a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java @@ -208,12 +208,14 @@ public static class CreateReference private final String catalog; private final ConnectorTableMetadata tableMetadata; private final Optional layout; + private final boolean replace; - public CreateReference(String catalog, ConnectorTableMetadata tableMetadata, Optional layout) + public CreateReference(String catalog, ConnectorTableMetadata tableMetadata, Optional layout, boolean replace) { this.catalog = requireNonNull(catalog, "catalog is null"); this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null"); this.layout = requireNonNull(layout, "layout is null"); + this.replace = replace; } public String getCatalog() @@ -253,6 +255,11 @@ public ConnectorTableMetadata getTableMetadata() return tableMetadata; } + public boolean isReplace() + { + return replace; + } + @Override public String toString() { @@ -268,6 +275,7 @@ public static class CreateTarget private final boolean multipleWritersPerPartitionSupported; private final OptionalInt maxWriterTasks; private final WriterScalingOptions writerScalingOptions; + private final boolean replace; @JsonCreator public CreateTarget( @@ -275,13 +283,15 @@ public CreateTarget( @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported, @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks, - @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions, + @JsonProperty("replace") boolean replace) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported; this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); + this.replace = replace; } @JsonProperty @@ -308,6 +318,12 @@ public WriterScalingOptions getWriterScalingOptions() return writerScalingOptions; } + @JsonProperty + public boolean isReplace() + { + return replace; + } + @Override public String toString() { diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index 0443f764cc56..67b3b8785ff3 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -55,6 +55,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SortItem; @@ -375,6 +376,15 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe } } + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) + { + Span span = startSpan("createTable", tableMetadata.getTable()); + try (var ignored = scopedSpan(span)) { + delegate.createTable(session, tableMetadata, saveMode); + } + } + @Override public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -609,6 +619,15 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con } } + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace) + { + Span span = startSpan("beginCreateTable", tableMetadata.getTable()); + try (var ignored = scopedSpan(span)) { + return delegate.beginCreateTable(session, tableMetadata, layout, retryMode, replace); + } + } + @Override public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) { diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 2df32de219f9..2507690d0016 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -68,6 +68,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortItem; import io.trino.spi.connector.SystemTable; @@ -388,11 +389,11 @@ public void setSchemaAuthorization(Session session, CatalogSchemaName source, Tr } @Override - public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { Span span = startSpan("createTable", catalogName, tableMetadata); try (var ignored = scopedSpan(span)) { - delegate.createTable(session, catalogName, tableMetadata, ignoreExisting); + delegate.createTable(session, catalogName, tableMetadata, saveMode); } } @@ -568,11 +569,11 @@ public Optional getSupportedType(Session session, CatalogHandle catalogHan } @Override - public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout) + public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout, boolean replace) { Span span = startSpan("beginCreateTable", catalogName, tableMetadata); try (var ignored = scopedSpan(span)) { - return delegate.beginCreateTable(session, catalogName, tableMetadata, layout); + return delegate.beginCreateTable(session, catalogName, tableMetadata, layout, replace); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/BaseDataDefinitionTaskTest.java b/core/trino-main/src/test/java/io/trino/execution/BaseDataDefinitionTaskTest.java index 0206ae0c32f9..8357a4d93a30 100644 --- a/core/trino-main/src/test/java/io/trino/execution/BaseDataDefinitionTaskTest.java +++ b/core/trino-main/src/test/java/io/trino/execution/BaseDataDefinitionTaskTest.java @@ -43,6 +43,7 @@ import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TestingColumnHandle; import io.trino.spi.function.OperatorType; @@ -82,6 +83,8 @@ import static io.trino.metadata.MetadataManager.createTestMetadataManager; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.DIVISION_BY_ZERO; +import static io.trino.spi.connector.SaveMode.IGNORE; +import static io.trino.spi.connector.SaveMode.REPLACE; import static io.trino.spi.session.PropertyMetadata.longProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.BigintType.BIGINT; @@ -340,9 +343,9 @@ public TableMetadata getTableMetadata(Session session, TableHandle tableHandle) } @Override - public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { - checkArgument(ignoreExisting || !tables.containsKey(tableMetadata.getTable())); + checkArgument(saveMode == REPLACE || saveMode == IGNORE || !tables.containsKey(tableMetadata.getTable())); tables.put(tableMetadata.getTable(), tableMetadata); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestAddColumnTask.java b/core/trino-main/src/test/java/io/trino/execution/TestAddColumnTask.java index d89b8b328d1c..5b497a1cc2f6 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestAddColumnTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestAddColumnTask.java @@ -45,6 +45,7 @@ import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RowType.rowType; @@ -60,7 +61,7 @@ public class TestAddColumnTask public void testAddColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("test", BIGINT)); @@ -74,7 +75,7 @@ public void testAddColumn() public void testAddColumnWithComment() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); getFutureValue(executeAddColumn(asQualifiedName(tableName), QualifiedName.of("new_col"), INTEGER, Optional.of("test comment"), false, false)); @@ -92,7 +93,7 @@ public void testAddColumnWithComment() public void testAddColumnWithColumnProperty() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); Property columnProperty = new Property(new Identifier("column_property"), new LongLiteral("111")); @@ -126,7 +127,7 @@ public void testAddColumnNotExistingTableIfExists() public void testAddColumnNotExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("test", BIGINT)); @@ -140,7 +141,7 @@ public void testAddColumnNotExists() public void testAddColumnAlreadyExist() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeAddColumn(asQualifiedName(tableName), QualifiedName.of("test"), INTEGER, Optional.empty(), false, false))) .hasErrorCode(COLUMN_ALREADY_EXISTS) @@ -173,7 +174,7 @@ public void testAddColumnOnMaterializedView() public void testAddFieldWithNotExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("col", rowType(new RowType.Field(Optional.of("a"), BIGINT)))); @@ -191,7 +192,7 @@ public void testAddFieldToNotExistingField() testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), rowType(new RowType.Field(Optional.of("b"), INTEGER)))), - false); + FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeAddColumn(asQualifiedName(tableName), QualifiedName.of("col", "x", "c"), INTEGER, false, false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -206,7 +207,7 @@ public void testUnsupportedArrayTypeInRowField() testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), new ArrayType(rowType(new RowType.Field(Optional.of("element"), INTEGER))))), - false); + FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeAddColumn(asQualifiedName(tableName), QualifiedName.of("col", "a", "c"), INTEGER, false, false))) .hasErrorCode(NOT_SUPPORTED) @@ -224,7 +225,7 @@ public void testUnsupportedMapTypeInRowField() rowType(new RowType.Field(Optional.of("key"), INTEGER)), rowType(new RowType.Field(Optional.of("key"), INTEGER)), new TypeOperators()))), - false); + FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeAddColumn(asQualifiedName(tableName), QualifiedName.of("col", "a", "c"), INTEGER, false, false))) .hasErrorCode(NOT_SUPPORTED) @@ -235,7 +236,7 @@ public void testUnsupportedMapTypeInRowField() public void testUnsupportedAddDuplicatedField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).orElseThrow(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("col", rowType(new RowType.Field(Optional.of("a"), BIGINT)))); @@ -260,7 +261,7 @@ public void testUnsupportedAddAmbiguousField() rowTable(tableName, new RowType.Field(Optional.of("a"), rowType(new RowType.Field(Optional.of("x"), INTEGER))), new RowType.Field(Optional.of("A"), rowType(new RowType.Field(Optional.of("y"), INTEGER)))), - false); + FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("col", rowType( diff --git a/core/trino-main/src/test/java/io/trino/execution/TestCommentTask.java b/core/trino-main/src/test/java/io/trino/execution/TestCommentTask.java index 1c9863fbb702..bca446767f66 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestCommentTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestCommentTask.java @@ -30,6 +30,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.sql.tree.Comment.Type.COLUMN; import static io.trino.sql.tree.Comment.Type.TABLE; import static io.trino.sql.tree.Comment.Type.VIEW; @@ -44,7 +45,7 @@ public class TestCommentTask public void testCommentTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertThat(metadata.getTableMetadata(testSession, metadata.getTableHandle(testSession, tableName).get()).getMetadata().getComment()) .isEmpty(); @@ -90,7 +91,7 @@ public void testCommentView() public void testCommentViewOnTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(setComment(VIEW, asQualifiedName(tableName), Optional.of("new comment")))) .hasErrorCode(TABLE_NOT_FOUND) @@ -113,7 +114,7 @@ public void testCommentTableColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); QualifiedName columnName = qualifiedColumnName("existing_table", "test"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); getFutureValue(setComment(COLUMN, columnName, Optional.of("new test column comment"))); TableHandle tableHandle = metadata.getTableHandle(testSession, tableName).get(); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestCreateTableTask.java b/core/trino-main/src/test/java/io/trino/execution/TestCreateTableTask.java index 2f1e41e29929..ef413f26905e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestCreateTableTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestCreateTableTask.java @@ -33,6 +33,7 @@ import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorCapabilities; import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.AccessDeniedException; import io.trino.spi.type.TimestampType; @@ -81,6 +82,7 @@ import static io.trino.sql.tree.LikeClause.PropertiesOption.INCLUDING; import static io.trino.sql.tree.SaveMode.FAIL; import static io.trino.sql.tree.SaveMode.IGNORE; +import static io.trino.sql.tree.SaveMode.REPLACE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.SHOW_CREATE_TABLE; import static io.trino.testing.TestingAccessControlManager.privilege; @@ -194,6 +196,22 @@ public void testCreateTableNotExistsFalse() assertEquals(metadata.getCreateTableCallCount(), 1); } + @Test + public void testReplaceTable() + { + CreateTable statement = new CreateTable(QualifiedName.of("test_table"), + ImmutableList.of(new ColumnDefinition(QualifiedName.of("a"), toSqlType(BIGINT), true, emptyList(), Optional.empty())), + REPLACE, + ImmutableList.of(), + Optional.empty()); + + CreateTableTask createTableTask = new CreateTableTask(plannerContext, new AllowAllAccessControl(), columnPropertyManager, tablePropertyManager); + getFutureValue(createTableTask.internalExecute(statement, testSession, emptyList(), output -> {})); + assertEquals(metadata.getCreateTableCallCount(), 1); + assertThat(metadata.getReceivedTableMetadata().get(0).getColumns()) + .isEqualTo(ImmutableList.of(new ColumnMetadata("a", BIGINT))); + } + @Test public void testCreateTableWithMaterializedViewPropertyFails() { @@ -422,10 +440,13 @@ private class MockMetadata private Set connectorCapabilities = ImmutableSet.of(); @Override - public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { + if (saveMode == SaveMode.REPLACE) { + tables.removeIf(table -> table.getTable().equals(tableMetadata.getTable())); + } tables.add(tableMetadata); - if (!ignoreExisting) { + if (saveMode == SaveMode.FAIL) { throw new TrinoException(ALREADY_EXISTS, "Table already exists"); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java index 20c5d9370e22..77c2c8240384 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestCreateViewTask.java @@ -37,6 +37,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.TABLE_ALREADY_EXISTS; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.sql.QueryUtil.selectList; import static io.trino.sql.QueryUtil.simpleQuery; import static io.trino.sql.QueryUtil.table; @@ -68,7 +69,7 @@ public void setUp() new StatementRewrite(ImmutableSet.of()), plannerContext.getTracer()); QualifiedObjectName tableName = qualifiedObjectName("mock_table"); - metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), FAIL); } @Test @@ -104,7 +105,7 @@ public void testReplaceViewOnViewIfExists() public void testCreateViewOnTableIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeCreateView(asQualifiedName(tableName), false))) .hasErrorCode(TABLE_ALREADY_EXISTS) @@ -115,7 +116,7 @@ public void testCreateViewOnTableIfExists() public void testReplaceViewOnTableIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeCreateView(asQualifiedName(tableName), true))) .hasErrorCode(TABLE_ALREADY_EXISTS) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropColumnTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropColumnTask.java index 43fb56195f94..a443dc529e80 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropColumnTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropColumnTask.java @@ -33,6 +33,7 @@ import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; @@ -45,7 +46,7 @@ public class TestDropColumnTask public void testDropColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT)); @@ -59,7 +60,7 @@ public void testDropColumn() public void testDropOnlyColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("test", BIGINT)); @@ -92,7 +93,7 @@ public void testDropColumnNotExistingTableIfExists() public void testDropMissingColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeDropColumn(asQualifiedName(tableName), QualifiedName.of("missing_column"), false, false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -103,7 +104,7 @@ public void testDropMissingColumn() public void testDropColumnIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); getFutureValue(executeDropColumn(asQualifiedName(tableName), QualifiedName.of("c"), false, true)); @@ -115,7 +116,7 @@ public void testDropColumnIfExists() public void testUnsupportedDropDuplicatedField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT), new Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT), new Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .isEqualTo(ImmutableList.of(new ColumnMetadata("col", RowType.rowType( @@ -130,7 +131,7 @@ public void testUnsupportedDropDuplicatedField() public void testUnsupportedDropOnlyField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("col", RowType.rowType(new Field(Optional.of("a"), BIGINT)))); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropMaterializedViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropMaterializedViewTask.java index 1f2ee9981cc1..bfbd422399b9 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropMaterializedViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropMaterializedViewTask.java @@ -24,6 +24,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -65,7 +66,7 @@ public void testDropNotExistingMaterializedViewIfExists() public void testDropMaterializedViewOnTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeDropMaterializedView(asQualifiedName(tableName), true))) .hasErrorCode(GENERIC_USER_ERROR) @@ -76,7 +77,7 @@ public void testDropMaterializedViewOnTable() public void testDropMaterializedViewOnTableIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeDropMaterializedView(asQualifiedName(tableName), true))) .hasErrorCode(GENERIC_USER_ERROR) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropSchemaTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropSchemaTask.java index 76cd27404e49..57118ccb4631 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropSchemaTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropSchemaTask.java @@ -28,6 +28,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.execution.warnings.WarningCollector.NOOP; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static java.util.Collections.emptyList; @@ -69,7 +70,7 @@ public void testDropNonEmptySchemaRestrict() DropSchema dropSchema = new DropSchema(QualifiedName.of(CATALOG_SCHEMA_NAME.getSchemaName()), false, false); QualifiedObjectName tableName = new QualifiedObjectName(CATALOG_SCHEMA_NAME.getCatalogName(), CATALOG_SCHEMA_NAME.getSchemaName(), "test_table"); - metadata.createTable(testSession, CATALOG_SCHEMA_NAME.getCatalogName(), someTable(tableName), false); + metadata.createTable(testSession, CATALOG_SCHEMA_NAME.getCatalogName(), someTable(tableName), FAIL); assertThatExceptionOfType(TrinoException.class) .isThrownBy(() -> getFutureValue(dropSchemaTask.execute(dropSchema, queryStateMachine, emptyList(), NOOP))) @@ -115,7 +116,7 @@ public void testDropNonEmptySchemaCascade() DropSchema dropSchema = new DropSchema(QualifiedName.of(CATALOG_SCHEMA_NAME.getSchemaName()), false, true); QualifiedObjectName tableName = new QualifiedObjectName(CATALOG_SCHEMA_NAME.getCatalogName(), CATALOG_SCHEMA_NAME.getSchemaName(), "test_table"); - metadata.createTable(testSession, CATALOG_SCHEMA_NAME.getCatalogName(), someTable(tableName), false); + metadata.createTable(testSession, CATALOG_SCHEMA_NAME.getCatalogName(), someTable(tableName), FAIL); getFutureValue(dropSchemaTask.execute(dropSchema, queryStateMachine, emptyList(), NOOP)); assertFalse(metadata.schemaExists(testSession, CATALOG_SCHEMA_NAME)); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java index 0e1f1eb09fcc..79fcb4237689 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java @@ -25,6 +25,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -36,7 +37,7 @@ public class TestDropTableTask public void testDropExistingTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertThat(metadata.getTableHandle(testSession, tableName)).isPresent(); getFutureValue(executeDropTable(asQualifiedName(tableName), false)); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropViewTask.java index 168612a1034b..88af3f3802ee 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropViewTask.java @@ -24,6 +24,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -65,7 +66,7 @@ public void testDropNotExistingViewIfExists() public void testDropViewOnTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeDropView(asQualifiedName(tableName), false))) .hasErrorCode(GENERIC_USER_ERROR) @@ -76,7 +77,7 @@ public void testDropViewOnTable() public void testDropViewOnTableIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeDropView(asQualifiedName(tableName), true))) .hasErrorCode(GENERIC_USER_ERROR) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestRenameColumnTask.java b/core/trino-main/src/test/java/io/trino/execution/TestRenameColumnTask.java index 3e8544a39a4e..7441dc996980 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestRenameColumnTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestRenameColumnTask.java @@ -35,6 +35,7 @@ import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.RowType.rowType; import static io.trino.sql.QueryUtil.identifier; @@ -49,7 +50,7 @@ public class TestRenameColumnTask public void testRenameColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .containsExactly(new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT)); @@ -82,7 +83,7 @@ public void testRenameColumnNotExistingTableIfExists() public void testRenameMissingColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameColumn(asQualifiedName(tableName), QualifiedName.of("missing_column"), identifier("test"), false, false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -93,7 +94,7 @@ public void testRenameMissingColumn() public void testRenameColumnIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); getFutureValue(executeRenameColumn(asQualifiedName(tableName), QualifiedName.of("missing_column"), identifier("test"), false, true)); @@ -146,7 +147,7 @@ public void testRenameFieldNotExistingTableIfExists() public void testRenameMissingField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameColumn(asQualifiedName(tableName), QualifiedName.of("missing_column"), identifier("x"), false, false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -157,7 +158,7 @@ public void testRenameMissingField() public void testRenameFieldIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, simpleTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); getFutureValue(executeRenameColumn(asQualifiedName(tableName), QualifiedName.of("c"), identifier("x"), false, true)); @@ -169,7 +170,7 @@ public void testRenameFieldIfExists() public void testUnsupportedRenameDuplicatedField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .isEqualTo(ImmutableList.of(new ColumnMetadata("col", RowType.rowType( @@ -184,7 +185,7 @@ public void testUnsupportedRenameDuplicatedField() public void testUnsupportedRenameToExistingField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("b"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("b"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .isEqualTo(ImmutableList.of(new ColumnMetadata("col", RowType.rowType( diff --git a/core/trino-main/src/test/java/io/trino/execution/TestRenameMaterializedViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestRenameMaterializedViewTask.java index e409421539ed..6189b34633bc 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestRenameMaterializedViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestRenameMaterializedViewTask.java @@ -25,6 +25,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.TABLE_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -67,7 +68,7 @@ public void testRenameNotExistingMaterializedViewIfExists() public void testRenameMaterializedViewOnTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameMaterializedView(asQualifiedName(tableName), qualifiedName("existing_table_new")))) .hasErrorCode(TABLE_NOT_FOUND) @@ -78,7 +79,7 @@ public void testRenameMaterializedViewOnTable() public void testRenameMaterializedViewOnTableIfExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameMaterializedView(asQualifiedName(tableName), qualifiedName("existing_table_new"), true))) .hasErrorCode(TABLE_NOT_FOUND) @@ -91,7 +92,7 @@ public void testRenameMaterializedViewTargetTableExists() QualifiedObjectName materializedViewName = qualifiedObjectName("existing_materialized_view"); metadata.createMaterializedView(testSession, materializedViewName, someMaterializedView(), false, false); QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameMaterializedView(asQualifiedName(materializedViewName), asQualifiedName(tableName)))) .hasErrorCode(TABLE_ALREADY_EXISTS) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestRenameTableTask.java b/core/trino-main/src/test/java/io/trino/execution/TestRenameTableTask.java index ff0ffb35e7ab..1ab35e7a1239 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestRenameTableTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestRenameTableTask.java @@ -25,6 +25,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -37,7 +38,7 @@ public void testRenameExistingTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); QualifiedObjectName newTableName = qualifiedObjectName("existing_view_new"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); getFutureValue(executeRenameTable(asQualifiedName(tableName), asQualifiedName(newTableName), false)); assertThat(metadata.getTableHandle(testSession, tableName)).isEmpty(); @@ -111,7 +112,7 @@ public void testRenameTableOnMaterializedViewIfExists() public void testRenameTableTargetViewExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); QualifiedName viewName = qualifiedName("existing_view"); metadata.createView(testSession, QualifiedObjectName.valueOf(viewName.toString()), someView(), false); @@ -124,7 +125,7 @@ public void testRenameTableTargetViewExists() public void testRenameTableTargetMaterializedViewExists() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); QualifiedObjectName materializedViewName = qualifiedObjectName("existing_materialized_view"); metadata.createMaterializedView(testSession, materializedViewName, someMaterializedView(), false, false); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestRenameViewTask.java b/core/trino-main/src/test/java/io/trino/execution/TestRenameViewTask.java index fdfb6f5d87dd..2bee2d32cbd0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestRenameViewTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestRenameViewTask.java @@ -26,6 +26,7 @@ import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; import static io.trino.spi.StandardErrorCode.TABLE_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; import static org.assertj.core.api.Assertions.assertThat; @@ -59,7 +60,7 @@ public void testRenameNotExistingView() public void testRenameViewOnTable() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameView(asQualifiedName(tableName), qualifiedName("existing_table_new")))) .hasErrorCode(TABLE_NOT_FOUND) @@ -83,7 +84,7 @@ public void testRenameViewTargetTableExists() QualifiedName viewName = qualifiedName("existing_view"); metadata.createView(testSession, QualifiedObjectName.valueOf(viewName.toString()), someView(), false); QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeRenameView(viewName, asQualifiedName(tableName)))) .hasErrorCode(TABLE_ALREADY_EXISTS) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSetColumnTypeTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSetColumnTypeTask.java index ca38de7f2fc1..8de522663eeb 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSetColumnTypeTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSetColumnTypeTask.java @@ -35,6 +35,7 @@ import static io.trino.spi.StandardErrorCode.AMBIGUOUS_NAME; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RowType.rowType; @@ -50,7 +51,7 @@ public class TestSetColumnTypeTask public void testSetDataType() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .isEqualTo(ImmutableList.of(new ColumnMetadata("test", BIGINT))); @@ -90,7 +91,7 @@ public void testSetDataTypeNotExistingColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); QualifiedName columnName = QualifiedName.of("not_existing_column"); - metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeSetColumnType(asQualifiedName(tableName), columnName, toSqlType(INTEGER), false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -123,7 +124,7 @@ public void testSetDataTypeOnMaterializedView() public void testSetFieldDataTypeNotExistingColumn() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeSetColumnType(asQualifiedName(tableName), QualifiedName.of("test", "a"), toSqlType(INTEGER), false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -134,7 +135,7 @@ public void testSetFieldDataTypeNotExistingColumn() public void testSetFieldDataTypeNotExistingField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new Field(Optional.of("a"), BIGINT)), FAIL); assertTrinoExceptionThrownBy(() -> getFutureValue(executeSetColumnType(asQualifiedName(tableName), QualifiedName.of("col", "b"), toSqlType(INTEGER), false))) .hasErrorCode(COLUMN_NOT_FOUND) @@ -145,7 +146,7 @@ public void testSetFieldDataTypeNotExistingField() public void testUnsupportedSetDataTypeDuplicatedField() { QualifiedObjectName tableName = qualifiedObjectName("existing_table"); - metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("a"), BIGINT)), false); + metadata.createTable(testSession, TEST_CATALOG_NAME, rowTable(tableName, new RowType.Field(Optional.of("a"), BIGINT), new RowType.Field(Optional.of("a"), BIGINT)), FAIL); TableHandle table = metadata.getTableHandle(testSession, tableName).get(); assertThat(metadata.getTableMetadata(testSession, table).getColumns()) .isEqualTo(ImmutableList.of(new ColumnMetadata("col", RowType.rowType( diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 8081dee9aed0..00d26fc2b227 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -46,6 +46,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortItem; import io.trino.spi.connector.SystemTable; @@ -279,7 +280,7 @@ public void setSchemaAuthorization(Session session, CatalogSchemaName source, Tr } @Override - public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { throw new UnsupportedOperationException(); } @@ -399,7 +400,7 @@ public Optional getSupportedType(Session session, CatalogHandle catalogHan } @Override - public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout) + public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout, boolean replace) { throw new UnsupportedOperationException(); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java index 2bf8b2ddb42c..fa112a6bf0ca 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java @@ -302,7 +302,8 @@ private Operator createTableWriterOperator( schemaTableName, false, OptionalInt.empty(), - WriterScalingOptions.DISABLED), + WriterScalingOptions.DISABLED, + false), ImmutableList.of(0), session, statisticsAggregation, diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestAnalyzer.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestAnalyzer.java index 2d4329ffcca3..0ad12eec4f3e 100644 --- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestAnalyzer.java +++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestAnalyzer.java @@ -178,6 +178,7 @@ import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; import static io.trino.spi.StandardErrorCode.VIEW_IS_RECURSIVE; import static io.trino.spi.StandardErrorCode.VIEW_IS_STALE; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.BigintType.BIGINT; @@ -6754,14 +6755,14 @@ public void setup() new ColumnMetadata("b", BIGINT), new ColumnMetadata("c", BIGINT), new ColumnMetadata("d", BIGINT))), - false)); + FAIL)); SchemaTableName table2 = new SchemaTableName("s1", "t2"); inSetupTransaction(session -> metadata.createTable(session, TPCH_CATALOG, new ConnectorTableMetadata(table2, ImmutableList.of( new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT))), - false)); + FAIL)); SchemaTableName table3 = new SchemaTableName("s1", "t3"); inSetupTransaction(session -> metadata.createTable(session, TPCH_CATALOG, @@ -6769,7 +6770,7 @@ public void setup() new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT), ColumnMetadata.builder().setName("x").setType(BIGINT).setHidden(true).build())), - false)); + FAIL)); // table with a hidden column SchemaTableName table5 = new SchemaTableName("s1", "t5"); @@ -6777,7 +6778,7 @@ public void setup() new ConnectorTableMetadata(table5, ImmutableList.of( new ColumnMetadata("a", BIGINT), ColumnMetadata.builder().setName("b").setType(BIGINT).setHidden(true).build())), - false)); + FAIL)); // table with a varchar column SchemaTableName table6 = new SchemaTableName("s1", "t6"); @@ -6787,7 +6788,7 @@ public void setup() new ColumnMetadata("b", VARCHAR), new ColumnMetadata("c", BIGINT), new ColumnMetadata("d", BIGINT))), - false)); + FAIL)); // table with bigint, double, array of bigints and array of doubles column SchemaTableName table7 = new SchemaTableName("s1", "t7"); @@ -6797,7 +6798,7 @@ public void setup() new ColumnMetadata("b", DOUBLE), new ColumnMetadata("c", new ArrayType(BIGINT)), new ColumnMetadata("d", new ArrayType(DOUBLE)))), - false)); + FAIL)); // materialized view referencing table in same schema MaterializedViewDefinition materializedViewData1 = new MaterializedViewDefinition( @@ -6873,7 +6874,7 @@ public void setup() new ColumnMetadata("nested_bounded_varchar_column", anonymousRow(createVarcharType(3))), new ColumnMetadata("row_column", anonymousRow(TINYINT, createUnboundedVarcharType())), new ColumnMetadata("date_column", DATE))), - false)); + FAIL)); // for identifier chain resolving tests queryRunner.createCatalog(CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new StaticConnectorFactory("chain", new TestingConnector(new TestingMetadata())), ImmutableMap.of()); @@ -6886,39 +6887,39 @@ public void setup() inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(b, ImmutableList.of( new ColumnMetadata("x", VARCHAR))), - false)); + FAIL)); SchemaTableName t1 = new SchemaTableName("a", "t1"); inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(t1, ImmutableList.of( new ColumnMetadata("b", rowType))), - false)); + FAIL)); SchemaTableName t2 = new SchemaTableName("a", "t2"); inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(t2, ImmutableList.of( new ColumnMetadata("a", rowType))), - false)); + FAIL)); SchemaTableName t3 = new SchemaTableName("a", "t3"); inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(t3, ImmutableList.of( new ColumnMetadata("b", nestedRowType), new ColumnMetadata("c", BIGINT))), - false)); + FAIL)); SchemaTableName t4 = new SchemaTableName("a", "t4"); inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(t4, ImmutableList.of( new ColumnMetadata("b", doubleNestedRowType), new ColumnMetadata("c", BIGINT))), - false)); + FAIL)); SchemaTableName t5 = new SchemaTableName("a", "t5"); inSetupTransaction(session -> metadata.createTable(session, CATALOG_FOR_IDENTIFIER_CHAIN_TESTS, new ConnectorTableMetadata(t5, ImmutableList.of( new ColumnMetadata("b", singleFieldRowType))), - false)); + FAIL)); QualifiedObjectName tableViewAndMaterializedView = new QualifiedObjectName(TPCH_CATALOG, "s1", "table_view_and_materialized_view"); inSetupTransaction(session -> metadata.createMaterializedView( @@ -6956,7 +6957,7 @@ public void setup() new ConnectorTableMetadata( tableViewAndMaterializedView.asSchemaTableName(), ImmutableList.of(new ColumnMetadata("a", BIGINT))), - false)); + FAIL)); QualifiedObjectName tableAndView = new QualifiedObjectName(TPCH_CATALOG, "s1", "table_and_view"); inSetupTransaction(session -> metadata.createView( @@ -6970,7 +6971,7 @@ public void setup() new ConnectorTableMetadata( tableAndView.asSchemaTableName(), ImmutableList.of(new ColumnMetadata("a", BIGINT))), - false)); + FAIL)); QualifiedObjectName freshMaterializedView = new QualifiedObjectName(TPCH_CATALOG, "s1", "fresh_materialized_view"); inSetupTransaction(session -> metadata.createMaterializedView( diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java index cc6c3eb110a4..253be3ac8a24 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Optional; +import static io.trino.spi.connector.SaveMode.FAIL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -91,7 +92,7 @@ protected LocalQueryRunner createLocalQueryRunner() ImmutableList.of( new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT))), - false); + FAIL); return null; }); @@ -105,7 +106,7 @@ protected LocalQueryRunner createLocalQueryRunner() ImmutableList.of( new ColumnMetadata("a", BIGINT), new ColumnMetadata("b", BIGINT))), - false); + FAIL); return null; }); @@ -119,7 +120,7 @@ protected LocalQueryRunner createLocalQueryRunner() ImmutableList.of( new ColumnMetadata("a", TINYINT), new ColumnMetadata("b", VARCHAR))), - false); + FAIL); return null; }); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java index 639732dff4ab..e264e86f0542 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java @@ -90,7 +90,7 @@ private void testRemoveEmptyMergeRewrite(Rule rule, boolean pla List.of(rowCount)); return p.tableFinish( planWithExchange ? withExchange(p, merge, rowCount) : merge, - p.createTarget(catalogHandle, schemaTableName, true, WriterScalingOptions.ENABLED), + p.createTarget(catalogHandle, schemaTableName, true, WriterScalingOptions.ENABLED, false), rowCount); }) .matches(values("A")); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 200757236374..c1ccf9f2ff1d 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -719,7 +719,7 @@ public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode sou rowCountSymbol); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks, WriterScalingOptions writerScalingOptions) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks, WriterScalingOptions writerScalingOptions, boolean replace) { OutputTableHandle tableHandle = new OutputTableHandle( catalogHandle, @@ -731,12 +731,13 @@ public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName sc schemaTableName, multipleWritersPerPartitionSupported, maxWriterTasks, - writerScalingOptions); + writerScalingOptions, + replace); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, WriterScalingOptions writerScalingOptions) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, WriterScalingOptions writerScalingOptions, boolean replace) { - return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty(), writerScalingOptions); + return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty(), writerScalingOptions, replace); } public MergeWriterNode merge(SchemaTableName schemaTableName, PlanNode mergeSource, Symbol mergeRow, Symbol rowId, List outputs) diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java index 5e7cc183a08c..5063fe6b30f5 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java @@ -119,7 +119,7 @@ public void testScaledWritersUsedAndTargetSupportsIt(PartitioningHandle scaledWr PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, true, WriterScalingOptions.ENABLED), + planBuilder.createTarget(catalog, schemaTableName, true, WriterScalingOptions.ENABLED, false), tableWriterSource, symbol))); validatePlan(root); @@ -141,7 +141,7 @@ public void testScaledWritersUsedAndTargetDoesNotSupportScalingPerTask(Partition PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(true, false)), + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(true, false), false), tableWriterSource, symbol))); assertThatThrownBy(() -> validatePlan(root)) @@ -165,7 +165,7 @@ public void testScaledWritersUsedAndTargetDoesNotSupportScalingAcrossTasks(Parti PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(false, true)), + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(false, true), false), tableWriterSource, symbol))); assertThatThrownBy(() -> validatePlan(root)) @@ -188,7 +188,7 @@ public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartiti PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED, false), tableWriterSource, symbol))); @@ -223,7 +223,7 @@ public void testScaledWriterWithMultipleSourceExchangesAndTargetDoesNotSupportMu PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED, false), tableWriterSource, symbol))); diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index bbb6202aba72..405e36037dc5 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -62,6 +62,8 @@ import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; +import static io.trino.spi.connector.SaveMode.IGNORE; +import static io.trino.spi.connector.SaveMode.REPLACE; import static io.trino.spi.expression.Constant.FALSE; import static io.trino.spi.expression.StandardFunctions.AND_FUNCTION_NAME; import static java.util.Collections.emptyList; @@ -481,12 +483,30 @@ default void setSchemaAuthorization(ConnectorSession session, String schemaName, * Creates a table using the specified table metadata. * * @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param ignoreExisting} is not set + * @deprecated use {@link #createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode)} */ + @Deprecated default void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) { throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables"); } + /** + * Creates a table using the specified table metadata. + * IGNORE means the table is created using CREATE ... IF NOT EXISTS syntax. + * REPLACE means the table is created using CREATE OR REPLACE syntax. + * + * @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param savemode} is FAIL. + */ + default void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) + { + if (saveMode == REPLACE) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables"); + } + // Delegate to deprecated SPI to not break existing connectors + createTable(session, tableMetadata, saveMode == IGNORE); + } + /** * Drops the specified table * @@ -726,12 +746,34 @@ default void finishStatisticsCollection(ConnectorSession session, ConnectorTable * new TrinoException(NOT_SUPPORTED, "This connector does not support query retries") * * unless {@code retryMode} is set to {@code NO_RETRIES}. + * + * @deprecated use {@link #beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace)} */ + @Deprecated default ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data"); } + /** + * Begin the atomic creation of a table with data. + * + *

+ * If connector does not support execution with retries, the method should throw: + *

+     *     new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
+     * 
+ * unless {@code retryMode} is set to {@code NO_RETRIES}. + */ + default ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace) + { + // Redirect to deprecated SPI to not break existing connectors + if (!replace) { + return beginCreateTable(session, tableMetadata, layout, retryMode); + } + throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables"); + } + /** * Finish a table creation with data after the data is written. */ diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/SaveMode.java b/core/trino-spi/src/main/java/io/trino/spi/connector/SaveMode.java new file mode 100644 index 000000000000..5c6eb0aaffdd --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/SaveMode.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.connector; + +public enum SaveMode { + IGNORE, + REPLACE, + FAIL +} diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 76278a6973db..2658e31af157 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1276,6 +1276,35 @@ FROM example.testdb."customer_orders$snapshots" ORDER BY committed_at DESC ``` +(iceberg-create-or-replace)= + +#### Replacing tables + +The connector supports replacing a table as an atomic operation. Atomic table +replacement creates a new snapshot with the new table definition (see +{doc}`/sql/create-table` and {doc}`/sql/create-table-as`), but keeps table history. + +The new table after replacement is completely new and separate from the old table. +Only the name of the table remains identical. Earlier snapshots can be retrieved +through Iceberg's [time travel](iceberg-time-travel). + +For example a partitioned table `my_table` can be replaced by completely new +definition. + +``` +CREATE TABLE my_table ( + a BIGINT, + b DATE, + c BIGINT) +WITH (partitioning = ARRAY['a']); + +CREATE OR REPLACE TABLE my_table +WITH (sorted_by = ARRAY['a']) +AS SELECT * from another_table; +``` + +Earlier snapshots can be retrieved through Iceberg's [time travel](iceberg-time-travel). + (iceberg-time-travel)= ##### Time travel queries @@ -1302,6 +1331,14 @@ SELECT * FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna' ``` +The connector allows to create a new snapshot through Iceberg's [replace table](iceberg-create-or-replace). + +``` +CREATE OR REPLACE TABLE example.testdb.customer_orders AS +SELECT * +FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna' +``` + You can use a date to specify a point a time in the past for using a snapshot of a table in a query. Assuming that the session time zone is `Europe/Vienna` the following queries are equivalent: diff --git a/docs/src/main/sphinx/sql/create-table-as.md b/docs/src/main/sphinx/sql/create-table-as.md index bc2cb58f0a2a..f985ce44d932 100644 --- a/docs/src/main/sphinx/sql/create-table-as.md +++ b/docs/src/main/sphinx/sql/create-table-as.md @@ -3,7 +3,7 @@ ## Synopsis ```text -CREATE TABLE [ IF NOT EXISTS ] table_name [ ( column_alias, ... ) ] +CREATE [ OR REPLACE ] TABLE [ IF NOT EXISTS ] table_name [ ( column_alias, ... ) ] [ COMMENT table_comment ] [ WITH ( property_name = expression [, ...] ) ] AS query @@ -15,9 +15,16 @@ AS query Create a new table containing the result of a {doc}`select` query. Use {doc}`create-table` to create an empty table. +The optional `OR REPLACE` clause causes an existing table with the +specified name to be replaced with the new table definition. Support +for table replacement varies across connectors. Refer to the +connector documentation for details. + The optional `IF NOT EXISTS` clause causes the error to be suppressed if the table already exists. +`OR REPLACE` and `IF NOT EXISTS` cannot be used together. + The optional `WITH` clause can be used to set properties on the newly created table. To list all available table properties, run the following query: diff --git a/docs/src/main/sphinx/sql/create-table.md b/docs/src/main/sphinx/sql/create-table.md index 56e4ebd77665..49af4848da04 100644 --- a/docs/src/main/sphinx/sql/create-table.md +++ b/docs/src/main/sphinx/sql/create-table.md @@ -3,7 +3,7 @@ ## Synopsis ```text -CREATE TABLE [ IF NOT EXISTS ] +CREATE [ OR REPLACE ] TABLE [ IF NOT EXISTS ] table_name ( { column_name data_type [ NOT NULL ] [ COMMENT comment ] @@ -22,9 +22,16 @@ table_name ( Create a new, empty table with the specified columns. Use {doc}`create-table-as` to create a table with data. +The optional `OR REPLACE` clause causes an existing table with the +specified name to be replaced with the new table definition. Support +for table replacement varies across connectors. Refer to the +connector documentation for details. + The optional `IF NOT EXISTS` clause causes the error to be suppressed if the table already exists. +`OR REPLACE` and `IF NOT EXISTS` cannot be used together. + The optional `WITH` clause can be used to set properties on the newly created table or on single columns. To list all available table properties, run the following query: diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index a54c6ab6c052..871d35cea0d3 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -55,6 +55,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SortItem; @@ -431,6 +432,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe } } + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.createTable(session, tableMetadata, saveMode); + } + } + @Override public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -543,6 +552,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con } } + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.beginCreateTable(session, tableMetadata, layout, retryMode, replace); + } + } + @Override public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4189347b1575..52a3f67e5030 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -82,6 +82,7 @@ import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; @@ -834,10 +835,10 @@ public void setSchemaAuthorization(ConnectorSession session, String schemaName, } @Override - public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { Optional layout = getNewTableLayout(session, tableMetadata); - finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES), ImmutableList.of(), ImmutableList.of()); + finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES, saveMode == SaveMode.REPLACE), ImmutableList.of(), ImmutableList.of()); } @Override @@ -889,18 +890,26 @@ public Optional getSupportedType(ConnectorSession sessio } @Override - public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace) { verify(transaction == null, "transaction already set"); String schemaName = tableMetadata.getTable().getSchemaName(); if (!schemaExists(session, schemaName)) { throw new SchemaNotFoundException(schemaName); } - transaction = newCreateTableTransaction(catalog, tableMetadata, session); + if (replace) { + IcebergTableHandle table = (IcebergTableHandle) getTableHandle(session, tableMetadata.getTableSchema().getTable(), Optional.empty(), Optional.empty()); + if (table != null) { + verifyTableVersionForUpdate(table); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + validateNotModifyingOldSnapshot(table, icebergTable); + } + } + transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace); Location location = Location.of(transaction.table().location()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); try { - if (fileSystem.listFiles(location).hasNext()) { + if (!replace && fileSystem.listFiles(location).hasNext()) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" + "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + "to use unique table locations for every table.", location)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 6a196cab870a..c13276cb22b1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -629,7 +629,7 @@ public static Schema schemaFromMetadata(List columns) return new Schema(icebergSchema.asStructType().fields()); } - public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session) + public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session, boolean replace) { SchemaTableName schemaTableName = tableMetadata.getTable(); Schema schema = schemaFromMetadata(tableMetadata.getColumns()); @@ -638,6 +638,14 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec String targetPath = getTableLocation(tableMetadata.getProperties()) .orElseGet(() -> catalog.defaultTableLocation(session, schemaTableName)); + if (replace) { + return catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, createTableProperties(tableMetadata)); + } + return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, createTableProperties(tableMetadata)); + } + + private static Map createTableProperties(ConnectorTableMetadata tableMetadata) + { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties()); propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString()); @@ -655,8 +663,7 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec if (tableMetadata.getComment().isPresent()) { propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); } - - return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, propertiesBuilder.buildOrThrow()); + return propertiesBuilder.buildOrThrow(); } /** diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 36d09f2d0d19..855f66de3f35 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -231,6 +231,12 @@ protected void refreshFromMetadataLocation(String newLocation) return; } + // a table that is replaced doesn't need its metadata reloaded + if (newLocation == null) { + shouldRefresh = false; + return; + } + TableMetadata newMetadata; try { newMetadata = Failsafe.with(RetryPolicy.builder() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index 0c77792299b9..a6f3fcbad350 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -32,6 +32,7 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.type.ArrayType; import io.trino.spi.type.CharType; import io.trino.spi.type.MapType; @@ -43,6 +44,7 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -96,6 +98,7 @@ import static java.util.UUID.randomUUID; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.Transactions.createOrReplaceTableTransaction; import static org.apache.iceberg.Transactions.createTableTransaction; public abstract class AbstractTrinoCatalog @@ -202,6 +205,46 @@ protected Transaction newCreateTableTransaction( return createTableTransaction(schemaTableName.toString(), ops, metadata); } + protected Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties, + Optional owner) + { + BaseTable table; + Optional metadata = Optional.empty(); + try { + table = (BaseTable) loadTable(session, new SchemaTableName(schemaTableName.getSchemaName(), schemaTableName.getTableName())); + metadata = Optional.of(table.operations().current()); + } + catch (TableNotFoundException ignored) { + // ignored + } + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + owner, + Optional.of(location)); + TableMetadata newMetaData; + if (metadata.isPresent()) { + operations.initializeFromMetadata(metadata.get()); + newMetaData = operations.current() + // don't inherit table properties from earlier snapshots + .replaceProperties(properties) + .buildReplacement(schema, partitionSpec, sortOrder, location, properties); + } + else { + newMetaData = newTableMetadata(schema, partitionSpec, sortOrder, location, properties); + } + return createOrReplaceTableTransaction(schemaTableName.toString(), operations, newMetaData); + } + protected String createNewTableName(String baseTableName) { String tableNameLocationComponent = escapeTableName(baseTableName); @@ -273,7 +316,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se }); ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); - Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session); + Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false); AppendFiles appendFiles = transaction.newAppend(); commit(appendFiles, session); transaction.commitTransaction(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index afd25eab19a2..03ba84d0f533 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -95,6 +95,15 @@ Transaction newCreateTableTransaction( String location, Map properties); + Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties); + void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata); void unregisterTable(ConnectorSession session, SchemaTableName tableName); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 19e638c9f118..2c069ae085a9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -701,6 +701,27 @@ public Transaction newCreateTableTransaction( Optional.of(session.getUser())); } + @Override + public Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties) + { + return newCreateOrReplaceTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + Optional.of(session.getUser())); + } + @Override public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, TableMetadata tableMetadata) throws TrinoException diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 6cace7b54e78..e010255898c9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -280,6 +280,26 @@ public Transaction newCreateTableTransaction( isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())); } + @Override + public Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties) + { + return newCreateOrReplaceTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())); + } + @Override public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, TableMetadata tableMetadata) throws TrinoException diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 920be5a5dede..5d04d469baba 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -29,7 +29,6 @@ import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; -import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.security.TrinoPrincipal; @@ -210,9 +209,6 @@ public Transaction newCreateTableTransaction( String location, Map properties) { - if (!listNamespaces(session, Optional.of(schemaTableName.getSchemaName())).contains(schemaTableName.getSchemaName())) { - throw new SchemaNotFoundException(schemaTableName.getSchemaName()); - } return newCreateTableTransaction( session, schemaTableName, @@ -224,6 +220,27 @@ public Transaction newCreateTableTransaction( Optional.of(session.getUser())); } + @Override + public Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties) + { + return newCreateOrReplaceTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + Optional.of(session.getUser())); + } + @Override public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 7f4a8872a18b..7de7de1c58d1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -253,6 +253,27 @@ public Transaction newCreateTableTransaction( Optional.of(session.getUser())); } + @Override + public Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties) + { + return newCreateOrReplaceTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + Optional.of(session.getUser())); + } + @Override public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 57f41315f343..527a7d6603b5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -236,6 +236,24 @@ public Transaction newCreateTableTransaction( .createTransaction(); } + @Override + public Transaction newCreateOrReplaceTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + SortOrder sortOrder, + String location, + Map properties) + { + return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + .withPartitionSpec(partitionSpec) + .withSortOrder(sortOrder) + .withLocation(location) + .withProperties(properties) + .createOrReplaceTransaction(); + } + @Override public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 33fdd90ea008..16605509d5f3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -170,6 +170,50 @@ public void testDeleteRowsConcurrently() } } + @Test + public void testCreateOrReplaceTable() + { + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_create_or_replace", + " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) { + assertThat(query("SELECT a, b FROM " + table.getName())) + .matches("VALUES (BIGINT '42', -385e-1)"); + + long v1SnapshotId = getMostRecentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT BIGINT '-42' a, DOUBLE '38.5' b", 1); + assertThat(query("SELECT a, b FROM " + table.getName())) + .matches("VALUES (BIGINT '-42', 385e-1)"); + + assertThat(query("SELECT COUNT(snapshot_id) FROM \"" + table.getName() + "$history\"")) + .matches("VALUES BIGINT '2'"); + + assertThat(query("SELECT a, b FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '42', -385e-1)"); + } + } + + @Test + public void testCreateOrReplaceTableChangeColumnNamesAndTypes() + { + String tableName = "test_create_or_replace_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b", 1); + assertThat(query("SELECT CAST(a AS bigint), b FROM " + tableName)) + .matches("VALUES (BIGINT '42', -385e-1)"); + + long v1SnapshotId = getMostRecentSnapshotId(tableName); + + assertUpdate("CREATE OR REPLACE TABLE " + tableName + " AS SELECT VARCHAR 'test' c, VARCHAR 'test2' d", 1); + assertThat(query("SELECT c, d FROM " + tableName)) + .matches("VALUES (VARCHAR 'test', VARCHAR 'test2')"); + + assertThat(query("SELECT a, b FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '42', -385e-1)"); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testRegisterTableWithTableLocation() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 1a48e3279995..03c2987e5ab7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -217,7 +217,8 @@ public void initStorageTimePrecision() protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { return switch (connectorBehavior) { - case SUPPORTS_REPORTING_WRITTEN_BYTES -> true; + case SUPPORTS_CREATE_OR_REPLACE_TABLE, + SUPPORTS_REPORTING_WRITTEN_BYTES -> true; case SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT, SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS, SUPPORTS_TOPN_PUSHDOWN, @@ -6267,6 +6268,109 @@ public void testDeleteRetainsTableHistory() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateOrReplaceTableSnapshots() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) { + long v1SnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT BIGINT '-42' a, DOUBLE '38.5' b", 1); + assertThat(query("SELECT CAST(a AS bigint), b FROM " + table.getName())) + .matches("VALUES (BIGINT '-42', 385e-1)"); + + assertThat(query("SELECT a, b FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '42', -385e-1)"); + } + } + + @Test + public void testCreateOrReplaceTableChangeColumnNamesAndTypes() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) { + long v1SnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT CAST(ARRAY[ROW('test')] AS ARRAY(ROW(field VARCHAR))) a, VARCHAR 'test2' b", 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (CAST(ARRAY[ROW('test')] AS ARRAY(ROW(field VARCHAR))), VARCHAR 'test2')"); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '42', -385e-1)"); + } + } + + @Test + public void testCreateOrReplaceTableChangePartitionedTableIntoUnpartitioned() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " WITH (partitioning=ARRAY['a']) AS SELECT BIGINT '42' a, 'some data' b UNION ALL SELECT BIGINT '43' a, 'another data' b")) { + long v1SnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (sorted_by=ARRAY['a']) AS SELECT BIGINT '22' a, 'new data' b", 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (BIGINT '22', CAST('new data' AS VARCHAR))"); + + assertThat(query("SELECT partition FROM \"" + table.getName() + "$partitions\"")) + .matches("VALUES (ROW(CAST (ROW(NULL) AS ROW(a BIGINT))))"); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '42', CAST('some data' AS VARCHAR)), (BIGINT '43', CAST('another data' AS VARCHAR))"); + + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .contains("sorted_by = ARRAY['a ASC NULLS FIRST']"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .doesNotContain("partitioning = ARRAY['a']"); + } + } + + @Test + public void testCreateOrReplaceTableChangeUnpartitionedTableIntoPartitioned() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " WITH (sorted_by=ARRAY['a']) AS SELECT BIGINT '22' a, CAST('some data' AS VARCHAR) b")) { + long v1SnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (partitioning=ARRAY['a']) AS SELECT BIGINT '42' a, 'some data' b UNION ALL SELECT BIGINT '43' a, 'another data' b", 2); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (BIGINT '42', CAST('some data' AS VARCHAR)), (BIGINT '43', CAST('another data' AS VARCHAR))"); + + assertThat(query("SELECT partition FROM \"" + table.getName() + "$partitions\"")) + .matches("VALUES (ROW(CAST (ROW(BIGINT '42') AS ROW(a BIGINT)))), (ROW(CAST (ROW(BIGINT '43') AS ROW(a BIGINT))))"); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .matches("VALUES (BIGINT '22', CAST('some data' AS VARCHAR))"); + + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .contains("partitioning = ARRAY['a']"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .doesNotContain("sorted_by = ARRAY['a ASC NULLS FIRST']"); + } + } + + @Test + public void testCreateOrReplaceTableWithComments() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " (a BIGINT COMMENT 'This is a column') COMMENT 'This is a table'")) { + long v1SnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 1 a", 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES 1"); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + v1SnapshotId)) + .returnsEmptyResult(); + + assertThat(getTableComment(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), table.getName())) + .isNull(); + assertThat(getColumnComment(table.getName(), "a")) + .isNull(); + + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (a BIGINT COMMENT 'This is a column') COMMENT 'This is a table'"); + + assertThat(getTableComment(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), table.getName())) + .isEqualTo("This is a table"); + assertThat(getColumnComment(table.getName(), "a")) + .isEqualTo("This is a column"); + } + } + @Test public void testMergeSimpleSelectPartitioned() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java index eab36dd07c3a..ca8afab6b255 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java @@ -239,6 +239,19 @@ public void testUpdate() onTrino().executeQuery("DROP TABLE " + hiveTableName); } + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testCreateOrReplaceTable() + { + String tableName = "iceberg_create_or_replace_hive_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + assertQueryFailure(() -> onTrino().executeQuery("CREATE OR REPLACE TABLE " + icebergTableName + " (d integer)")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' of unsupported type already exists"); + } + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) public void testDropTable() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index b91c2ef5b998..7645b33341b1 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.UncheckedTimeoutException; import io.airlift.log.Logger; import io.airlift.units.DataSize; @@ -120,6 +121,7 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_FEDERATED_MATERIALIZED_VIEW; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_MATERIALIZED_VIEW; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_MATERIALIZED_VIEW_GRACE_PERIOD; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_OR_REPLACE_TABLE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT; @@ -3338,6 +3340,112 @@ public void testCreateSchemaWithNonLowercaseOwnerName() assertUpdate(newSession, "DROP SCHEMA " + schemaName); } + @Test + public void testCreateOrReplaceTableWhenTableDoesNotExist() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + String table = "test_create_or_replace_" + randomNameSuffix(); + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + assertQueryFails("CREATE OR REPLACE TABLE " + table + " (a bigint, b double, c varchar(50))", "This connector does not support replacing tables"); + return; + } + + try { + assertUpdate("CREATE OR REPLACE TABLE " + table + " (a bigint, b double, c varchar(50))"); + assertQueryReturnsEmptyResult("SELECT * FROM " + table); + } finally { + assertUpdate("DROP TABLE IF EXISTS " + table); + } + } + + @Test + public void testCreateOrReplaceTableAsSelectWhenTableDoesNotExists() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + String table = "test_create_or_replace_" + randomNameSuffix(); + @Language("SQL") String query = "SELECT nationkey, name, regionkey FROM nation"; + @Language("SQL") String rowCountQuery = "SELECT count(*) FROM nation"; + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + assertQueryFails("CREATE OR REPLACE TABLE " + table + " AS " + query, "This connector does not support replacing tables"); + return; + } + + try { + assertUpdate("CREATE OR REPLACE TABLE " + table + " AS " + query, rowCountQuery); + assertQuery("SELECT * FROM " + table, query); + } finally { + assertUpdate("DROP TABLE IF EXISTS " + table); + } + } + + @Test + public void testCreateOrReplaceTableWhenTableAlreadyExistsSameSchema() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + // covered in testCreateOrReplaceTableWhenTableDoesNotExist + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", "AS SELECT CAST(1 AS BIGINT) AS nationkey, 'test' AS name, CAST(2 AS BIGINT) AS regionkey FROM nation LIMIT 1")) { + @Language("SQL") String query = "SELECT nationkey, name, regionkey FROM nation"; + @Language("SQL") String rowCountQuery = "SELECT count(*) FROM nation"; + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS " + query, rowCountQuery); + assertQuery("SELECT * FROM " + table.getName(), query); + } + } + + @Test + public void testCreateOrReplaceTableWhenTableAlreadyExistsSameSchemaNoData() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + // covered in testCreateOrReplaceTableWhenTableDoesNotExist + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT nationkey, name, regionkey FROM nation")) { + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT nationkey, name, regionkey FROM nation WITH NO DATA", 0L); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + } + } + + @Test + public void testCreateOrReplaceTableWithNewColumnNames() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + // covered in testCreateOrReplaceTableWhenTableDoesNotExist + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT nationkey, name, regionkey FROM nation")) { + assertTableColumnNames(table.getName(), "nationkey", "name", "regionkey"); + @Language("SQL") String query = "SELECT nationkey AS nationkey_new, name AS name_new_2, regionkey AS region_key_new FROM nation"; + @Language("SQL") String rowCountQuery = "SELECT count(*) FROM nation"; + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS " + query, rowCountQuery); + assertTableColumnNames(table.getName(), "nationkey_new", "name_new_2", "region_key_new"); + assertQuery("SELECT * FROM " + table.getName(), query); + } + } + + @Test + public void testCreateOrReplaceTableWithDifferentDataType() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + // covered in testCreateOrReplaceTableWhenTableDoesNotExist + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT nationkey, name FROM nation")) { + @Language("SQL") String query = "SELECT name AS nationkey, nationkey AS name FROM nation"; + @Language("SQL") String rowCountQuery = "SELECT count(*) FROM nation"; + assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS " + query, rowCountQuery); + assertQuery(getSession(), "SELECT * FROM " + table.getName(), query); + } + } + @Test public void testCreateSchemaWithLongName() { @@ -4918,6 +5026,87 @@ protected void verifyConcurrentAddColumnFailurePermissible(Exception e) throw new AssertionError("Unexpected concurrent add column failure", e); } + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @Test(timeOut = 60_000, invocationCount = 4) + public void testCreateOrReplaceTableConcurrently() + throws Exception + { + if (!hasBehavior(SUPPORTS_CREATE_OR_REPLACE_TABLE)) { + // Already handled in testCreateOrReplaceTableWhenTableDoesNotExist + return; + } + + int threads = 4; + int numOfCreateOrReplaceStatements = 4; + int numOfReads = 16; + CyclicBarrier barrier = new CyclicBarrier(threads + 1); + ExecutorService executor = newFixedThreadPool(threads + 1); + List> futures = new ArrayList<>(); + try (TestTable table = createTableWithOneIntegerColumn("test_create_or_replace")) { + String tableName = table.getName(); + + getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 1 a"); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1"); + + /// One thread submits some CREATE OR REPLACE statements + futures.add(executor.submit(() -> { + barrier.await(30, SECONDS); + IntStream.range(0, numOfCreateOrReplaceStatements).forEach(index -> { + try { + getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT * FROM (VALUES (1), (2)) AS t(a) "); + } catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + throw new AssertionError("Unexpected concurrent CREATE OR REPLACE failure", trinoException); + } catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + } + }); + return null; + })); + // Other 4 threads continue try to read the same table, none of the reads should fail. + IntStream.range(0, threads) + .forEach(threadNumber -> futures.add(executor.submit(() -> { + barrier.await(30, SECONDS); + IntStream.range(0, numOfReads).forEach(readIndex -> { + try { + MaterializedResult result = computeActual("SELECT * FROM " + tableName); + if (result.getRowCount() == 1) { + assertEqualsIgnoreOrder(result.getMaterializedRows(), List.of(new MaterializedRow(List.of(1)))); + } + else { + assertEqualsIgnoreOrder(result.getMaterializedRows(), List.of(new MaterializedRow(List.of(1)), new MaterializedRow(List.of(2)))); + } + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + throw new AssertionError("Unexpected concurrent CREATE OR REPLACE failure", trinoException); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + } + }); + return null; + }))); + futures.forEach(Futures::getUnchecked); + getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT * FROM (VALUES (1), (2), (3)) AS t(a)"); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3"); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(30, SECONDS); + } + } + protected TestTable createTableWithOneIntegerColumn(String namePrefix) { return new TestTable(getQueryRunner()::execute, namePrefix, "(col integer)"); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java b/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java index d5b3844e18d7..99162b9058d8 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java @@ -75,6 +75,7 @@ public enum TestingConnectorBehavior SUPPORTS_DROP_SCHEMA_CASCADE(SUPPORTS_CREATE_SCHEMA), SUPPORTS_CREATE_TABLE, + SUPPORTS_CREATE_OR_REPLACE_TABLE(false), SUPPORTS_CREATE_TABLE_WITH_DATA(SUPPORTS_CREATE_TABLE), SUPPORTS_CREATE_TABLE_WITH_TABLE_COMMENT(SUPPORTS_CREATE_TABLE), SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT(SUPPORTS_CREATE_TABLE), @@ -137,6 +138,7 @@ public enum TestingConnectorBehavior (name().equals("SUPPORTS_CANCELLATION") || name().equals("SUPPORTS_DYNAMIC_FILTER_PUSHDOWN") || name().equals("SUPPORTS_JOIN_PUSHDOWN") || + name().equals("SUPPORTS_CREATE_OR_REPLACE_TABLE") || name().equals("SUPPORTS_REPORTING_WRITTEN_BYTES") || name().equals("SUPPORTS_MULTI_STATEMENT_WRITES")), "Every behavior should be expected to be true by default. Having mixed defaults makes reasoning about tests harder. False default provided for %s",