Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
24a82a5
added hash functions
ilongin Sep 8, 2025
eedd2b8
added hash functions to all steps, not implemented for now
ilongin Sep 8, 2025
1c8dcce
using custom hash method
ilongin Sep 8, 2025
b051b9a
added tests for select and select except
ilongin Sep 8, 2025
72dc833
fixing method name
Sep 9, 2025
8cc6582
added hash for filter
Sep 10, 2025
172e677
added one filter test case
ilongin Sep 10, 2025
781e935
Merge branch 'main' into ilongin/1319-datachain-hash
ilongin Sep 10, 2025
dfcf0a9
added hash for SQLMutate
ilongin Sep 10, 2025
7862515
added order by hash and tests
ilongin Sep 10, 2025
f17ea92
added sqllimit hash
ilongin Sep 10, 2025
46051be
added sqloffset hash
ilongin Sep 10, 2025
7032af4
added count hash
ilongin Sep 10, 2025
01cbd9c
added distinct hash
ilongin Sep 10, 2025
26e64a3
added union hash logic
ilongin Sep 10, 2025
579ab94
added test for sqlunion hash
ilongin Sep 10, 2025
119316c
added hash for join
ilongin Sep 10, 2025
e60ff36
added hash for group by
ilongin Sep 11, 2025
590acb1
added hash subtract
ilongin Sep 11, 2025
214eb6f
Merge branch 'main' into ilongin/1319-datachain-hash
ilongin Sep 11, 2025
f8e2f40
added comments
ilongin Sep 11, 2025
c4a493d
added hash for udf
Sep 14, 2025
caba039
finish adding udf hash tests
Sep 14, 2025
5c26a8c
created hash utils
Sep 14, 2025
1d6d64c
added hash utils test file
Sep 15, 2025
94d7258
removing prints
Sep 15, 2025
279dc06
added hash callable test
Sep 15, 2025
93b4f7c
updated query step hash and added tests
Sep 15, 2025
bf03730
added basic datachain hash tests
Sep 15, 2025
46abe90
added new test
ilongin Sep 16, 2025
3e260b3
added more steps
ilongin Sep 16, 2025
d75b483
added diff test which is currently inconsistent
ilongin Sep 16, 2025
2d83c18
skipping tests
ilongin Sep 16, 2025
7cbef9c
renamed var
ilongin Sep 16, 2025
0d53b4a
fixing tests
ilongin Sep 17, 2025
125cbd1
fixing diff
ilongin Sep 17, 2025
b36fd66
fixing column element
ilongin Sep 17, 2025
b5cfaea
updated naming
ilongin Sep 17, 2025
bae876f
renamned hash methods
ilongin Sep 17, 2025
041e8e8
refactoring
ilongin Sep 17, 2025
99ca59a
Merge branch 'main' into ilongin/1319-datachain-hash
ilongin Sep 17, 2025
3def3bd
refactoring and more test cases
ilongin Sep 19, 2025
1cb1216
changed to Union
ilongin Sep 22, 2025
6c631e5
Merge branch 'main' into ilongin/1319-datachain-hash
ilongin Sep 23, 2025
4d7fbf0
added func rand in hash tests
ilongin Sep 23, 2025
d188adf
using ensure sequence
ilongin Sep 23, 2025
fd1099d
added test with multiple chained diffs
ilongin Sep 23, 2025
64a9988
fixing hash test
ilongin Sep 23, 2025
32c987c
fixing hash callable
ilongin Sep 23, 2025
17e5e19
debugging
ilongin Sep 23, 2025
718c515
fixing hash functions
ilongin Sep 23, 2025
731636b
fixing tests to work for saas
ilongin Sep 23, 2025
6b77576
added correct window function
ilongin Sep 23, 2025
43670d1
added more tests
ilongin Sep 23, 2025
9af27b4
Merge branch 'main' into ilongin/1319-datachain-hash
ilongin Sep 24, 2025
29ba477
fixing calling right hash method
ilongin Sep 24, 2025
e6576d2
added prints
ilongin Sep 24, 2025
d9458e1
returned to random hash for now
ilongin Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions src/datachain/diff/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import random
import string
from collections.abc import Sequence
from enum import Enum
from typing import TYPE_CHECKING, Optional, Union
Expand All @@ -11,16 +9,12 @@
if TYPE_CHECKING:
from datachain.lib.dc import DataChain


