Skip to content

Commit 7bbeb9b

Browse files
committed
Add support for table_changes function for Lakehouse
The table_changes function is supported by DeltaLake and Iceberg modules. The TableChangesFunction does the initial analisys. The Lakehouse TableChangesFunction will delegate to the DeltaLake TableChangesFunction, or, if the table is not found, to the Iceberg TableChangesFunction. The LakehouseFunctionProvider returns the TableFunctionProcessorProviderFactory from DeltaLak or Iceberg, depending on the previously functionHandle generated by the TableChangesFunction.
1 parent ff3deac commit 7bbeb9b

File tree

10 files changed

+475
-1
lines changed

10 files changed

+475
-1
lines changed

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.plugin.lakehouse;
1515

16+
import com.google.common.collect.ImmutableSet;
1617
import com.google.inject.Inject;
1718
import io.airlift.bootstrap.LifeCycleManager;
1819
import io.trino.plugin.hive.HiveSchemaProperties;
@@ -26,10 +27,13 @@
2627
import io.trino.spi.connector.ConnectorSession;
2728
import io.trino.spi.connector.ConnectorSplitManager;
2829
import io.trino.spi.connector.ConnectorTransactionHandle;
30+
import io.trino.spi.function.FunctionProvider;
31+
import io.trino.spi.function.table.ConnectorTableFunction;
2932
import io.trino.spi.session.PropertyMetadata;
3033
import io.trino.spi.transaction.IsolationLevel;
3134

3235
import java.util.List;
36+
import java.util.Optional;
3337
import java.util.Set;
3438

3539
import static com.google.common.collect.Sets.immutableEnumSet;
@@ -51,6 +55,8 @@ public class LakehouseConnector
5155
private final LakehouseSessionProperties sessionProperties;
5256
private final LakehouseTableProperties tableProperties;
5357
private final IcebergMaterializedViewProperties materializedViewProperties;
58+
private final Set<ConnectorTableFunction> tableFunctions;
59+
private final FunctionProvider functionProvider;
5460

5561
@Inject
5662
public LakehouseConnector(
@@ -62,7 +68,9 @@ public LakehouseConnector(
6268
LakehouseNodePartitioningProvider nodePartitioningProvider,
6369
LakehouseSessionProperties sessionProperties,
6470
LakehouseTableProperties tableProperties,
65-
IcebergMaterializedViewProperties materializedViewProperties)
71+
IcebergMaterializedViewProperties materializedViewProperties,
72+
Set<ConnectorTableFunction> tableFunctions,
73+
FunctionProvider functionProvider)
6674
{
6775
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
6876
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
@@ -73,6 +81,8 @@ public LakehouseConnector(
7381
this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
7482
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
7583
this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null");
84+
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
85+
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
7686
}
7787

7888
@Override
@@ -159,4 +169,16 @@ public Set<ConnectorCapabilities> getCapabilities()
159169
{
160170
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT, MATERIALIZED_VIEW_GRACE_PERIOD);
161171
}
172+
173+
@Override
174+
public Set<ConnectorTableFunction> getTableFunctions()
175+
{
176+
return tableFunctions;
177+
}
178+
179+
@Override
180+
public Optional<FunctionProvider> getFunctionProvider()
181+
{
182+
return Optional.of(functionProvider);
183+
}
162184
}

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.trino.plugin.deltalake.DeltaLakeConfig;
2222
import io.trino.plugin.deltalake.DeltaLakeExecutorModule;
2323
import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory;
24+
import io.trino.plugin.deltalake.DeltaLakeFunctionProvider;
2425
import io.trino.plugin.deltalake.DeltaLakeMergeResult;
2526
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
2627
import io.trino.plugin.deltalake.DeltaLakeNodePartitioningProvider;
@@ -32,6 +33,7 @@
3233
import io.trino.plugin.deltalake.DeltaLakeTableProperties;
3334
import io.trino.plugin.deltalake.DeltaLakeTransactionManager;
3435
import io.trino.plugin.deltalake.DeltaLakeWriterStats;
36+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
3537
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastoreModule;
3638
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
3739
import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider;
@@ -103,6 +105,9 @@ protected void setup(Binder binder)
103105
binder.bind(DeltaLakeTableMetadataScheduler.class).in(Scopes.SINGLETON);
104106
newExporter(binder).export(DeltaLakeTableMetadataScheduler.class).withGeneratedName();
105107

108+
binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON);
109+
binder.bind(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
110+
106111
jsonCodecBinder(binder).bindJsonCodec(DataFileInfo.class);
107112
jsonCodecBinder(binder).bindJsonCodec(DeltaLakeMergeResult.class);
108113
jsonCodecBinder(binder).bindJsonCodec(ExtendedStatistics.class);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.plugin.deltalake.DeltaLakeFunctionProvider;
18+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle;
19+
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
20+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
21+
import io.trino.spi.function.FunctionProvider;
22+
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
23+
import io.trino.spi.function.table.TableFunctionProcessorProviderFactory;
24+
25+
import static java.util.Objects.requireNonNull;
26+
27+
public class LakehouseFunctionProvider
28+
implements FunctionProvider
29+
{
30+
private final DeltaLakeFunctionProvider deltaLakeFunctionProvider;
31+
private final IcebergFunctionProvider icebergFunctionProvider;
32+
33+
@Inject
34+
public LakehouseFunctionProvider(
35+
DeltaLakeFunctionProvider deltaLakeFunctionProvider,
36+
IcebergFunctionProvider icebergFunctionProvider)
37+
{
38+
this.deltaLakeFunctionProvider = requireNonNull(deltaLakeFunctionProvider, "deltaLakeFunctionProvider is null");
39+
this.icebergFunctionProvider = requireNonNull(icebergFunctionProvider, "icebergFunctionProvider is null");
40+
}
41+
42+
@Override
43+
public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFactory(ConnectorTableFunctionHandle functionHandle)
44+
{
45+
if (functionHandle instanceof TableChangesTableFunctionHandle) {
46+
return deltaLakeFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle);
47+
}
48+
if (functionHandle instanceof TableChangesFunctionHandle) {
49+
return icebergFunctionProvider.getTableFunctionProcessorProviderFactory(functionHandle);
50+
}
51+
throw new UnsupportedOperationException("Unsupported function: " + functionHandle);
52+
}
53+
}

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
4242
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
4343
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
44+
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
4445

4546
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
4647
import static io.airlift.configuration.ConfigBinder.configBinder;
@@ -75,6 +76,8 @@ protected void setup(Binder binder)
7576

7677
binder.bind(ForwardingFileIoFactory.class).in(Scopes.SINGLETON);
7778

