Skip to content

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Sep 9, 2025

Implements DataChain.hash() that is calculating hash for current definition of chain (steps). Two chains with identical steps should have the same hash. Ordering of steps is important. Hash calculation is fast and consistent - it's not making any DB calls or compiling to SQL queries and it always produce the same result for same chain definition.

Hash for chain is calculated by combining hashes of each step that is added to the chain. Hash for each steps is generally calculated by hashing it's inputs (e.g expressions added to .filter()) and adding step name to the calculation.
Hash function is implemented for steps:

  • Select and SelectExcept
  • Mutate
  • Filter
  • OrderBy
  • Limit
  • Offset
  • Count
  • Distinct
  • Union
  • Join
  • GroupBy
  • Subtract
  • UDFSignal
  • RowGenerator

Limitations:

Currently hash is inconsistent for chains that starts with .read_values(). This is because in order to be correct, we would need to calculate hash of whole input to .read_values() and that is not implemented yet. It will be added in followup.

Important changes

Summary by Sourcery

Introduce a hashing framework by adding hash() methods to the query step interface and DataChain, enabling reproducible chain-level hashes based on step definitions.

New Features:

  • Add DataChain.hash() to compute a unique SHA-256 hash for the entire chain definition
  • Define hash() as an abstract method on Step and provide stub implementations on all SQL and UDF step classes
  • Implement QueryStep.hash() to return the dataset version UUID from the catalog

Summary by Sourcery

Introduce a hashing framework to compute fast, consistent SHA-256 hashes for DataChain definitions by combining step-level hashes without external dependencies.

New Features:

  • Add DataChain.hash() to compute a unique hash of the entire chain definition
  • Introduce hash_inputs and hash methods on the Step interface and implement them on all SQL and UDF step classes
  • Add SignalSchema.hash() to generate a deterministic schema-level hash
  • Create a new hash_utils module with functions to serialize and hash SQL expression elements and callables

Enhancements:

  • Replace dynamic diff column name generation with fixed constants for deterministic behavior
  • Add ensure_sequence helper in utils for consistent sequence handling
  • Seed stable dataset version UUID in test fixtures to enable reproducible hash tests
  • Extend UDFAdapter to propagate hash calls to underlying UDF wrappers

Tests:

  • Add comprehensive parameterized tests for hashing behavior of each query step and UDF step
  • Add unit tests for DataChain.hash(), SignalSchema.hash(), and all hash_utils functions to verify deterministic outputs

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Sep 9, 2025

Reviewer's Guide

This PR introduces a deterministic SHA-256 hashing framework across DataChain and its query steps, enabling chain- and step-level hashes based solely on in-memory definitions. It adds abstract hashing methods to the Step interface, concrete implementations for every SQL and UDF step, a new hash_utils module for serializing and hashing SQLAlchemy elements and callables, a schema-level hash in SignalSchema, stable diff column constants, and comprehensive tests to validate consistency.

ER diagram for stable diff column constants in datachain.diff

erDiagram
    DIFF_COLUMNS {
        STATUS_COL_NAME string
        LEFT_DIFF_COL_NAME string
        RIGHT_DIFF_COL_NAME string
    }
    DIFF_COLUMNS ||--o{ CompareStatus : uses
    DIFF_COLUMNS ||--o{ _compare : referenced_by
    DIFF_COLUMNS ||--o{ compare_and_split : referenced_by
Loading

Class diagram for new and updated hash methods in DataChain and Step classes

classDiagram
    class DataChain {
        +hash() str
    }
    class Step {
        <<abstract>>
        +hash_inputs() str
        +hash() str
    }
    class QueryStep {
        +hash() str
    }
    class SQLClause {
        +hash_inputs() str
    }
    class SQLSelect {
        +hash_inputs() str
    }
    class SQLSelectExcept {
        +hash_inputs() str
    }
    class SQLMutate {
        +hash_inputs() str
    }
    class SQLFilter {
        +hash_inputs() str
    }
    class SQLOrderBy {
        +hash_inputs() str
    }
    class SQLLimit {
        +hash_inputs() str
    }
    class SQLOffset {
        +hash_inputs() str
    }
    class SQLCount {
        +hash_inputs() str
    }
    class SQLDistinct {
        +hash_inputs() str
    }
    class SQLUnion {
        +hash_inputs() str
    }
    class SQLJoin {
        +hash_inputs() str
    }
    class SQLGroupBy {
        +hash_inputs() str
    }
    class UDFStep {
        +hash_inputs() str
    }
    class UDFAdapter {
        +hash() str
    }
    class UDF {
        +hash() str
    }
    class SignalSchema {
        +hash() str
    }
    Step <|-- QueryStep
    Step <|-- SQLClause
    SQLClause <|-- SQLSelect
    SQLClause <|-- SQLSelectExcept
    SQLClause <|-- SQLMutate
    SQLClause <|-- SQLFilter
    SQLClause <|-- SQLOrderBy
    SQLClause <|-- SQLLimit
    SQLClause <|-- SQLOffset
    SQLClause <|-- SQLCount
    SQLClause <|-- SQLDistinct
    SQLClause <|-- SQLGroupBy
    Step <|-- SQLUnion
    Step <|-- SQLJoin
    Step <|-- UDFStep
    UDFStep <|-- UDFAdapter
    UDFAdapter <|-- UDF
    DataChain --> "1" QueryStep
    UDFStep --> "1" UDFAdapter
    UDFAdapter --> "1" UDF
    UDF --> "1" SignalSchema
Loading

Class diagram for new hash_utils module functions

classDiagram
    class hash_utils {
        +serialize_column_element(expr) dict
        +hash_column_elements(columns) str
        +hash_callable(func) str
    }
Loading

File-Level Changes

Change Details Files
Add hashing framework to Step interface and implement step-specific hash_inputs and hash methods
  • Define abstract hash_inputs on the Step base class
  • Introduce default Step.hash combining class name and input hash
  • Implement hash_inputs in all SQLClause subclasses (Select, Mutate, Filter, etc.)
  • Provide hash in UDFStep, QueryStep, UDFAdapter, and RowGenerator
src/datachain/query/dataset.py
src/datachain/lib/udf.py
Implement chain-level hashing in DataChain
  • Add DataChain.hash delegating to the underlying query
  • Extend Query.hash to include starting step or dataset name and ordered step hashes
src/datachain/lib/dc/datachain.py
src/datachain/query/dataset.py
Introduce hash_utils for serializing and hashing SQL elements and callables
  • Add serialize_column_element for deterministic ColumnElement serialization
  • Implement hash_column_elements to compute SHA-256 over serialized expressions
  • Provide hash_callable for hashing Python callables
  • Add ensure_sequence helper in utils
src/datachain/hash_utils.py
src/datachain/utils.py
Add SignalSchema.hash method for schema-level hashing
  • Serialize SignalSchema via stable JSON and compute SHA-256
  • Add parameterized tests for schema hash
src/datachain/lib/signal_schema.py
tests/unit/lib/test_signal_schema.py
Replace dynamic diff column name generation with fixed constants
  • Remove get_status_col_name() in diff module
  • Define STATUS_COL_NAME, LEFT_DIFF_COL_NAME, RIGHT_DIFF_COL_NAME constants
  • Update diff logic to use fixed constants
src/datachain/diff/__init__.py
Add comprehensive tests for the hashing framework
  • Introduce tests for hash_utils, query step hashing, and chain-level hashing
  • Update conftest for stable dataset version in tests
  • Cover all supported step types and ordering scenarios
tests/unit/test_hash_utils.py
tests/unit/test_query_steps_hash.py
tests/unit/test_datachain_hash.py
tests/conftest.py

Possibly linked issues

  • Initial DataChain Commit #1: The PR implements the DataChain.hash() function by calculating and combining hashes for each step within the chain, directly fulfilling the issue's requirements.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@ilongin ilongin marked this pull request as draft September 9, 2025 23:24
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The PR adds a hash stub to almost every step class; consider centralizing step serialization and hashing logic (e.g., via a mixin or visitor) to avoid so much boilerplate.
  • QueryStep.hash currently returns only the dataset version UUID, which may not capture the full configuration (like selected columns or dependencies); make sure it reflects all relevant step attributes.
  • When building the overall DataChain.hash, you’re updating the hasher with raw step hashes without separators—consider adding explicit delimiters or length prefixes to prevent possible hash collisions.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The PR adds a `hash` stub to almost every step class; consider centralizing step serialization and hashing logic (e.g., via a mixin or visitor) to avoid so much boilerplate.
- QueryStep.hash currently returns only the dataset version UUID, which may not capture the full configuration (like selected columns or dependencies); make sure it reflects all relevant step attributes.
- When building the overall DataChain.hash, you’re updating the hasher with raw step hashes without separators—consider adding explicit delimiters or length prefixes to prevent possible hash collisions.

## Individual Comments

### Comment 1
<location> `src/datachain/query/dataset.py:172` </location>
<code_context>
         self.print_schema(file=file)
         return file.getvalue()

+    def hash(self) -> str:
+        return self._query.hash()
+
</code_context>

<issue_to_address>
Many hash methods raise NotImplementedError; consider a default implementation or clearer subclassing.

If these classes aren't intended for direct instantiation, mark them as abstract or provide a base hash() implementation to prevent runtime errors and clarify usage.

Suggested implementation:

```python
import contextlib
import hashlib
import inspect
import logging
import os
import abc

```

```python
    ) -> "StepResult":
        """Apply the processing step."""

class AbstractStep(abc.ABC):
    @abc.abstractmethod
    def hash(self) -> str:
        """Calculates hash for this step.
        Subclasses must implement this method.
        """
        raise NotImplementedError("Subclasses must implement the hash() method.")


```

You will need to:
1. Ensure that any step classes inherit from `AbstractStep` instead of directly from `object` or other base classes.
2. Remove any redundant `@abstractmethod` decorators from subclasses, as the base class now enforces this.
3. If you want a default hash implementation, you can replace the `raise NotImplementedError` with a default logic.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 209 to 221
def clone(self) -> "Self":
return self.__class__(self.dq, self.catalog)

def hash(self) -> str:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Many hash methods raise NotImplementedError; consider a default implementation or clearer subclassing.

If these classes aren't intended for direct instantiation, mark them as abstract or provide a base hash() implementation to prevent runtime errors and clarify usage.

Suggested implementation:

import contextlib
import hashlib
import inspect
import logging
import os
import abc
    ) -> "StepResult":
        """Apply the processing step."""

class AbstractStep(abc.ABC):
    @abc.abstractmethod
    def hash(self) -> str:
        """Calculates hash for this step.
        Subclasses must implement this method.
        """
        raise NotImplementedError("Subclasses must implement the hash() method.")

You will need to:

  1. Ensure that any step classes inherit from AbstractStep instead of directly from object or other base classes.
  2. Remove any redundant @abstractmethod decorators from subclasses, as the base class now enforces this.
  3. If you want a default hash implementation, you can replace the raise NotImplementedError with a default logic.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Sep 9, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: d9458e1
Status: ✅  Deploy successful!
Preview URL: https://3ada55da.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1319-datachain-hash.datachain-documentation.pages.dev

View logs

@codecov
Copy link

codecov bot commented Sep 9, 2025

Codecov Report

❌ Patch coverage is 95.16129% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.64%. Comparing base (2b89b64) to head (d9458e1).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/hash_utils.py 87.75% 4 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1327      +/-   ##
==========================================
+ Coverage   87.58%   87.64%   +0.05%     
==========================================
  Files         158      159       +1     
  Lines       14702    14819     +117     
  Branches     2112     2124      +12     
==========================================
+ Hits        12877    12988     +111     
- Misses       1343     1347       +4     
- Partials      482      484       +2     
Flag Coverage Δ
datachain 87.57% <95.16%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/diff/__init__.py 93.25% <100.00%> (-0.08%) ⬇️
src/datachain/lib/dc/datachain.py 91.23% <100.00%> (+0.02%) ⬆️
src/datachain/lib/signal_schema.py 96.16% <100.00%> (+0.03%) ⬆️
src/datachain/lib/udf.py 93.97% <100.00%> (+0.17%) ⬆️
src/datachain/query/dataset.py 93.81% <100.00%> (+0.35%) ⬆️
src/datachain/utils.py 80.68% <100.00%> (+0.27%) ⬆️
src/datachain/hash_utils.py 87.75% <87.75%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin linked an issue Sep 11, 2025 that may be closed by this pull request
15 tasks
@ilongin
Copy link
Contributor Author

ilongin commented Sep 23, 2025

Looks good to me overall.

One important note is: IMO we have way less tests for such an important feature, than we should have. I think it might be a good idea to ask AI to suggest more tests cases for these changes.

Other important question is: are there any cases, when we consider chain as "unhashable", and how do we behave in these cases? I can think of, at least, one example, when it will be almost impossible to calculate hash: using SQL random() function inside the chain. There are many more (like using Python random inside from_values, etc). How do we cover this scenario? Are there other scenarios like this?

I already used AI to add some additional test cases but I agree, we could probably add more. I will ask mister ChatGPT to try harder :D

Regarding "unhashable" chains - currently I'm not able to find such a case. Note that we don't hash generated SQL query or it's params, just the "structure" of the chain which is basically a Python construct. I've added a test case with func.rand() and it works as expected.

@dreadatour
Copy link
Contributor

dreadatour commented Sep 23, 2025

Regarding "unhashable" chains - currently I'm not able to find such a case. Note that we don't hash generated SQL query or it's params, just the "structure" of the chain which is basically a Python construct. I've added a test case with func.rand() and it works as expected.

I meant not in terms of "getting a hash of some code which uses random() function", but rather in general, on a higher level — will it make sense at all to get hash from chains like this? Because during the next run results could be unpredictable, especially if chain broke in the middle and we want to re-run it.

So my question is more general: should we bring in a new "unhashable" term and consider some chains as "unhashable" for proper evaluation of such chains? If so, this "unhashable" feature (result) should be implemented here, in this PR, I believe.

This is kind of more generic question, and we might want to do it in a follow-up, but IMO we need to think about this case anyway, it is just a good time to start thinking on this.

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks good to me! I think we can move on 👍 As soon as we have all tests passed.

Just to resume, open questions from me:

  • "Unhashable" chains — general question to think about.
  • Much more tests would be great to have 👀 More complex scenarios, edge cases, failures, nested signals/chains, etc. This is quite an important feature for us, and it will be really hard to debug issues, so I would love to see this covered by tests from top to bottom.

@ilongin
Copy link
Contributor Author

ilongin commented Sep 23, 2025

I meant not in terms of "getting a hash of some code which uses random() function", but rather in general, on a higher level — will it make sense at all to het hash from chains like this? Because during the next run results could be unpredictable, especially if chain broke in the middle and we want to re-run it.

So my question is more general: should we bring in a new "unhashable" term and consider some chains as "unhashable" for proper evaluation of such chains? If so, this "unhashable" feature (result) should be implemented here, in this PR, I believe.

This is kind of more generic question, and we might want to do it in a follow-up, but IMO we need to think about this case anyway, it is just a good time to start thinking on this.

Ok, I see your point now. But I still don't see the reason for this "unhashable" chains. Even if someone for example added random value column with mutate and code broke later in some UDF and results with the value of that column generated with random are taken on re-run, I don't see how it could break user's code / logic.. In any case, user's will be aware of how checkpoints work and if that's an issue for them they will have ability to run job from the scratch every time so I wouldn't complicate this for now by introducing "unhashable" chain ... maybe this will change in future though, cannot be sure.

@dreadatour
Copy link
Contributor

Ok, I see your point now. But I still don't see the reason for this "unhashable" chains. Even if someone for example added random value column with mutate and code broke later in some UDF and results with the value of that column generated with random are taken on re-run, I don't see how it could break user's code / logic.. In any case, user's will be aware of how checkpoints work and if that's an issue for them they will have ability to run job from the scratch every time so I wouldn't complicate this for now by introducing "unhashable" chain ... maybe this will change in future though, cannot be sure.

What about ORDER BY random()? 🤔 We will have duplicates in such cases.

@ilongin
Copy link
Contributor Author

ilongin commented Sep 23, 2025

What about ORDER BY random()? 🤔 We will have duplicates in such cases.

🤔 ... Could you explain how duplicates could happen? ... just to explain - if something breaks in UDF (let's say half UDF is done and half is not becasue error happened) we will have input udf table saved and use that to continue with that UDF. So if something is calculated with order by random() before UDF that will stay that way - inputs to UDF won't be re-calculated..

@dreadatour
Copy link
Contributor

What about ORDER BY random()? 🤔 We will have duplicates in such cases.

🤔 ... Could you explain how duplicates could happen? ... just to explain - if something breaks in UDF (let's say half UDF is done and half is not becasue error happened) we will have input udf table saved and use that to continue with that UDF. So if something is calculated with order by random() before UDF that will stay that way - inputs to UDF won't be re-calculated..

I didn't know about "we will have input udf table saved" 🤔 Does this means we will save all the chain steps results in temp tables?

@ilongin
Copy link
Contributor Author

ilongin commented Sep 23, 2025

I didn't know about "we will have input udf table saved" 🤔 Does this means we will save all the chain steps results in temp tables?

Not all the chain steps, but UDF ones since other steps are not that "heavey" and don't have real processing .. alongside checkpoint object we will save (actually just won't delete) input table, partial result table and special new table for generator called "processed inputs table" which is needed to know what rows are processed).
Basically we will put checkpoints in 2 places:

  1. At the end of successfully saved dataset from chain - to know that we can completely skip calculation of this chain
  2. In the UDFs where we will have 2 types of checkpoints - partial checkpoint created at beginning of UDF which will be active until UDF is completely successful -> then we save non partial (default / normal) checkpoint to know that we can skip completely this UDF in re-run.

@dreadatour
Copy link
Contributor

I didn't know about "we will have input udf table saved" 🤔 Does this means we will save all the chain steps results in temp tables?

Not all the chain steps, but UDF ones since other steps are not that "heavey" and don't have real processing .. alongside checkpoint object we will save (actually just won't delete) input table, partial result table and special new table for generator called "processed inputs table" which is needed to know what rows are processed). Basically we will put checkpoints in 2 places:

  1. At the end of successfully saved dataset from chain - to know that we can completely skip calculation of this chain
  2. In the UDFs where we will have 2 types of checkpoints - partial checkpoint created at beginning of UDF which will be active until UDF is completely successful -> then we save non partial (default / normal) checkpoint to know that we can skip completely this UDF in re-run.

