Skip to content

Comments

Incremental (delta) update#928

Merged
ilongin merged 75 commits intomainfrom
ilongin/798-incremental-update
May 15, 2025
Merged

Incremental (delta) update#928
ilongin merged 75 commits intomainfrom
ilongin/798-incremental-update

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Feb 19, 2025

Adding the ability to do incremental, or delta updates with dataset. The way user will run delta updates is to just re-run the whole script where it creates dataset, with one small modification - adding delta=True on DataChain.save() method.

General idea behind delta update is to not re-build the whole dataset from source (or sources ) once the source has some changes (new or modified files are considered changed), but to run the whole chain in "diff" mode and union that with latest version of resulting dataset. Running chain in "diff" mode means that we run the whole chain as is, but every starting step (.from_storage() or .from_dataset()) returns diff between latest version and the one with which last time chain was run and created.

Facts:

  1. Starting points mentioned above become direct dataset dependencies.
  2. Dataset can have multiple direct dependencies only if .union() or .merge() was used in the chain.
  3. Indirect dependencies are the ones 2 or more steps away from our dataset (see picture below). They can happen if one of the direct dependencies is actual dataset (not bucket listing dataset) as that dataset can have it's own direct (and indirect) dependencies etc.

!! NOTE that in this PR we implemented only subset of above - situation where we have only one direct dataset dependency which means .union() won't work as expected - it will create duplicates and .merge() might work as expected if some circumstances are met. There will be follow-up PRs for the rest.

Example chain:

# query.py
(
    DataChain.from_storage( "gs://datachain-demo/50k-laion-files/")
    .save("laion", delta=True)
)

(
    DataChain.from_storage( "s3://ldb-public/remote/data-lakes/dogs-and-cats/", anon=True)
    .save("dogs-cats", delta=True)
)

(
    DataChain.from_dataset("dogs-cats")
    .union(DataChain.from_dataset("laion"))
    .save("dogs-cats-laion", delta=True)
)

In above example we have 2 "starting points" when creating dogs-cats-laion dataset:

  • DataChain.from_dataset("dogs-cats")
  • DataChain.from_dataset("laion")

In first run maybe we created chain with dogs-cats dataset at version v3 and on next chain run dataset dogs-cats is in version v5 so DataChain.from_dataset("dogs-cats") will return diff(dogs-cats@v5, dogs-cats@v3). The same goes with all other sources / dependencies which can be datasets or direct listings (also datasets in the back)

Note that from the standpoint of delta update of chain, only direct dataset dependencies are visible - indirect dependencies are not taken into consideration as they are created in another, non related chain (that could be in another query file as well).
For example, this is a dependency graph of above example:

Selection_011

In above, final dogs-cats-laion dataset has 2 direct dataset dependencies. Those later on have listing datasets as direct dependencies, but from the point of view of this chain it's irrelevant.
If user want's to update dogs-cats-laion dataset taking into consideration changes in s3 and gcs buckets, which are not it's direct dependencies, then it must run chains for those dogs-cats and laion datasets before (he can put those chains to utilize delta update as well). Easiest is to put those in the same query file and just re-run the whole thing like in above example.

Comparison between current approach and final / ideal one:

Comparison 1. Current approach 3. Apply diff on every starting step
Description From dataset dependencies of resulting dataset we get starting dataset name and version (listing or any other "normal" dataset) and calculate diff between that and last version of starting dataset. We then apply chain functions on that diff and merge results with current version of resulting dataset to create new version of it Similar to current approach but doing diff for every starting point of chain / direct dependency. Idea is to run whole chain in special "delta" mode which would ensure that on each starting step (reading from listing or normal dataset) we don't read whole data but calculate diff and run on that. We would then union that "delta" chain with current last version of dataset. The problem is that this is not currently possible to implement because of structure of our codebase - diff is implemented in DataChain or "upper" level and it would need to be used in DatasetQuery or "lower" level ... we need to refactor / remove DatasetQuery first.
Someone removes starting dataset and all of it's versions (e.g listing dataset) We wouldn't have exact listing version for delta -> we would need to re-create the whole dataset as it was ran the first time (without delta performance gains) We wouldn't have exact listing version for delta -> we would need to re-create the whole dataset as it was ran the first time (without delta performance gains)
File object is removed in resulting dataset (e.g using one of the chain functions) Delta update doesn't work Delta update doesn't work
How it will look in UI The same as in local (just adding delta=True flag in .save()) The same as in local (just adding delta=True flag in .save()).
Not supported DataChain methods union() /
agg(...) This function works fine when partition values are impossible to be found in both delta and old dataset set (all partition values are localized in it's own set). This is important because of how delta update works -> we run the chain for new + changed rows in source and then union with current dataset version to create new version. Example that works fine is in examples/computer_vision/openimage-detect.py Same as first column
group_by(...) Similar as with .agg(), it works only if identical groups are not found in both sets (delta part and old dataset) Same as first column
distinct(...) Similar as with .agg(), it works only if distinct group values are not found in both sets (delta part and old dataset) Same as first column
union() This will always produce duplicates as we are always doing union between diff and other full non diff dataset. Union works as expected as all parts in union will be just diffs.
merge() Merge works only if inner=True is used and similar with agg() if rows to be merged are not found in both sets (delta / diff and old dataset) but are isolated in only one of those Same as first column except it should work for inner=False as well

Q&A

Q: In the second approach the idea was to use delta on the source side (per each source). This would allow to have a single file source for example. Both approaches don't allow that atm AFAIU. Are there cases like this?
A: In both approaches when we speak about "source" we actually mean on source or starting dataset from which resulting dataset is created. It can be listing dataset if .from_storage() is used or just a regular dataset if .from_dataset() is used for example. Note that there can be multiple starting datasets in chain as in above example with .union(). In the question, if I understood correctly, suggestion is to get distinct values from source column and then do diff for each of those and apply chain to each diff and then union with current dataset. This approach is similar to the ideal solution described in description and table which should be done in future. We need to look at starting datasets or direct dataset dependencies and not distinct source values as some of those sources could have come from indirect dependency (another dataset down the tree which is created in another, not related chain)

Regarding the one file, yes that one is tricky. With current implementation it doesn't break but every time dataset is calculated from start (delta doesn't work as there is no dependency in created dataset as there is no listing created since we just extract data from file and create dataset rows) .. example:

    DF_DATA = {
        "first_name": ["Alice", "Bob", "Charlie", "David", "Eva"],
        "age": [25, 30, 35, 40, 45],
        "city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"],
    }
    pd.DataFrame(DF_DATA).to_parquet(Path(os.path.join(os.path.abspath(os.getcwd()), "test.parquet")))
    DataChain.from_storage(path.as_uri()).parse_tabular().save("tabular", delta=True)

@ilongin ilongin marked this pull request as draft February 19, 2025 15:13
@ilongin ilongin linked an issue Feb 19, 2025 that may be closed by this pull request
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Feb 19, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: ee27458
Status: ✅  Deploy successful!
Preview URL: https://549e375b.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Contributor

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

@cloudflare-workers-and-pages
Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 8fa1534
Status: ✅  Deploy successful!
Preview URL: https://c897b9bc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@ilongin
Copy link
Contributor Author

ilongin commented Feb 21, 2025

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

@dmpetrov one question just to be 100% sure. How do we deal with different statuses : added, modified, removed, same?

My assumption is to:

  1. Added records are appended to previous dataset (current last version of it)
  2. Modified records are replacing those matched from previous dataset in new dataset
  3. Deleted records > Do nothing about it, but maybe we should remove them in new dataset??
  4. Same -> nothing to do here

Currently DataChain.diff() returns only added and changed records by default...for other statuses explicit flags must be set.

Regarding the name, delta makes more sense if we are not just appending new ones, otherwise it's more like incremental, but I don't have strong opinion here...both sound reasonable to me.

@cloudflare-workers-and-pages
Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status:⚡️  Build in progress...

View logs

@cloudflare-workers-and-pages
Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status: ✅  Deploy successful!
Preview URL: https://b84a6f31.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Contributor

Currently DataChain.diff() returns only added and changed records by default...

Let's use the same default for the incremental update.

delta makes more sense

Then let's use Delta 🙂

@codecov
Copy link

codecov bot commented Feb 24, 2025

Codecov Report

Attention: Patch coverage is 94.82759% with 6 lines in your changes missing coverage. Please review.

Project coverage is 87.97%. Comparing base (41af6ff) to head (ee27458).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/delta.py 95.55% 1 Missing and 1 partial ⚠️
src/datachain/diff/__init__.py 71.42% 1 Missing and 1 partial ⚠️
src/datachain/lib/dc/datachain.py 93.93% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #928      +/-   ##
==========================================
+ Coverage   87.93%   87.97%   +0.04%     
==========================================
  Files         147      148       +1     
  Lines       12655    12747      +92     
  Branches     1772     1783      +11     
