-
Notifications
You must be signed in to change notification settings - Fork 134
Implement checkpoints usage in local #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR introduces a local job lifecycle by implementing a central JobManager service that creates, links, and finalizes DataChain jobs for plain Python script runs, and wires it into the checkpoint and save logic so that local runs behave like SaaS-managed jobs. Sequence diagram for local script job lifecycle with JobManagersequenceDiagram
participant User as actor User
participant Script as "Python Script"
participant JobManager
participant Session
participant Metastore
participant Catalog
User->>Script: Run script (python my_script.py)
Script->>JobManager: get_or_create(session)
JobManager->>Metastore: get_last_job_by_name(script_path)
Metastore-->>JobManager: return last job (if any)
JobManager->>Metastore: create_job(name=script_path, parent_job_id=last_job.id)
Metastore-->>JobManager: return new job id
JobManager->>Metastore: get_job(new_job_id)
Metastore-->>JobManager: return new job
JobManager-->>Script: return job
Script->>Catalog: create_dataset(..., job_id=job.id)
Catalog->>Metastore: create_checkpoint(job.id, hash)
Note over JobManager: On exit, JobManager finalizes job status
Class diagram for new JobManager and Job lifecycle changesclassDiagram
class JobManager {
+Job job
+JobStatus status
+bool owned
+get_or_create(session)
+reset()
+finalize_success(session)
+finalize_failure(session, exc_type, exc_value, tb)
}
class Job {
+str id
+str name
+str query
+JobStatus status
+str parent_job_id
}
JobManager --> Job : manages
JobManager --> "Session" : uses
Job --> "Job" : parent_job_id (optional)
class Metastore {
+get_job(job_id)
+get_last_job_by_name(name)
+create_job(...)
+set_job_status(job_id, status, ...)
}
JobManager --> Metastore : interacts
class DataChain {
+save(...)
+_resolve_checkpoint(...)
}
DataChain --> JobManager : uses
DataChain --> Job : uses
Metastore --> Job : creates/queries
class Catalog {
+create_dataset(..., job_id)
}
DataChain --> Catalog : uses
Catalog --> Job : links via job_id
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Deploying datachain-documentation with
|
| Latest commit: |
855c841
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://cfc3c586.datachain-documentation.pages.dev |
| Branch Preview URL: | https://ilongin-1361-local-checkpoin.datachain-documentation.pages.dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
- Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
- Consider making get_user_script_source more robust (e.g. handle module runs via
-mor scripts without.pyextensions) so that source detection works in more launch scenarios.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
- Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
- Consider making get_user_script_source more robust (e.g. handle module runs via `-m` or scripts without `.py` extensions) so that source detection works in more launch scenarios.
## Individual Comments
### Comment 1
<location> `tests/unit/test_job_manager.py:102-109` </location>
<code_context>
+ assert job2.parent_job_id == job1.id
+
+
+def test_get_or_create_fallback_query(test_session, patch_argv):
+ # Patch to return None for script source
+ with patch("datachain.job.get_user_script_source", return_value=None):
+ jm = JobManager()
+ job = jm.get_or_create(test_session)
+
+ assert job.query.startswith("python ")
+ assert job.name.endswith("script.py")
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider testing fallback behavior for interactive sessions.
Please add a test case for when sys.argv is empty or not a script, to verify correct fallback query and job name behavior.
```suggestion
def test_get_or_create_fallback_query(test_session, patch_argv):
# Patch to return None for script source
with patch("datachain.job.get_user_script_source", return_value=None):
jm = JobManager()
job = jm.get_or_create(test_session)
assert job.query.startswith("python ")
assert job.name.endswith("script.py")
def test_get_or_create_interactive_fallback(test_session, monkeypatch):
# Simulate interactive session: sys.argv is empty
monkeypatch.setattr("sys.argv", [])
with patch("datachain.job.get_user_script_source", return_value=None):
jm = JobManager()
job = jm.get_or_create(test_session)
# Check fallback query and job name for interactive session
assert job.query.startswith("python ")
assert job.name == "interactive"
```
</issue_to_address>
### Comment 2
<location> `tests/conftest.py:1080-1089` </location>
<code_context>
return compressed_parquet_data(dog_entries("1.0.0"), src_uri)
+
+
+@pytest.fixture(autouse=True)
+def disable_jobmanager_hooks(request):
+ # Run the test
+ yield
+
+ # After test finishes: remove only JobManager hooks, unless explicitly requested
+ if "use_jobmanager_hooks" not in request.keywords:
+ for fn in list(JobManager._hook_refs):
+ try:
+ atexit.unregister(fn)
+ except Exception as e: # noqa: BLE001
+ print(f"Failed to unregister atexit hook: {e}")
+ JobManager._hook_refs.clear()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that explicitly enables JobManager hooks.
Since hooks are disabled by default, adding a test with the 'use_jobmanager_hooks' marker would help confirm that hook registration and execution work correctly when enabled.
</issue_to_address>
### Comment 3
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return None if not results else self._parse_job(results[0])
```
</issue_to_address>
### Comment 4
<location> `tests/unit/test_job_manager.py:126-129` </location>
<code_context>
def test_finalize_failure_delegates_to_sys_excepthook(
test_session, patch_argv, patch_user_script
):
jm = JobManager()
job = jm.get_or_create(test_session)
called = {}
def fake_excepthook(exc_type, exc_value, tb):
called["exc"] = (exc_type, str(exc_value))
sys.__excepthook__, old_hook = fake_excepthook, sys.__excepthook__
try:
try:
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
finally:
sys.__excepthook__ = old_hook
assert "bad stuff" in called["exc"][1]
db_job = test_session.catalog.metastore.get_job(job.id)
assert db_job.status == JobStatus.FAILED
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))
```suggestion
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
2b570d9 to
0921aea
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1367 +/- ##
==========================================
+ Coverage 87.74% 87.78% +0.03%
==========================================
Files 160 160
Lines 15141 15207 +66
Branches 2171 2178 +7
==========================================
+ Hits 13286 13349 +63
+ Misses 1356 1352 -4
- Partials 499 506 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
- The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
- The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.
## Individual Comments
### Comment 1
<location> `src/datachain/lib/udf.py:165` </location>
<code_context>
inputs and outputs.
"""
parts = [
- hash_callable(self._func),
+ hash_callable(self._func) if self._func else "",
self.params.hash() if self.params else "",
self.output.hash(),
</code_context>
<issue_to_address>
**question (bug_risk):** Returning an empty string for missing _func may affect hash uniqueness.
Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.
</issue_to_address>
### Comment 2
<location> `tests/func/test_datachain.py:1670-1672` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 3
<location> `tests/test_job_manager_e2e.py:9` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Don't import test modules. ([`dont-import-test-modules`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/dont-import-test-modules))
<details><summary>Explanation</summary>Don't import test modules.
Tests should be self-contained and don't depend on each other.
If a helper function is used by multiple tests,
define it in a helper module,
instead of importing one test from the other.
</details>
</issue_to_address>
### Comment 4
<location> `tests/test_job_manager_e2e.py:73-76` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 5
<location> `tests/unit/lib/test_checkpoints.py:48-51` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 6
<location> `tests/unit/lib/test_checkpoints.py:66-70` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 7
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return None if not results else self._parse_job(results[0])
```
</issue_to_address>
### Comment 8
<location> `tests/unit/test_job_manager.py:128-131` </location>
<code_context>
def test_finalize_failure_delegates_to_previous_excepthook(
test_session, patch_argv, patch_user_script
):
jm = JobManager()
job = jm.get_or_create(test_session)
called = {}
def fake_excepthook(exc_type, exc_value, tb):
called["exc"] = (exc_type, str(exc_value))
# Replace the previous hook that JobManager saved
old_hook = jm._previous_excepthook
jm._previous_excepthook = fake_excepthook
try:
try:
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
finally:
jm._previous_excepthook = old_hook
assert "bad stuff" in called["exc"][1]
db_job = test_session.catalog.metastore.get_job(job.id)
assert db_job.status == JobStatus.FAILED
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))
```suggestion
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
src/datachain/lib/udf.py
Outdated
| inputs and outputs. | ||
| """ | ||
| parts = [ | ||
| hash_callable(self._func), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Returning an empty string for missing _func may affect hash uniqueness.
Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.
| catalog = test_session.catalog | ||
|
|
||
| dc.read_values(num=[1, 2, 3], session=test_session).save("nums") | ||
| dc.read_values(num=list(range(1000)), session=test_session).save("nums") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to patch the value of the variable that defines the batch size or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! how much of this is AI generated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All. I just did maybe 5-6 prompts to make some things better, but even the first version would be good enough I think...for now I found AI to be most useful in generating docs and helping understand the cause of non trivial bugs.
| interrupt_exit_code = -2 | ||
| # (canceled by KeyboardInterrupt). Also accept 130 explicitly as some Python | ||
| # scripts may exit with that code directly. | ||
| interrupt_exit_codes = (-2, 130) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ilongin why do we have this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ilongin ping
Enables checkpoint usage with just running
python my_job.py.For that to work couple of changes were required:
Jobinstance for every dataset, regardless of if it's created with justpython my_script.pyor from SaaS. When we create dataset from SaaS then SaaS is taking care of job lifecycle (creating, updating state at the end etc.) but on local run it's managed indatachainitself. Job instance in local run has name as the full path to the script, so 2 sequential runs of the same script means the next job created is a child of previous job / run.JobManagerwas added to be a single place which we can ask to get aJobfor current process and which updates job state at the end in local env. IfDATACHAIN_JOB_IDenv is present, it means we manage job's lifecycle from outside and that job is just returned from manager, but otherwise new job is being created.JobManageris used inDataChain.save()and in current checkpoints testsJobManagerand e2e tests for checking if job is present when running local script.Summary by Sourcery
Centralize and standardize job lifecycle management in DataChain by adopting JobManager for both SaaS and local runs, integrating job IDs into save and checkpoint workflows, and bolstering coverage with new unit and end-to-end tests.
New Features:
Enhancements:
Tests: