Skip to content

Commit f6d92dd

Browse files
committed
Add Iceberg system.bucket function to Lakehouse
LakehouseMetadata will delegate all calls related to functions to IcebergMetadata. The system.bucket function will always be delegated to Iceberg function provider. If there is no connector selected, the full query is like this: SELECT lakehouse.system.bucket('trino', 16)
1 parent 913b1c9 commit f6d92dd

File tree

6 files changed

+106
-1
lines changed

6 files changed

+106
-1
lines changed

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@
2626
import io.trino.spi.connector.ConnectorSession;
2727
import io.trino.spi.connector.ConnectorSplitManager;
2828
import io.trino.spi.connector.ConnectorTransactionHandle;
29+
import io.trino.spi.function.FunctionProvider;
2930
import io.trino.spi.session.PropertyMetadata;
3031
import io.trino.spi.transaction.IsolationLevel;
3132

3233
import java.util.List;
34+
import java.util.Optional;
3335
import java.util.Set;
3436

3537
import static com.google.common.collect.Sets.immutableEnumSet;
@@ -51,6 +53,7 @@ public class LakehouseConnector
5153
private final LakehouseSessionProperties sessionProperties;
5254
private final LakehouseTableProperties tableProperties;
5355
private final IcebergMaterializedViewProperties materializedViewProperties;
56+
private final FunctionProvider functionProvider;
5457

5558
@Inject
5659
public LakehouseConnector(
@@ -62,7 +65,8 @@ public LakehouseConnector(
6265
LakehouseNodePartitioningProvider nodePartitioningProvider,
6366
LakehouseSessionProperties sessionProperties,
6467
LakehouseTableProperties tableProperties,
65-
IcebergMaterializedViewProperties materializedViewProperties)
68+
IcebergMaterializedViewProperties materializedViewProperties,
69+
FunctionProvider functionProvider)
6670
{
6771
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
6872
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
@@ -73,6 +77,7 @@ public LakehouseConnector(
7377
this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
7478
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
7579
this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null");
80+
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
7681
}
7782

7883
@Override
@@ -159,4 +164,10 @@ public Set<ConnectorCapabilities> getCapabilities()
159164
{
160165
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT, MATERIALIZED_VIEW_GRACE_PERIOD);
161166
}
167+
168+
@Override
169+
public Optional<FunctionProvider> getFunctionProvider()
170+
{
171+
return Optional.of(functionProvider);
172+
}
162173
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
18+
import io.trino.spi.function.BoundSignature;
19+
import io.trino.spi.function.FunctionDependencies;
20+
import io.trino.spi.function.FunctionId;
21+
import io.trino.spi.function.FunctionProvider;
22+
import io.trino.spi.function.InvocationConvention;
23+
import io.trino.spi.function.ScalarFunctionImplementation;
24+
25+
import static java.util.Objects.requireNonNull;
26+
27+
public class LakehouseFunctionProvider
28+
implements FunctionProvider
29+
{
30+
private final IcebergFunctionProvider icebergFunctionProvider;
31+
32+
@Inject
33+
public LakehouseFunctionProvider(
34+
IcebergFunctionProvider icebergFunctionProvider)
35+
{
36+
this.icebergFunctionProvider = requireNonNull(icebergFunctionProvider, "icebergFunctionProvider is null");
37+
}
38+
39+
@Override
40+
public ScalarFunctionImplementation getScalarFunctionImplementation(
41+
FunctionId functionId,
42+
BoundSignature boundSignature,
43+
FunctionDependencies functionDependencies,
44+
InvocationConvention invocationConvention)
45+
{
46+
if ("bucket".equals(boundSignature.getName().getFunctionName())) {
47+
return icebergFunctionProvider.getScalarFunctionImplementation(functionId, boundSignature, functionDependencies, invocationConvention);
48+
}
49+
throw new UnsupportedOperationException("%s provides only 'bucket' scalar function".formatted(getClass().getName()));
50+
}
51+
}

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
4242
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
4343
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
44+
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
4445

4546
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
4647
import static io.airlift.configuration.ConfigBinder.configBinder;
@@ -75,6 +76,8 @@ protected void setup(Binder binder)
7576

7677
binder.bind(ForwardingFileIoFactory.class).in(Scopes.SINGLETON);
7778

79+
binder.bind(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
80+
7881
install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) {
7982
case THRIFT -> new IcebergHiveMetastoreCatalogModule();
8083
case FILE -> new IcebergFileMetastoreCatalogModule();

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@
8585
import io.trino.spi.connector.WriterScalingOptions;
8686
import io.trino.spi.expression.ConnectorExpression;
8787
import io.trino.spi.expression.Constant;
88+
import io.trino.spi.function.BoundSignature;
89+
import io.trino.spi.function.FunctionDependencyDeclaration;
90+
import io.trino.spi.function.FunctionId;
91+
import io.trino.spi.function.FunctionMetadata;
8892
import io.trino.spi.function.LanguageFunction;
8993
import io.trino.spi.function.SchemaFunctionName;
9094
import io.trino.spi.security.GrantInfo;
@@ -173,6 +177,30 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
173177
return hiveMetadata.getTableHandle(session, tableName, startVersion, endVersion);
174178
}
175179

180+
@Override
181+
public Collection<FunctionMetadata> listFunctions(ConnectorSession session, String schemaName)
182+
{
183+
return icebergMetadata.listFunctions(session, schemaName);
184+
}
185+
186+
@Override
187+
public Collection<FunctionMetadata> getFunctions(ConnectorSession session, SchemaFunctionName name)
188+
{
189+
return icebergMetadata.getFunctions(session, name);
190+
}
191+
192+
@Override
193+
public FunctionMetadata getFunctionMetadata(ConnectorSession session, FunctionId functionId)
194+
{
195+
return icebergMetadata.getFunctionMetadata(session, functionId);
196+
}
197+
198+
@Override
199+
public FunctionDependencyDeclaration getFunctionDependencies(ConnectorSession session, FunctionId functionId, BoundSignature boundSignature)
200+
{
201+
return icebergMetadata.getFunctionDependencies(session, functionId, boundSignature);
202+
}
203+
176204
@Override
177205
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorAccessControl accessControl, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
178206
{

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.trino.plugin.hive.orc.OrcWriterConfig;
2525
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
2626
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
27+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
28+
import io.trino.spi.function.FunctionProvider;
2729

2830
import static io.airlift.configuration.ConfigBinder.configBinder;
2931
import static org.weakref.jmx.guice.ExportBinder.newExporter;
@@ -53,6 +55,9 @@ protected void setup(Binder binder)
5355
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
5456
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
5557

58+
binder.bind(FunctionProvider.class).to(LakehouseFunctionProvider.class).in(Scopes.SINGLETON);
59+
binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON);
60+
5661
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
5762
}
5863
}

plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,11 @@ public void testShowCreateTable()
365365
type = 'ICEBERG'
366366
)\\E""");
367367
}
368+
369+
@Test
370+
void testSystemBucket()
371+
{
372+
assertThat(query("SELECT lakehouse.system.bucket('trino', 16)"))
373+
.matches("VALUES 10");
374+
}
368375
}

0 commit comments

Comments
 (0)