Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions plugin/trino-iceberg/COPY_ON_WRITE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Trino Iceberg Connector - Copy-on-Write Support

## Overview

The Trino Iceberg connector now supports both **Copy-on-Write (CoW)** and **Merge-on-Read (MoR)** strategies for row-level operations (DELETE, UPDATE, and MERGE). This enables users to optimize performance based on their specific workload patterns.

| Mode | Best For | Advantages | Trade-offs |
|------|----------|------------|------------|
| **Copy-on-Write** | Read-heavy workloads, dimension tables | Fast reads (no merge overhead) | Slower writes (file rewriting) |
| **Merge-on-Read** | Write-heavy workloads, streaming data | Fast writes (small delete files) | Slower reads (merge at query time) |

## Quick Start

### Enable Copy-on-Write

```sql
-- Enable CoW for specific operations (table-level)
ALTER TABLE my_table SET PROPERTIES write_delete_mode = 'copy-on-write';
ALTER TABLE my_table SET PROPERTIES write_update_mode = 'copy-on-write';
ALTER TABLE my_table SET PROPERTIES write_merge_mode = 'copy-on-write';

-- Create table with CoW enabled
CREATE TABLE events (
id BIGINT,
event_time TIMESTAMP,
data VARCHAR
)
WITH (
format_version = 2, -- Required for row-level operations
write_delete_mode = 'copy-on-write'
);
```

### Usage Examples

```sql
-- Delete with CoW (rewrites affected files)
DELETE FROM events WHERE event_time < TIMESTAMP '2023-01-01';

-- Update with CoW (rewrites affected files with updated rows)
UPDATE events SET data = 'updated' WHERE id = 123;

-- Merge with CoW (rewrites affected files with merged data)
MERGE INTO events t
USING new_events s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET data = s.data
WHEN NOT MATCHED THEN INSERT (id, event_time, data) VALUES (s.id, s.event_time, s.data);
```

## Configuration

### Table Properties

| Property | Description | Default | Values |
|----------|-------------|---------|--------|
| `write_delete_mode` | Controls DELETE behavior | `merge-on-read` | `merge-on-read`, `copy-on-write` |
| `write_update_mode` | Controls UPDATE behavior | `merge-on-read` | `merge-on-read`, `copy-on-write` |
| `write_merge_mode` | Controls MERGE behavior | `merge-on-read` | `merge-on-read`, `copy-on-write` |

**Note:** Requires Iceberg format version 2 or higher.

## Performance Considerations

### When to Choose Copy-on-Write

- Read-heavy analytical workloads
- Dimension tables with infrequent updates
- When read performance is prioritized over write performance
- When storage cost is not a primary concern

### Performance Tuning

1. **Compact small files** before CoW operations:
```sql
CALL system.rewrite_data_files('schema.table')
```

2. **Use partitioning** to limit rewrite scope to affected partitions

3. **Batch operations** to minimize file rewrites:
```sql
-- Better: One batch update
UPDATE events SET data = 'updated' WHERE id IN (1, 2, 3, 4, 5);

-- Worse: Multiple small updates
UPDATE events SET data = 'updated' WHERE id = 1;
UPDATE events SET data = 'updated' WHERE id = 2;
-- etc.
```

4. **Monitor file counts** using system tables:
```sql
SELECT * FROM "schema"."table$files";
```

## Implementation Architecture

CoW operations rewrite entire data files, excluding deleted rows or including updated rows:

1. **Detection**: Identifies DELETE, UPDATE, or MERGE operations with CoW mode
2. **Manifest Scan**: Locates affected data files efficiently
3. **Position Delete Processing**: Tracks deleted row positions in bitmap
4. **File Rewriting**: Reads original files, filters out deleted rows, writes new files
5. **Atomic Commit**: Uses Iceberg's RewriteFiles API for consistent state

## Current Limitations

1. **Equality Deletes**: Only position deletes supported (value-based deletes not yet implemented)
2. **Format Version**: Requires Iceberg format v2 or higher
3. **File-Level Rewrites**: Entire files rewritten, not row groups
4. **Manifest Scanning**: Linear scan for file lookup (no caching/indexing)

## Troubleshooting

### Common Issues

- **"Could not find X file(s) to delete in table manifests"**
Cause: Concurrent modification or file cleanup
Solution: Retry operation (Iceberg handles conflicts automatically)

- **Slow CoW operations**
Cause: Large files or many small files
Solution: Run `OPTIMIZE` before CoW operations, increase worker resources

- **High write amplification**
Expected behavior: CoW rewrites entire files
Solution: Use MoR for write-heavy workloads, batch operations

## References

