Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -25,7 +27,8 @@ public record DataFileInfo(
long creationTime,
io.trino.plugin.deltalake.DataFileInfo.DataFileType dataFileType,
List<String> partitionValues,
DeltaLakeJsonFileStatistics statistics)
DeltaLakeJsonFileStatistics statistics,
Optional<DeletionVectorEntry> deletionVector)
{
public enum DataFileType
{
Expand All @@ -37,5 +40,6 @@ public enum DataFileType
{
requireNonNull(dataFileType, "dataFileType is null");
requireNonNull(statistics, "statistics is null");
requireNonNull(deletionVector, "deletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
Expand All @@ -22,12 +23,19 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.deltalake.delete.RoaringBitmapArray;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
Expand All @@ -42,11 +50,10 @@
import jakarta.annotation.Nullable;
import org.apache.parquet.format.CompressionCodec;
import org.joda.time.DateTimeZone;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -66,12 +73,17 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.relativePath;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getCompressionCodec;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageValueCount;
import static io.trino.plugin.deltalake.DeltaLakeTypes.toParquetType;
import static io.trino.plugin.deltalake.DeltaLakeWriter.readStatistics;
import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors;
import static io.trino.plugin.deltalake.delete.DeletionVectors.toFileName;
import static io.trino.plugin.deltalake.delete.DeletionVectors.writeDeletionVectors;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
Expand Down Expand Up @@ -113,6 +125,10 @@ public class DeltaLakeMergeSink
private final int[] dataColumnsIndices;
private final int[] dataAndRowIdColumnsIndices;
private final DeltaLakeParquetSchemaMapping parquetSchemaMapping;
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final ParquetReaderOptions parquetReaderOptions;
private final boolean deletionVectorEnabled;
private final Map<String, DeletionVectorEntry> deletionVectors;

@Nullable
private DeltaLakeCdfPageSink cdfPageSink;
Expand All @@ -132,7 +148,11 @@ public DeltaLakeMergeSink(
int domainCompactionThreshold,
Supplier<DeltaLakeCdfPageSink> cdfPageSinkSupplier,
boolean cdfEnabled,
DeltaLakeParquetSchemaMapping parquetSchemaMapping)
DeltaLakeParquetSchemaMapping parquetSchemaMapping,
ParquetReaderOptions parquetReaderOptions,
FileFormatDataSourceStats fileFormatDataSourceStats,
boolean deletionVectorEnabled,
Map<String, DeletionVectorEntry> deletionVectors)
{
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.session = requireNonNull(session, "session is null");
Expand All @@ -156,6 +176,10 @@ public DeltaLakeMergeSink(
this.cdfPageSinkSupplier = requireNonNull(cdfPageSinkSupplier);
this.cdfEnabled = cdfEnabled;
this.parquetSchemaMapping = requireNonNull(parquetSchemaMapping, "parquetSchemaMapping is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
this.deletionVectorEnabled = deletionVectorEnabled;
this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
dataColumnsIndices = new int[tableColumnCount];
dataAndRowIdColumnsIndices = new int[tableColumnCount + 1];
for (int i = 0; i < tableColumnCount; i++) {
Expand Down Expand Up @@ -213,10 +237,10 @@ private void processDeletion(Page deletions, String cdfOperation)
FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, _ -> new FileDeletion(partitionValues));

if (cdfOperation.equals(UPDATE_PREIMAGE_CDF_LABEL)) {
deletion.rowsDeletedByUpdate().addLong(rowPosition);
deletion.rowsDeletedByUpdate().add(rowPosition);
}
else {
deletion.rowsDeletedByDelete().addLong(rowPosition);
deletion.rowsDeletedByDelete().add(rowPosition);
}
}
}
Expand Down Expand Up @@ -308,8 +332,14 @@ public CompletableFuture<Collection<Slice>> finish()
.map(Slices::wrappedBuffer)
.forEach(fragments::add);

fileDeletions.forEach((path, deletion) ->
fragments.addAll(rewriteFile(path.toStringUtf8(), deletion)));
fileDeletions.forEach((path, deletion) -> {
if (deletionVectorEnabled) {
fragments.add(writeMergeResult(path, deletion));
}
else {
fragments.addAll(rewriteFile(path.toStringUtf8(), deletion));
}
});

if (cdfEnabled && cdfPageSink != null) { // cdf may be enabled but there may be no update/deletion so sink was not instantiated
MoreFutures.getDone(cdfPageSink.finish()).stream()
Expand All @@ -324,6 +354,82 @@ public CompletableFuture<Collection<Slice>> finish()
return completedFuture(fragments);
}

private Slice writeMergeResult(Slice path, FileDeletion deletion)
{
RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8()));
deletedRows.or(deletion.rowsDeletedByDelete());
deletedRows.or(deletion.rowsDeletedByUpdate());
Comment on lines +359 to +361
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a discussion with @findepi about whether If the amount of rows deleted from file depasses a certain quota, we should consider to proactively rewrite the file.
Is there a potential follow-up here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a potential follow-up task. I don't expect we will handle it shortly though.


TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know size of deletion vector in advance from io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry#sizeInBytes or any other metadata ?
If we do, then using it in newInputFile would probably save a FS call.

Copy link
Copy Markdown
Member Author

@ebyhr ebyhr Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This inputFile is a Parquet file, not deletion vector. I think that's possible, but it requires some refactoring. Let me handle in a follow-up.

try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) {
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum();
RoaringBitmapArray rowsRetained = new RoaringBitmapArray();
rowsRetained.addRange(0, rowCount);
rowsRetained.andNot(deletedRows);
if (rowsRetained.isEmpty()) {
// No rows are retained in the file, so we don't need to write deletion vectors.
return onlySourceFile(path.toStringUtf8(), deletion);
}
return writeDeletionVector(path.toStringUtf8(), inputFile.length(), inputFile.lastModified(), deletedRows, deletion, parquetMetadata, rowCount);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we write a new deletion vector, is it mandatory as per the spec that it should be a union of all deleted/updated rows so far, or is that just how we've implemented it ?
Is cleanup of old deletion vectors something already handled by some optimize/vaccum procedure in Trino ?

Copy link
Copy Markdown
Member Author

@ebyhr ebyhr Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mandatory for compatiblity with Delta Lake. Otherwise, they return the wrong results if I remember correctly.

The cleanup should be handled in #22809

}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e);
}
}

