Skip to content

Commit 80c52ad

Browse files
committed
Add table_changes function for delta lake
table_changes allows reading cdf entries stream. since_version is optional and exclusive. If since_version is not provided entire history of the table will be read.
1 parent a703569 commit 80c52ad

25 files changed

+1554
-8
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.base.util;
15+
16+
import com.google.errorprone.annotations.FormatMethod;
17+
import io.trino.spi.TrinoException;
18+
19+
import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
20+
import static java.lang.String.format;
21+
22+
public final class Functions
23+
{
24+
private Functions() {}
25+
26+
@FormatMethod
27+
public static void checkFunctionArgument(boolean condition, String message, Object... args)
28+
{
29+
if (!condition) {
30+
throw new TrinoException(INVALID_FUNCTION_ARGUMENT, format(message, args));
31+
}
32+
}
33+
}

plugin/trino-delta-lake/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,12 @@
196196
<scope>runtime</scope>
197197
</dependency>
198198

199+
<dependency>
200+
<groupId>io.trino</groupId>
201+
<artifactId>trino-memory-context</artifactId>
202+
<scope>runtime</scope>
203+
</dependency>
204+
199205
<dependency>
200206
<groupId>io.airlift</groupId>
201207
<artifactId>log-manager</artifactId>

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ public boolean isBaseColumn()
185185
return projectionInfo.isEmpty();
186186
}
187187

188+
@JsonIgnore
189+
public Type getType()
190+
{
191+
return projectionInfo.map(DeltaLakeColumnProjectionInfo::getType)
192+
.orElse(baseType);
193+
}
194+
188195
@Override
189196
public int hashCode()
190197
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import io.trino.spi.connector.SystemTable;
3333
import io.trino.spi.connector.TableProcedureMetadata;
3434
import io.trino.spi.eventlistener.EventListener;
35+
import io.trino.spi.function.FunctionProvider;
3536
import io.trino.spi.procedure.Procedure;
37+
import io.trino.spi.ptf.ConnectorTableFunction;
3638
import io.trino.spi.session.PropertyMetadata;
3739
import io.trino.spi.transaction.IsolationLevel;
3840

@@ -68,6 +70,8 @@ public class DeltaLakeConnector
6870
// Delta lake is not transactional but we use Trino transaction boundaries to create a per-query
6971
// caching Hive metastore clients. DeltaLakeTransactionManager is used to store those.
7072
private final DeltaLakeTransactionManager transactionManager;
73+
private final Set<ConnectorTableFunction> tableFunctions;
74+
private final FunctionProvider functionProvider;
7175

7276
public DeltaLakeConnector(
7377
LifeCycleManager lifeCycleManager,
@@ -84,7 +88,9 @@ public DeltaLakeConnector(
8488
List<PropertyMetadata<?>> analyzeProperties,
8589
Optional<ConnectorAccessControl> accessControl,
8690
Set<EventListener> eventListeners,
87-
DeltaLakeTransactionManager transactionManager)
91+
DeltaLakeTransactionManager transactionManager,
92+
Set<ConnectorTableFunction> tableFunctions,
93+
FunctionProvider functionProvider)
8894
{
8995
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
9096
this.splitManager = requireNonNull(splitManager, "splitManager is null");
@@ -104,6 +110,8 @@ public DeltaLakeConnector(
104110
this.accessControl = requireNonNull(accessControl, "accessControl is null");
105111
this.eventListeners = ImmutableSet.copyOf(requireNonNull(eventListeners, "eventListeners is null"));
106112
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
113+
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
114+
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
107115
}
108116

109117
@Override
@@ -223,4 +231,16 @@ public Set<ConnectorCapabilities> getCapabilities()
223231
{
224232
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT);
225233
}
234+
235+
@Override
236+
public Set<ConnectorTableFunction> getTableFunctions()
237+
{
238+
return tableFunctions;
239+
}
240+
241+
@Override
242+
public Optional<FunctionProvider> getFunctionProvider()
243+
{
244+
return Optional.of(functionProvider);
245+
}
226246
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
17+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle;
18+
import io.trino.spi.function.FunctionProvider;
19+
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
20+
import io.trino.spi.ptf.TableFunctionProcessorProvider;
21+
22+
import javax.inject.Inject;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class DeltaLakeFunctionProvider
27+
implements FunctionProvider
28+
{
29+
private final TableChangesProcessorProvider tableChangesProcessorProvider;
30+
31+
@Inject
32+
public DeltaLakeFunctionProvider(TableChangesProcessorProvider tableChangesProcessorProvider)
33+
{
34+
this.tableChangesProcessorProvider = requireNonNull(tableChangesProcessorProvider, "tableChangesProcessorProvider is null");
35+
}
36+
37+
@Override
38+
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle)
39+
{
40+
if (functionHandle instanceof TableChangesTableFunctionHandle) {
41+
return tableChangesProcessorProvider;
42+
}
43+
throw new UnsupportedOperationException("Unsupported function: " + functionHandle);
44+
}
45+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ public class DeltaLakeMetadata
288288
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
289289
public static final String INSERT_OPERATION = "WRITE";
290290
public static final String MERGE_OPERATION = "MERGE";
291+
public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark
292+
public static final String DELETE_OPERATION = "DELETE"; // used by old Trino versions and Spark
291293
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
292294
public static final String SET_TBLPROPERTIES_OPERATION = "SET TBLPROPERTIES";
293295
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.trino.plugin.base.CatalogName;
2424
import io.trino.plugin.base.security.ConnectorAccessControlModule;
2525
import io.trino.plugin.base.session.SessionPropertiesProvider;
26+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider;
27+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
2628
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
2729
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
2830
import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure;
@@ -57,7 +59,9 @@
5759
import io.trino.spi.connector.ConnectorPageSourceProvider;
5860
import io.trino.spi.connector.ConnectorSplitManager;
5961
import io.trino.spi.connector.TableProcedureMetadata;
62+
import io.trino.spi.function.FunctionProvider;
6063
import io.trino.spi.procedure.Procedure;
64+
import io.trino.spi.ptf.ConnectorTableFunction;
6165

