Skip to content

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Aug 27, 2025

Currently, if there are multiple error rows generated with same id (name of the column for matching) then, for some unexplained reason, subtract filters out "duplicates" and leaves out only one error row in result. I've also added test for subtract that should've fail as well but for some, yet unknown, reason test passes which means that subtract behaves differently in test and in retry delta code.
Instead of .subtract(), .diff() was used which works as expected. Note that .subtract() was working fine with Clickhouse DB (also not explainable right now).

This issue is related to https://github.com/iterative/studio/issues/12068

Summary by Sourcery

Fix delta retry logic to correctly include multiple error rows with the same id by replacing subtract with diff and add tests to validate duplicate handling

Bug Fixes:

  • Preserve repeated error rows in delta retry by using diff instead of subtract

Tests:

  • Add functional test for repeating errors in delta retry to verify multiple rows per id
  • Add unit test to ensure subtract retains duplicated rows by id

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Aug 27, 2025

Reviewer's Guide

This PR replaces the use of .subtract() with .diff() in the delta retry chain to correctly handle multiple error rows sharing the same id, and it adds both functional and unit tests to ensure duplicate rows are preserved.

Sequence diagram for delta retry chain with diff instead of subtract

sequenceDiagram
    participant RetryChain
    participant DiffChain
    participant Result
    RetryChain->>DiffChain: diff(on="id", added=true, same=true, modified=false, deleted=false)
    DiffChain-->>Result: Returns all error rows, including duplicates with same id
    Result-->>RetryChain: Retry chain proceeds with correct error rows
Loading

Class diagram for RetryChain and DiffChain interaction update

classDiagram
    class RetryChain {
        +diff(chain, on, added, same, modified, deleted)
    }
    class DiffChain
    RetryChain --> DiffChain: uses diff()
    %% Previously: RetryChain --> DiffChain: uses subtract()
Loading

File-Level Changes

Change Details Files
Switch retry logic from subtract to diff to preserve duplicate error rows
  • Replaced retry_chain.subtract(...) with retry_chain.diff(...)
  • Configured diff to include added and same rows only (added=True, same=True)
src/datachain/delta.py
Add functional test covering repeating error rows in delta retry
  • Introduced test_repeating_errors with helper functions create_input and run_delta
  • Asserted correct counts for duplicate ids across multiple runs
tests/func/test_retry.py
Add unit test for subtract method with duplicated rows
  • Added test_subtract_duplicated_rows to validate subtract preserves duplicates
  • Asserted subtract output contains both identical rows
tests/unit/lib/test_datachain.py

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `tests/unit/lib/test_datachain.py:2234` </location>
<code_context>
     assert set(chain4.subtract(chain5, on="d", right_on="a").to_list()) == {(3, "z")}


+def test_subtract_duplicated_rows(test_session):
+    chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
+    chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
+    sub = chain1.subtract(chain2, on="id")
+    assert set(sub.to_list()) == {(1, "1"), (1, "1")}
+
+
</code_context>

<issue_to_address>
Consider adding assertions for edge cases with more complex duplicates.

Adding tests with duplicate rows that differ in 'name', or with duplicates in the right chain, will better validate the subtract logic against varied duplication scenarios.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
def test_subtract_duplicated_rows(test_session):
    chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
    chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
    sub = chain1.subtract(chain2, on="id")
    assert set(sub.to_list()) == {(1, "1"), (1, "1")}
=======
def test_subtract_duplicated_rows(test_session):
    # Left chain has duplicate rows with same id and name
    chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
    chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
    sub = chain1.subtract(chain2, on="id")
    assert set(sub.to_list()) == {(1, "1"), (1, "1")}

def test_subtract_left_duplicates_different_names(test_session):
    # Left chain has duplicate ids but different names
    chain1 = dc.read_values(id=[1, 1, 2], name=["a", "b", "c"], session=test_session)
    chain2 = dc.read_values(id=[2], name=["c"], session=test_session)
    sub = chain1.subtract(chain2, on="id")
    # Only rows with id=1 should remain, both "a" and "b"
    assert set(sub.to_list()) == {(1, "a"), (1, "b")}

def test_subtract_right_duplicates(test_session):
    # Right chain has duplicate rows
    chain1 = dc.read_values(id=[1, 2, 3], name=["x", "y", "z"], session=test_session)
    chain2 = dc.read_values(id=[2, 2], name=["y", "y"], session=test_session)
    sub = chain1.subtract(chain2, on="id")
    # Only rows with id=1 and id=3 should remain
    assert set(sub.to_list()) == {(1, "x"), (3, "z")}
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +2234 to +2238
def test_subtract_duplicated_rows(test_session):
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding assertions for edge cases with more complex duplicates.

Adding tests with duplicate rows that differ in 'name', or with duplicates in the right chain, will better validate the subtract logic against varied duplication scenarios.

