-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Description
Environment
Trino 451 / Delta Lake Connector
Delta tables generated via Spark 3.5.1 / Delta 3.1.0
Delta Tables have Deletion Vectors enabled + are used in MERGE statements for updated and deleted rows
Description
For context, we have a collection of Spark jobs that incrementally snapshot JDBC sources (i.e. MySQL) and update a Delta Table via a MERGE statement roughly once a day. All of our MySQL tables have an auto-incrementing primary key id that we use as our merge key.
We use deletion vectors for all of these tables for performance reasons, and have noticed a bug we believe exists within Trino.
In some of our tables, we can find "duplicates" of a given id in a delta table, even if counts line up exactly. Moreover, this only happens when trying to filter off the merge key and not with any other filters.
For example, say we started snapshotting a MySQL table on 07/29 with it's Delta table having 100 rows and a contiguous block of ids 0-99. We can validate this via our Spark runtime that this table indeed has no duplicates. Running something like select count(*), count(distinct id) from table in Trino will also return 100 for both, indicating no dupe ids exist in the table as expected.
However, sometimes querying a certain primary key will yield duplicate results. For example, select * from table where id = 50 would return:
id | filter_column | created_at | updated_at
----------+----------------+------------------------------+-----------------------------
50 | foo | 2022-10-12 09:07:36.000 UTC | 2024-07-29 16:06:20.000 UTC
50 | foo | 2022-10-12 09:07:36.000 UTC | 2024-08-07 00:12:56.000 UTC
In our observations, this seems to be pulling the latest version of that row, and the first version of the row ever taken. However, a similar query in Spark returns the expected 1 (up to date) row, indicating this seems to be an issue with how Trino is reading the table and not the table itself.
To make things a bit more confusing, this doesn't happen if you filter on other columns than the merge key. For instance:
select * from table where filter_column = 'foo' -> will only return the one up to date row for id = 50
select * from table where filter_column = 'foo' AND id = 50 -> will still return both rows
This also doesn't
Reproducing
It's hard to build a reproducible case, but this seems to happen to every row in our tables that:
- Have deletion vectors and are on Delta Lake >= 3.1.0, which is when they added support for using DVs in MERGE statements (release notes)
- The row has been updated via a MERGE statement
My best attempt to reproduce (looks like Trino just added support for writing DVs, so maybe it's reproducible this way)
Warning
This is a best effort at reproducing and is not indicative of how we generate our Delta Tables. Are Delta Tables are written via Spark. with a call similar to this:
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()Create table
create table delta.scratch.foo (id int, updated_at timestamp)Enable DVs on the table. Not sure if it's possible to do in Trino
Insert a row in the table:
insert into delta.scratch.foo values (1, TIMESTAMP '1970-01-01')Run a merge that would update a row:
MERGE into delta.scratch.foo target using (select 1 as id, timestamp '2024-01-01' as updated_at) u
on (target.id = u.id)
when matched
then update set updated_at = u.updated_at;If this reproduces the error, you should see something like this:
id | updated_at
----+----------------------------
1 | 2024-01-01 00:00:00.000000
1 | 1970-01-01 00:00:00.000000