Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,13 @@ private List<DeltaLakeSplit> splitsForFile(

public static Location buildSplitPath(Location tableLocation, AddFileEntry addAction)
{
// paths are relative to the table location and are RFC 2396 URIs
// paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
URI uri = URI.create(addAction.getPath());

if (uri.isAbsolute()) {
return Location.of(uri.getScheme() + ":" + uri.getSchemeSpecificPart());
}
return tableLocation.appendPath(uri.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
if (!containsRemoveEntry) {
for (DeltaLakeTransactionLogEntry entry : entries) {
if (entry.getAdd() != null && entry.getAdd().isDataChange()) {
// paths can be absolute as well in case of shallow-cloned tables
AddFileEntry addEntry = entry.getAdd();
splits.add(mapToDeltaLakeTableChangesSplit(
commitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ private void doVacuum(
alwaysFalse())) {
retainedPaths = Stream.concat(
activeAddEntries
// paths can be absolute as well in case of shallow-cloned tables, and they shouldn't be deleted as part of vacuum because according to
// delta-protocol absolute paths are inherited from base table and the vacuum procedure should only list and delete local file references
.map(AddFileEntry::getPath),
transactionLogAccess.getJsonEntries(
fileSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ else if (stats.isPresent()) {
this.parsedStats = resultParsedStats;
}

/**
* @see <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file">Delta Lake protocol</a>
*/
@JsonProperty
public String getPath()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,45 @@ public class TestDeltaLakeSplitManager
0);
private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true);

@Test
public void testAbsolutePathSplits()
throws Exception
{
testAbsolutePathSplits("file://path/to/file", "file://path/to/file");
testAbsolutePathSplits("abfs://[email protected]/path/to/file", "abfs://[email protected]/path/to/file");
testAbsolutePathSplits("hdfs://path/to/file", "hdfs://path/to/file");
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file", "s3://my-s3-bucket/path/to//file");
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file/", "s3://my-s3-bucket/path/to//file/");
testAbsolutePathSplits("gs://my-gcp-bucket/path/to/file", "gs://my-gcp-bucket/path/to/file");
testAbsolutePathSplits("abfs://[email protected]/+ab+/a%25/a%2525/path/to/file", "abfs://[email protected]/+ab+/a%/a%25/path/to/file");
}

private void testAbsolutePathSplits(String absoluteRawEncodedFilePath, String absoluteDecodedParsedFilePath)
throws Exception
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(absoluteRawEncodedFilePath, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxSplitSize(DataSize.ofBytes(5_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(absoluteDecodedParsedFilePath, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}

@Test
public void testSplitSizes()
throws ExecutionException, InterruptedException
{
long fileSize = 50_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxSplitSize(DataSize.ofBytes(20_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
Expand All @@ -115,9 +148,9 @@ public void testSplitSizes()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(40_000, 10_000, fileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 20_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 40_000, 10_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}
Expand All @@ -128,7 +161,7 @@ public void testSplitsFromMultipleFiles()
{
long firstFileSize = 1_000;
long secondFileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, firstFileSize), addFileEntryOfSize(FILE_PATH, secondFileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxSplitSize(DataSize.ofBytes(10_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
Expand All @@ -137,9 +170,9 @@ public void testSplitsFromMultipleFiles()

List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(0, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 10_000, secondFileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 0, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 10_000, 10_000, secondFileSize, minimumAssignedSplitWeight));
assertThat(splits).isEqualTo(expected);
}

Expand Down Expand Up @@ -211,15 +244,15 @@ public Stream<AddFileEntry> getActiveFiles(
new DefaultCachingHostAddressProvider());
}

private AddFileEntry addFileEntryOfSize(long fileSize)
private AddFileEntry addFileEntryOfSize(String path, long fileSize)
{
return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
}

private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
{
SplitWeight splitWeight = SplitWeight.fromProportion(clamp((double) fileSize / splitSize, minimumAssignedSplitWeight, 1.0));
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
}

private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)
Expand Down
Loading