From 3c4a62fb6a396db031e60d3bc40c6f0055fb15fb Mon Sep 17 00:00:00 2001 From: codluca Date: Wed, 8 Oct 2025 20:39:57 +0300 Subject: [PATCH] Add support for table_changes function for Lakehouse The table_changes function is supported by DeltaLake and Iceberg modules. The TableChangesFunction does the initial analisys. The Lakehouse TableChangesFunction will delegate to the DeltaLake TableChangesFunction, or, if the table is not found, to the Iceberg TableChangesFunction. The LakehouseFunctionProvider returns the TableFunctionProcessorProviderFactory from DeltaLak or Iceberg, depending on the previously functionHandle generated by the TableChangesFunction. --- .../plugin/lakehouse/LakehouseConnector.java | 24 ++++- .../lakehouse/LakehouseDeltaModule.java | 5 + .../lakehouse/LakehouseFunctionProvider.java | 53 ++++++++++ .../lakehouse/LakehouseIcebergModule.java | 3 + .../plugin/lakehouse/LakehouseModule.java | 9 ++ .../tablechanges/TableChangesFunction.java | 96 ++++++++++++++++++ .../TableChangesFunctionProvider.java | 54 ++++++++++ .../TestLakehouseDeltaConnectorSmokeTest.java | 95 ++++++++++++++++++ .../TestLakehouseHiveConnectorSmokeTest.java | 38 +++++++ ...estLakehouseIcebergConnectorSmokeTest.java | 99 +++++++++++++++++++ 10 files changed, 475 insertions(+), 1 deletion(-) create mode 100644 plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java create mode 100644 plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java create mode 100644 plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java index 8ee552c98930..86966ee6f438 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.bootstrap.LifeCycleManager; import io.trino.plugin.hive.HiveSchemaProperties; @@ -26,10 +27,13 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunction; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; import java.util.List; +import java.util.Optional; import java.util.Set; import static com.google.common.collect.Sets.immutableEnumSet; @@ -51,6 +55,8 @@ public class LakehouseConnector private final LakehouseSessionProperties sessionProperties; private final LakehouseTableProperties tableProperties; private final IcebergMaterializedViewProperties materializedViewProperties; + private final Set tableFunctions; + private final FunctionProvider functionProvider; @Inject public LakehouseConnector( @@ -62,7 +68,9 @@ public LakehouseConnector( LakehouseNodePartitioningProvider nodePartitioningProvider, LakehouseSessionProperties sessionProperties, LakehouseTableProperties tableProperties, - IcebergMaterializedViewProperties materializedViewProperties) + IcebergMaterializedViewProperties materializedViewProperties, + Set tableFunctions, + FunctionProvider functionProvider) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -73,6 +81,8 @@ public LakehouseConnector( this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null"); + this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null")); + this.functionProvider = requireNonNull(functionProvider, "functionProvider is null"); } @Override @@ -159,4 +169,16 @@ public Set getCapabilities() { return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT, MATERIALIZED_VIEW_GRACE_PERIOD); } + + @Override + public Set getTableFunctions() + { + return tableFunctions; + } + + @Override + public Optional getFunctionProvider() + { + return Optional.of(functionProvider); + } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java index 7f68de3b361b..26f61be45824 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java @@ -21,6 +21,7 @@ import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeExecutorModule; import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; +import io.trino.plugin.deltalake.DeltaLakeFunctionProvider; import io.trino.plugin.deltalake.DeltaLakeMergeResult; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; import io.trino.plugin.deltalake.DeltaLakeNodePartitioningProvider; @@ -32,6 +33,7 @@ import io.trino.plugin.deltalake.DeltaLakeTableProperties; import io.trino.plugin.deltalake.DeltaLakeTransactionManager; import io.trino.plugin.deltalake.DeltaLakeWriterStats; +import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastoreModule; import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler; import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; @@ -103,6 +105,9 @@ protected void setup(Binder binder) binder.bind(DeltaLakeTableMetadataScheduler.class).in(Scopes.SINGLETON); newExporter(binder).export(DeltaLakeTableMetadataScheduler.class).withGeneratedName(); + binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON); + jsonCodecBinder(binder).bindJsonCodec(DataFileInfo.class); jsonCodecBinder(binder).bindJsonCodec(DeltaLakeMergeResult.class); jsonCodecBinder(binder).bindJsonCodec(ExtendedStatistics.class); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java new file mode 100644 index 000000000000..bdfc48d0cf96 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeFunctionProvider; +import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle; +import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; + +import static java.util.Objects.requireNonNull; + +public class LakehouseFunctionProvider + implements FunctionProvider +{ + private final DeltaLakeFunctionProvider deltaLakeFunctionProvider; + private final IcebergFunctionProvider icebergFunctionProvider; + + @Inject + public LakehouseFunctionProvider( + DeltaLakeFunctionProvider deltaLakeFunctionProvider, + IcebergFunctionProvider icebergFunctionProvider) + { + this.deltaLakeFunctionProvider = requireNonNull(deltaLakeFunctionProvider, "deltaLakeFunctionProvider is null"); + this.icebergFunctionProvider = requireNonNull(icebergFunctionProvider, "icebergFunctionProvider is null"); + } + + @Override + public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFactory(ConnectorTableFunctionHandle functionHandle) + { + if (functionHandle instanceof TableChangesTableFunctionHandle) { + return deltaLakeFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle); + } + if (functionHandle instanceof TableChangesFunctionHandle) { + return icebergFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle); + } + throw new UnsupportedOperationException("Unsupported function: " + functionHandle); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java index 96cde88f842f..ab9aea1a5b1f 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java @@ -41,6 +41,7 @@ import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; +import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; @@ -75,6 +76,8 @@ protected void setup(Binder binder) binder.bind(ForwardingFileIoFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergFunctionProvider.class).in(Scopes.SINGLETON); + install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) { case THRIFT -> new IcebergHiveMetastoreCatalogModule(); case FILE -> new IcebergFileMetastoreCatalogModule(); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java index 6050c8fa54f0..e0f2b164aeec 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java @@ -24,7 +24,12 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory; +import io.trino.plugin.lakehouse.functions.tablechanges.TableChangesFunctionProvider; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunction; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -53,6 +58,10 @@ protected void setup(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); + binder.bind(FunctionProvider.class).to(LakehouseFunctionProvider.class).in(Scopes.SINGLETON); + binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON); + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java new file mode 100644 index 000000000000..97a80e1b23a3 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.function.table.AbstractConnectorTableFunction; +import io.trino.spi.function.table.Argument; +import io.trino.spi.function.table.ScalarArgumentSpecification; +import io.trino.spi.function.table.TableFunctionAnalysis; + +import java.util.Map; + +import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.VarcharType.VARCHAR; + +public class TableChangesFunction + extends AbstractConnectorTableFunction +{ + private static final String FUNCTION_NAME = "table_changes"; + private static final String SCHEMA_NAME = "system"; + private static final String NAME = "table_changes"; + public static final String SCHEMA_NAME_ARGUMENT = "SCHEMA_NAME"; + private static final String TABLE_NAME_ARGUMENT = "TABLE_NAME"; + private static final String START_SNAPSHOT_VAR_NAME = "START_SNAPSHOT_ID"; + private static final String END_SNAPSHOT_VAR_NAME = "END_SNAPSHOT_ID"; + private static final String SINCE_VERSION_ARGUMENT = "SINCE_VERSION"; + + private final io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction; + private final io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction; + + public TableChangesFunction( + io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction, + io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction) + { + super( + SCHEMA_NAME, + NAME, + ImmutableList.of( + ScalarArgumentSpecification.builder().name(SCHEMA_NAME_ARGUMENT).type(VARCHAR).build(), + ScalarArgumentSpecification.builder().name(TABLE_NAME_ARGUMENT).type(VARCHAR).build(), + ScalarArgumentSpecification.builder().name(START_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(), + ScalarArgumentSpecification.builder().name(END_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(), + ScalarArgumentSpecification.builder().name(SINCE_VERSION_ARGUMENT).type(BIGINT).defaultValue(null).build()), + GENERIC_TABLE); + this.deltaLakeTableChangesFunction = deltaLakeTableChangesFunction; + this.icebergTableChangesFunction = icebergTableChangesFunction; + } + + @Override + public TableFunctionAnalysis analyze( + ConnectorSession session, + ConnectorTransactionHandle transaction, + Map arguments, + ConnectorAccessControl accessControl) + { + try { + return deltaLakeTableChangesFunction.analyze(session, transaction, arguments, accessControl); + } + catch (NotADeltaLakeTableException _) { + checkNonNull(arguments.get(START_SNAPSHOT_VAR_NAME), START_SNAPSHOT_VAR_NAME); + checkNonNull(arguments.get(END_SNAPSHOT_VAR_NAME), END_SNAPSHOT_VAR_NAME); + try { + return icebergTableChangesFunction.analyze(session, transaction, arguments, accessControl); + } + catch (UnknownTableTypeException e) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, "table_changes function is not supported for the given table type"); + } + } + } + + private void checkNonNull(Object argumentValue, String argumentName) + { + if (argumentValue == null) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " argument " + argumentName + " may not be null"); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java new file mode 100644 index 000000000000..8dfe2d095208 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.functions.tablechanges; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction; +import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.function.table.ConnectorTableFunction; +import io.trino.spi.type.TypeManager; + +import static java.util.Objects.requireNonNull; + +public class TableChangesFunctionProvider + implements Provider +{ + private final DeltaLakeMetadataFactory deltaLakeMetadataFactory; + + private final TrinoCatalogFactory trinoCatalogFactory; + private final TypeManager typeManager; + + @Inject + public TableChangesFunctionProvider( + DeltaLakeMetadataFactory deltaLakeMetadataFactory, + TrinoCatalogFactory trinoCatalogFactory, + TypeManager typeManager) + { + this.deltaLakeMetadataFactory = requireNonNull(deltaLakeMetadataFactory, "deltaLakeMetadataFactory is null"); + this.trinoCatalogFactory = requireNonNull(trinoCatalogFactory, "trinoCatalogFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public ConnectorTableFunction get() + { + io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction = + new io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction(deltaLakeMetadataFactory); + io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction = + new io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction(trinoCatalogFactory, typeManager); + return new ClassLoaderSafeConnectorTableFunction(new TableChangesFunction(deltaLakeTableChangesFunction, icebergTableChangesFunction), getClass().getClassLoader()); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java index 955bdd2da1ee..63dde98d44c7 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.DELTA; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -59,4 +60,98 @@ public void testShowCreateTable() type = 'DELTA' )\\E"""); } + + @Test + void testTableChangesFunction() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertUpdate("UPDATE " + tableName + " SET page_url = 'url22' WHERE views = 2", 2); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))", + """ + VALUES + ('url1', 'domain1', 1, 'insert', BIGINT '1'), + ('url2', 'domain2', 2, 'insert', BIGINT '1'), + ('url3', 'domain3', 3, 'insert', BIGINT '1'), + ('url4', 'domain4', 4, 'insert', BIGINT '2'), + ('url5', 'domain5', 2, 'insert', BIGINT '2'), + ('url6', 'domain6', 6, 'insert', BIGINT '2'), + ('url2', 'domain2', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain2', 2, 'update_postimage', BIGINT '3'), + ('url5', 'domain5', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain5', 2, 'update_postimage', BIGINT '3') + """); + + assertUpdate("DELETE FROM " + tableName + " WHERE views = 2", 2); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 3))", + """ + VALUES + ('url22', 'domain2', 2, 'delete', BIGINT '4'), + ('url22', 'domain5', 2, 'delete', BIGINT '4') + """); + + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "')) ORDER BY _commit_version, _change_type, domain", + """ + VALUES + ('url1', 'domain1', 1, 'insert', BIGINT '1'), + ('url2', 'domain2', 2, 'insert', BIGINT '1'), + ('url3', 'domain3', 3, 'insert', BIGINT '1'), + ('url4', 'domain4', 4, 'insert', BIGINT '2'), + ('url5', 'domain5', 2, 'insert', BIGINT '2'), + ('url6', 'domain6', 6, 'insert', BIGINT '2'), + ('url22', 'domain2', 2, 'update_postimage', BIGINT '3'), + ('url22', 'domain5', 2, 'update_postimage', BIGINT '3'), + ('url2', 'domain2', 2, 'update_preimage', BIGINT '3'), + ('url5', 'domain5', 2, 'update_preimage', BIGINT '3'), + ('url22', 'domain2', 2, 'delete', BIGINT '4'), + ('url22', 'domain5', 2, 'delete', BIGINT '4') + """); + } + + private void assertTableChangesQuery(String sql, String expectedResult) + { + assertThat(query(sql)) + .result() + .exceptColumns("_commit_timestamp") + .skippingTypesCheck() + .matches(expectedResult); + } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessage("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessage("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessage("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .succeeds(); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 1))")) + .succeeds(); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java index 1745312c5c41..f1249ee2f8f6 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.HIVE; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseHiveConnectorSmokeTest @@ -66,4 +67,41 @@ comment varchar(152) type = 'HIVE' )"""); } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessageMatching("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessageMatching("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessageMatching("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100, 200))")) + .failure().hasMessageMatching("table_changes function is not supported for the given table type"); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java index 42bfaa102b22..d815e60e1924 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java @@ -13,9 +13,17 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.Iterables; +import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + import static io.trino.plugin.lakehouse.TableType.ICEBERG; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseIcebergConnectorSmokeTest @@ -44,4 +52,95 @@ public void testShowCreateTable() type = 'ICEBERG' )\\E"""); } + + @Test + void testTableChangesFunction() + { + DateTimeFormatter instantMillisFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSVV").withZone(UTC); + + try (TestTable table = newTrinoTable( + "test_table_changes_function_", + "AS SELECT nationkey, name FROM tpch.tiny.nation WITH NO DATA")) { + long initialSnapshot = getMostRecentSnapshotId(table.getName()); + assertUpdate("INSERT INTO " + table.getName() + " SELECT nationkey, name FROM nation", 25); + long snapshotAfterInsert = getMostRecentSnapshotId(table.getName()); + String snapshotAfterInsertTime = getSnapshotTime(table.getName(), snapshotAfterInsert).format(instantMillisFormatter); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), initialSnapshot, snapshotAfterInsert), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation".formatted(snapshotAfterInsert, snapshotAfterInsertTime)); + + // Run with named arguments + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(schema_name => CURRENT_SCHEMA, table_name => '%s', start_snapshot_id => %s, end_snapshot_id => %s))" + .formatted(table.getName(), initialSnapshot, snapshotAfterInsert), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation".formatted(snapshotAfterInsert, snapshotAfterInsertTime)); + + assertUpdate("DELETE FROM " + table.getName(), 25); + long snapshotAfterDelete = getMostRecentSnapshotId(table.getName()); + String snapshotAfterDeleteTime = getSnapshotTime(table.getName(), snapshotAfterDelete).format(instantMillisFormatter); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), snapshotAfterInsert, snapshotAfterDelete), + "SELECT nationkey, name, 'delete', %s, '%s', 0 FROM nation".formatted(snapshotAfterDelete, snapshotAfterDeleteTime)); + + assertQuery( + "SELECT nationkey, name, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal " + + "FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))".formatted(table.getName(), initialSnapshot, snapshotAfterDelete), + "SELECT nationkey, name, 'insert', %s, '%s', 0 FROM nation UNION SELECT nationkey, name, 'delete', %s, '%s', 1 FROM nation".formatted( + snapshotAfterInsert, snapshotAfterInsertTime, snapshotAfterDelete, snapshotAfterDeleteTime)); + } + } + + private long getMostRecentSnapshotId(String tableName) + { + return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyColumnAsSet()); + } + + private ZonedDateTime getSnapshotTime(String tableName, long snapshotId) + { + return (ZonedDateTime) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id = %s", tableName, snapshotId)) + .getOnlyColumnAsSet()); + } + + @Test + void testTableChangesFunctionFailures() + { + String tableName = "test_table_changes_function_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER) WITH (change_data_feed_enabled = true)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain3', 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES('url4', 'domain4', 4), ('url5', 'domain5', 2), ('url6', 'domain6', 6)", 3); + + assertThat(query("SELECT * FROM TABLE(system.table_changes())")) + .failure().hasMessageMatching("line 1:21: Missing argument: SCHEMA_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(NOSCHEMA))")) + .failure().hasMessageMatching("line 1:42: Column 'noschema' cannot be resolved"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA))")) + .failure().hasMessageMatching("line 1:42: Missing argument: TABLE_NAME"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 'not-a-number', null, null))")) + .failure().hasMessage("line 1:100: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, 'not-a-number', null))")) + .failure().hasMessage("line 1:106: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', null, null, 'not-a-number'))")) + .failure().hasMessage("line 1:112: Cannot cast type varchar(12) to bigint"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100))")) + .failure().hasMessageMatching("table_changes arguments may not be null"); + + assertThat(query("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "', 100, 200))")) + .failure().hasMessageMatching("Snapshot not found in Iceberg table history: 100"); + } }