Suggested change
def test_subtract_duplicated_rows(test_session):
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
def test_subtract_duplicated_rows(test_session):
# Left chain has duplicate rows with same id and name
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
def test_subtract_left_duplicates_different_names(test_session):
# Left chain has duplicate ids but different names
chain1 = dc.read_values(id=[1, 1, 2], name=["a", "b", "c"], session=test_session)
chain2 = dc.read_values(id=[2], name=["c"], session=test_session)
sub = chain1.subtract(chain2, on="id")
# Only rows with id=1 should remain, both "a" and "b"
assert set(sub.to_list()) == {(1, "a"), (1, "b")}
def test_subtract_right_duplicates(test_session):
# Right chain has duplicate rows
chain1 = dc.read_values(id=[1, 2, 3], name=["x", "y", "z"], session=test_session)
chain2 = dc.read_values(id=[2, 2], name=["y", "y"], session=test_session)
sub = chain1.subtract(chain2, on="id")
# Only rows with id=1 and id=3 should remain
assert set(sub.to_list()) == {(1, "x"), (3, "z")}

.gen(func, output={"id": int, "name": str, "error": str})
.save("processed_data")
)
return dc.read_dataset("processed_data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unreachable code (remove-unreachable-code)

Suggested change
return dc.read_dataset("processed_data")

@@ -124,7 +124,13 @@ def _get_retry_chain(
# Subtract also diff chain since some items might be picked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment need an update

# result dataset atm)
return retry_chain.subtract(diff_chain, on=on) if retry_chain else None
return (
retry_chain.diff(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this semantics? do we really want to retry multiple times per item? it seems wrong in most case tbh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is just replacing .subtract() with .diff() which should produce the same output (nothing should change). The reason I added this is because for some, still unexplained, reason in CLI (SQLite) subtract here works differently than expected (wrong). Because of this strange behavior it was accidentally removing duplicates in that client's example and I thought that Studio / CH code was the issue and CLI / SQLite was ok but it was other way around.

So to break it down:

  1. Client issue is not actually an issue / bug. Duplicates are expected because of their code, please look at my explanation to them here
  2. On the other hand, there is an issue in CLI / SQLite caused by this wrong subtract behavior that I accidentally found when debugging client "issue" and is fixed by using .diff() instead which works as expected and now CH and SQLite are working the same / consistent.
  3. I couldn't reproduce this wrong .subtract() SQLite behavior in isolated test (added it in this PR) which means it's probably something related to specific context in delta logic but I don't think we should waste more time on that (already I waste too much IMO).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client issue is not actually an issue / bug. Duplicates are expected because of their code, please look at my explanation to them here

I think I understand the reason, my question is - do we really want to keep that semantics or would it better to deduplicate records based on on. So, even if the target table has multiple errors witt the same measurement_ids, we take it only once. I think it is more expected.

This will solve the issue from the product perspective and remove this discrepancy anyways. (though it is an interested thing - might hurt us somewhere else down the road)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's interesting question but I think we should not do that as we might disrupt specific client logic with it. Note that those error rows from client issue are not completely the same (they differ in one column) and the first question is which one to discard and which one to leave. If they don't want this kind of behavior, they can easily fix it in their business logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, looking more into this, I think retrying once is the only right option here. We are retrying input items. And it is a single one in this case. Unless im missing something. We are not retrying outputs.

They could indeed change logic, but it might be very inconvenient. It is fine to use gen and produce multiple items per a single row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also doesn't make sense that shape keeps changing on a retry run with a very regular gen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are not retrying input items, the whole implementation now gets error rows from result, which makes sense as that's where the errors will be. Output is not 1-1 with input, specially if you have gen which can generate totally random number of various rows (including error rows) and input rows are not kept after it. We cannot know from which input we got error at the end in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they didn't generate 2 error rows from one input row, there wouldn't be any issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, sorry, we do merge source records with error rows based on on column and we use source schema ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They might need multiple errors. Moreover it can be a mix (errors and not). It would wrong and complicated to change it to generate a single row in case of any errors

def test_repeating_errors(test_session):
from collections.abc import Iterator

def create_input(num_values):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse existing numerous helpers in this file:

_create_sample_data
_simple_process
_process_with_errors

etc etc ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added _create_sample_data. Others are not useful for this test

@shcheklein
Copy link
Contributor

So, does subtract itself work fine or not? is it expected behavior? I'm not sure I fully understand the description ...

@codecov
Copy link

codecov bot commented Aug 27, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 88.74%. Comparing base (80b5787) to head (e0c0937).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1310   +/-   ##
=======================================
  Coverage   88.74%   88.74%           
=======================================
  Files         155      155           
  Lines       14148    14149    +1     
  Branches     1999     1999           
=======================================
+ Hits        12556    12557    +1     
  Misses       1125     1125           
  Partials      467      467           
Flag Coverage Δ
datachain 88.68% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/delta.py 92.59% <100.00%> (+0.09%) ⬆️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Aug 28, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: e0c0937
Status: ✅  Deploy successful!
Preview URL: https://764521ce.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-12068-duplicated-row.datachain-documentation.pages.dev

View logs

@ilongin ilongin requested a review from shcheklein August 28, 2025 09:41
@ilongin ilongin merged commit 3607c0b into main Aug 29, 2025
37 of 38 checks passed
@ilongin ilongin deleted the ilongin/12068-duplicated-rows-delta-retry branch August 29, 2025 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants