Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.SortOrder;

Expand All @@ -33,7 +34,8 @@ public record CommitTaskData(
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets,
int sortOrderId,
Optional<byte[]> serializedDeletionVector)
Optional<byte[]> serializedDeletionVector,
Optional<RewriteInfo> rewriteInfo)
{
public CommitTaskData
{
Expand All @@ -47,5 +49,65 @@ public record CommitTaskData(
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
checkArgument(content == FileContent.DATA || sortOrderId == SortOrder.unsorted().orderId(), "Sorted order id can be present only for data files");
requireNonNull(serializedDeletionVector, "serializedDeletionVector is null");
requireNonNull(rewriteInfo, "rewriteInfo is null");
checkArgument(rewriteInfo.isEmpty() || content == FileContent.DATA, "rewriteInfo can only be present for DATA content, got %s", content);
}

public CommitTaskData(
String path,
IcebergFileFormat fileFormat,
long fileSizeInBytes,
MetricsWrapper metrics,
String partitionSpecJson,
Optional<String> partitionDataJson,
FileContent content,
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets,
int sortOrderId,
Optional<byte[]> serializedDeletionVector)
{
this(path, fileFormat, fileSizeInBytes, metrics, partitionSpecJson, partitionDataJson,
content, referencedDataFile, fileSplitOffsets, sortOrderId, serializedDeletionVector,
Optional.empty());
}

public record RewriteInfo(
String oldFilePath,
long oldFileSizeInBytes,
long oldRecordCount,
IcebergFileFormat oldFileFormat,
List<DanglingDeleteFile> danglingDeleteFiles)
{
public RewriteInfo
{
requireNonNull(oldFilePath, "oldFilePath is null");
requireNonNull(oldFileFormat, "oldFileFormat is null");
checkArgument(oldFileSizeInBytes >= 0, "oldFileSizeInBytes is negative");
checkArgument(oldRecordCount >= 0, "oldRecordCount is negative");
danglingDeleteFiles = ImmutableList.copyOf(requireNonNull(danglingDeleteFiles, "danglingDeleteFiles is null"));
}

public RewriteInfo(String oldFilePath, long oldFileSizeInBytes, long oldRecordCount, IcebergFileFormat oldFileFormat)
{
this(oldFilePath, oldFileSizeInBytes, oldRecordCount, oldFileFormat, ImmutableList.of());
}
}

public record DanglingDeleteFile(
String path,
long fileSizeInBytes,
long recordCount,
String partitionSpecJson,
Optional<String> partitionDataJson,
Long contentOffset,
Long contentSizeInBytes,
String referencedDataFile)
{
public DanglingDeleteFile
{
requireNonNull(path, "path is null");
requireNonNull(partitionSpecJson, "partitionSpecJson is null");
requireNonNull(partitionDataJson, "partitionDataJson is null");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.delete.DeletionVector;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.mapping.NameMapping;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;

/**
* Reads an Iceberg data file, filters out deleted positions, and writes the
* surviving rows to a new data file. Used by workers during copy-on-write
* DML operations to rewrite affected data files before the coordinator commits.
*
* <p>This class is stateless and thread-safe; a single instance is shared
* across all queries in the connector.
*/
public class CopyOnWriteFileRewriter
{
private static final Logger log = Logger.get(CopyOnWriteFileRewriter.class);
private final IcebergPageSourceProviderFactory pageSourceProviderFactory;
private final IcebergFileWriterFactory fileWriterFactory;
private final IcebergFileSystemFactory fileSystemFactory;
private final TypeManager typeManager;

@Inject
public CopyOnWriteFileRewriter(
IcebergPageSourceProviderFactory pageSourceProviderFactory,
IcebergFileWriterFactory fileWriterFactory,
IcebergFileSystemFactory fileSystemFactory,
TypeManager typeManager)
{
this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

public record RewriteMetrics(
long originalRecordCount,
long newRecordCount,
long deletedRowCount,
long originalFileSizeBytes,
long newFileSizeBytes,
long rewriteDurationMs) {}

public record RewriteResult(
Optional<DataFile> newFile,
Closeable rollbackAction,
RewriteMetrics metrics,
Metrics newFileMetrics) {}

/**
* Reads {@code originalPath}, filters rows marked in {@code deletionVector},
* and writes surviving rows to a new data file.
*
* @return a {@link RewriteResult} containing the new DataFile (empty if
* all rows were deleted), metrics, and a rollback action that
* deletes the newly written file on failure
*/
public RewriteResult rewriteFile(
ConnectorSession session,
String originalPath,
long originalFileSize,
DeletionVector deletionVector,
Schema schema,
PartitionSpec partitionSpec,
Optional<PartitionData> partitionData,
List<DeleteFile> preExistingDeletes,
long sourceDataSequenceNumber,
OptionalLong sourceFileFirstRowId,
IcebergFileFormat sourceFileFormat,
IcebergFileFormat writeFileFormat,
MetricsConfig metricsConfig,
Map<String, String> fileIoProperties,
Map<String, String> tableProperties,
LocationProvider locationProvider,
Optional<NameMapping> nameMapping)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{
long startTimeMs = System.currentTimeMillis();
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), requireNonNull(fileIoProperties, "fileIoProperties is null"));

String fileName = writeFileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID());
String newFilePath = partitionData
.map(partition -> locationProvider.newDataLocation(partitionSpec, partition, fileName))
.orElseGet(() -> locationProvider.newDataLocation(fileName));

IcebergFileWriter writer = fileWriterFactory.createDataFileWriter(
fileSystem, Location.of(newFilePath), schema, session,
writeFileFormat, metricsConfig, tableProperties);

long originalRecordCount = readFilterWrite(
session, writer, originalPath, originalFileSize, deletionVector,
schema, partitionSpec, partitionData,
preExistingDeletes, sourceDataSequenceNumber, sourceFileFirstRowId,
sourceFileFormat, fileIoProperties, nameMapping);

Closeable rollbackAction = writer.commit();
IcebergFileWriter.FileMetrics fileMetrics = writer.getFileMetrics();
Metrics fullMetrics = fileMetrics.metrics();
long newRecordCount = fullMetrics.recordCount();
long newFileSize = writer.getWrittenBytes();
Optional<DataFile> newFile = Optional.empty();
if (newRecordCount > 0) {
DataFiles.Builder newBuilder = DataFiles.builder(partitionSpec)
.withPath(newFilePath)
.withFormat(writeFileFormat.toIceberg())
.withFileSizeInBytes(newFileSize)
.withMetrics(fullMetrics);
fileMetrics.splitOffsets().ifPresent(newBuilder::withSplitOffsets);
partitionData.ifPresent(newBuilder::withPartition);
newFile = Optional.of(newBuilder.build());
}
else {
// All rows were deleted -- clean up the empty output file to avoid orphans
try {
rollbackAction.close();
}
catch (IOException e) {
log.warn(e, "Failed to delete empty CoW rewrite output file: %s", newFilePath);
}
rollbackAction = () -> {};
}

long durationMs = System.currentTimeMillis() - startTimeMs;
long deletedRowCount = originalRecordCount - newRecordCount;
RewriteMetrics metrics = new RewriteMetrics(
originalRecordCount, newRecordCount, deletedRowCount,
originalFileSize, newFileSize, durationMs);

log.debug("CoW rewrite: %s -> %s (%d rows -> %d rows, %d deleted, %d bytes -> %d bytes, %d ms)",
originalPath, newFile.isPresent() ? newFilePath : "<removed>",
originalRecordCount, newRecordCount, deletedRowCount,
originalFileSize, newFileSize, durationMs);

return new RewriteResult(newFile, rollbackAction, metrics, fullMetrics);
}

private long readFilterWrite(
ConnectorSession session,
IcebergFileWriter writer,
String originalPath,
long originalFileSize,
DeletionVector deletionVector,
Schema schema,
PartitionSpec partitionSpec,
Optional<PartitionData> partitionData,
List<DeleteFile> preExistingDeletes,
long sourceDataSequenceNumber,
OptionalLong sourceFileFirstRowId,
IcebergFileFormat fileFormat,
Map<String, String> fileIoProperties,
Optional<NameMapping> nameMapping)
{
List<IcebergColumnHandle> dataColumns = schema.columns().stream()
.map(column -> getColumnHandle(column, typeManager))
.collect(toImmutableList());
List<IcebergColumnHandle> readColumns = new ArrayList<>(dataColumns);
readColumns.add(getColumnHandle(MetadataColumns.ROW_POSITION, typeManager));

IcebergPageSourceProvider pageSourceProvider = pageSourceProviderFactory.createPageSourceProvider();

long originalRecordCount = 0;
try (ConnectorPageSource source = pageSourceProvider.createPageSource(
session, readColumns, schema, partitionSpec, partitionData.orElseGet(() -> new PartitionData(new Object[0])),
preExistingDeletes, DynamicFilter.EMPTY, TupleDomain.all(), TupleDomain.all(),
originalPath, 0, originalFileSize, originalFileSize, 0,
fileFormat, fileIoProperties, sourceDataSequenceNumber, sourceFileFirstRowId, nameMapping)) {
int dataColumnCount = dataColumns.size();
int[] dataChannels = new int[dataColumnCount];
for (int channel = 0; channel < dataColumnCount; channel++) {
dataChannels[channel] = channel;
}
while (!source.isFinished()) {
SourcePage sourcePage = source.getNextSourcePage();
if (sourcePage == null) {
continue;
}
Page page = sourcePage.getPage();
Page dataPage = page.getColumns(dataChannels);
int positionCount = dataPage.getPositionCount();
originalRecordCount += positionCount;
Block rowPositionBlock = page.getBlock(dataColumnCount);

int[] kept = new int[positionCount];
int keptCount = 0;
for (int i = 0; i < positionCount; i++) {
long rowPosition = BIGINT.getLong(rowPositionBlock, i);
if (!deletionVector.isRowDeleted(rowPosition)) {
kept[keptCount++] = i;
}
}

if (keptCount > 0) {
writer.appendRows(dataPage.getPositions(kept, 0, keptCount));
}
}
}
catch (Throwable t) {
try {
writer.rollback();
}
catch (Throwable rollbackFailure) {
t.addSuppressed(rollbackFailure);
}
if (t instanceof IOException ioException) {
throw new UncheckedIOException("Failed to rewrite data file: " + originalPath, ioException);
}
if (t instanceof RuntimeException runtimeException) {
throw runtimeException;
}
if (t instanceof Error error) {
throw error;
}
throw new RuntimeException("Failed to rewrite data file: " + originalPath, t);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
return originalRecordCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.iceberg;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForIcebergCopyOnWriteRewrite {}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class IcebergColumnHandle
public static final int DATA_CHANGE_ORDINAL_ID = Integer.MIN_VALUE + 6;

public static final int TRINO_MERGE_SOURCE_ROW_ID = Integer.MIN_VALUE + 7;
public static final int TRINO_MERGE_FILE_FORMAT = Integer.MIN_VALUE + 8;
public static final int TRINO_MERGE_FILE_SIZE = Integer.MIN_VALUE + 9;
public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 10;
public static final int TRINO_MERGE_DATA_SEQUENCE_NUMBER = Integer.MIN_VALUE + 11;
public static final int TRINO_MERGE_FILE_FIRST_ROW_ID = Integer.MIN_VALUE + 12;

private final ColumnIdentity baseColumnIdentity;
private final Type baseType;
Expand Down
Loading
Loading