C = Column


def get_status_col_name() -> str:
"""Returns new unique status col name"""
return "diff_" + "".join(
random.choice(string.ascii_letters) # noqa: S311
for _ in range(10)
)
STATUS_COL_NAME = "diff_7aeed3aa17ba4d50b8d1c368c76e16a6"
LEFT_DIFF_COL_NAME = "diff_95f95344064a4b819c8625cd1a5cfc2b"
RIGHT_DIFF_COL_NAME = "diff_5808838a49b54849aa461d7387376d34"
Copy link
Contributor Author

@ilongin ilongin Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed to make hash function consistent. Before we were generating random names for these temporary columns that are added during diff process and removed before the end but that was causing .hash() function of chain having .diff() to be inconsistent since column names added with mutate are part of hash calculation.

Workaround is not to always generate full random column name, but just set some constant that is already random enough not to cause collision with user's columns and since it's constant, hash will be consistent as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, but I can't resist to put it here 😅
image
Although the question is: how will it works with multiple diff operations, applied to the same chain? 🤔 Are there any cases when it might overlaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just didn't have any other ideas how to fix this but it shouldn't affect our diff logic. Good point about multiple diffs in the single chain - I've added the test for that and it works fine.

Regarding randomization, those randoms were used for column names to not collide with user columns and when you think about it, it's not very different than just setting large "random" uuid. It's the same chance to collision which is ~ 0 . ... but it does look strange in code, I do agree with that :D

Copy link
Contributor

@dreadatour dreadatour Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding randomization, those randoms were used for column names to not collide with user columns and when you think about it, it's not very different than just setting large "random" uuid.

Yes, I know 😅 This image is just a perfect fit for this case, I could not resist 🤣

Good point about multiple diffs in the single chain - I've added the test for that and it works fine.

Awesome! Thank you! 🙏



class CompareStatus(str, Enum):
Expand Down Expand Up @@ -101,9 +95,9 @@ def _to_list(obj: Optional[Union[str, Sequence[str]]]) -> Optional[list[str]]:
compare = right_compare = [c for c in cols if c in right_cols and c not in on] # type: ignore[misc]

# get diff column names
diff_col = status_col or get_status_col_name()
ldiff_col = get_status_col_name()
rdiff_col = get_status_col_name()
diff_col = status_col or STATUS_COL_NAME
ldiff_col = LEFT_DIFF_COL_NAME
rdiff_col = RIGHT_DIFF_COL_NAME