- **Iceberg Spec**: [Row-level Deletes](https://iceberg.apache.org/spec/#row-level-deletes)
- **Related Issues**: Fixes: #26161
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
private Slice writePositionDeletes(PositionDeleteWriter writer, DeletionVector rowsToDelete)
{
try {
// PositionDeleteWriter.write() already commits the writer internally
// Calling commit() again would cause "Cannot commit transaction: last operation has not committed" error
CommitTaskData task = writer.write(rowsToDelete);
writtenBytes += task.fileSizeInBytes();
return wrappedBuffer(jsonCodec.toJsonBytes(task));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.plugin.iceberg.delete.DeletionVectorWriter.DeletionVectorInfo;
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.UpdateKind;
import io.trino.plugin.iceberg.UpdateMode;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle;
import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle;
Expand Down Expand Up @@ -429,7 +431,10 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import static java.util.UUID.randomUUID;
import static java.util.UUID.randomUUID;
import static java.util.function.Function.identity;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.IcebergManifestUtils.liveEntries;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
Expand Down Expand Up @@ -518,6 +523,9 @@ public class IcebergMetadata
private final Duration materializedViewRefreshSnapshotRetentionPeriod;
private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();
private final DeletionVectorWriter deletionVectorWriter;
private final IcebergPageSourceProvider pageSourceProvider;
private final IcebergFileWriterFactory fileWriterFactory;
private static final Logger log = Logger.get(IcebergMetadata.class);

private Transaction transaction;
private OptionalLong fromSnapshotForRefresh = OptionalLong.empty();
Expand All @@ -538,7 +546,9 @@ public IcebergMetadata(
ExecutorService icebergPlanningExecutor,
ExecutorService icebergFileDeleteExecutor,
int materializedViewRefreshMaxSnapshotsToExpire,
Duration materializedViewRefreshSnapshotRetentionPeriod)
Duration materializedViewRefreshSnapshotRetentionPeriod,
IcebergPageSourceProvider pageSourceProvider,
IcebergFileWriterFactory fileWriterFactory)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
Expand All @@ -556,6 +566,8 @@ public IcebergMetadata(
this.deletionVectorWriter = requireNonNull(deletionVectorWriter, "deletionVectorWriter is null");
this.materializedViewRefreshMaxSnapshotsToExpire = materializedViewRefreshMaxSnapshotsToExpire;
this.materializedViewRefreshSnapshotRetentionPeriod = materializedViewRefreshSnapshotRetentionPeriod;
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class IcebergMetadataFactory
private final DeletionVectorWriter deletionVectorWriter;
private final int materializedViewRefreshMaxSnapshotsToExpire;
private final Duration materializedViewRefreshSnapshotRetentionPeriod;
private final IcebergPageSourceProviderFactory pageSourceProviderFactory;
private final IcebergFileWriterFactory fileWriterFactory;

@Inject
public IcebergMetadataFactory(
Expand All @@ -67,6 +69,8 @@ public IcebergMetadataFactory(
@ForIcebergMetadata ExecutorService metadataExecutorService,
@ForIcebergPlanning ExecutorService icebergPlanningExecutor,
@ForIcebergFileDelete ExecutorService icebergFileDeleteExecutor,
IcebergPageSourceProviderFactory pageSourceProviderFactory,
IcebergFileWriterFactory fileWriterFactory,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand Down Expand Up @@ -96,6 +100,8 @@ public IcebergMetadataFactory(
this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null");
this.materializedViewRefreshMaxSnapshotsToExpire = config.getMaterializedViewRefreshMaxSnapshotsToExpire();
this.materializedViewRefreshSnapshotRetentionPeriod = config.getMaterializedViewRefreshSnapshotRetentionPeriod();
this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -116,6 +122,8 @@ public IcebergMetadata create(ConnectorIdentity identity)
icebergPlanningExecutor,
icebergFileDeleteExecutor,
materializedViewRefreshMaxSnapshotsToExpire,
materializedViewRefreshSnapshotRetentionPeriod);
materializedViewRefreshSnapshotRetentionPeriod,
(IcebergPageSourceProvider) pageSourceProviderFactory.createPageSourceProvider(),
fileWriterFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,16 @@ private TupleDomain<IcebergColumnHandle> prunePredicate(
}

Set<IcebergColumnHandle> partitionColumns = partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(tableSchema.findField(fieldId), typeManager))
.map(fieldId -> {
Types.NestedField field = tableSchema.findField(fieldId);
if (field == null) {
// This can happen if the field no longer exists in the schema due to schema evolution
// Return null and filter out nulls below
return null;
}
return getColumnHandle(field, typeManager);
})
.filter(Objects::nonNull)
.collect(toImmutableSet());
Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(partitionColumns, partitionKeys));
if (!partitionMatchesPredicate(partitionColumns, partitionValues, unenforcedPredicate)) {
Expand Down
Loading
Loading