Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/datachain/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,15 @@ def _get_retry_chain(
# Subtract also diff chain since some items might be picked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment need an update

# up by `delta=True` itself (e.g. records got modified AND are missing in the
# result dataset atm)
return retry_chain.subtract(diff_chain, on=on) if retry_chain else None
on = [on] if isinstance(on, str) else on

return (
retry_chain.diff(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this semantics? do we really want to retry multiple times per item? it seems wrong in most case tbh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is just replacing .subtract() with .diff() which should produce the same output (nothing should change). The reason I added this is because for some, still unexplained, reason in CLI (SQLite) subtract here works differently than expected (wrong). Because of this strange behavior it was accidentally removing duplicates in that client's example and I thought that Studio / CH code was the issue and CLI / SQLite was ok but it was other way around.

So to break it down:

  1. Client issue is not actually an issue / bug. Duplicates are expected because of their code, please look at my explanation to them here
  2. On the other hand, there is an issue in CLI / SQLite caused by this wrong subtract behavior that I accidentally found when debugging client "issue" and is fixed by using .diff() instead which works as expected and now CH and SQLite are working the same / consistent.
  3. I couldn't reproduce this wrong .subtract() SQLite behavior in isolated test (added it in this PR) which means it's probably something related to specific context in delta logic but I don't think we should waste more time on that (already I waste too much IMO).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client issue is not actually an issue / bug. Duplicates are expected because of their code, please look at my explanation to them here

I think I understand the reason, my question is - do we really want to keep that semantics or would it better to deduplicate records based on on. So, even if the target table has multiple errors witt the same measurement_ids, we take it only once. I think it is more expected.

This will solve the issue from the product perspective and remove this discrepancy anyways. (though it is an interested thing - might hurt us somewhere else down the road)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's interesting question but I think we should not do that as we might disrupt specific client logic with it. Note that those error rows from client issue are not completely the same (they differ in one column) and the first question is which one to discard and which one to leave. If they don't want this kind of behavior, they can easily fix it in their business logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, looking more into this, I think retrying once is the only right option here. We are retrying input items. And it is a single one in this case. Unless im missing something. We are not retrying outputs.

They could indeed change logic, but it might be very inconvenient. It is fine to use gen and produce multiple items per a single row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also doesn't make sense that shape keeps changing on a retry run with a very regular gen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are not retrying input items, the whole implementation now gets error rows from result, which makes sense as that's where the errors will be. Output is not 1-1 with input, specially if you have gen which can generate totally random number of various rows (including error rows) and input rows are not kept after it. We cannot know from which input we got error at the end in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they didn't generate 2 error rows from one input row, there wouldn't be any issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, sorry, we do merge source records with error rows based on on column and we use source schema ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They might need multiple errors. Moreover it can be a mix (errors and not). It would wrong and complicated to change it to generate a single row in case of any errors

diff_chain, on=on, added=True, same=True, modified=False, deleted=False
).distinct(*on)
if retry_chain
else None
)


def _get_source_info(
Expand Down
40 changes: 40 additions & 0 deletions tests/func/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterator
from datetime import datetime, timezone
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -425,3 +426,42 @@ def test_delta_and_delta_retry_no_duplicates(test_session):
assert len(ids_in_result) == 4
assert len(set(ids_in_result)) == 4 # No duplicate IDs
assert set(ids_in_result) == {1, 2, 3, 4}


def test_repeating_errors(test_session):
def run_delta():
def func(id) -> Iterator[tuple[int, str, str]]:
yield id, "name1", "error"
yield id, "name2", "error"

return (
dc.read_dataset(
"sample_data",
delta=True,
delta_on="id",
delta_result_on="id",
delta_retry="error",
session=test_session,
)
.gen(func, output={"id": int, "name": str, "error": str})
.save("processed_data")
)
return dc.read_dataset("processed_data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unreachable code (remove-unreachable-code)

Suggested change
return dc.read_dataset("processed_data")


_create_sample_data(
test_session, ids=list(range(1)), contents=[str(i) for i in range(1)]
)
ch1 = run_delta()
assert sorted(ch1.collect("id")) == [0, 0]

_create_sample_data(
test_session, ids=list(range(2)), contents=[str(i) for i in range(2)]
)
ch2 = run_delta()
assert sorted(ch2.collect("id")) == [0, 0, 1, 1]

_create_sample_data(
test_session, ids=list(range(3)), contents=[str(i) for i in range(3)]
)
ch3 = run_delta()
assert sorted(ch3.collect("id")) == [0, 0, 1, 1, 2, 2]
7 changes: 7 additions & 0 deletions tests/unit/lib/test_datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,13 @@ def test_subtract(test_session):
assert set(chain4.subtract(chain5, on="d", right_on="a").to_list()) == {(3, "z")}


def test_subtract_duplicated_rows(test_session):
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
Comment on lines +2233 to +2237
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding assertions for edge cases with more complex duplicates.

Adding tests with duplicate rows that differ in 'name', or with duplicates in the right chain, will better validate the subtract logic against varied duplication scenarios.

Suggested change
def test_subtract_duplicated_rows(test_session):
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
def test_subtract_duplicated_rows(test_session):
# Left chain has duplicate rows with same id and name
chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session)
chain2 = dc.read_values(id=[2], name=["2"], session=test_session)
sub = chain1.subtract(chain2, on="id")
assert set(sub.to_list()) == {(1, "1"), (1, "1")}
def test_subtract_left_duplicates_different_names(test_session):
# Left chain has duplicate ids but different names
chain1 = dc.read_values(id=[1, 1, 2], name=["a", "b", "c"], session=test_session)
chain2 = dc.read_values(id=[2], name=["c"], session=test_session)
sub = chain1.subtract(chain2, on="id")
# Only rows with id=1 should remain, both "a" and "b"
assert set(sub.to_list()) == {(1, "a"), (1, "b")}
def test_subtract_right_duplicates(test_session):
# Right chain has duplicate rows
chain1 = dc.read_values(id=[1, 2, 3], name=["x", "y", "z"], session=test_session)
chain2 = dc.read_values(id=[2, 2], name=["y", "y"], session=test_session)
sub = chain1.subtract(chain2, on="id")
# Only rows with id=1 and id=3 should remain
assert set(sub.to_list()) == {(1, "x"), (3, "z")}



def test_subtract_error(test_session):
chain1 = dc.read_values(a=[1, 1, 2], b=["x", "y", "z"], session=test_session)
chain2 = dc.read_values(a=[1, 2], b=["x", "y"], session=test_session)
Expand Down
Loading