private Slice writeDeletionVector(
String sourcePath,
long length,
Instant lastModified,
RoaringBitmapArray deletedRows,
FileDeletion deletion,
ParquetMetadata parquetMetadata,
long rowCount)
{
String tablePath = rootTableLocation.toString();
String sourceRelativePath = relativePath(tablePath, sourcePath);

DeletionVectorEntry deletionVectorEntry;
try {
deletionVectorEntry = writeDeletionVectors(fileSystem, rootTableLocation, deletedRows);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to write deletion vector file", e);
}

try {
DataFileInfo newFileInfo = new DataFileInfo(
sourceRelativePath,
length,
lastModified.toEpochMilli(),
DATA,
deletion.partitionValues,
readStatistics(parquetMetadata, dataColumns, rowCount),
Optional.of(deletionVectorEntry));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo));
return utf8Slice(mergeResultJsonCodec.toJson(result));
}
catch (Throwable e) {
try {
fileSystem.deleteFile(rootTableLocation.appendPath(toFileName(deletionVectorEntry.pathOrInlineDv())));
}
catch (IOException ex) {
if (!e.equals(ex)) {
e.addSuppressed(ex);
}
}
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to write deletion vector file", e);
}
}

private Slice onlySourceFile(String sourcePath, FileDeletion deletion)
{
String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty());
return utf8Slice(mergeResultJsonCodec.toJson(result));
}

// In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows.
private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)
{
Expand Down Expand Up @@ -395,11 +501,26 @@ private ParquetFileWriter createParquetFileWriter(Location path, List<DeltaLakeC
}
}

private RoaringBitmapArray loadDeletionVector(Location path)
{
String relativePath = relativePath(rootTableLocation.toString(), path.toString());
DeletionVectorEntry deletionVector = deletionVectors.get(relativePath);
if (deletionVector == null) {
return new RoaringBitmapArray();
}
try {
return readDeletionVectors(fileSystem, rootTableLocation, deletionVector);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading deletion vector file: " + path, e);
}
}

private Optional<DataFileInfo> rewriteParquetFile(Location path, FileDeletion deletion, DeltaLakeWriter fileWriter)
throws IOException
{
LongBitmapDataProvider rowsDeletedByDelete = deletion.rowsDeletedByDelete();
LongBitmapDataProvider rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) {
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Expand All @@ -410,8 +531,8 @@ private Optional<DataFileInfo> rewriteParquetFile(Location path, FileDeletion de

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.getLongCardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.getLongCardinality()];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
Expand Down Expand Up @@ -529,8 +650,8 @@ public void abort()
private static class FileDeletion
{
private final List<String> partitionValues;
private final LongBitmapDataProvider rowsDeletedByDelete = new Roaring64Bitmap();
private final LongBitmapDataProvider rowsDeletedByUpdate = new Roaring64Bitmap();
private final RoaringBitmapArray rowsDeletedByDelete = new RoaringBitmapArray();
private final RoaringBitmapArray rowsDeletedByUpdate = new RoaringBitmapArray();
Comment thread
ebyhr marked this conversation as resolved.
Outdated

private FileDeletion(List<String> partitionValues)
{
Expand All @@ -544,12 +665,12 @@ public List<String> partitionValues()
return partitionValues;
}

public LongBitmapDataProvider rowsDeletedByDelete()
public RoaringBitmapArray rowsDeletedByDelete()
{
return rowsDeletedByDelete;
}

public LongBitmapDataProvider rowsDeletedByUpdate()
public RoaringBitmapArray rowsDeletedByUpdate()
{
return rowsDeletedByUpdate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorTableHandle;

import java.util.Map;

import static java.util.Objects.requireNonNull;

public record DeltaLakeMergeTableHandle(
DeltaLakeTableHandle tableHandle,
DeltaLakeInsertTableHandle insertTableHandle)
DeltaLakeInsertTableHandle insertTableHandle,
Map<String, DeletionVectorEntry> deletionVectors)
implements ConnectorMergeTableHandle
{
public DeltaLakeMergeTableHandle
{
requireNonNull(tableHandle, "tableHandle is null");
requireNonNull(insertTableHandle, "insertTableHandle is null");
deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
}

@Override
Expand Down
Loading