6266
import javax.inject.Singleton;
6367

@@ -141,6 +145,10 @@ public void setup(Binder binder)
141145

142146
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
143147
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
148+
149+
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
150+
binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
151+
binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON);
144152
}
145153

146154
@Singleton

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.units.DataSize;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
1819
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
20+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesSplitSource;
21+
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle;
1922
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
2023
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
2124
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
@@ -33,6 +36,7 @@
3336
import io.trino.spi.predicate.Domain;
3437
import io.trino.spi.predicate.NullableValue;
3538
import io.trino.spi.predicate.TupleDomain;
39+
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
3640
import io.trino.spi.type.TypeManager;
3741

3842
import javax.inject.Inject;
@@ -76,13 +80,15 @@ public class DeltaLakeSplitManager
7680
private final int maxSplitsPerSecond;
7781
private final int maxOutstandingSplits;
7882
private final double minimumAssignedSplitWeight;
83+
private final TrinoFileSystemFactory fileSystemFactory;
7984

8085
@Inject
8186
public DeltaLakeSplitManager(
8287
TypeManager typeManager,
8388
TransactionLogAccess transactionLogAccess,
8489
ExecutorService executor,
85-
DeltaLakeConfig config)
90+
DeltaLakeConfig config,
91+
TrinoFileSystemFactory fileSystemFactory)
8692
{
8793
this.typeManager = requireNonNull(typeManager, "typeManager is null");
8894
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
@@ -91,6 +97,7 @@ public DeltaLakeSplitManager(
9197
this.maxSplitsPerSecond = config.getMaxSplitsPerSecond();
9298
this.maxOutstandingSplits = config.getMaxOutstandingSplits();
9399
this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight();
100+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
94101
}
95102

96103
@Override
@@ -122,6 +129,15 @@ public ConnectorSplitSource getSplits(
122129
return new ClassLoaderSafeConnectorSplitSource(splitSource, DeltaLakeSplitManager.class.getClassLoader());
123130
}
124131

132+
@Override
133+
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
134+
{
135+
if (function instanceof TableChangesTableFunctionHandle tableFunctionHandle) {
136+
return new TableChangesSplitSource(session, fileSystemFactory, tableFunctionHandle);
137+
}
138+
throw new UnsupportedOperationException("Unrecognized function: " + function);
139+
}
140+
125141
private Stream<DeltaLakeSplit> getSplits(
126142
DeltaLakeTableHandle tableHandle,
127143
ConnectorSession session,

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
import io.trino.spi.connector.ConnectorSplitManager;
5757
import io.trino.spi.connector.TableProcedureMetadata;
5858
import io.trino.spi.eventlistener.EventListener;
59+
import io.trino.spi.function.FunctionProvider;
5960
import io.trino.spi.procedure.Procedure;
61+
import io.trino.spi.ptf.ConnectorTableFunction;
6062
import io.trino.spi.type.TypeManager;
6163
import org.weakref.jmx.guice.MBeanModule;
6264

@@ -142,6 +144,9 @@ public static Connector createConnector(
142144
Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {}));
143145
Set<TableProcedureMetadata> tableProcedures = injector.getInstance(Key.get(new TypeLiteral<Set<TableProcedureMetadata>>() {}));
144146

147+
Set<ConnectorTableFunction> connectorTableFunctions = injector.getInstance(Key.get(new TypeLiteral<Set<ConnectorTableFunction>>() {}));
148+
FunctionProvider functionProvider = injector.getInstance(FunctionProvider.class);
149+
145150
return new DeltaLakeConnector(
146151
lifeCycleManager,
147152
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
@@ -157,7 +162,9 @@ public static Connector createConnector(
157162
deltaLakeAnalyzeProperties.getAnalyzeProperties(),
158163
deltaAccessControl,
159164
eventListeners,
160-
transactionManager);
165+
transactionManager,
166+
connectorTableFunctions,
167+
functionProvider);
161168
}
162169
}
163170
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake.functions.tablechanges;
15+
16+
public enum TableChangesFileType
17+
{
18+
DATA_FILE,
19+
CDF_FILE,
20+
}

0 commit comments

Comments
 (0)