Skip to content

Unsafe delta#1279

Merged
ilongin merged 19 commits intomainfrom
ilongin/1276-unsafe-delta
Aug 29, 2025
Merged

Unsafe delta#1279
ilongin merged 19 commits intomainfrom
ilongin/1276-unsafe-delta

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Aug 7, 2025

Added ability to use methods like merge, union etc. with delta updates, which are normally disabled.

Summary by Sourcery

Enable unsafe datachain operations during delta updates by introducing a delta_unsafe flag to override method restrictions and ensure correct handling in chain evolution and dataset reads.

New Features:

  • Add delta_unsafe parameter to DataChain to permit unsafe methods (merge, union, group_by, agg, distinct) during delta updates
  • Extend read_dataset and read_storage APIs to accept delta_unsafe flag and propagate it through the chain setup
  • Modify delta_disabled decorator to bypass method restrictions when delta_unsafe is enabled

Tests:

  • Add functional test to verify delta updates with unsafe operations maintain correct dependencies and merge data as expected

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Aug 7, 2025

Reviewer's Guide

This PR introduces a new delta_unsafe flag to DataChain, propagates it through the API to override default delta restrictions on operations like merge/union, updates the delta-disabled guard to respect this flag, enhances documentation, and adds a functional test to verify merge behavior under unsafe delta updates.

Sequence diagram for method invocation with delta_unsafe override

sequenceDiagram
    participant User
    participant DataChain
    participant DeltaGuard
    User->>DataChain: Call merge() on delta DataChain
    DataChain->>DeltaGuard: Check delta and delta_unsafe
    alt delta_unsafe is False
        DeltaGuard-->>User: Raise NotImplementedError
    else delta_unsafe is True
        DeltaGuard-->>DataChain: Allow merge()
    end
Loading

Class diagram for DataChain with delta_unsafe support

classDiagram
    class DataChain {
        - _delta: bool
        - _delta_unsafe: bool
        + delta: bool
        + delta_unsafe: bool
        + _as_delta(..., delta_unsafe: bool)
    }
Loading

File-Level Changes

Change Details Files
Introduce and propagate delta_unsafe flag in DataChain
  • Add _delta_unsafe attribute and delta_unsafe property in the chain class
  • Extend _as_delta and _evolve methods to accept and forward delta_unsafe
  • Add delta_unsafe parameter to read_dataset, read_storage, and read_storage functions
src/datachain/lib/dc/datachain.py
src/datachain/lib/dc/datasets.py
src/datachain/lib/dc/storage.py
Allow operations during unsafe delta mode
  • Update the delta_disabled decorator to skip the restriction when delta_unsafe is true
src/datachain/delta.py
Enhance documentation for delta_unsafe usage
  • Document delta_unsafe in read_dataset docstring
  • Document delta_unsafe in read_storage docstring
src/datachain/lib/dc/datasets.py
src/datachain/lib/dc/storage.py
Add functional test for unsafe delta merge
  • Create test_delta_update_unsafe to verify merge behavior under delta_unsafe
  • Assert dependencies and record outputs across delta versions
tests/func/test_delta.py

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@ilongin ilongin marked this pull request as draft August 7, 2025 11:42
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Aug 7, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: dedd402
Status: ✅  Deploy successful!
Preview URL: https://1ab6a9dc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1276-unsafe-delta.datachain-documentation.pages.dev

