-
Notifications
You must be signed in to change notification settings - Fork 134
Incremental (delta) update #928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa09d0b
99a5327
f01b3a2
8fa1534
67824e6
c11d797
ee6640d
5e446b5
71c3469
83366aa
a22916c
d9e4f26
c622ba4
58c27f0
ad5ee5a
079ba7a
de026e3
d1d066e
046731b
9f52c8b
802a934
f3a7b12
3e7fa37
07968ac
d504461
d7b8623
0464c16
8093000
b505df7
0dd71a2
932f64b
b2430c4
c02d689
638e737
2b29498
626ba5a
e1c0325
b0958a8
400a991
59d1713
ce35a6e
e085280
735af02
f3ebf97
59b7666
055ce91
eaa8dc6
7d0a283
95a206b
de72329
55269ab
b7b16ba
723a1a6
9135b2d
c670e33
773b22d
e8de5f2
e8d6f2d
803345a
08a4c1b
b0470ce
5470104
114e80a
2ab1759
594ef7d
567d63f
5ca0689
3debff1
10b95cd
e1f60c7
be704a2
5decfeb
ab9f9a3
e2f5bf3
ee27458
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| from collections.abc import Sequence | ||
| from copy import copy | ||
| from functools import wraps | ||
| from typing import TYPE_CHECKING, Callable, Optional, TypeVar, Union | ||
|
|
||
| import datachain | ||
| from datachain.dataset import DatasetDependency | ||
| from datachain.error import DatasetNotFoundError | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing_extensions import Concatenate, ParamSpec | ||
|
|
||
| from datachain.lib.dc import DataChain | ||
|
|
||
| P = ParamSpec("P") | ||
|
|
||
|
|
||
| T = TypeVar("T", bound="DataChain") | ||
|
|
||
|
|
||
| def delta_disabled( | ||
| method: "Callable[Concatenate[T, P], T]", | ||
| ) -> "Callable[Concatenate[T, P], T]": | ||
| """ | ||
| Decorator for disabling DataChain methods (e.g `.agg()` or `.union()`) to | ||
| work with delta updates. It throws `NotImplementedError` if chain on which | ||
| method is called is marked as delta. | ||
| """ | ||
|
|
||
| @wraps(method) | ||
| def _inner(self: T, *args: "P.args", **kwargs: "P.kwargs") -> T: | ||
| if self.delta: | ||
| raise NotImplementedError( | ||
| f"Delta update cannot be used with {method.__name__}" | ||
| ) | ||
| return method(self, *args, **kwargs) | ||
|
|
||
| return _inner | ||
|
|
||
|
|
||
| def _append_steps(dc: "DataChain", other: "DataChain"): | ||
| """Returns cloned chain with appended steps from other chain. | ||
| Steps are all those modification methods applied like filters, mappers etc. | ||
| """ | ||
| dc = dc.clone() | ||
| dc._query.steps += other._query.steps.copy() | ||
| dc.signals_schema = other.signals_schema | ||
| return dc | ||
|
|
||
|
|
||
| def delta_update( | ||
| dc: "DataChain", | ||
| name: str, | ||
| on: Union[str, Sequence[str]], | ||
| right_on: Optional[Union[str, Sequence[str]]] = None, | ||
| compare: Optional[Union[str, Sequence[str]]] = None, | ||
| ) -> tuple[Optional["DataChain"], Optional[list[DatasetDependency]], bool]: | ||
| """ | ||
| Creates new chain that consists of the last version of current delta dataset | ||
| plus diff from the source with all needed modifications. | ||
| This way we don't need to re-calculate the whole chain from the source again( | ||
| apply all the DataChain methods like filters, mappers, generators etc.) | ||
| but just the diff part which is very important for performance. | ||
|
|
||
| Note that currently delta update works only if there is only one direct dependency. | ||
| """ | ||
| catalog = dc.session.catalog | ||
| dc._query.apply_listing_pre_step() | ||
|
|
||
| try: | ||
| latest_version = catalog.get_dataset(name).latest_version | ||
| except DatasetNotFoundError: | ||
| # first creation of delta update dataset | ||
| return None, None, True | ||
|
|
||
| dependencies = catalog.get_dataset_dependencies( | ||
| name, latest_version, indirect=False | ||
| ) | ||
|
|
||
| dep = dependencies[0] | ||
| if not dep: | ||
| # starting dataset (e.g listing) was removed so we are backing off to normal | ||
| # dataset creation, as it was created first time | ||
| return None, None, True | ||
|
|
||
| source_ds_name = dep.name | ||
| source_ds_version = dep.version | ||
| source_ds_latest_version = catalog.get_dataset(source_ds_name).latest_version | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this logic work with studio flags ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. I assumed all datasets are present locally and I think this will always be the case since the only case where one of the dependencies is not present locally, but in Studio, is when we pull dataset from it (we don't pull it's dependencies). But when using delta update, user is creating / saving dataset so this is not about pulling existing one.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, will I be able to use something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So in this example: dc.from_dataset("my-studio-ds", studio=True, delta=True).filter(...).save("my-delta-ds")This will happen:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if someone else is running this code and there are already a few versions of the dataset on Studio?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be the same, the last version of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, should we actually check / pull two versions from Studio in this case (initial run) - to be able to get the diff?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, this one is also not resolved yet ... can be a followup, but it seems like an issue for me |
||
| dependencies = copy(dependencies) | ||
| dependencies = [d for d in dependencies if d is not None] # filter out removed dep | ||
| dependencies[0].version = source_ds_latest_version # type: ignore[union-attr] | ||
|
|
||
| source_dc = datachain.read_dataset(source_ds_name, source_ds_version) | ||
| source_dc_latest = datachain.read_dataset(source_ds_name, source_ds_latest_version) | ||
|
|
||
| diff = source_dc_latest.compare(source_dc, on=on, compare=compare, deleted=False) | ||
| # We append all the steps from the original chain to diff, e.g filters, mappers. | ||
| diff = _append_steps(diff, dc) | ||
|
|
||
| # to avoid re-calculating diff multiple times | ||
| diff = diff.persist() | ||
|
|
||
| if diff.empty: | ||
| return None, None, False | ||
|
|
||
| # merging diff and the latest version of dataset | ||
| delta_chain = ( | ||
| datachain.read_dataset(name, latest_version) | ||
| .compare( | ||
| diff, | ||
| on=right_on or on, | ||
| added=True, | ||
| modified=False, | ||
| deleted=False, | ||
| ) | ||
| .union(diff) | ||
| ) | ||
|
|
||
| return delta_chain, dependencies, True # type: ignore[return-value] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,7 @@ | |
| SAME = "S" | ||
|
|
||
|
|
||
| def _compare( # noqa: C901 | ||
| def _compare( # noqa: C901, PLR0912 | ||
| left: "DataChain", | ||
| right: "DataChain", | ||
| on: Union[str, Sequence[str]], | ||
|
|
@@ -77,14 +77,16 @@ | |
| cols_select = list(left.signals_schema.clone_without_sys_signals().values.keys()) | ||
|
|
||
| # getting correct on and right_on column names | ||
| on_ = on | ||
| on = left.signals_schema.resolve(*on).db_signals() # type: ignore[assignment] | ||
| right_on = right.signals_schema.resolve(*(right_on or on)).db_signals() # type: ignore[assignment] | ||
| right_on = right.signals_schema.resolve(*(right_on or on_)).db_signals() # type: ignore[assignment] | ||
|
|
||
| # getting correct compare and right_compare column names if they are defined | ||
| if compare: | ||
| compare_ = compare | ||
| compare = left.signals_schema.resolve(*compare).db_signals() # type: ignore[assignment] | ||
| right_compare = right.signals_schema.resolve( | ||
| *(right_compare or compare) | ||
| *(right_compare or compare_) | ||
| ).db_signals() # type: ignore[assignment] | ||
| elif not compare and len(cols) != len(right_cols): | ||
| # here we will mark all rows that are not added or deleted as modified since | ||
|
|
@@ -155,7 +157,11 @@ | |
| if status_col: | ||
| cols_select.append(diff_col) | ||
|
|
||
| dc_diff = dc_diff.select(*cols_select) | ||
| if not dc_diff._sys: | ||
| # TODO workaround when sys signal is not available in diff | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does it mean?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After this change I had some issues in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it fixed? can we cleanup this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I will create separate issue to deal with this later on. I don't think it's high priority as this workaround works fine for now. It's tricky to fix though so that's why I don't want to spent too much time on it now
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when does it happen? if we have an issue with context - let's put a link in this todo please ... it should be clear as much as possible from the context what is happening here (and why) what is |
||
| dc_diff = dc_diff.settings(sys=True).select(*cols_select).settings(sys=False) | ||
| else: | ||
| dc_diff = dc_diff.select(*cols_select) | ||
|
|
||
| # final schema is schema from the left chain with status column added if needed | ||
| dc_diff.signals_schema = ( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
|
|
||
| from datachain import semver | ||
| from datachain.dataset import DatasetRecord | ||
| from datachain.delta import delta_disabled, delta_update | ||
| from datachain.func import literal | ||
| from datachain.func.base import Function | ||
| from datachain.func.func import Func | ||
|
|
@@ -72,6 +73,9 @@ | |
| P = ParamSpec("P") | ||
|
|
||
|
|
||
| T = TypeVar("T", bound="DataChain") | ||
|
|
||
|
|
||
| class DataChain: | ||
| """DataChain - a data structure for batch data processing and evaluation. | ||
|
|
||
|
|
@@ -164,6 +168,7 @@ | |
| self.signals_schema = signal_schema | ||
| self._setup: dict = setup or {} | ||
| self._sys = _sys | ||
| self._delta = False | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the chain.""" | ||
|
|
@@ -177,6 +182,32 @@ | |
| self.print_schema(file=file) | ||
| return file.getvalue() | ||
|
|
||
| def _as_delta( | ||
| self, | ||
| on: Optional[Union[str, Sequence[str]]] = None, | ||
| right_on: Optional[Union[str, Sequence[str]]] = None, | ||
| compare: Optional[Union[str, Sequence[str]]] = None, | ||
| ) -> "Self": | ||
| """Marks this chain as delta, which means special delta process will be | ||
| called on saving dataset for optimization""" | ||
| if on is None: | ||
| raise ValueError("'delta on' fields must be defined") | ||
| self._delta = True | ||
| self._delta_on = on | ||
| self._delta_result_on = right_on | ||
| self._delta_compare = compare | ||
| return self | ||
|
|
||
| @property | ||
| def empty(self) -> bool: | ||
| """Returns True if chain has zero number of rows""" | ||
| return not bool(self.count()) | ||
|
|
||
| @property | ||
| def delta(self) -> bool: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it public? do we want it to be public?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a reason why it shouldn't be public. User could check if some chain is in "delta" mode or not. It is also used in some other internal methods for which it doesn't need to be public.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, that's fine .. but if we keep it public we need proper docs for it then ... and an example if you have something in mind
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one minor things that is left here @ilongin ... let's please take care of it |
||
| """Returns True if this chain is ran in "delta" update mode""" | ||
| return self._delta | ||
|
|
||
| @property | ||
| def schema(self) -> dict[str, DataType]: | ||
| """Get schema of the chain.""" | ||
|
|
@@ -254,9 +285,17 @@ | |
| signal_schema = copy.deepcopy(self.signals_schema) | ||
| if _sys is None: | ||
| _sys = self._sys | ||
| return type(self)( | ||
| chain = type(self)( | ||
| query, settings, signal_schema=signal_schema, setup=self._setup, _sys=_sys | ||
| ) | ||
| if self.delta: | ||
| chain = chain._as_delta( | ||
| on=self._delta_on, | ||
| right_on=self._delta_result_on, | ||
| compare=self._delta_compare, | ||
| ) | ||
|
|
||
| return chain | ||
|
|
||
| def settings( | ||
| self, | ||
|
|
@@ -463,7 +502,7 @@ | |
| attrs: Optional[list[str]] = None, | ||
| update_version: Optional[str] = "patch", | ||
| **kwargs, | ||
| ) -> "Self": | ||
| ) -> "DataChain": | ||
| """Save to a Dataset. It returns the chain itself. | ||
|
|
||
| Parameters: | ||
|
|
@@ -490,6 +529,35 @@ | |
| ) | ||
|
|
||
| schema = self.signals_schema.clone_without_sys_signals().serialize() | ||
| if self.delta and name: | ||
| delta_ds, dependencies, has_changes = delta_update( | ||
| self, | ||
| name, | ||
| on=self._delta_on, | ||
| right_on=self._delta_result_on, | ||
| compare=self._delta_compare, | ||
| ) | ||
|
|
||
| if delta_ds: | ||
| return self._evolve( | ||
| query=delta_ds._query.save( | ||
| name=name, | ||
| version=version, | ||
| feature_schema=schema, | ||
| dependencies=dependencies, | ||
| **kwargs, | ||
| ) | ||
| ) | ||
|
|
||
| if not has_changes: | ||
| # sources have not been changed so new version of resulting dataset | ||
| # would be the same as previous one. To avoid duplicating exact | ||
| # datasets, we won't create new version of it and we will return | ||
| # current latest version instead. | ||
| from .datasets import read_dataset | ||
|
|
||
| return read_dataset(name, **kwargs) | ||
|
|
||
| return self._evolve( | ||
| query=self._query.save( | ||
| name=name, | ||
|
|
@@ -615,6 +683,7 @@ | |
| signal_schema=udf_obj.output, | ||
| ) | ||
|
|
||
| @delta_disabled | ||
| def agg( | ||
| self, | ||
| func: Optional[Callable] = None, | ||
|
|
@@ -768,6 +837,7 @@ | |
|
|
||
| return self._evolve(query=self._query.order_by(*args)) | ||
|
|
||
| @delta_disabled | ||
| def distinct(self, arg: str, *args: str) -> "Self": # type: ignore[override] | ||
| """Removes duplicate rows based on uniqueness of some input column(s) | ||
| i.e if rows are found with the same value of input column(s), only one | ||
|
|
@@ -802,6 +872,7 @@ | |
| query=self._query.select(*columns), signal_schema=new_schema | ||
| ) | ||
|
|
||
| @delta_disabled # type: ignore[arg-type] | ||
| def group_by( | ||
| self, | ||
| *, | ||
|
|
@@ -1160,6 +1231,7 @@ | |
| schema = self.signals_schema.clone_without_file_signals() | ||
| return self.select(*schema.values.keys()) | ||
|
|
||
| @delta_disabled | ||
| def merge( | ||
| self, | ||
| right_ds: "DataChain", | ||
|
|
@@ -1268,6 +1340,7 @@ | |
|
|
||
| return ds | ||
|
|
||
| @delta_disabled | ||
| def union(self, other: "Self") -> "Self": | ||
| """Return the set union of the two datasets. | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.