Not sure if I follow all the logic here but let's get back to this when we will actually implement checkpoints.

@ilongin
Copy link
Contributor Author

ilongin commented Sep 24, 2025

I didn't know about "we will have input udf table saved" 🤔 Does this means we will save all the chain steps results in temp tables?

Not all the chain steps, but UDF ones since other steps are not that "heavey" and don't have real processing .. alongside checkpoint object we will save (actually just won't delete) input table, partial result table and special new table for generator called "processed inputs table" which is needed to know what rows are processed). Basically we will put checkpoints in 2 places:

  1. At the end of successfully saved dataset from chain - to know that we can completely skip calculation of this chain
  2. In the UDFs where we will have 2 types of checkpoints - partial checkpoint created at beginning of UDF which will be active until UDF is completely successful -> then we save non partial (default / normal) checkpoint to know that we can skip completely this UDF in re-run.

Not sure if I follow all the logic here but let's get back to this when we will actually implement checkpoints.

Yes, the logic is little big complex because of that "special" feature we want to give which is those partial checkpoints in UDF. Without that it would've been much simpler

@ilongin ilongin merged commit 945ac24 into main Sep 24, 2025
38 checks passed
@ilongin ilongin deleted the ilongin/1319-datachain-hash branch September 24, 2025 10:36

