fix(connector): Delta Lake checkpoint file reading by upgrading Delta Kernel API and updating Snapshot method calls#27434
Merged
agrawalreetika merged 1 commit intoprestodb:masterfrom Mar 31, 2026
Conversation
Contributor
Reviewer's guide (collapsed on small PRs)Reviewer's GuideUpgrades the Presto Delta connector to Delta Kernel API 4.0.0 and updates snapshot usage to the new API, ensuring snapshots (including from checkpoint files) are read correctly without requiring an Engine parameter for snapshot accessors. Sequence diagram for listing Delta Lake files with updated snapshot APIsequenceDiagram
actor User
participant PrestoCoordinator
participant DeltaMetadata
participant DeltaClient
participant DeltaTable
participant Engine
participant Snapshot
participant ScanBuilder
User->>PrestoCoordinator: Submit SELECT query
PrestoCoordinator->>DeltaMetadata: getTableHandle
DeltaMetadata->>DeltaClient: getTable
DeltaClient->>Engine: get
DeltaClient->>DeltaTable: getLatestSnapshot(Engine)
DeltaTable-->>DeltaClient: Snapshot
DeltaClient->>Snapshot: getVersion()
Snapshot-->>DeltaClient: version
DeltaClient-->>DeltaMetadata: DeltaTable (locked to version)
PrestoCoordinator->>DeltaMetadata: listTableFiles
DeltaMetadata->>DeltaClient: listFiles(session, deltaTable)
DeltaClient->>Engine: get
DeltaClient->>DeltaTable: getSnapshotAsOfVersion(Engine, snapshotId)
DeltaTable-->>DeltaClient: Snapshot
DeltaClient->>Snapshot: getScanBuilder()
Snapshot-->>DeltaClient: ScanBuilder
DeltaClient->>ScanBuilder: build()
ScanBuilder-->>DeltaClient: Scan
DeltaClient->>Scan: getScanFiles(Engine)
Scan-->>DeltaClient: CloseableIterator<FilteredColumnarBatch>
DeltaClient-->>DeltaMetadata: iterator of files
DeltaMetadata-->>PrestoCoordinator: table handle with files
PrestoCoordinator-->>User: query results
Sequence diagram for inferring Delta table schema using snapshot scan buildersequenceDiagram
participant DeltaClient
participant DeltaConfig
participant Snapshot
participant ScanBuilder
participant Engine
participant ColumnarBatchIterator as CloseableIterator
DeltaClient->>Snapshot: getScanBuilder()
Snapshot-->>DeltaClient: ScanBuilder
DeltaClient->>ScanBuilder: build()
ScanBuilder-->>DeltaClient: Scan
DeltaClient->>Scan: getScanFiles(Engine)
Scan-->>DeltaClient: CloseableIterator<FilteredColumnarBatch>
DeltaClient->>ColumnarBatchIterator: iterate hasNext()/next()
ColumnarBatchIterator-->>DeltaClient: Row with partition values
DeltaClient->>Snapshot: getSchema()
Snapshot-->>DeltaClient: StructType schema
DeltaClient->>DeltaConfig: isCaseSensitivePartitionsEnabled()
DeltaConfig-->>DeltaClient: boolean
DeltaClient-->>DeltaClient: build List<DeltaColumn> from schema fields and partition values
Updated class diagram for DeltaClient snapshot interactionsclassDiagram
class DeltaClient {
+Optional~DeltaTable~ getTable(DeltaConfig config, SchemaTableName schemaTableName)
+CloseableIterator~FilteredColumnarBatch~ listFiles(ConnectorSession session, DeltaTable deltaTable)
-static Snapshot getSnapshotAsOfTimestamp(Table deltaTable, Engine deltaEngine, long timestampMillis)
-static List~DeltaColumn~ getSchema(DeltaConfig config, SchemaTableName tableName, Engine deltaEngine, Snapshot snapshot)
}
class DeltaTable {
+Optional~Long~ getSnapshotId()
+Snapshot getLatestSnapshot(Engine deltaEngine)
+Snapshot getSnapshotAsOfVersion(Engine deltaEngine, long version)
}
class Snapshot {
+long getVersion()
+ScanBuilder getScanBuilder()
+StructType getSchema()
}
class Engine {
+static Optional~Engine~ get()
}
class ScanBuilder {
+Scan build()
}
class Scan {
+CloseableIterator~FilteredColumnarBatch~ getScanFiles(Engine deltaEngine)
}
class DeltaColumn {
+String name
+String type
+boolean partitionKey
}
class DeltaConfig {
+boolean isCaseSensitivePartitionsEnabled()
}
class SchemaTableName {
+String getSchemaName()
+String getTableName()
}
class Table {
+Snapshot getSnapshot(Engine deltaEngine)
}
DeltaClient --> DeltaTable : uses
DeltaClient --> Snapshot : uses
DeltaClient --> Engine : uses
DeltaClient --> ScanBuilder : uses
DeltaClient --> DeltaConfig : uses
DeltaClient --> SchemaTableName : uses
DeltaTable --> Snapshot : returns
Snapshot --> ScanBuilder : returns
ScanBuilder --> Scan : builds
Scan --> FilteredColumnarBatch : iterates
Snapshot --> StructType : returns schema
DeltaClient --> DeltaColumn : constructs
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
5cf31b2 to
bff33af
Compare
Contributor
Author
Thank you @agrawalreetika for the review and approval! |
Contributor
|
@agrawalreetika imported this issue as lakehouse/presto #27434 |
tdcmeehan
approved these changes
Mar 31, 2026
hantangwangd
approved these changes
Mar 31, 2026
Member
|
Thanks @imsayari404 |
This was referenced Mar 31, 2026
bibith4
pushed a commit
to bibith4/presto
that referenced
this pull request
Apr 1, 2026
… Kernel API and updating Snapshot method calls (prestodb#27434) ## Description A bug was identified in the Delta Lake connector's snapshot handling when a checkpoint file is present in the `_delta_log `directory. When Delta Lake performs checkpointing (periodically compacting the transaction log into a .checkpoint.parquet file), the Presto Delta connector was failing to correctly read the snapshot state from the checkpoint. This caused the scan to return no files/data, even though the table metadata was resolved successfully. ## Motivation and Context Resolved the below error: ``` java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2737) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3888) at org.apache.hadoop.fs.PrestoFileSystemCache.createFileSystem(PrestoFileSystemCache.java:149) at org.apache.hadoop.fs.PrestoFileSystemCache.getInternal(PrestoFileSystemCache.java:108) at org.apache.hadoop.fs.PrestoFileSystemCache.get(PrestoFileSystemCache.java:62) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:612) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366) at org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:346) at io.delta.kernel.defaults.internal.parquet.ParquetFileReader$1.initParquetReaderIfRequired(ParquetFileReader.java:143) at io.delta.kernel.defaults.internal.parquet.ParquetFileReader$1.hasNext(ParquetFileReader.java:73) at io.delta.kernel.defaults.engine.DefaultParquetHandler$1.hasNext(DefaultParquetHandler.java:71) at io.delta.kernel.defaults.engine.DefaultParquetHandler$1.hasNext(DefaultParquetHandler.java:82) at io.delta.kernel.internal.replay.ActionsIterator$1.hasNext(ActionsIterator.java:238) at io.delta.kernel.internal.replay.ActionsIterator$2.hasNext(ActionsIterator.java:373) at io.delta.kernel.internal.replay.ActionsIterator.tryEnsureNextActionsIterIsReady(ActionsIterator.java:154) at io.delta.kernel.internal.replay.ActionsIterator.hasNext(ActionsIterator.java:98) at io.delta.kernel.internal.replay.LogReplay.loadTableProtocolAndMetadata(LogReplay.java:212) at io.delta.kernel.internal.replay.LogReplay.<init>(LogReplay.java:134) at io.delta.kernel.internal.snapshot.SnapshotManager.createSnapshot(SnapshotManager.java:535) at io.delta.kernel.internal.snapshot.SnapshotManager.getCoordinatedCommitsAwareSnapshot(SnapshotManager.java:496) at io.delta.kernel.internal.snapshot.SnapshotManager.lambda$getSnapshotAtInit$14(SnapshotManager.java:485) at java.base/java.util.Optional.map(Optional.java:260) at io.delta.kernel.internal.snapshot.SnapshotManager.getSnapshotAtInit(SnapshotManager.java:485) at io.delta.kernel.internal.snapshot.SnapshotManager.buildLatestSnapshot(SnapshotManager.java:126) at io.delta.kernel.internal.TableImpl.getLatestSnapshot(TableImpl.java:93) at com.facebook.presto.delta.DeltaClient.getSnapshot(DeltaClient.java:122) at com.facebook.presto.delta.DeltaClient.getTable(DeltaClient.java:91) at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:221) at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:73) at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:234) at com.facebook.presto.metadata.MetadataUtil.lambda$getOptionalTableHandle$4(MetadataUtil.java:178) at java.base/java.util.Optional.orElseGet(Optional.java:364) at com.facebook.presto.metadata.MetadataUtil.getOptionalTableHandle(MetadataUtil.java:178) at com.facebook.presto.metadata.MetadataManager$1.getTableHandle(MetadataManager.java:1666) at com.facebook.presto.util.MetadataUtils.lambda$getTableColumnMetadata$2(MetadataUtils.java:83) at com.facebook.presto.common.RuntimeStats.recordWallTime(RuntimeStats.java:140) at com.facebook.presto.util.MetadataUtils.getTableColumnMetadata(MetadataUtils.java:81) at com.facebook.presto.util.MetadataUtils.getTableColumnsMetadata(MetadataUtils.java:54) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:2275) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:455) at com.facebook.presto.sql.tree.Table.accept(Table.java:60) at com.facebook.presto.sql.tree.AstVisitor.process(AstVisitor.java:27) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:477) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.analyzeFrom(StatementAnalyzer.java:4277) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:2955) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:455) at com.facebook.presto.sql.tree.QuerySpecification.accept(QuerySpecification.java:138) at com.facebook.presto.sql.tree.AstVisitor.process(AstVisitor.java:27) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:477) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:485) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:1509) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:455) at com.facebook.presto.sql.tree.Query.accept(Query.java:105) at com.facebook.presto.sql.tree.AstVisitor.process(AstVisitor.java:27) at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:477) at com.facebook.presto.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:427) at com.facebook.presto.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:447) at com.facebook.presto.sql.analyzer.Analyzer.analyzeSemantic(Analyzer.java:141) at com.facebook.presto.sql.analyzer.BuiltInQueryAnalyzer.analyze(BuiltInQueryAnalyzer.java:134) at com.facebook.presto.execution.SqlQueryExecution.lambda$new$0(SqlQueryExecution.java:220) at com.facebook.presto.common.RuntimeStats.recordWallAndCpuTime(RuntimeStats.java:158) at com.facebook.presto.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:220) at com.facebook.presto.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:112) at com.facebook.presto.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:1022) at com.facebook.presto.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:171) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2641) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2735) ... 70 more ``` ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan <!---Please fill in how you tested your change--> ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. - [ ] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
15 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
A bug was identified in the Delta Lake connector's snapshot handling when a checkpoint file is present in the
_delta_logdirectory.When Delta Lake performs checkpointing (periodically compacting the transaction log into a .checkpoint.parquet file), the Presto Delta connector was failing to correctly read the snapshot state from the checkpoint. This caused the scan to return no files/data, even though the table metadata was resolved successfully.
Motivation and Context
Resolved the below error:
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.