79+
binder.bind(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
80+
7881
install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) {
7982
case THRIFT -> new IcebergHiveMetastoreCatalogModule();
8083
case FILE -> new IcebergFileMetastoreCatalogModule();

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import io.trino.plugin.hive.orc.OrcWriterConfig;
2525
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
2626
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
27+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
28+
import io.trino.plugin.lakehouse.functions.tablechanges.TableChangesFunctionProvider;
29+
import io.trino.spi.function.FunctionProvider;
30+
import io.trino.spi.function.table.ConnectorTableFunction;
2731

32+
import static com.google.inject.multibindings.Multibinder.newSetBinder;
2833
import static io.airlift.configuration.ConfigBinder.configBinder;
2934
import static org.weakref.jmx.guice.ExportBinder.newExporter;
3035

@@ -53,6 +58,10 @@ protected void setup(Binder binder)
5358
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
5459
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
5560

61+
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
62+
binder.bind(FunctionProvider.class).to(LakehouseFunctionProvider.class).in(Scopes.SINGLETON);
63+
binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON);
64+
5665
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
5766
}
5867
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse.functions.tablechanges;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException;
18+
import io.trino.plugin.iceberg.UnknownTableTypeException;
19+
import io.trino.spi.TrinoException;
20+
import io.trino.spi.connector.ConnectorAccessControl;
21+
import io.trino.spi.connector.ConnectorSession;
22+
import io.trino.spi.connector.ConnectorTransactionHandle;
23+
import io.trino.spi.function.table.AbstractConnectorTableFunction;
24+
import io.trino.spi.function.table.Argument;
25+
import io.trino.spi.function.table.ScalarArgumentSpecification;
26+
import io.trino.spi.function.table.TableFunctionAnalysis;
27+
28+
import java.util.Map;
29+
30+
import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
31+
import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
32+
import static io.trino.spi.type.BigintType.BIGINT;
33+
import static io.trino.spi.type.VarcharType.VARCHAR;
34+
35+
public class TableChangesFunction
36+
extends AbstractConnectorTableFunction
37+
{
38+
private static final String FUNCTION_NAME = "table_changes";
39+
private static final String SCHEMA_NAME = "system";
40+
private static final String NAME = "table_changes";
41+
public static final String SCHEMA_NAME_ARGUMENT = "SCHEMA_NAME";
42+
private static final String TABLE_NAME_ARGUMENT = "TABLE_NAME";
43+
private static final String START_SNAPSHOT_VAR_NAME = "START_SNAPSHOT_ID";
44+
private static final String END_SNAPSHOT_VAR_NAME = "END_SNAPSHOT_ID";
45+
private static final String SINCE_VERSION_ARGUMENT = "SINCE_VERSION";
46+
47+
private final io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction;
48+
private final io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction;
49+
50+
public TableChangesFunction(
51+
io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction,
52+
io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction)
53+
{
54+
super(
55+
SCHEMA_NAME,
56+
NAME,
57+
ImmutableList.of(
58+
ScalarArgumentSpecification.builder().name(SCHEMA_NAME_ARGUMENT).type(VARCHAR).build(),
59+
ScalarArgumentSpecification.builder().name(TABLE_NAME_ARGUMENT).type(VARCHAR).build(),
60+
ScalarArgumentSpecification.builder().name(START_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(),
61+
ScalarArgumentSpecification.builder().name(END_SNAPSHOT_VAR_NAME).type(BIGINT).defaultValue(null).build(),
62+
ScalarArgumentSpecification.builder().name(SINCE_VERSION_ARGUMENT).type(BIGINT).defaultValue(null).build()),
63+
GENERIC_TABLE);
64+
this.deltaLakeTableChangesFunction = deltaLakeTableChangesFunction;
65+
this.icebergTableChangesFunction = icebergTableChangesFunction;
66+
}
67+
68+
@Override
69+
public TableFunctionAnalysis analyze(
70+
ConnectorSession session,
71+
ConnectorTransactionHandle transaction,
72+
Map<String, Argument> arguments,
73+
ConnectorAccessControl accessControl)
74+
{
75+
try {
76+
return deltaLakeTableChangesFunction.analyze(session, transaction, arguments, accessControl);
77+
}
78+
catch (NotADeltaLakeTableException _) {
79+
checkNonNull(arguments.get(START_SNAPSHOT_VAR_NAME), START_SNAPSHOT_VAR_NAME);
80+
checkNonNull(arguments.get(END_SNAPSHOT_VAR_NAME), END_SNAPSHOT_VAR_NAME);
81+
try {
82+
return icebergTableChangesFunction.analyze(session, transaction, arguments, accessControl);
83+
}
84+
catch (UnknownTableTypeException e) {
85+
throw new TrinoException(INVALID_FUNCTION_ARGUMENT, "table_changes function is not supported for the given table type");
86+
}
87+
}
88+
}
89+
90+
private void checkNonNull(Object argumentValue, String argumentName)
91+
{
92+
if (argumentValue == null) {
93+
throw new TrinoException(INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " argument " + argumentName + " may not be null");
94+
}
95+
}
96+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse.functions.tablechanges;
15+
16+
import com.google.inject.Inject;
17+
import com.google.inject.Provider;
18+
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
19+
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
20+
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
21+
import io.trino.spi.function.table.ConnectorTableFunction;
22+
import io.trino.spi.type.TypeManager;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class TableChangesFunctionProvider
27+
implements Provider<ConnectorTableFunction>
28+
{
29+
private final DeltaLakeMetadataFactory deltaLakeMetadataFactory;
30+
31+
private final TrinoCatalogFactory trinoCatalogFactory;
32+
private final TypeManager typeManager;
33+
34+
@Inject
35+
public TableChangesFunctionProvider(
36+
DeltaLakeMetadataFactory deltaLakeMetadataFactory,
37+
TrinoCatalogFactory trinoCatalogFactory,
38+
TypeManager typeManager)
39+
{
40+
this.deltaLakeMetadataFactory = requireNonNull(deltaLakeMetadataFactory, "deltaLakeMetadataFactory is null");
41+
this.trinoCatalogFactory = requireNonNull(trinoCatalogFactory, "trinoCatalogFactory is null");
42+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
43+
}
44+
45+
@Override
46+
public ConnectorTableFunction get()
47+
{
48+
io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction deltaLakeTableChangesFunction =
49+
new io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunction(deltaLakeMetadataFactory);
50+
io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction icebergTableChangesFunction =
51+
new io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunction(trinoCatalogFactory, typeManager);
52+
return new ClassLoaderSafeConnectorTableFunction(new TableChangesFunction(deltaLakeTableChangesFunction, icebergTableChangesFunction), getClass().getClassLoader());
53+
}
54+
}

0 commit comments

Comments
 (0)