==========================================
+ Hits        11128    11214      +86     
- Misses       1091     1094       +3     
- Partials      436      439       +3     
Flag Coverage Δ
datachain 87.90% <94.82%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin marked this pull request as ready for review February 25, 2025 15:34
@ilongin ilongin requested a review from shcheklein April 30, 2025 13:22
self.print_schema(file=file)
return file.getvalue()

def _as_delta(self, delta: bool = False) -> "Self":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it a regular ctr parameter (like we do with all other attrs) ... why do we need a special way of setting this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User should never set this parameter so I wanted to "hide" it a little bit by making it a private attribute and special private method to set it to True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users never actually use DataChain ctr anyway - it is kinda private already (and probably it has some attributes already that are technical, not use facing)

return self

@property
def delta(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it public? do we want it to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why it shouldn't be public. User could check if some chain is in "delta" mode or not. It is also used in some other internal methods for which it doesn't need to be public.
I can make it private as well, don't have strong opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that's fine .. but if we keep it public we need proper docs for it then ... and an example if you have something in mind

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor things that is left here @ilongin ... let's please take care of it

Copy link
Contributor

@shcheklein shcheklein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test it on the cattle care scenario? Video files, we extract and save frames for them / or extract and save subvideos ... (it is similar to one of the tutorias). Should it be working for such scenario? (let me know if you need access to the code)

@ilongin
Copy link
Contributor Author

ilongin commented Apr 30, 2025

Can we test it on the cattle care scenario? Video files, we extract and save frames for them / or extract and save subvideos ... (it is similar to one of the tutorias). Should it be working for such scenario? (let me know if you need access to the code)

Yes, please give me access to the code as I dont think I have it

@shcheklein
Copy link
Contributor

Yes, please give me access to the code as I dont think I have it

DMed you the link!

If two rows have the same values, they are considered the same (e.g., they
could be different versions of the same row in a versioned source).
This is used in the delta update to calculate the diff.
delta_right_on: A list of fields in the final dataset that correspond to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the final dataset? is it the result itself of the whole chain?

should we then specify this in save? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's it. I don't think we should specify this in .save() as these fields can be specific to source. For example when we will be able to handle multiple sources in delta update each source could have it's own match fields that correspond to different fields in final dataset. I was thinking to rename this field but haven't find better name...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, right_on (why right, why not left?) is not clear a all

we will be able to handle multiple sources in delta update each source could have it's own match fields that correspond to different fields in final dataset

are we going to do subtract per source? is it pretty much about changed / removed objects only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, right_on (why right, why not left?) is not clear a all

I've updated it to delta_result_on and added better explanation in docs. Let me know if this is better.

are we going to do subtract per source? is it pretty much about changed / removed objects only?

Currently delta works only for one (first) "starting" point e.g from_storage(...) or .from_dataset(...) but in future it will work for any starting point in chain e.g here we have 2 "starting points":

(
    chain.from_storage("s3://first-bucket", delta=True, delta_on=["id_first"], delta_result_on=["id"])
    .map(...)
    .union(dc.from_storage("s3://second-bucket", delta=True, delta_on=["id_second"], delta_result_on=["id"]))
    .mutate(id=ifelse(isnone(chain.C("id_first") ), chain.C("id_second"), chain.C("id_first)))
    .select_except("id_first", "id_second")
)

In this example id_first and id_second are all normalized to just id in final dataset and then we need to use that to match "diff" datasets with final dataset. Matching "diff" datasets with final datasets is important to keep only latest modified files i.e. remove old ones from final dataset.

This is used in the delta update to calculate the diff.
delta_result_on: A list of fields in the resulting dataset that correspond
to the `delta_on` fields from the source.
This is needed to identify rows that have changed in the source but are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we / can we detect deletions btw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at beginning we decided to ignore deletions for now. I do think we should re-consider this

Copy link
Contributor

@shcheklein shcheklein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we tested at least on some real scenarios, let's merge it.

@ilongin
Copy link
Contributor Author

ilongin commented May 13, 2025

If we tested at least on some real scenarios, let's merge it.

Testing today on cattle-care for some scripts... found some issue with dataset dependencies, pushed a fix. I want to test a little bit more today evening and then merge

@ilongin ilongin merged commit b57275a into main May 15, 2025
35 checks passed
@ilongin ilongin deleted the ilongin/798-incremental-update branch May 15, 2025 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Incremental update

5 participants