Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent write on Iceberg transformed column #24160

Merged

Conversation

pajaks
Copy link
Member

@pajaks pajaks commented Nov 18, 2024

Description

Add concurrent write to scenarios like:

-- create test table in iceberg
create table all_defaults_partitioned
with (
  partitioning = array['month(shipdate)']
)
as select * from tpch.sf1000.lineitem

-- first session
update all_defaults_partitioned
set orderkey = 654
where shipdate = date '1995-01-01'

-- second session 
update all_defaults_partitioned
set orderkey = 765
where shipdate = date '1996-12-01'

Release notes

## Iceberg
* Fix failure when writing concurrently with [transformed partition](https://iceberg.apache.org/spec/#partition-transforms) columns

@cla-bot cla-bot bot added the cla-signed label Nov 18, 2024
@github-actions github-actions bot added the iceberg Iceberg connector label Nov 18, 2024
@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from 0f00cdd to ff872d7 Compare November 18, 2024 11:10
@@ -2740,7 +2740,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col

RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().intersect(table.getUnenforcedPredicate())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we intersect enforced with the unenforced predicate?
They are rather unrelated.

Copy link
Member Author

@pajaks pajaks Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that both carriers information about predicate.
The difference is that taken from @raunaqmorarka explanation:

enforced is the part of the predicate which is guaranteed to be satisfied by the connector, so the engine will not apply it on it's side.
unenforced is the part of the predicate which connector cannot guarantee even if it is able to use it to reduce output, so the engine will apply it on the connector output

@raunaqmorarka Correct me if I'm wrong here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{2:part:date=[ SortedRangeSet[type=date, ranges=1, {[2024-01-01]}] ]}

I'm checking io.trino.plugin.iceberg.TestIcebergLocalConcurrentWritesTest#testConcurrentUpdateWithOverlappingPartitionTransformation (BTW really cool that we have this battery of new concurrency tests)

It's unclear to me why the partition predicate is not an "enforced predicate" while debugging your code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding such predicates for transformed columns as enforced, would mean pushdowns of those values and connector would need to filter files and rows also during reading from such table as well. It's not supported right now and would require more work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are rather unrelated.

We already join those in IcebergSplitSource

TupleDomain<IcebergColumnHandle> effectivePredicate = TupleDomain.intersect(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are able to prune files in IcebergSplitSource based on unenforced filter, but not rows (hence "unenforced"). Filters on identity partitioned columns are enforced at the row level due to the data being partitioned by value and PartitionConstraintMatcher in IcebergSplitSource.
I think it's valid to intersect unenforced filter here because org.apache.iceberg.MergingSnapshotProducer is internally checking whether any files matching given filter have been added to the table.

@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch 2 times, most recently from 8da17cc to c9670a6 Compare November 21, 2024 10:40
@pajaks pajaks marked this pull request as ready for review November 21, 2024 10:46
@@ -2740,7 +2740,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col

RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().intersect(table.getUnenforcedPredicate())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are able to prune files in IcebergSplitSource based on unenforced filter, but not rows (hence "unenforced"). Filters on identity partitioned columns are enforced at the row level due to the data being partitioned by value and PartitionConstraintMatcher in IcebergSplitSource.
I think it's valid to intersect unenforced filter here because org.apache.iceberg.MergingSnapshotProducer is internally checking whether any files matching given filter have been added to the table.

@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from c9670a6 to 7623816 Compare November 25, 2024 11:44
@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from 7623816 to 5df5282 Compare November 26, 2024 15:07
@pajaks pajaks requested a review from ebyhr November 26, 2024 15:08
@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from 5df5282 to bb147d6 Compare November 27, 2024 08:28
Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm
But I would like a second opinion on it
@alexjo2144 @findepi @losipiuk @hashhar @electrum are any of you able to weigh in here ?

@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from bb147d6 to 8daa87b Compare November 27, 2024 12:43
@pajaks pajaks force-pushed the pajaks/iceberg_partition_concurrent_writes branch from 8daa87b to 36c8476 Compare November 27, 2024 14:10
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertableToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);
if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty cool @pajaks 🎉

Outstanding test coverage.

@pajaks
Copy link
Member Author

pajaks commented Nov 28, 2024

@ebyhr @raunaqmorarka Can this be merged?

@ebyhr ebyhr merged commit 5b82e10 into trinodb:master Nov 28, 2024
42 checks passed
@github-actions github-actions bot added this to the 467 milestone Nov 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

4 participants