Skip to content

Commit eddba0d

Browse files
committed
Add table_changes(tableName, startVersion, endVersion) function for delta lake
table_changes allows reading cdf entries stream. startVersion and endVersion are optional. If startVersion is not provided, function starts reading entries from the first one. startVersion is exclusive, endVersion is inclusive.
1 parent 7ccb04d commit eddba0d

20 files changed

+1373
-6
lines changed

docs/src/main/sphinx/connector/delta-lake.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,25 @@ The output of the query has the following columns:
724724
- ``boolean``
725725
- Whether or not the operation appended data
726726

727+
Functions
728+
---------------
729+
730+
The connector exposes following functions:
731+
732+
table_changes
733+
^^^^^^^^^^^^^
734+
735+
Allows reading CDF entries:
736+
737+
.. code-block:: sql
738+
739+
SELECT * FROM TABLE(system.table_changes('test_schema.test_table', startVersion, endVersion)
740+
741+
``startVersion`` - type ``BIGINT``, exclusive, changes will be read from ``startVersion + 1``
742+
743+
``endVersion`` - type ``BIGINT``, inclusive, changes will be read up to endVersion
744+
745+
If ``startVersion`` is not provided entire history will be read.
727746

728747
Performance
729748
-----------

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import io.trino.spi.connector.SystemTable;
3333
import io.trino.spi.connector.TableProcedureMetadata;
3434
import io.trino.spi.eventlistener.EventListener;
35+
import io.trino.spi.function.FunctionProvider;
3536
import io.trino.spi.procedure.Procedure;
37+
import io.trino.spi.ptf.ConnectorTableFunction;
3638
import io.trino.spi.session.PropertyMetadata;
3739
import io.trino.spi.transaction.IsolationLevel;
3840

@@ -68,6 +70,8 @@ public class DeltaLakeConnector
6870
// Delta lake is not transactional but we use Trino transaction boundaries to create a per-query
6971
// caching Hive metastore clients. DeltaLakeTransactionManager is used to store those.
7072
private final DeltaLakeTransactionManager transactionManager;
73+
private final Set<ConnectorTableFunction> tableFunctions;
74+
private final FunctionProvider functionProvider;
7175

7276
public DeltaLakeConnector(
7377
LifeCycleManager lifeCycleManager,
@@ -84,7 +88,9 @@ public DeltaLakeConnector(
8488
List<PropertyMetadata<?>> analyzeProperties,
8589
Optional<ConnectorAccessControl> accessControl,
8690
Set<EventListener> eventListeners,
87-
DeltaLakeTransactionManager transactionManager)
91+
DeltaLakeTransactionManager transactionManager,
92+
Set<ConnectorTableFunction> tableFunctions,
93+
FunctionProvider functionProvider)
8894
{
8995
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
9096
this.splitManager = requireNonNull(splitManager, "splitManager is null");
@@ -104,6 +110,8 @@ public DeltaLakeConnector(
104110
this.accessControl = requireNonNull(accessControl, "accessControl is null");
105111
this.eventListeners = ImmutableSet.copyOf(requireNonNull(eventListeners, "eventListeners is null"));
106112
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
113+
this.tableFunctions = requireNonNull(tableFunctions, "tableFunctions is null");
114+
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
107115
}
108116

109117
@Override
@@ -223,4 +231,16 @@ public Set<ConnectorCapabilities> getCapabilities()
223231
{
224232
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT);
225233
}
234+
235+
@Override
236+
public Set<ConnectorTableFunction> getTableFunctions()
237+
{
238+
return tableFunctions;
239+
}
240+
241+
@Override
242+
public Optional<FunctionProvider> getFunctionProvider()
243+
{
244+
return Optional.of(functionProvider);
245+
}
226246
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import io.trino.filesystem.TrinoFileSystemFactory;
17+
import io.trino.plugin.deltalake.functions.tablechanges.DeltaLakeTableChangesFunction;
18+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
19+
import io.trino.spi.function.AggregationImplementation;
20+
import io.trino.spi.function.BoundSignature;
21+
import io.trino.spi.function.FunctionDependencies;
22+
import io.trino.spi.function.FunctionId;
23+
import io.trino.spi.function.FunctionProvider;
24+
import io.trino.spi.function.InvocationConvention;
25+
import io.trino.spi.function.ScalarFunctionImplementation;
26+
import io.trino.spi.function.SchemaFunctionName;
27+
import io.trino.spi.function.WindowFunctionSupplier;
28+
import io.trino.spi.ptf.TableFunctionProcessorProvider;
29+
30+
import javax.inject.Inject;
31+
32+
public class DeltaLakeFunctionProvider
33+
implements FunctionProvider
34+
{
35+
private final TrinoFileSystemFactory fileSystemFactory;
36+
private final DeltaLakeConfig deltaLakeConfig;
37+
38+
@Inject
39+
public DeltaLakeFunctionProvider(TrinoFileSystemFactory fileSystemFactory, DeltaLakeConfig deltaLakeConfig)
40+
{
41+
this.fileSystemFactory = fileSystemFactory;
42+
this.deltaLakeConfig = deltaLakeConfig;
43+
}
44+
45+
@Override
46+
public ScalarFunctionImplementation getScalarFunctionImplementation(FunctionId functionId, BoundSignature boundSignature, FunctionDependencies functionDependencies, InvocationConvention invocationConvention)
47+
{
48+
return null;
49+
}
50+
51+
@Override
52+
public AggregationImplementation getAggregationImplementation(FunctionId functionId, BoundSignature boundSignature, FunctionDependencies functionDependencies)
53+
{
54+
return null;
55+
}
56+
57+
@Override
58+
public WindowFunctionSupplier getWindowFunctionSupplier(FunctionId functionId, BoundSignature boundSignature, FunctionDependencies functionDependencies)
59+
{
60+
return null;
61+
}
62+
63+
@Override
64+
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(SchemaFunctionName name)
65+
{
66+
if (name.equals(new SchemaFunctionName(DeltaLakeTableChangesFunction.SCHEMA_NAME, DeltaLakeTableChangesFunction.NAME))) {
67+
return new TableChangesProcessorProvider(fileSystemFactory, deltaLakeConfig);
68+
}
69+
return null;
70+
}
71+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2750,7 +2750,7 @@ private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, Strin
27502750
return toColumnHandle(column, OptionalInt.empty(), physicalName, physicalType, partitionColumns);
27512751
}
27522752

2753-
private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
2753+
public static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
27542754
{
27552755
boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(column.getName()));
27562756
return new DeltaLakeColumnHandle(

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@
6363
import io.trino.spi.connector.ConnectorSession;
6464
import io.trino.spi.connector.ConnectorSplitManager;
6565
import io.trino.spi.connector.TableProcedureMetadata;
66+
import io.trino.spi.function.FunctionProvider;
6667
import io.trino.spi.procedure.Procedure;
68+
import io.trino.spi.ptf.ConnectorTableFunction;
6769
import io.trino.spi.security.ConnectorIdentity;
6870

6971
import javax.inject.Singleton;
@@ -150,6 +152,9 @@ public void setup(Binder binder)
150152

151153
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
152154
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
155+
156+
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(DeltaLakeTableFunctionsProvider.class).in(Scopes.SINGLETON);
157+
binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
153158
}
154159

155160
@Singleton

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.units.DataSize;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
1819
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
20+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionTableHandle;
21+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesSplitSource;
1922
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
2023
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
2124
import io.trino.plugin.hive.HiveTransactionHandle;
@@ -29,9 +32,11 @@
2932
import io.trino.spi.connector.Constraint;
3033
import io.trino.spi.connector.DynamicFilter;
3134
import io.trino.spi.connector.FixedSplitSource;
35+
import io.trino.spi.function.SchemaFunctionName;
3236
import io.trino.spi.predicate.Domain;
3337
import io.trino.spi.predicate.NullableValue;
3438
import io.trino.spi.predicate.TupleDomain;
39+
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
3540
import io.trino.spi.type.TypeManager;
3641

3742
import javax.inject.Inject;
@@ -57,6 +62,7 @@
5762
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
5863
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize;
5964
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
65+
import static io.trino.plugin.deltalake.functions.tablechanges.DeltaLakeTableChangesFunction.TABLE_CHANGES_NAME;
6066
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
6167
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
6268
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -74,13 +80,15 @@ public class DeltaLakeSplitManager
7480
private final int maxSplitsPerSecond;
7581
private final int maxOutstandingSplits;
7682
private final double minimumAssignedSplitWeight;
83+
private final TrinoFileSystemFactory fileSystemFactory;
7784

7885
@Inject
7986
public DeltaLakeSplitManager(
8087
TypeManager typeManager,
8188
BiFunction<ConnectorSession, HiveTransactionHandle, DeltaLakeMetastore> metastoreProvider,
8289
ExecutorService executor,
83-
DeltaLakeConfig config)
90+
DeltaLakeConfig config,
91+
TrinoFileSystemFactory fileSystemFactory)
8492
{
8593
this.typeManager = requireNonNull(typeManager, "typeManager is null");
8694
this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null");
@@ -89,6 +97,7 @@ public DeltaLakeSplitManager(
8997
this.maxSplitsPerSecond = config.getMaxSplitsPerSecond();
9098
this.maxOutstandingSplits = config.getMaxOutstandingSplits();
9199
this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight();
100+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystem is null");
92101
}
93102

94103
@Override
@@ -120,6 +129,15 @@ public ConnectorSplitSource getSplits(
120129
return new ClassLoaderSafeConnectorSplitSource(splitSource, DeltaLakeSplitManager.class.getClassLoader());
121130
}
122131

132+
@Override
133+
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, SchemaFunctionName name, ConnectorTableFunctionHandle function)
134+
{
135+
if (name.equals(TABLE_CHANGES_NAME)) {
136+
return new TableChangesSplitSource(session, getMetastore(session, transaction), fileSystemFactory, (TableChangesFunctionTableHandle) function);
137+
}
138+
return ConnectorSplitManager.super.getSplits(transaction, session, name, function);
139+
}
140+
123141
private Stream<DeltaLakeSplit> getSplits(
124142
ConnectorTransactionHandle transaction,
125143
DeltaLakeTableHandle tableHandle,
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
17+
import io.trino.plugin.deltalake.functions.tablechanges.DeltaLakeTableChangesFunction;
18+
import io.trino.spi.ptf.ConnectorTableFunction;
19+
import io.trino.spi.type.TypeManager;
20+
21+
import javax.inject.Inject;
22+
import javax.inject.Provider;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class DeltaLakeTableFunctionsProvider
27+
implements Provider<ConnectorTableFunction>
28+
{
29+
private final DeltaLakeMetadataFactory deltaLakeMetadataFactory;
30+
private final TypeManager typeManager;
31+
32+
@Inject
33+
public DeltaLakeTableFunctionsProvider(DeltaLakeMetadataFactory deltaLakeMetadataFactory, TypeManager typeManager)
34+
{
35+
this.deltaLakeMetadataFactory = requireNonNull(deltaLakeMetadataFactory, "deltaLakeMetadataFactory is null");
36+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
37+
}
38+
39+
@Override
40+
public ConnectorTableFunction get()
41+
{
42+
return new ClassLoaderSafeConnectorTableFunction(
43+
new DeltaLakeTableChangesFunction(typeManager, deltaLakeMetadataFactory),
44+
getClass().getClassLoader());
45+
}
46+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import io.trino.spi.connector.ConnectorSplitManager;
5454
import io.trino.spi.connector.TableProcedureMetadata;
5555
import io.trino.spi.eventlistener.EventListener;
56+
import io.trino.spi.function.FunctionProvider;
5657
import io.trino.spi.procedure.Procedure;
58+
import io.trino.spi.ptf.ConnectorTableFunction;
5759
import io.trino.spi.type.TypeManager;
5860
import org.weakref.jmx.guice.MBeanModule;
5961

@@ -134,6 +136,9 @@ public static Connector createConnector(
134136
Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {}));
135137
Set<TableProcedureMetadata> tableProcedures = injector.getInstance(Key.get(new TypeLiteral<Set<TableProcedureMetadata>>() {}));
136138

139+
Set<ConnectorTableFunction> connectorTableFunctions = injector.getInstance(Key.get(new TypeLiteral<Set<ConnectorTableFunction>>() {}));
140+
FunctionProvider functionProvider = injector.getInstance(FunctionProvider.class);
141+
137142
return new DeltaLakeConnector(
138143
lifeCycleManager,
139144
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
@@ -149,7 +154,9 @@ public static Connector createConnector(
149154
deltaLakeAnalyzeProperties.getAnalyzeProperties(),
150155
deltaAccessControl,
151156
eventListeners,
152-
transactionManager);
157+
transactionManager,
158+
connectorTableFunctions,
159+
functionProvider);
153160
}
154161
}
155162
}

0 commit comments

Comments
 (0)