diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java index 8ee552c98930..86966ee6f438 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.bootstrap.LifeCycleManager; import io.trino.plugin.hive.HiveSchemaProperties; @@ -26,10 +27,13 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunction; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; import java.util.List; +import java.util.Optional; import java.util.Set; import static com.google.common.collect.Sets.immutableEnumSet; @@ -51,6 +55,8 @@ public class LakehouseConnector private final LakehouseSessionProperties sessionProperties; private final LakehouseTableProperties tableProperties; private final IcebergMaterializedViewProperties materializedViewProperties; + private final Set tableFunctions; + private final FunctionProvider functionProvider; @Inject public LakehouseConnector( @@ -62,7 +68,9 @@ public LakehouseConnector( LakehouseNodePartitioningProvider nodePartitioningProvider, LakehouseSessionProperties sessionProperties, LakehouseTableProperties tableProperties, - IcebergMaterializedViewProperties materializedViewProperties) + IcebergMaterializedViewProperties materializedViewProperties, + Set tableFunctions, + FunctionProvider functionProvider) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -73,6 +81,8 @@ public LakehouseConnector( this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null"); + this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null")); + this.functionProvider = requireNonNull(functionProvider, "functionProvider is null"); } @Override @@ -159,4 +169,16 @@ public Set getCapabilities() { return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT, MATERIALIZED_VIEW_GRACE_PERIOD); } + + @Override + public Set getTableFunctions() + { + return tableFunctions; + } + + @Override + public Optional getFunctionProvider() + { + return Optional.of(functionProvider); + } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java index 7f68de3b361b..26f61be45824 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java @@ -21,6 +21,7 @@ import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeExecutorModule; import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; +import io.trino.plugin.deltalake.DeltaLakeFunctionProvider; import io.trino.plugin.deltalake.DeltaLakeMergeResult; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; import io.trino.plugin.deltalake.DeltaLakeNodePartitioningProvider; @@ -32,6 +33,7 @@ import io.trino.plugin.deltalake.DeltaLakeTableProperties; import io.trino.plugin.deltalake.DeltaLakeTransactionManager; import io.trino.plugin.deltalake.DeltaLakeWriterStats; +import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastoreModule; import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler; import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; @@ -103,6 +105,9 @@ protected void setup(Binder binder) binder.bind(DeltaLakeTableMetadataScheduler.class).in(Scopes.SINGLETON); newExporter(binder).export(DeltaLakeTableMetadataScheduler.class).withGeneratedName(); + binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON); + jsonCodecBinder(binder).bindJsonCodec(DataFileInfo.class); jsonCodecBinder(binder).bindJsonCodec(DeltaLakeMergeResult.class); jsonCodecBinder(binder).bindJsonCodec(ExtendedStatistics.class); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java new file mode 100644 index 000000000000..bdfc48d0cf96 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeFunctionProvider; +import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle; +import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; + +import static java.util.Objects.requireNonNull; + +public class LakehouseFunctionProvider + implements FunctionProvider +{ + private final DeltaLakeFunctionProvider deltaLakeFunctionProvider; + private final IcebergFunctionProvider icebergFunctionProvider; + + @Inject + public LakehouseFunctionProvider( + DeltaLakeFunctionProvider deltaLakeFunctionProvider, + IcebergFunctionProvider icebergFunctionProvider) + { + this.deltaLakeFunctionProvider = requireNonNull(deltaLakeFunctionProvider, "deltaLakeFunctionProvider is null"); + this.icebergFunctionProvider = requireNonNull(icebergFunctionProvider, "icebergFunctionProvider is null"); + } + + @Override + public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFactory(ConnectorTableFunctionHandle functionHandle) + { + if (functionHandle instanceof TableChangesTableFunctionHandle) { + return deltaLakeFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle); + } + if (functionHandle instanceof TableChangesFunctionHandle) { + return icebergFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle); + } + throw new UnsupportedOperationException("Unsupported function: " + functionHandle); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java index 96cde88f842f..ab9aea1a5b1f 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java @@ -41,6 +41,7 @@ import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; +import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; @@ -75,6 +76,8 @@ protected void setup(Binder binder) binder.bind(ForwardingFileIoFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergFunctionProvider.class).in(Scopes.SINGLETON); + install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) { case THRIFT -> new IcebergHiveMetastoreCatalogModule(); case FILE -> new IcebergFileMetastoreCatalogModule(); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java index 6050c8fa54f0..e0f2b164aeec 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java @@ -24,7 +24,12 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory; +import io.trino.plugin.lakehouse.functions.tablechanges.TableChangesFunctionProvider; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunction; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -53,6 +58,10 @@ protected void setup(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); + binder.bind(FunctionProvider.class).to(LakehouseFunctionProvider.class).in(Scopes.SINGLETON); + binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON); + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java new file mode 100644 index 000000000000..97a80e1b23a3 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.function.table.AbstractConnectorTableFunction; +import io.trino.spi.function.table.Argument; +import io.trino.spi.function.table.ScalarArgumentSpecification; +import io.trino.spi.function.table.TableFunctionAnalysis; + +import java.util.Map; + +import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.VarcharType.VARCHAR; + +public class TableChangesFunction + extends AbstractConnectorTableFunction +{ + private static final String FUNCTION_NAME = "table_changes"; + private static final String SCHEMA_NAME = "system"; + private static final String NAME = "table_changes"; + public static final String SCHEMA_NAME_ARGUMENT = "SCHEMA_NAME"; + private static final String TABLE_NAME_ARGUMENT = "TABLE_NAME"; + private static final String START_SNAPSHOT_VAR_NAME = "START_SNAPSHOT_ID"; + private static final String END_SNAPSHOT_VAR_NAME = "END_SNAPSHOT_ID"; + private static final String SINCE_VERSION_ARGUMENT = "SINCE_VERSION"; + + private final io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction; + private final io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction; + + public TableChangesFunction( + io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction, + io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction) + { + super( + SCHEMA_NAME, + NAME, + ImmutableList.of( + ScalarArgumentSpecification.builder().name(SCHEMA_NAME_ARGUMENT).type(VARCHAR).build(), + ScalarArgumentSpecification.builder().name(TABLE_NAME_ARGUMENT).type(VARCHAR).build(), + ScalarArgumentSpecification.builder().name(START_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(), + ScalarArgumentSpecification.builder().name(END_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(), + ScalarArgumentSpecification.builder().name(SINCE_VERSION_ARGUMENT).type(BIGINT).defaultValue(null).build()), + GENERIC_TABLE); + this.deltaLakeTableChangesFunction = deltaLakeTableChangesFunction; + this.icebergTableChangesFunction = icebergTableChangesFunction; + } + + @Override + public TableFunctionAnalysis analyze( + ConnectorSession session, + ConnectorTransactionHandle transaction, + Map arguments, + ConnectorAccessControl accessControl) + { + try { + return deltaLakeTableChangesFunction.analyze(session, transaction, arguments, accessControl); + } + catch (NotADeltaLakeTableException _) { + checkNonNull(arguments.get(START_SNAPSHOT_VAR_NAME), START_SNAPSHOT_VAR_NAME); + checkNonNull(arguments.get(END_SNAPSHOT_VAR_NAME), END_SNAPSHOT_VAR_NAME); + try { + return icebergTableChangesFunction.analyze(session, transaction, arguments, accessControl); + } + catch (UnknownTableTypeException e) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, "table_changes function is not supported for the given table type"); + } + } + } + + private void checkNonNull(Object argumentValue, String argumentName) + { + if (argumentValue == null) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " argument " + argumentName + " may not be null"); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java new file mode 100644 index 000000000000..8dfe2d095208 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.functions.tablechanges; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction; +import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.function.table.ConnectorTableFunction; +import io.trino.spi.type.TypeManager; + +import static java.util.Objects.requireNonNull; + +public class TableChangesFunctionProvider + implements Provider +{ + private final DeltaLakeMetadataFactory deltaLakeMetadataFactory; + + private final TrinoCatalogFactory trinoCatalogFactory; + private final TypeManager typeManager; + + @Inject + public TableChangesFunctionProvider( + DeltaLakeMetadataFactory deltaLakeMetadataFactory, + TrinoCatalogFactory trinoCatalogFactory, + TypeManager typeManager) + { + this.deltaLakeMetadataFactory = requireNonNull(deltaLakeMetadataFactory, "deltaLakeMetadataFactory is null"); + this.trinoCatalogFactory = requireNonNull(trinoCatalogFactory, "trinoCatalogFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public ConnectorTableFunction get() + { + io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction = + new io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction(deltaLakeMetadataFactory); + io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction = + new io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction(trinoCatalogFactory, typeManager); + return new ClassLoaderSafeConnectorTableFunction(new TableChangesFunction(deltaLakeTableChangesFunction, icebergTableChangesFunction), getClass().getClassLoader()); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java index 955bdd2da1ee..63dde98d44c7 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.DELTA; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -59,4 +60,98 @@ public void testShowCreateTable() type = 'DELTA' )\\E"""); } + + @Test + void testTableChangesFunction() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertUpdate("UPDATE " + tableName + " SET page_url = 'url22' WHERE views = 2", 2); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))", + """ + VALUES + ('url1', 'domain1', 1, 'insert', BIGINT '1'), + ('url2', 'domain2', 2, 'insert', BIGINT '1'), + ('url3', 'domain3', 3, 'insert', BIGINT '1'), + ('url4', 'domain4', 4, 'insert', BIGINT '2'), + ('url5', 'domain5', 2, 'insert', BIGINT '2'), + ('url6', 'domain6', 6, 'insert', BIGINT '2'), + ('url2', 'domain2', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain2', 2, 'update_postimage', BIGINT '3'), + ('url5', 'domain5', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain5', 2, 'update_postimage', BIGINT '3') + """); + + assertUpdate("DELETE FROM " + tableName + " WHERE views = 2", 2); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 3))", + """ + VALUES + ('url22', 'domain2', 2, 'delete', BIGINT '4'), + ('url22', 'domain5', 2, 'delete', BIGINT '4') + """); + + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "')) ORDER BY _commit_version, _change_type, domain", + """ + VALUES + ('url1', 'domain1', 1, 'insert', BIGINT '1'), + ('url2', 'domain2', 2, 'insert', BIGINT '1'), + ('url3', 'domain3', 3, 'insert', BIGINT '1'), + ('url4', 'domain4', 4, 'insert', BIGINT '2'), + ('url5', 'domain5', 2, 'insert', BIGINT '2'), + ('url6', 'domain6', 6, 'insert', BIGINT '2'), + ('url22', 'domain2', 2, 'update_postimage', BIGINT '3'), + ('url22', 'domain5', 2, 'update_postimage', BIGINT '3'), + ('url2', 'domain2', 2, 'update_preimage', BIGINT '3'), + ('url5', 'domain5', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain2', 2, 'delete', BIGINT '4'), + ('url22', 'domain5', 2, 'delete', BIGINT '4') + """); + } + + private void assertTableChangesQuery(String sql, String expectedResult) + { + assertThat(query(sql)) + .result() + .exceptColumns("_commit_timestamp") + .skippingTypesCheck() + .matches(expectedResult); + } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessage("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessage("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessage("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .succeeds(); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 1))")) + .succeeds(); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java index 1745312c5c41..f1249ee2f8f6 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.HIVE; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseHiveConnectorSmokeTest @@ -66,4 +67,41 @@ comment varchar(152) type = 'HIVE' )"""); } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessageMatching("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessageMatching("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessageMatching("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100, 200))")) + .failure().hasMessageMatching("table_changes function is not supported for the given table type"); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java index 42bfaa102b22..d815e60e1924 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java @@ -13,9 +13,17 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.Iterables; +import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + import static io.trino.plugin.lakehouse.TableType.ICEBERG; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseIcebergConnectorSmokeTest @@ -44,4 +52,95 @@ public void testShowCreateTable() type = 'ICEBERG' )\\E"""); } + + @Test + void testTableChangesFunction() + { + DateTimeFormatter instantMillisFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSVV").withZone(UTC); + + try (TestTable table = newTrinoTable( + "test_table_changes_function_", + "AS SELECT nationkey, name FROM tpch.tiny.nation WITH NO DATA")) { + long initialSnapshot = getMostRecentSnapshotId(table.getName()); + assertUpdate("INSERT INTO " + table.getName() + " SELECT nationkey, name FROM nation", 25); + long snapshotAfterInsert = getMostRecentSnapshotId(table.getName()); + String snapshotAfterInsertTime = getSnapshotTime(table.getName(), snapshotAfterInsert).format(instantMillisFormatter); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), initialSnapshot, snapshotAfterInsert), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation".formatted(snapshotAfterInsert, snapshotAfterInsertTime)); + + // Run with named arguments + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(schema_name => CURRENT_SCHEMA, table_name => '%s', start_snapshot_id => %s, end_snapshot_id => %s))" + .formatted(table.getName(), initialSnapshot, snapshotAfterInsert), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation".formatted(snapshotAfterInsert, snapshotAfterInsertTime)); + + assertUpdate("DELETE FROM " + table.getName(), 25); + long snapshotAfterDelete = getMostRecentSnapshotId(table.getName()); + String snapshotAfterDeleteTime = getSnapshotTime(table.getName(), snapshotAfterDelete).format(instantMillisFormatter); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), snapshotAfterInsert, snapshotAfterDelete), + "SELECT nationkey, name, 'delete', %s, '%s', 0 FROM nation".formatted(snapshotAfterDelete, snapshotAfterDeleteTime)); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), initialSnapshot, snapshotAfterDelete), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation UNION SELECT nationkey, name, 'delete', %s, '%s', 1 FROM nation".formatted( + snapshotAfterInsert, snapshotAfterInsertTime, snapshotAfterDelete, snapshotAfterDeleteTime)); + } + } + + private long getMostRecentSnapshotId(String tableName) + { + return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyColumnAsSet()); + } + + private ZonedDateTime getSnapshotTime(String tableName, long snapshotId) + { + return (ZonedDateTime) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id = %s", tableName, snapshotId)) + .getOnlyColumnAsSet()); + } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessageMatching("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessageMatching("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessageMatching("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100, 200))")) + .failure().hasMessageMatching("Snapshot not found in Iceberg table history: 100"); + } }