View logs

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ilongin - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `tests/func/test_delta.py:179` </location>
<code_context>
+    ])
+    """
+
+    assert (dc.read_dataset(ds_name, version="1.0.0").order_by("file.path")).to_list(
+        "file.path", "value"
+    ) == [
+        ("img1.jpg", 1),
+        ("img2.jpg", 2),
</code_context>

<issue_to_address>
Consider adding a test for error conditions when delta_unsafe is not used.

Please add a test to confirm that calling `merge` in delta mode without `delta_unsafe()` raises the expected exception, ensuring the safety mechanism works as intended.
</issue_to_address>

### Comment 2
<location> `tests/func/test_delta.py:170` </location>
<code_context>
+    # second version of delta dataset
+    create_delta_dataset(ds_name)
+
+    """
+    assert _get_dependencies(catalog, ds_name, "1.0.1") == [
+        (dependency_ds_name, "1.0.1")
</code_context>

<issue_to_address>
Remove or clarify commented-out assertions.

If the commented-out assertions are obsolete, please remove them. If they're still needed, clarify their intent or mark them with a TODO.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

# second version of delta dataset
create_delta_dataset(ds_name)

"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Remove or clarify commented-out assertions.

If the commented-out assertions are obsolete, please remove them. If they're still needed, clarify their intent or mark them with a TODO.

@codecov
Copy link

codecov bot commented Aug 7, 2025

Codecov Report

❌ Patch coverage is 83.33333% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.74%. Comparing base (9439a24) to head (dedd402).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/delta.py 85.71% 0 Missing and 1 partial ⚠️
src/datachain/lib/dc/datachain.py 80.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1279      +/-   ##
==========================================
- Coverage   88.75%   88.74%   -0.01%     
==========================================
  Files         155      155              
  Lines       14143    14146       +3     
  Branches     1999     1999              
==========================================
+ Hits        12552    12554       +2     
- Misses       1124     1125       +1     
  Partials      467      467              
Flag Coverage Δ
datachain 88.68% <83.33%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/lib/dc/datasets.py 95.12% <ø> (ø)
src/datachain/lib/dc/storage.py 100.00% <ø> (ø)
src/datachain/delta.py 92.50% <85.71%> (-0.19%) ⬇️
src/datachain/lib/dc/datachain.py 91.42% <80.00%> (-0.08%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@shcheklein
Copy link
Contributor

@ilongin I don't think it is a priority atm.

@ilongin ilongin linked an issue Aug 7, 2025 that may be closed by this pull request
4 tasks
@ilongin ilongin marked this pull request as ready for review August 15, 2025 11:42
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider adding the delta_unsafe flag to the chain's _setup dictionary so it's persisted when serializing the chain.
  • The docstrings for delta_unsafe list some allowed methods but omit 'merge'; update the documentation to include all permitted operations.
  • The test_delta_update_unsafe function is lengthy and repeats setup code—refactor it using fixtures or helper functions to improve clarity and reduce duplication.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider adding the delta_unsafe flag to the chain's _setup dictionary so it's persisted when serializing the chain.
- The docstrings for delta_unsafe list some allowed methods but omit 'merge'; update the documentation to include all permitted operations.
- The test_delta_update_unsafe function is lengthy and repeats setup code—refactor it using fixtures or helper functions to improve clarity and reduce duplication.

## Individual Comments

### Comment 1
<location> `tests/func/test_delta.py:96` </location>
<code_context>
     create_delta_dataset(ds_name)


</code_context>

<issue_to_address>
Suggestion to test other unsafe operations beyond merge.

Please add tests for agg, union, group_by, and distinct to verify they work as expected with delta_unsafe enabled.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    create_delta_dataset(ds_name)
=======
    create_delta_dataset(ds_name)

def test_delta_agg_unsafe(test_session, tmp_dir, tmp_path):
    catalog = test_session.catalog
    ds_name = "agg_ds"
    create_delta_dataset(ds_name)
    ds = catalog.get_dataset(ds_name)
    result = ds.agg({"value": "sum"}, delta_unsafe=True)
    assert result is not None

def test_delta_union_unsafe(test_session, tmp_dir, tmp_path):
    catalog = test_session.catalog
    ds_name1 = "union_ds1"
    ds_name2 = "union_ds2"
    create_delta_dataset(ds_name1)
    create_delta_dataset(ds_name2)
    ds1 = catalog.get_dataset(ds_name1)
    ds2 = catalog.get_dataset(ds_name2)
    result = ds1.union(ds2, delta_unsafe=True)
    assert result is not None

def test_delta_group_by_unsafe(test_session, tmp_dir, tmp_path):
    catalog = test_session.catalog
    ds_name = "groupby_ds"
    create_delta_dataset(ds_name)
    ds = catalog.get_dataset(ds_name)
    result = ds.group_by("category").agg({"value": "mean"}, delta_unsafe=True)
    assert result is not None

def test_delta_distinct_unsafe(test_session, tmp_dir, tmp_path):
    catalog = test_session.catalog
    ds_name = "distinct_ds"
    create_delta_dataset(ds_name)
    ds = catalog.get_dataset(ds_name)
    result = ds.distinct(delta_unsafe=True)
    assert result is not None
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@@ -96,6 +96,101 @@ def create_delta_dataset(ds_name):
create_delta_dataset(ds_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Suggestion to test other unsafe operations beyond merge.

Please add tests for agg, union, group_by, and distinct to verify they work as expected with delta_unsafe enabled.

Suggested change
create_delta_dataset(ds_name)
create_delta_dataset(ds_name)
def test_delta_agg_unsafe(test_session, tmp_dir, tmp_path):
catalog = test_session.catalog
ds_name = "agg_ds"
create_delta_dataset(ds_name)
ds = catalog.get_dataset(ds_name)
result = ds.agg({"value": "sum"}, delta_unsafe=True)
assert result is not None
def test_delta_union_unsafe(test_session, tmp_dir, tmp_path):
catalog = test_session.catalog
ds_name1 = "union_ds1"
ds_name2 = "union_ds2"
create_delta_dataset(ds_name1)
create_delta_dataset(ds_name2)
ds1 = catalog.get_dataset(ds_name1)
ds2 = catalog.get_dataset(ds_name2)
result = ds1.union(ds2, delta_unsafe=True)
assert result is not None
def test_delta_group_by_unsafe(test_session, tmp_dir, tmp_path):
catalog = test_session.catalog
ds_name = "groupby_ds"
create_delta_dataset(ds_name)
ds = catalog.get_dataset(ds_name)
result = ds.group_by("category").agg({"value": "mean"}, delta_unsafe=True)
assert result is not None
def test_delta_distinct_unsafe(test_session, tmp_dir, tmp_path):
catalog = test_session.catalog
ds_name = "distinct_ds"
create_delta_dataset(ds_name)
ds = catalog.get_dataset(ds_name)
result = ds.distinct(delta_unsafe=True)
assert result is not None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you've added tests for the other unsafe operations (agg, union, group_by, distinct) with delta_unsafe enabled and verified they pass, then yes, you can resolve this suggestion. If not, it's best to add those tests to ensure full coverage of the new functionality before resolving.

Let me know if you need help with the test implementations or want to discuss specific cases!

@datachain-ai datachain-ai deleted a comment from sourcery-ai bot Aug 16, 2025
Copy link
Contributor

@dmpetrov dmpetrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change. I know it's not fully done but provided some comments.

Main point: unit tests are missing. We should focus on unit tests over functional tests here. The current tests feel heavier than needed - they check merge dependencies and merge value correctness, while what we really need is to test if unsafe blocks the exception.

It might be a great idea to check the dependencies and merge - please create separate tests for this. It's not related to unsafe logic. Unit tests are strongly preffered.

will only fetch the dataset from Studio if it is not found locally.
delta_unsafe: If True, allows the use of methods that are normally unsafe
and forbidden during a delta update. The following methods will be
permitted: `merge`, `agg`, `union`, `group_by`, and `distinct`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be shorten - too many words. Or We can put more light on the reasons. Like:

Allow restricted ops in delta: `merge`, `agg`, `union`, `group_by`, `distinct`.
Caller must ensure datasets are consistent and not partially updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have also a guide for delta updates - I think we can put a separate paragpraph there ## Using delta with merge, agg, etc ... Where we can explain some reasoning behind it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's good idea. I've added new section in docs and simplified this function docs at the same time.

def test_delta_update_unsafe(test_session, tmp_dir, tmp_path):
catalog = test_session.catalog
default_namespace_name = catalog.metastore.default_namespace_name
default_project_name = catalog.metastore.default_project_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to deal with the namespaces here? Can we just run it in the default namespace?

Copy link
Contributor Author

@ilongin ilongin Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check full dataset name and since this tests must run in Studio as well, I cannot just hardcode local as default namespace is different there.

# first version of delta dataset
create_delta_dataset(ds_name)
assert sorted(_get_dependencies(catalog, ds_name, "1.0.0")) == sorted(
[(dependency_ds_name, "1.0.0"), (dependency_ds_merge_name, "1.0.0")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How dependency check is relevant to delta unsafe? can we avoid testing it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked deeper - yes, the dependency deserved it's own test. But it should not be combined with the unsafe test. PLease extract or remove (if it's exist already)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep dependency check in this test since this is func test. I can create unit tests to test more granular details. This needs to be checked with unsafe as we are using things like merge that affects dependencies

@ilongin ilongin requested a review from dmpetrov August 20, 2025 14:02
some version of the dataset exists locally already. If False (default), it
will only fetch the dataset from Studio if it is not found locally.
delta_unsafe: Allow restricted ops in delta: merge, agg, union, group_by,
distinct. Caller must ensure datasets are consistent and not partially
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we clarify that we mean results datasets?

Copy link
Contributor Author

@ilongin ilongin Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following what you meant by results dataset. Actually, when I read this sentence I'm not sure I completely understand it in general. @dmpetrov can you clarify what did you mean by it?

These operations are not safe from different reasons, e.g

  1. union / merge -> producing duplicates
  2. distinct -> must be called on whole dataset which conflicts with delta logic which runs delta part on new + updated rows and then merges results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you mention datasets in the message, what datasets are we talking about? It should be clear to end users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed that sentence all together (see #1279 (comment))

delta_unsafe=True,
).merge(merge_ds, on="id", inner=True).save(ds_name)

assert set(_get_dependencies(catalog, ds_name, "1.0.1")) == {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we need a proper method for gependencies? or even chain.dataset should be returning a record that can be expanded to dependencies ...

Copy link
Contributor Author

@ilongin ilongin Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have method to return dependencies in Catalog but this one is just helper for tests to extract dataset name + version out of it as that ones are being asserted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t u think users might need the same? Not sure why this can’t be accessed via chain itslef

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't have this kind of request yet. If needed, we can add it to chain (public API) but I would do it in separate issue / PR as it's really not related to this one.

dependency_ds_name = (
f"{default_namespace_name}.{default_project_name}.{starting_ds_name}"
)
dependency_ds_merge_name = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it only to use (_get_dependencies(catalog, ds_name, "1.0.1") ? (quite unfortunate tbh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to check if dependencies are ok but still you are right it's not needed. I don't think test of correct namespace / project is in scope of this tests so I will just return pure dataset name from that helper method and assert with that one in tests.

Copy link
Contributor

@shcheklein shcheklein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor comments, otherwise good to go I think

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 🤔👍

Copy link
Contributor

@dmpetrov dmpetrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving to move faster. Nut unit-test issue is still here - too much in functional test without a need and no unit tests.

@shcheklein
Copy link
Contributor

Just to I understand the terminology and be on the same page in the terminology we use:

https://github.com/iterative/datachain/pull/1279/files#diff-2a81289be4719523a37fa0369702d59af21a78b3522f155ebbbfb48c4e4ed2e8R382 - is this one unit or not, for example? (not matter what directory it is in).

@dmpetrov
Copy link
Contributor

is this one unit or not, for example?

yes, these are unit and should be in the unit test dir. the previous one is not.

@shcheklein
Copy link
Contributor

shcheklein commented Aug 27, 2025

yes, these are unit and should be in the unit test dir.

yep, I don't call them unit (I call unit tests those that usually test only a very particular function, module in isolation ... usually with mocks, etc, etc), but that's fine ... I'm totally fine to call them whatever and place them in unit or func.

the previous one is not.

you mean test_delta_update_unsafe or something else?

@dmpetrov
Copy link
Contributor

dmpetrov commented Aug 27, 2025

the previous one is not.

you mean test_delta_update_unsafe or something else?

yep

@shcheklein
Copy link
Contributor

yep

okay, I see

my 2cs on this - that one looks heavy indeed (some cleanup can be done?) - but it is a single test for the whole unsafe stuff and it is also using in-memory db AFAIU, no file systems, etc, etc. Some complexity comes from the delta itself - requires multiple versions of source / destinations, etc, etc. Not sure what would be a better alternative here tbh if we want to run delta + merge.


dep = dependencies[0]
if not dep:
source_ds_dep = next((d for d in dependencies if d.name == source_ds.name), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was it catched by the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, test was inconsistent and there was a bug when things like merge used - we didn't expect more than 1 dependencies before but I guess now it can happen

@ilongin ilongin merged commit 2717473 into main Aug 29, 2025
37 of 38 checks passed
@ilongin ilongin deleted the ilongin/1276-unsafe-delta branch August 29, 2025 04:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Merge after delta update

4 participants