# adding helper diff columns, which will be removed after
left = left.mutate(**{ldiff_col: 1})
Expand Down Expand Up @@ -227,7 +221,7 @@ def compare_and_split(
)
```
"""
status_col = get_status_col_name()
status_col = STATUS_COL_NAME

res = _compare(
left,
Expand Down
147 changes: 147 additions & 0 deletions src/datachain/hash_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import hashlib
import inspect
import json
import textwrap
from collections.abc import Sequence
from typing import TypeVar, Union

from sqlalchemy.sql.elements import (
BinaryExpression,
BindParameter,
ColumnElement,
Label,
Over,
UnaryExpression,
)
from sqlalchemy.sql.functions import Function

T = TypeVar("T", bound=ColumnElement)
ColumnLike = Union[str, T]


def serialize_column_element(expr: Union[str, ColumnElement]) -> dict: # noqa: PLR0911
"""
Recursively serialize a SQLAlchemy ColumnElement into a deterministic structure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use str(query) or something like this? seems too complicated atm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible because:

  1. It depends on a dialect. By default SQLAlchemy uses default dialect, but if table from which we generate expression is bound to engine it will create different string representation for different DBs
  2. Bind parameter names are unstable - there is no guarantee that col == 5 will always produce "col = :param_1" .. sometimes it can produce "col = :param_2"
  3. SQLAlchemy version changes - The stringification format is not a stable API. A minor upgrade could produce different SQL text for the same expression → inconsistent hashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on a dialect. By default SQLAlchemy uses default dialect, but if table from which we generate expression is bound to engine it will create different string representation for different DBs

do we care about this for hash?

SQLAlchemy version changes - The stringification format is not a stable API. A minor upgrade could produce different SQL text for the same expression → inconsistent hashes.

same - is it important? it seems not tbh ...

Bind parameter names are unstable - there is no guarantee that col == 5 will always produce "col = :param_1" .. sometimes it can produce "col = :param_2"

what are the cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should care about all 3 of those items.

About the unstable param names - I think that param names depend on ordering which should not affect the hash (e,g orders of elements in and_ clause etc.) .. but in addition, constants are not included in representation so these 2 expressions produce same hashes even though they are clearly different:

>>> from datachain import C
>>> expr = C("age") == 20
>>> str(expr)
'age = :age_1'
>>> expr = C("age") == 30
>>> str(expr)
'age = :age_1'

Copy link
Contributor

@shcheklein shcheklein Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. but in addition, constants are not included in representation

can we / don't we take all values separately in the Step? it should be simple I guess?

I don't think we should care about order - again, we are complicating this ahead of time I feel

I think we should care about all 3 of those items.

could you elaborate please?

Copy link
Contributor Author

@ilongin ilongin Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we / don't we take all values separately in the Step? it should be simple I guess?

Inputs to the .filter() (and a lot of other DataChain methods) is a list of sqlalchemy.ColumnElement which can be column, binary expression, unary expression, over, function (all those classes that are handled in questionable serialize_column_element function btw).
So in this example: chain.filter(C("numbers") > 3, C("numbers") < 10) we have 2 arguments, both binary expressions. If we would do just str(e) for each expression we would not get constant values in that string -> hash is incorrect. In order to get constant values we would need to parse that binary expression down the tree and that is actually exactly what that serialize_column_element is doing but it's also returning correct hashable representation (dicts) which are hashed after that in another method.
TL;DR; It's impossible to achieve correct / consistent hash with just using str() on inputs.

BTW str(dc.func.and_(C("numbers") > 3, C("file.path").glob("*.jpg"))) == and().

I think we should care about all 3 of those items.

could you elaborate please?

  1. If we don't have to I don't think we should be dialect depended in hash calculation. The more hash is generic, the better. If hash is coupled with dialect then 2 identical chains will produce different hashes locally and on Studio and we always want to avoid this.
  2. Explained above why str(expression) is not enough - it's not consistent and won't work correctly for hash calculation
  3. str(expr) is just string representation of query class and is more prone to changes - I don't think it's meant to be used for important parts of the code, let alone hash calculation but more like printing queries in logs etc.

Copy link
Contributor

@shcheklein shcheklein Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we use something like str(expr.compile(..., compile_kwargs={"literal_binds": True}))?
    1 and 3. Still not clear why it should be that stable. It is made to restart a job right away, but not to make it portable. If we need it - we can add it later.

I would say - Let's simplify unless we really have a blocker for a basic requirement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My very first thought was to use compile with literal_binds but I quickly decided not to. Beside the fact that current implementation is also very straightfoward and not complex there are multiple reasons not to go with compile, each of those are deal breaker IMO:

  1. Already mentioned consistency with compilers - maybe it's not direct requirement now but it's still nice to have consistent hashing across every backend and what is important now is that then we wouldn't be able to write tests that are expecting specific hash value as Studio (PG) and local (SQLite) tests would end up with different value which means we won't be able to test regression - this means I would also need to refactor current tests.
  2. Alias naming - SQLAlchemy automatically adds aliases sometimes to compiled statements and those are really not stable, not sure what they depend on but I've read it can be due to compiler internal state. Also there might be issue with whitespaces, parenthesis, changing ordering of elements in AND / OR functions etc.
  3. I've read in the past that those literal_binds are not completely stable - they do not work for some cases .. though this would need further investigation .. in general that method doesn't guarantee stable results and that is the issue for hashing.
  4. It's slower 30-40% than just python object traversal (this might not be that important though)

"""

# Binary operations: col > 5, col1 + col2, etc.
if isinstance(expr, BinaryExpression):
op = (
expr.operator.__name__
if hasattr(expr.operator, "__name__")
else str(expr.operator)
)
return {
"type": "binary",
"op": op,
"left": serialize_column_element(expr.left),
"right": serialize_column_element(expr.right),
}

# Unary operations: -col, NOT col, etc.
if isinstance(expr, UnaryExpression):
op = (
expr.operator.__name__
if expr.operator is not None and hasattr(expr.operator, "__name__")
else str(expr.operator)
)

return {
"type": "unary",
"op": op,
"element": serialize_column_element(expr.element), # type: ignore[arg-type]
}

# Function calls: func.lower(col), func.count(col), etc.
if isinstance(expr, Function):
return {
"type": "function",
"name": expr.name,
"clauses": [serialize_column_element(c) for c in expr.clauses],
}

# Window functions: func.row_number().over(partition_by=..., order_by=...)
if isinstance(expr, Over):
return {
"type": "window",
"function": serialize_column_element(expr.element),
"partition_by": [
serialize_column_element(p) for p in getattr(expr, "partition_by", [])
],
"order_by": [
serialize_column_element(o) for o in getattr(expr, "order_by", [])
],
}

# Labeled expressions: col.label("alias")
if isinstance(expr, Label):
return {
"type": "label",
"name": expr.name,
"element": serialize_column_element(expr.element),
}

# Bound values (constants)
if isinstance(expr, BindParameter):
return {"type": "bind", "value": expr.value}

# Plain columns
if hasattr(expr, "name"):
return {"type": "column", "name": expr.name}

# Fallback: stringify unknown nodes
return {"type": "other", "repr": str(expr)}


def hash_column_elements(columns: Sequence[ColumnLike]) -> str:
"""
Hash a list of ColumnElements deterministically, dialect agnostic.
Only accepts ordered iterables (like list or tuple).
"""
serialized = [serialize_column_element(c) for c in columns]
json_str = json.dumps(serialized, sort_keys=True) # stable JSON
return hashlib.sha256(json_str.encode("utf-8")).hexdigest()


def hash_callable(func):
"""
Calculate a hash from a callable.
Rules:
- Named functions (def) → use source code for stable, cross-version hashing
- Lambdas → use bytecode (deterministic in same Python runtime)
"""
if not callable(func):
raise TypeError("Expected a callable")

# Determine if it is a lambda
is_lambda = func.__name__ == "<lambda>"

if not is_lambda:
# Try to get exact source of named function
try:
lines, _ = inspect.getsourcelines(func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we extract source code here? do we include it into hash? (this is not what is expected)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use UDF function source to calculate hash. I know we spoke how user should be able to change UDF function code and continue on that UDF and this doesn't block that.

This are implementation details / my plans about UDF checkpoints:

  1. At beginning of UDF create special partial checkpoint of the chain that enters the UDF (without UDF itself). All partial results will be connected to this checkpoint and this will be active until UDF is 100% success
  2. When UDF is 100% success / done we create another normal / non partial checkpoint that now includes UDF itself into calculation (here we use this function code for hash). This is to be able to continue from this UDF if something breaks in the future (avoid UDF re-calculation) and also to be able to recalculate UDF if user decides to change UDF code after even if UDF is successfully done (maybe user realized there is still something he missed in that UDF later on). I think this is important as we need to give user ability to re-run that UDF from beginning, without the need to re-run the whole job for example.

Let me know if you see some issues in the approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, my main concern is complexity while it doesn't fully solve the issue - anything outside in the code can change that leads to change in the UDF - e.g. it is using some helper that changed? or some package? We won't be able to detect this - so unless I'm missing something I would keep it simple, skip recalculation no matter what and have some mechanism to trigger full recall (or partial on the UDF side - e.g. by introducing some version of the function as a param).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, this hash will only catch function code changes itself, not the helper method code changes or lib changes. But my feeling was that it's good enough for the users (we can clearly communicate this to them) and it's def better than to only give option to recalculate the whole job. ... idk

BTW it's not that hard to add those helper methods changes affect the hash as well, the only problematic part are the external libs (for them we can only include name + type in the hash so hash will change if user stops using one or adds another one etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's def better than to only give option to recalculate the whole job.

100%

this hash will only catch function code changes itself, not the helper method code changes or lib changes.

the worst thing here - is that it will be unpredictable - sometimes it works, sometimes doesn't. I don't think we want that level of magic

let's please simplify this, at least for now

payload = textwrap.dedent("".join(lines)).strip()
except (OSError, TypeError):
# Fallback: bytecode if source not available
payload = func.__code__.co_code
else:
# For lambdas, fall back directly to bytecode
payload = func.__code__.co_code

# Normalize annotations
annotations = {
k: getattr(v, "__name__", str(v)) for k, v in func.__annotations__.items()
}

# Extras to distinguish functions with same code but different metadata
extras = {
"name": func.__name__,
"defaults": func.__defaults__,
"annotations": annotations,
}

# Compute SHA256
h = hashlib.sha256()
h.update(str(payload).encode() if isinstance(payload, str) else payload)
h.update(str(extras).encode())
return h.hexdigest()
10 changes: 9 additions & 1 deletion src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ def __repr__(self) -> str:
self.print_schema(file=file)
return file.getvalue()

def hash(self) -> str:
"""
Calculates SHA hash of this chain. Hash calculation is fast and consistent.
It takes into account all the steps added to the chain and their inputs.
Order of the steps is important.
"""
return self._query.hash()

def _as_delta(
self,
on: Optional[Union[str, Sequence[str]]] = None,
Expand Down Expand Up @@ -682,7 +690,7 @@ def save( # type: ignore[override]

if job_id := os.getenv("DATACHAIN_JOB_ID"):
catalog.metastore.create_checkpoint(
job_id, # type: ignore[arg-type]
job_id,
_hash=hashlib.sha256( # TODO this will be replaced with self.hash()
str(uuid4()).encode()
).hexdigest(),
Expand Down
7 changes: 7 additions & 0 deletions src/datachain/lib/signal_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import hashlib
import json
import warnings
from collections.abc import Iterator, Sequence
from dataclasses import dataclass
Expand Down Expand Up @@ -257,6 +259,11 @@ def serialize(self) -> dict[str, Any]:
signals["_custom_types"] = custom_types
return signals

def hash(self) -> str:
"""Create SHA hash of this schema"""
json_str = json.dumps(self.serialize(), sort_keys=True, separators=(",", ":"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how often / when do we call it?

it is a very heavy operation AFAIU ...

what if implementation becomes unstable (e.g. some model names are different each time, etc) -will we catch it with tests? how can we guarantee this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call it for each UDF in the chain when we calculate the whole chain hash. Chain hash will be calculated:

  1. On .save()
  2. In each UDF itself

IMO we should not worry about this being bottleneck. I will test with some big schema to be 100% sure, but since this is only converting column and their types to string implementation (doesn't do any IO or similar) I would guess this is not even on the same page with UDF calculation, or applying all the steps in chain.

Regarding instability, can you give example of something that could happen? I'm trying to come up with scenario but don't have much ideas .. SignalsSchema.hash() depends on schema serialization which should always be consistent , otherwise we have big issue in our system regardless checkpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't tbh ... I remember we have things like random names for generated DataModels in some places ... SignalSchema overall and serialization is quite complex - I think it is worth reviewing it high level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In each UDF itself

Once? or per each input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each UDF we calculate it twice, for params and outputs - both are instances of SignalSchema in udf codebase.

return hashlib.sha256(json_str.encode("utf-8")).hexdigest()

@staticmethod
def _split_subtypes(type_name: str) -> list[str]:
"""This splits a list of subtypes, including proper square bracket handling."""
Expand Down
20 changes: 20 additions & 0 deletions src/datachain/lib/udf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import sys
import traceback
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
Expand All @@ -12,6 +13,7 @@
from datachain.asyn import AsyncMapper
from datachain.cache import temporary_cache
from datachain.dataset import RowDict
from datachain.hash_utils import hash_callable
from datachain.lib.convert.flatten import flatten
from datachain.lib.file import DataModel, File
from datachain.lib.utils import AbstractUDF, DataChainError, DataChainParamsError
Expand Down Expand Up @@ -61,6 +63,9 @@ class UDFAdapter:
batch_size: Optional[int] = None
batch: int = 1

def hash(self) -> str:
return self.inner.hash()

def get_batching(self, use_partitioning: bool = False) -> BatchingStrategy:
if use_partitioning:
return Partition()
Expand Down Expand Up @@ -151,6 +156,21 @@ def __init__(self):
self.output = None
self._func = None

def hash(self) -> str:
"""
Creates SHA hash of this UDF function. It takes into account function,
inputs and outputs.
"""
parts = [
hash_callable(self._func),
self.params.hash() if self.params else "",
self.output.hash(),
]

return hashlib.sha256(
b"".join([bytes.fromhex(part) for part in parts])
).hexdigest()

def process(self, *args, **kwargs):
"""Processing function that needs to be defined by user"""
if not self._func:
Expand Down
Loading
Loading