Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/guide/delta.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,23 @@ Delta processing can be combined with [retry processing](./retry.md) to create a

1. Processes only new or changed records (delta)
2. Reprocesses records with errors or that are missing (retry)

## Using Delta with Restricted Methods

By default, delta updates cannot be combined with the following methods:

1. `merge`
2. `union`
3. `distinct`
4. `agg`
5. `group_by`

These methods are restricted because they may produce **unexpected results** when used with delta processing. Delta runs the chain only on a subset of rows (new and changed records), while methods like `distinct`, `agg`, or `group_by` are designed to operate on the entire dataset.

Similarly, combining delta with methods like `merge` or `union` may result in duplicated rows when merging with a static dataset.

If you still need to use these methods together with delta, you can override this restriction by setting the additional flag:

```python
delta_unsafe=True
```
46 changes: 27 additions & 19 deletions src/datachain/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING, Callable, Optional, TypeVar, Union

import datachain
from datachain.dataset import DatasetDependency
from datachain.dataset import DatasetDependency, DatasetRecord
from datachain.error import DatasetNotFoundError
from datachain.project import Project

Expand All @@ -30,9 +30,10 @@ def delta_disabled(

@wraps(method)
def _inner(self: T, *args: "P.args", **kwargs: "P.kwargs") -> T:
if self.delta:
if self.delta and not self._delta_unsafe:
raise NotImplementedError(
f"Delta update cannot be used with {method.__name__}"
f"Cannot use {method.__name__} with delta datasets - may cause"
" inconsistency. Use delta_unsafe flag to allow this operation."
)
return method(self, *args, **kwargs)

Expand Down Expand Up @@ -128,6 +129,7 @@ def _get_retry_chain(


def _get_source_info(
source_ds: DatasetRecord,
name: str,
namespace_name: str,
project_name: str,
Expand All @@ -154,25 +156,23 @@ def _get_source_info(
indirect=False,
)

dep = dependencies[0]
if not dep:
source_ds_dep = next((d for d in dependencies if d.name == source_ds.name), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was it catched by the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, test was inconsistent and there was a bug when things like merge used - we didn't expect more than 1 dependencies before but I guess now it can happen

if not source_ds_dep:
# Starting dataset was removed, back off to normal dataset creation
return None, None, None, None, None

source_ds_project = catalog.metastore.get_project(dep.project, dep.namespace)
source_ds_name = dep.name
source_ds_version = dep.version
source_ds_latest_version = catalog.get_dataset(
source_ds_name,
namespace_name=source_ds_project.namespace.name,
project_name=source_ds_project.name,
).latest_version
# Refresh starting dataset to have new versions if they are created
source_ds = catalog.get_dataset(
source_ds.name,
namespace_name=source_ds.project.namespace.name,
project_name=source_ds.project.name,
)

return (
source_ds_name,
source_ds_project,
source_ds_version,
source_ds_latest_version,
source_ds.name,
source_ds.project,
source_ds_dep.version,
source_ds.latest_version,
dependencies,
)

Expand Down Expand Up @@ -244,7 +244,14 @@ def delta_retry_update(
source_ds_version,
source_ds_latest_version,
dependencies,
) = _get_source_info(name, namespace_name, project_name, latest_version, catalog)
) = _get_source_info(
dc._query.starting_step.dataset, # type: ignore[union-attr]
name,
namespace_name,
project_name,
latest_version,
catalog,
)

# If source_ds_name is None, starting dataset was removed
if source_ds_name is None:
Expand All @@ -267,8 +274,9 @@ def delta_retry_update(
if dependencies:
dependencies = copy(dependencies)
dependencies = [d for d in dependencies if d is not None]
source_ds_dep = next(d for d in dependencies if d.name == source_ds_name)
# Update to latest version
dependencies[0].version = source_ds_latest_version # type: ignore[union-attr]
source_ds_dep.version = source_ds_latest_version # type: ignore[union-attr]

# Handle retry functionality if enabled
if delta_retry:
Expand Down
8 changes: 8 additions & 0 deletions src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def __init__(
self._setup: dict = setup or {}
self._sys = _sys
self._delta = False
self._delta_unsafe = False
self._delta_on: Optional[Union[str, Sequence[str]]] = None
self._delta_result_on: Optional[Union[str, Sequence[str]]] = None
self._delta_compare: Optional[Union[str, Sequence[str]]] = None
Expand All @@ -216,6 +217,7 @@ def _as_delta(
right_on: Optional[Union[str, Sequence[str]]] = None,
compare: Optional[Union[str, Sequence[str]]] = None,
delta_retry: Optional[Union[bool, str]] = None,
delta_unsafe: bool = False,
) -> "Self":
"""Marks this chain as delta, which means special delta process will be
called on saving dataset for optimization"""
Expand All @@ -226,6 +228,7 @@ def _as_delta(
self._delta_result_on = right_on
self._delta_compare = compare
self._delta_retry = delta_retry
self._delta_unsafe = delta_unsafe
return self

@property
Expand All @@ -238,6 +241,10 @@ def delta(self) -> bool:
"""Returns True if this chain is ran in "delta" update mode"""
return self._delta

@property
def delta_unsafe(self) -> bool:
return self._delta_unsafe

@property
def schema(self) -> dict[str, DataType]:
"""Get schema of the chain."""
Expand Down Expand Up @@ -328,6 +335,7 @@ def _evolve(
right_on=self._delta_result_on,
compare=self._delta_compare,
delta_retry=self._delta_retry,
delta_unsafe=self._delta_unsafe,
)

return chain
Expand Down
4 changes: 4 additions & 0 deletions src/datachain/lib/dc/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def read_dataset(
delta_result_on: Optional[Union[str, Sequence[str]]] = None,
delta_compare: Optional[Union[str, Sequence[str]]] = None,
delta_retry: Optional[Union[bool, str]] = None,
delta_unsafe: bool = False,
update: bool = False,
) -> "DataChain":
"""Get data from a saved Dataset. It returns the chain itself.
Expand Down Expand Up @@ -80,6 +81,8 @@ def read_dataset(
update: If True always checks for newer versions available on Studio, even if
some version of the dataset exists locally already. If False (default), it
will only fetch the dataset from Studio if it is not found locally.
delta_unsafe: Allow restricted ops in delta: merge, agg, union, group_by,
distinct.


Example:
Expand Down Expand Up @@ -205,6 +208,7 @@ def read_dataset(
right_on=delta_result_on,
compare=delta_compare,
delta_retry=delta_retry,
delta_unsafe=delta_unsafe,
)

return chain
Expand Down
5 changes: 5 additions & 0 deletions src/datachain/lib/dc/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def read_storage(
delta_result_on: Optional[Union[str, Sequence[str]]] = None,
delta_compare: Optional[Union[str, Sequence[str]]] = None,
delta_retry: Optional[Union[bool, str]] = None,
delta_unsafe: bool = False,
client_config: Optional[dict] = None,
) -> "DataChain":
"""Get data from storage(s) as a list of file with all file attributes.
Expand Down Expand Up @@ -77,6 +78,9 @@ def read_storage(
(error mode)
- True: Reprocess records missing from the result dataset (missing mode)
- None: No retry processing (default)
delta_unsafe: Allow restricted ops in delta: merge, agg, union, group_by,
distinct. Caller must ensure datasets are consistent and not partially
updated.

Returns:
DataChain: A DataChain object containing the file information.
Expand Down Expand Up @@ -218,6 +222,7 @@ def lst_fn(ds_name, lst_uri):
right_on=delta_result_on,
compare=delta_compare,
delta_retry=delta_retry,
delta_unsafe=delta_unsafe,
)

return storage_chain
Loading
Loading