Skip to content

Commit cadf38e

Browse files
vinay-klebyhr
authored andcommitted
Add handling for absolute paths in the Delta transaction log
1 parent a0bcfb8 commit cadf38e

File tree

7 files changed

+467
-16
lines changed

7 files changed

+467
-16
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,13 @@ private List<DeltaLakeSplit> splitsForFile(
358358

359359
public static Location buildSplitPath(Location tableLocation, AddFileEntry addAction)
360360
{
361-
// paths are relative to the table location and are RFC 2396 URIs
361+
// paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs
362362
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
363363
URI uri = URI.create(addAction.getPath());
364+
365+
if (uri.isAbsolute()) {
366+
return Location.of(uri.getScheme() + ":" + uri.getSchemeSpecificPart());
367+
}
364368
return tableLocation.appendPath(uri.getPath());
365369
}
366370
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
112112
if (!containsRemoveEntry) {
113113
for (DeltaLakeTransactionLogEntry entry : entries) {
114114
if (entry.getAdd() != null && entry.getAdd().isDataChange()) {
115+
// paths can be absolute as well in case of shallow-cloned tables
115116
AddFileEntry addEntry = entry.getAdd();
116117
splits.add(mapToDeltaLakeTableChangesSplit(
117118
commitInfo,

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ private void doVacuum(
210210
alwaysFalse())) {
211211
retainedPaths = Stream.concat(
212212
activeAddEntries
213+
// paths can be absolute as well in case of shallow-cloned tables, and they shouldn't be deleted as part of vacuum because according to
214+
// delta-protocol absolute paths are inherited from base table and the vacuum procedure should only list and delete local file references
213215
.map(AddFileEntry::getPath),
214216
transactionLogAccess.getJsonEntries(
215217
fileSystem,

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ else if (stats.isPresent()) {
114114
this.parsedStats = resultParsedStats;
115115
}
116116

117+
/**
118+
* @see <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file">Delta Lake protocol</a>
119+
*/
117120
@JsonProperty
118121
public String getPath()
119122
{

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,45 @@ public class TestDeltaLakeSplitManager
101101
0);
102102
private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true);
103103

104+
@Test
105+
public void testAbsolutePathSplits()
106+
throws Exception
107+
{
108+
testAbsolutePathSplits("file://path/to/file", "file://path/to/file");
109+
testAbsolutePathSplits("abfs://[email protected]/path/to/file", "abfs://[email protected]/path/to/file");
110+
testAbsolutePathSplits("hdfs://path/to/file", "hdfs://path/to/file");
111+
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file", "s3://my-s3-bucket/path/to//file");
112+
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file/", "s3://my-s3-bucket/path/to//file/");
113+
testAbsolutePathSplits("gs://my-gcp-bucket/path/to/file", "gs://my-gcp-bucket/path/to/file");
114+
testAbsolutePathSplits("abfs://[email protected]/+ab+/a%25/a%2525/path/to/file", "abfs://[email protected]/+ab+/a%/a%25/path/to/file");
115+
}
116+
117+
private void testAbsolutePathSplits(String absoluteRawEncodedFilePath, String absoluteDecodedParsedFilePath)
118+
throws Exception
119+
{
120+
long fileSize = 20_000;
121+
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(absoluteRawEncodedFilePath, fileSize));
122+
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
123+
.setMaxSplitSize(DataSize.ofBytes(5_000));
124+
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
125+
126+
DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
127+
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
128+
List<DeltaLakeSplit> expected = ImmutableList.of(
129+
makeSplit(absoluteDecodedParsedFilePath, 0, 5_000, fileSize, minimumAssignedSplitWeight),
130+
makeSplit(absoluteDecodedParsedFilePath, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
131+
makeSplit(absoluteDecodedParsedFilePath, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
132+
makeSplit(absoluteDecodedParsedFilePath, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));
133+
134+
assertThat(splits).isEqualTo(expected);
135+
}
136+
104137
@Test
105138
public void testSplitSizes()
106139
throws ExecutionException, InterruptedException
107140
{
108141
long fileSize = 50_000;
109-
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
142+
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
110143
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
111144
.setMaxSplitSize(DataSize.ofBytes(20_000));
112145
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
@@ -115,9 +148,9 @@ public void testSplitSizes()
115148
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
116149

117150
List<DeltaLakeSplit> expected = ImmutableList.of(
118-
makeSplit(0, 20_000, fileSize, minimumAssignedSplitWeight),
119-
makeSplit(20_000, 20_000, fileSize, minimumAssignedSplitWeight),
120-
makeSplit(40_000, 10_000, fileSize, minimumAssignedSplitWeight));
151+
makeSplit(FULL_PATH, 0, 20_000, fileSize, minimumAssignedSplitWeight),
152+
makeSplit(FULL_PATH, 20_000, 20_000, fileSize, minimumAssignedSplitWeight),
153+
makeSplit(FULL_PATH, 40_000, 10_000, fileSize, minimumAssignedSplitWeight));
121154

122155
assertThat(splits).isEqualTo(expected);
123156
}
@@ -128,7 +161,7 @@ public void testSplitsFromMultipleFiles()
128161
{
129162
long firstFileSize = 1_000;
130163
long secondFileSize = 20_000;
131-
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
164+
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, firstFileSize), addFileEntryOfSize(FILE_PATH, secondFileSize));
132165
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
133166
.setMaxSplitSize(DataSize.ofBytes(10_000));
134167
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
@@ -137,9 +170,9 @@ public void testSplitsFromMultipleFiles()
137170

138171
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
139172
List<DeltaLakeSplit> expected = ImmutableList.of(
140-
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
141-
makeSplit(0, 10_000, secondFileSize, minimumAssignedSplitWeight),
142-
makeSplit(10_000, 10_000, secondFileSize, minimumAssignedSplitWeight));
173+
makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight),
174+
makeSplit(FULL_PATH, 0, 10_000, secondFileSize, minimumAssignedSplitWeight),
175+
makeSplit(FULL_PATH, 10_000, 10_000, secondFileSize, minimumAssignedSplitWeight));
143176
assertThat(splits).isEqualTo(expected);
144177
}
145178

@@ -211,15 +244,15 @@ public Stream<AddFileEntry> getActiveFiles(
211244
new DefaultCachingHostAddressProvider());
212245
}
213246

214-
private AddFileEntry addFileEntryOfSize(long fileSize)
247+
private AddFileEntry addFileEntryOfSize(String path, long fileSize)
215248
{
216-
return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
249+
return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
217250
}
218251

219-
private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
252+
private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
220253
{
221254
SplitWeight splitWeight = SplitWeight.fromProportion(clamp((double) fileSize / splitSize, minimumAssignedSplitWeight, 1.0));
222-
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
255+
return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
223256
}
224257

225258
private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)

0 commit comments

Comments
 (0)