def hash(self) -> str:
"""Create SHA hash of this schema"""
json_str = json.dumps(self.serialize(), sort_keys=True, separators=(",", ":"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how often / when do we call it?

it is a very heavy operation AFAIU ...

what if implementation becomes unstable (e.g. some model names are different each time, etc) -will we catch it with tests? how can we guarantee this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call it for each UDF in the chain when we calculate the whole chain hash. Chain hash will be calculated:

  1. On .save()
  2. In each UDF itself

IMO we should not worry about this being bottleneck. I will test with some big schema to be 100% sure, but since this is only converting column and their types to string implementation (doesn't do any IO or similar) I would guess this is not even on the same page with UDF calculation, or applying all the steps in chain.

Regarding instability, can you give example of something that could happen? I'm trying to come up with scenario but don't have much ideas .. SignalsSchema.hash() depends on schema serialization which should always be consistent , otherwise we have big issue in our system regardless checkpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't tbh ... I remember we have things like random names for generated DataModels in some places ... SignalSchema overall and serialization is quite complex - I think it is worth reviewing it high level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In each UDF itself

Once? or per each input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each UDF we calculate it twice, for params and outputs - both are instances of SignalSchema in udf codebase.

if not is_lambda:
# Try to get exact source of named function
try:
lines, _ = inspect.getsourcelines(func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we extract source code here? do we include it into hash? (this is not what is expected)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use UDF function source to calculate hash. I know we spoke how user should be able to change UDF function code and continue on that UDF and this doesn't block that.

This are implementation details / my plans about UDF checkpoints:

  1. At beginning of UDF create special partial checkpoint of the chain that enters the UDF (without UDF itself). All partial results will be connected to this checkpoint and this will be active until UDF is 100% success
  2. When UDF is 100% success / done we create another normal / non partial checkpoint that now includes UDF itself into calculation (here we use this function code for hash). This is to be able to continue from this UDF if something breaks in the future (avoid UDF re-calculation) and also to be able to recalculate UDF if user decides to change UDF code after even if UDF is successfully done (maybe user realized there is still something he missed in that UDF later on). I think this is important as we need to give user ability to re-run that UDF from beginning, without the need to re-run the whole job for example.

Let me know if you see some issues in the approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, my main concern is complexity while it doesn't fully solve the issue - anything outside in the code can change that leads to change in the UDF - e.g. it is using some helper that changed? or some package? We won't be able to detect this - so unless I'm missing something I would keep it simple, skip recalculation no matter what and have some mechanism to trigger full recall (or partial on the UDF side - e.g. by introducing some version of the function as a param).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, this hash will only catch function code changes itself, not the helper method code changes or lib changes. But my feeling was that it's good enough for the users (we can clearly communicate this to them) and it's def better than to only give option to recalculate the whole job. ... idk

BTW it's not that hard to add those helper methods changes affect the hash as well, the only problematic part are the external libs (for them we can only include name + type in the hash so hash will change if user stops using one or adds another one etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's def better than to only give option to recalculate the whole job.

100%

this hash will only catch function code changes itself, not the helper method code changes or lib changes.

the worst thing here - is that it will be unpredictable - sometimes it works, sometimes doesn't. I don't think we want that level of magic

let's please simplify this, at least for now

],
)
def test_hash(schema, _hash):
assert SignalSchema(schema).hash() == _hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we really test here? why three tests? we should probably do some minor changes to the same schema and test that hash is different, also test some complex scenarios in schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test that certain schema always produce consistent hash. We have empty schema, one field schema with File and little bit more complex schema with multiple fields. I'm not sure what is wrong with these scenarios? I agree we could add more though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but how do I know that hash is meaningful at all? what it possibly detects that if some logic changes we will be getting some new hash - but I can predict in most cases people will be just updating the value w/o paying attention

so, what is the intention here? to make that it works for some edge cases?

do we need to test some actual logic (at least in other tests) - that it actually takes certain fields when it calculates the hash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention is to test that:

  1. Make sure it works
  2. Make sure it doesn't return random values every time which is basically the same as "2 identical schemas produce identical hashes"

I'm not sure what else can we test. Internal calculation of hash is "black box" to the tests.
We do have tests that are calculation hash of the whole meaningful chain that includes UDF with schemas with actual fields. This above is just simple unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

Internal calculation of hash is "black box" to the tests.

100%

A more meaningful test is probably to see that some changes make hash different or some other changes (e.g. some order of parameters if we don't depend on them in some places) don't make it different

Make sure it doesn't return random values every time which is basically the same as "2 identical schemas produce identical hashes"

btw - we don't really test it here - there should be two calls at least then ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add those tests. I thought that are not needed since we always expect specific hash to be asserted (meaning on every run must be consistent for same structure) but you might be right that it's better to explicitly add it



def test_read_storage(mock_get_listing):
assert dc.read_storage("s3://bucket").hash() == (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, what are we actually testing here? what is the purpose of this kind of tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about testing starting steps as there is little bit of special logic there compared to other steps. This is just simple test, there are more complex tests after. We test consistent hash calculation, as in other places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test consistent hash calculation, as in other places.

what does this exactly mean? (what is "consistent" in this case?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same chains produce same hashes every time. In this case it's most simple chain possible - just reading storage and saving it;


def serialize_column_element(expr: Union[str, ColumnElement]) -> dict: # noqa: PLR0911
"""
Recursively serialize a SQLAlchemy ColumnElement into a deterministic structure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use str(query) or something like this? seems too complicated atm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible because:

  1. It depends on a dialect. By default SQLAlchemy uses default dialect, but if table from which we generate expression is bound to engine it will create different string representation for different DBs
  2. Bind parameter names are unstable - there is no guarantee that col == 5 will always produce "col = :param_1" .. sometimes it can produce "col = :param_2"
  3. SQLAlchemy version changes - The stringification format is not a stable API. A minor upgrade could produce different SQL text for the same expression → inconsistent hashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on a dialect. By default SQLAlchemy uses default dialect, but if table from which we generate expression is bound to engine it will create different string representation for different DBs

do we care about this for hash?

SQLAlchemy version changes - The stringification format is not a stable API. A minor upgrade could produce different SQL text for the same expression → inconsistent hashes.

same - is it important? it seems not tbh ...

Bind parameter names are unstable - there is no guarantee that col == 5 will always produce "col = :param_1" .. sometimes it can produce "col = :param_2"

what are the cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should care about all 3 of those items.

About the unstable param names - I think that param names depend on ordering which should not affect the hash (e,g orders of elements in and_ clause etc.) .. but in addition, constants are not included in representation so these 2 expressions produce same hashes even though they are clearly different:

>>> from datachain import C
>>> expr = C("age") == 20
>>> str(expr)
'age = :age_1'
>>> expr = C("age") == 30
>>> str(expr)
'age = :age_1'

Copy link
Contributor

@shcheklein shcheklein Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. but in addition, constants are not included in representation

can we / don't we take all values separately in the Step? it should be simple I guess?

I don't think we should care about order - again, we are complicating this ahead of time I feel

I think we should care about all 3 of those items.

could you elaborate please?

Copy link
Contributor Author

@ilongin ilongin Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we / don't we take all values separately in the Step? it should be simple I guess?

Inputs to the .filter() (and a lot of other DataChain methods) is a list of sqlalchemy.ColumnElement which can be column, binary expression, unary expression, over, function (all those classes that are handled in questionable serialize_column_element function btw).
So in this example: chain.filter(C("numbers") > 3, C("numbers") < 10) we have 2 arguments, both binary expressions. If we would do just str(e) for each expression we would not get constant values in that string -> hash is incorrect. In order to get constant values we would need to parse that binary expression down the tree and that is actually exactly what that serialize_column_element is doing but it's also returning correct hashable representation (dicts) which are hashed after that in another method.
TL;DR; It's impossible to achieve correct / consistent hash with just using str() on inputs.

BTW str(dc.func.and_(C("numbers") > 3, C("file.path").glob("*.jpg"))) == and().

I think we should care about all 3 of those items.

could you elaborate please?

  1. If we don't have to I don't think we should be dialect depended in hash calculation. The more hash is generic, the better. If hash is coupled with dialect then 2 identical chains will produce different hashes locally and on Studio and we always want to avoid this.
  2. Explained above why str(expression) is not enough - it's not consistent and won't work correctly for hash calculation
  3. str(expr) is just string representation of query class and is more prone to changes - I don't think it's meant to be used for important parts of the code, let alone hash calculation but more like printing queries in logs etc.

Copy link
Contributor

@shcheklein shcheklein Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we use something like str(expr.compile(..., compile_kwargs={"literal_binds": True}))?
    1 and 3. Still not clear why it should be that stable. It is made to restart a job right away, but not to make it portable. If we need it - we can add it later.

I would say - Let's simplify unless we really have a blocker for a basic requirement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My very first thought was to use compile with literal_binds but I quickly decided not to. Beside the fact that current implementation is also very straightfoward and not complex there are multiple reasons not to go with compile, each of those are deal breaker IMO:

  1. Already mentioned consistency with compilers - maybe it's not direct requirement now but it's still nice to have consistent hashing across every backend and what is important now is that then we wouldn't be able to write tests that are expecting specific hash value as Studio (PG) and local (SQLite) tests would end up with different value which means we won't be able to test regression - this means I would also need to refactor current tests.
  2. Alias naming - SQLAlchemy automatically adds aliases sometimes to compiled statements and those are really not stable, not sure what they depend on but I've read it can be due to compiler internal state. Also there might be issue with whitespaces, parenthesis, changing ordering of elements in AND / OR functions etc.
  3. I've read in the past that those literal_binds are not completely stable - they do not work for some cases .. though this would need further investigation .. in general that method doesn't guarantee stable results and that is the issue for hashing.
  4. It's slower 30-40% than just python object traversal (this might not be that important though)

offset: int

def hash_inputs(self) -> str:
return hashlib.sha256(str(self.offset).encode()).hexdigest()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move hashlib.sha256(str(self.offset).encode()).hexdigest() into a helper? it will make code cleaner a lot ... since it repeats so many times

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement DataChain.hash()

3 participants