From 5fc21f7eab0d10a107e1478f79423e58bca1f62b Mon Sep 17 00:00:00 2001 From: ilongin Date: Wed, 27 Aug 2025 16:51:22 +0200 Subject: [PATCH 1/3] added fix for delta retry --- src/datachain/delta.py | 8 +++++- tests/func/test_retry.py | 42 ++++++++++++++++++++++++++++++++ tests/unit/lib/test_datachain.py | 7 ++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/datachain/delta.py b/src/datachain/delta.py index 1c3792abe..6d070599b 100644 --- a/src/datachain/delta.py +++ b/src/datachain/delta.py @@ -124,7 +124,13 @@ def _get_retry_chain( # Subtract also diff chain since some items might be picked # up by `delta=True` itself (e.g. records got modified AND are missing in the # result dataset atm) - return retry_chain.subtract(diff_chain, on=on) if retry_chain else None + return ( + retry_chain.diff( + diff_chain, on="id", added=True, same=True, modified=False, deleted=False + ) + if retry_chain + else None + ) def _get_source_info( diff --git a/tests/func/test_retry.py b/tests/func/test_retry.py index e8f025610..8556a9c16 100644 --- a/tests/func/test_retry.py +++ b/tests/func/test_retry.py @@ -425,3 +425,45 @@ def test_delta_and_delta_retry_no_duplicates(test_session): assert len(ids_in_result) == 4 assert len(set(ids_in_result)) == 4 # No duplicate IDs assert set(ids_in_result) == {1, 2, 3, 4} + + +def test_repeating_errors(test_session): + from collections.abc import Iterator + + def create_input(num_values): + ( + dc.read_values( + id=[i + 1 for i in range(num_values)], + name=[str(i + 1) for i in range(num_values)], + session=test_session, + ).save("input") + ) + + def run_delta(): + def func(id) -> Iterator[tuple[int, str, str]]: + yield id, "name1", "error" + yield id, "name2", "error" + + return ( + dc.read_dataset( + "input", + delta=True, + delta_on="id", + delta_result_on="id", + delta_retry="error", + session=test_session, + ) + .gen(func, output={"id": int, "name": str, "error": str}) + .save("processed_data") + ) + return dc.read_dataset("processed_data") + + create_input(1) + ch1 = run_delta() + assert sorted(ch1.collect("id")) == [1, 1] + create_input(2) + ch2 = run_delta() + assert sorted(ch2.collect("id")) == [1, 1, 1, 1, 2, 2] + create_input(3) + ch3 = run_delta() + assert sorted(ch3.collect("id")) == [1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3] diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index d5985506f..ec1cb3a1d 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -2231,6 +2231,13 @@ def test_subtract(test_session): assert set(chain4.subtract(chain5, on="d", right_on="a").to_list()) == {(3, "z")} +def test_subtract_duplicated_rows(test_session): + chain1 = dc.read_values(id=[1, 1], name=["1", "1"], session=test_session) + chain2 = dc.read_values(id=[2], name=["2"], session=test_session) + sub = chain1.subtract(chain2, on="id") + assert set(sub.to_list()) == {(1, "1"), (1, "1")} + + def test_subtract_error(test_session): chain1 = dc.read_values(a=[1, 1, 2], b=["x", "y", "z"], session=test_session) chain2 = dc.read_values(a=[1, 2], b=["x", "y"], session=test_session) From 40b3a9d2fe91de3fcb1a6ae6ac85c58cd1a8def3 Mon Sep 17 00:00:00 2001 From: ilongin Date: Thu, 28 Aug 2025 11:32:55 +0200 Subject: [PATCH 2/3] fixing test --- tests/func/test_retry.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/tests/func/test_retry.py b/tests/func/test_retry.py index 8556a9c16..bb66af270 100644 --- a/tests/func/test_retry.py +++ b/tests/func/test_retry.py @@ -1,3 +1,4 @@ +from collections.abc import Iterator from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -428,17 +429,6 @@ def test_delta_and_delta_retry_no_duplicates(test_session): def test_repeating_errors(test_session): - from collections.abc import Iterator - - def create_input(num_values): - ( - dc.read_values( - id=[i + 1 for i in range(num_values)], - name=[str(i + 1) for i in range(num_values)], - session=test_session, - ).save("input") - ) - def run_delta(): def func(id) -> Iterator[tuple[int, str, str]]: yield id, "name1", "error" @@ -446,7 +436,7 @@ def func(id) -> Iterator[tuple[int, str, str]]: return ( dc.read_dataset( - "input", + "sample_data", delta=True, delta_on="id", delta_result_on="id", @@ -458,12 +448,20 @@ def func(id) -> Iterator[tuple[int, str, str]]: ) return dc.read_dataset("processed_data") - create_input(1) + _create_sample_data( + test_session, ids=list(range(1)), contents=[str(i) for i in range(1)] + ) ch1 = run_delta() - assert sorted(ch1.collect("id")) == [1, 1] - create_input(2) + assert sorted(ch1.collect("id")) == [0, 0] + + _create_sample_data( + test_session, ids=list(range(2)), contents=[str(i) for i in range(2)] + ) ch2 = run_delta() - assert sorted(ch2.collect("id")) == [1, 1, 1, 1, 2, 2] - create_input(3) + assert sorted(ch2.collect("id")) == [0, 0, 0, 0, 1, 1] + + _create_sample_data( + test_session, ids=list(range(3)), contents=[str(i) for i in range(3)] + ) ch3 = run_delta() - assert sorted(ch3.collect("id")) == [1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3] + assert sorted(ch3.collect("id")) == [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2] From e0c0937d993da79848a9bccfacf9dc7c2170bf0a Mon Sep 17 00:00:00 2001 From: ilongin Date: Fri, 29 Aug 2025 23:36:44 +0200 Subject: [PATCH 3/3] distinct duplicated error rows --- src/datachain/delta.py | 6 ++++-- tests/func/test_retry.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/datachain/delta.py b/src/datachain/delta.py index 1b0c66b26..9704cdb58 100644 --- a/src/datachain/delta.py +++ b/src/datachain/delta.py @@ -125,10 +125,12 @@ def _get_retry_chain( # Subtract also diff chain since some items might be picked # up by `delta=True` itself (e.g. records got modified AND are missing in the # result dataset atm) + on = [on] if isinstance(on, str) else on + return ( retry_chain.diff( - diff_chain, on="id", added=True, same=True, modified=False, deleted=False - ) + diff_chain, on=on, added=True, same=True, modified=False, deleted=False + ).distinct(*on) if retry_chain else None ) diff --git a/tests/func/test_retry.py b/tests/func/test_retry.py index bb66af270..1178c150b 100644 --- a/tests/func/test_retry.py +++ b/tests/func/test_retry.py @@ -458,10 +458,10 @@ def func(id) -> Iterator[tuple[int, str, str]]: test_session, ids=list(range(2)), contents=[str(i) for i in range(2)] ) ch2 = run_delta() - assert sorted(ch2.collect("id")) == [0, 0, 0, 0, 1, 1] + assert sorted(ch2.collect("id")) == [0, 0, 1, 1] _create_sample_data( test_session, ids=list(range(3)), contents=[str(i) for i in range(3)] ) ch3 = run_delta() - assert sorted(ch3.collect("id")) == [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2] + assert sorted(ch3.collect("id")) == [0, 0, 1, 1, 2, 2]