-
Notifications
You must be signed in to change notification settings - Fork 133
UDF Checkpoints #1422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
UDF Checkpoints #1422
Changes from all commits
7f876e8
a5c4572
70a44a6
f4c848b
d8a337a
d7b5ed9
8752a9a
862fe28
b599429
3804b0c
7c05e0d
20346e7
e0242c9
8fd41af
630e37b
b31d44a
b5bb8cd
a5f0fcd
92590f7
3c2211d
181ea2e
88c2648
c0f46cb
14e473b
d68d746
8e0339f
08c9ec4
bb50da7
bd7d978
4ddee8f
5b80a87
9a6c71f
a2d6b34
3621cde
437b63c
76125d0
c644aa1
5f6f183
709873c
e180338
aaf43f9
1488fab
d7f3a50
8a2ec11
4379d11
27033e5
d73d55d
b15f1c9
fd5019e
e4e6de9
1e7f941
6a5c140
f914b7f
91aa89c
452ae72
2375237
55b3846
87a51f3
43835f5
911a3dc
ad2907d
0b3e092
7bbe619
fa5053b
140e56b
54c4493
e9f48f5
38fa81d
8ab52ae
ced14b4
0f88905
6f7e06f
af784f2
7703eb5
4a7b4ca
a95a764
8876a3b
13f3552
157a437
c9d3bb0
2227fe1
f459d60
f93b49d
a2a9a98
631cdb3
49e7641
f50058f
d11fb5d
96f9de9
ec98372
c77858a
9a51f9c
298bcf3
aa11f80
ee464fb
e2ab50b
9cb16c7
3685dca
eba46b5
e58d742
1feac6d
13c6aa0
a878df6
c61a13b
d88d68a
abccfbc
809e9a3
ab6799f
fa00047
da8fd5b
115ea69
334f5fb
cb18ee4
d1f83f1
79655e7
b93f328
4352423
26f1eef
985415d
31fe6dd
f117751
15751ba
85305b0
cafeadf
a022bc1
27781df
05cf600
af0dc7f
b108dc8
24c3894
eb0e03a
9d93aad
e8ec502
3bcfa18
4dd9cd4
8fdfea2
34402f4
8ea8b12
acf79c8
585d685
adb828e
797b6cd
505f304
d702b21
8df4905
659cc1c
b90a9d6
3431c10
19093a3
6803345
8c57340
d25b5af
7a44193
e267deb
bee6e0f
0144c1a
c457bf9
15650eb
495f189
8e6b2e5
56c6b78
8b16be4
88048de
e903715
79c94f2
a83fafe
1d77ac2
0de0d3f
8b8a8d3
0f61ea7
25def9c
5a70e41
12a771c
41847c9
85e0c42
3e6601c
a9358d1
22ebd7d
1a04a7c
5998b8e
e83884b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,10 +11,10 @@ Checkpoints are available for both local script runs and Studio executions. | |
| When you run a Python script locally (e.g., `python my_script.py`), DataChain automatically: | ||
|
|
||
| 1. **Creates a job** for the script execution, using the script's absolute path as the job name | ||
| 2. **Tracks parent jobs** by finding the last job with the same script name | ||
| 2. **Tracks previous runs** by finding the last job with the same script name | ||
| 3. **Calculates hashes** for each dataset save operation based on the DataChain operations chain | ||
| 4. **Creates checkpoints** after each successful `.save()` call, storing the hash | ||
| 5. **Checks for existing checkpoints** on subsequent runs - if a matching checkpoint exists in the parent job, DataChain skips the save and reuses the existing dataset | ||
| 5. **Checks for existing checkpoints** on subsequent runs - if a matching checkpoint exists from the previous run, DataChain skips the save and reuses the existing dataset | ||
|
|
||
| This means that if your script creates multiple datasets and fails partway through, the next run will skip recreating the datasets that were already successfully saved. | ||
|
|
||
|
|
@@ -37,7 +37,7 @@ When triggering jobs through the Studio interface: | |
| 2. **Checkpoint control** is explicit - you choose between: | ||
| - **Run from scratch**: Ignores any existing checkpoints and recreates all datasets | ||
| - **Continue from last checkpoint**: Resumes from the last successful checkpoint, skipping already-completed stages | ||
| 3. **Parent-child job linking** is handled automatically by the system | ||
| 3. **Job linking between runs** is handled automatically by the system | ||
| 4. **Checkpoint behavior** during execution is the same as local runs: datasets are saved at each `.save()` call and can be reused on retry | ||
|
|
||
|
|
||
|
|
@@ -75,7 +75,7 @@ result = ( | |
|
|
||
| **First run:** The script executes all three stages and creates three datasets: `filtered_data`, `transformed_data`, and `final_results`. If the script fails during Stage 3, only `filtered_data` and `transformed_data` are saved. | ||
|
|
||
| **Second run:** DataChain detects that `filtered_data` and `transformed_data` were already created in the parent job with matching hashes. It skips recreating them and proceeds directly to Stage 3, creating only `final_results`. | ||
| **Second run:** DataChain detects that `filtered_data` and `transformed_data` were already created in the previous run with matching hashes. It skips recreating them and proceeds directly to Stage 3, creating only `final_results`. | ||
|
|
||
| ## When Checkpoints Are Used | ||
|
|
||
|
|
@@ -84,27 +84,20 @@ Checkpoints are automatically used when: | |
| - Running a Python script locally (e.g., `python my_script.py`) | ||
| - The script has been run before | ||
| - A dataset with the same name is being saved | ||
| - The chain hash matches a checkpoint from the parent job | ||
| - The chain hash matches a checkpoint from the previous run | ||
|
|
||
| Checkpoints are **not** used when: | ||
|
|
||
| - Running code interactively (Python REPL, Jupyter notebooks) | ||
| - Running code as a module (e.g., `python -m mymodule`) | ||
| - The `DATACHAIN_CHECKPOINTS_RESET` environment variable is set (see below) | ||
| - The `DATACHAIN_IGNORE_CHECKPOINTS` environment variable is set (see below) | ||
|
|
||
| ## Resetting Checkpoints | ||
|
|
||
| To ignore existing checkpoints and run your script from scratch, set the `DATACHAIN_CHECKPOINTS_RESET` environment variable: | ||
| To ignore existing checkpoints and run your script from scratch, set the `DATACHAIN_IGNORE_CHECKPOINTS` environment variable: | ||
|
|
||
| ```bash | ||
| export DATACHAIN_CHECKPOINTS_RESET=1 | ||
| python my_script.py | ||
| ``` | ||
|
|
||
| Or set it inline: | ||
|
|
||
| ```bash | ||
| DATACHAIN_CHECKPOINTS_RESET=1 python my_script.py | ||
| DATACHAIN_IGNORE_CHECKPOINTS=1 python my_script.py | ||
| ``` | ||
|
|
||
| This forces DataChain to recreate all datasets, regardless of existing checkpoints. | ||
|
|
@@ -121,7 +114,7 @@ When running `python my_script.py`, DataChain uses the **absolute path** to the | |
| /home/user/projects/my_script.py | ||
| ``` | ||
|
|
||
| This allows DataChain to link runs of the same script together as parent-child jobs, enabling checkpoint lookup. | ||
| This allows DataChain to link runs of the same script together, enabling checkpoint lookup across runs. | ||
|
|
||
| ### Interactive or Module Execution (Checkpoints Disabled) | ||
|
|
||
|
|
@@ -140,7 +133,7 @@ For each `.save()` operation, DataChain calculates a hash based on: | |
| 1. The hash of the previous checkpoint in the current job (if any) | ||
| 2. The hash of the current DataChain operations chain | ||
|
|
||
| This creates a chain of hashes that uniquely identifies each stage of data processing. On subsequent runs, DataChain matches these hashes against the parent job's checkpoints and skips recreating datasets where the hashes match. | ||
| This creates a chain of hashes that uniquely identifies each stage of data processing. On subsequent runs, DataChain matches these hashes against checkpoints from the previous run and skips recreating datasets where the hashes match. | ||
|
|
||
| ### Hash Invalidation | ||
|
|
||
|
|
@@ -198,29 +191,153 @@ for ds in dc.datasets(): | |
| print(ds.name) | ||
| ``` | ||
|
|
||
| ## Limitations | ||
| ## UDF-Level Checkpoints | ||
|
|
||
| - **Script-based:** Code must be run as a script (not interactively or as a module). | ||
| - **Hash-based matching:** Any change to the chain will create a different hash, preventing checkpoint reuse. | ||
| - **Same script path:** The script must be run from the same absolute path for parent job linking to work. | ||
| In addition to dataset-level checkpointing via `.save()`, DataChain automatically creates checkpoints for individual UDFs (`.map()`, `.gen()`, `.agg()`) during execution. | ||
|
|
||
| ## Future Plans | ||
| **Two levels of checkpointing:** | ||
| - **Dataset checkpoints** (via `.save()`): When you explicitly save a dataset, it's persisted and can be used in other scripts. If you re-run the same chain with unchanged code, DataChain skips recreation and reuses the saved dataset. | ||
| - **UDF checkpoints** (automatic): Each UDF execution is automatically checkpointed. If a UDF completes successfully, it's skipped entirely on re-run (if code unchanged). If a UDF fails mid-execution, only the unprocessed rows are recomputed on re-run. | ||
|
|
||
| ### UDF-Level Checkpoints | ||
| **Key differences:** | ||
| - `.save()` creates a named dataset that persists even if your script fails later, and can be used in other scripts | ||
| - UDF checkpoints are automatic and internal - they optimize execution within a single script by skipping or resuming UDFs | ||
|
|
||
| Currently, checkpoints are created only when datasets are saved using `.save()`. This means that if a script fails during a long-running UDF operation (like `.map()`, `.gen()`, or `.agg()`), the entire UDF computation must be rerun on the next execution. | ||
| For `.map()` and `.gen()`, **DataChain saves processed rows continuously during UDF execution**. This means: | ||
| - If a UDF **completes successfully**, a checkpoint is created and the entire UDF is skipped on re-run (unless code changes) | ||
| - If a UDF **fails mid-execution**, the next run continues from where it left off, skipping already-processed rows - even if you've modified the UDF code to fix a bug | ||
|
|
||
| Future versions will support **UDF-level checkpoints**, creating checkpoints after each UDF step in the chain. This will provide much more granular recovery: | ||
| **Note:** For `.agg()`, checkpoints are created when the aggregation completes successfully, but partial results are not tracked. If an aggregation fails partway through, it will restart from scratch on the next run. | ||
|
|
||
| ### How It Works | ||
|
|
||
| When executing `.map()` or `.gen()`, DataChain: | ||
|
|
||
| 1. **Saves processed rows incrementally** as the UDF processes your dataset | ||
| 2. **Creates a checkpoint** when the UDF completes successfully | ||
| 3. **Allows you to fix bugs and continue** - if the UDF fails, you can modify the code and re-run, skipping already-processed rows | ||
| 4. **Invalidates the checkpoint if you change the UDF after successful completion** - completed UDFs are recomputed from scratch if the code changes | ||
|
|
||
| For `.agg()`, checkpoints are only created upon successful completion, without incremental progress tracking. | ||
|
|
||
| ### Example: Fixing a Bug Mid-Execution | ||
|
|
||
| ```python | ||
| # Future behavior with UDF-level checkpoints | ||
| result = ( | ||
| dc.read_csv("data.csv") | ||
| .map(heavy_computation_1) # Checkpoint created after this UDF | ||
| .map(heavy_computation_2) # Checkpoint created after this UDF | ||
| .map(heavy_computation_3) # Checkpoint created after this UDF | ||
| .save("result") | ||
|
|
||
| def process_image(file: File) -> int: | ||
| # Bug: this will fail on some images | ||
| img = Image.open(file.get_local_path()) | ||
| return img.size[0] | ||
|
|
||
| ( | ||
| dc.read_dataset("images") | ||
| .map(width=process_image) | ||
| .save("image_dimensions") | ||
| ) | ||
| ``` | ||
|
|
||
| If the script fails during `heavy_computation_3`, the next run will skip re-executing `heavy_computation_1` and `heavy_computation_2`, resuming only the work that wasn't completed. | ||
| **First run:** Script processes 50% of images successfully, then fails on a corrupted image. | ||
|
|
||
| **After fixing the bug:** | ||
|
|
||
| ```python | ||
| from datachain import File | ||
|
|
||
| def process_image(file: File) -> int: | ||
| # Fixed: handle corrupted images gracefully | ||
| try: | ||
| img = Image.open(file.get_local_path()) | ||
| return img.size[0] | ||
| except Exception: | ||
| return 0 | ||
| ``` | ||
|
|
||
| **Second run:** DataChain automatically skips the 50% of images that were already processed successfully, and continues processing the remaining images using the fixed code. You don't lose any progress from the first run. | ||
|
|
||
| ### When UDF Checkpoints Are Invalidated | ||
|
|
||
| DataChain distinguishes between two types of UDF changes: | ||
|
|
||
| #### 1. Code-Only Changes (Bug Fixes) - Continues from Partial Results | ||
|
|
||
| When you fix a bug in your UDF code **without changing the output type**, DataChain allows you to continue from where the UDF failed. This is the key benefit of UDF-level checkpoints - you don't lose progress when fixing bugs. | ||
|
|
||
| **Example: Bug fix without output change** | ||
| ```python | ||
| # First run - fails partway through | ||
| def process(num: int) -> int: | ||
| if num > 100: | ||
| raise Exception("Bug!") # Oops, a bug! | ||
| return num * 10 | ||
|
|
||
| # Second run - continues from where it failed | ||
| def process(num: int) -> int: | ||
| return num * 10 # Bug fixed! ✓ Continues from partial results | ||
| ``` | ||
|
|
||
| In this case, DataChain will skip already-processed rows and continue processing the remaining rows with your fixed code. | ||
|
|
||
| #### 2. Output Schema Changes - Forces Re-run from Scratch | ||
|
|
||
| When you change the **output type** of your UDF, DataChain automatically detects this and reruns the entire UDF from scratch. This prevents schema mismatches that would cause errors or corrupt data. | ||
|
|
||
| **Example: Output change** | ||
| ```python | ||
| # First run - fails partway through | ||
| def process(num: int) -> int: | ||
| if num > 100: | ||
| raise Exception("Bug!") | ||
| return num * 10 | ||
|
|
||
| # Second run - output type changed | ||
| def process(num: int) -> str: | ||
| return f"value_{num * 10}" # Output type changed! ✗ Reruns from scratch | ||
| ``` | ||
|
|
||
| In this case, DataChain detects that the output type changed from `int` to `str` and discards partial results to avoid schema incompatibility. All rows will be reprocessed with the new output. | ||
|
|
||
| #### Changes That Invalidate In-Progress UDF Checkpoints | ||
|
|
||
| Partial results are automatically discarded when you change: | ||
|
|
||
| - **Output type** - Changes to the `output` parameter or return type annotations | ||
| - **Operations before the UDF** - Any changes to the data processing chain before the UDF | ||
|
|
||
| #### Changes That Invalidate Completed UDF Checkpoints | ||
|
|
||
| Once a UDF completes successfully, its checkpoint is tied to the UDF function code. If you modify the function and re-run the script, DataChain will detect the change and recompute the entire UDF from scratch. | ||
|
|
||
| Changes that invalidate completed UDF checkpoints: | ||
|
|
||
| - **Modifying the UDF function logic** - Any code changes inside the function | ||
| - **Changing function parameters or output types** - Changes to input/output specifications | ||
| - **Altering any operations before the UDF in the chain** - Changes to upstream data processing | ||
|
|
||
| **Key takeaway:** For in-progress (partial) UDFs, you can fix bugs freely as long as the output stays the same. For completed UDFs, any code change triggers a full recomputation. | ||
|
|
||
| ## Limitations | ||
|
|
||
| When running locally: | ||
|
|
||
| - **Script-based:** Code must be run as a script (not interactively or as a module). | ||
shcheklein marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - **Same script path:** The script must be run from the same absolute path for linking to previous runs to work. | ||
| - **Threading/Multiprocessing:** Checkpoints are automatically disabled when Python threading or multiprocessing is detected to prevent race conditions. Any checkpoints created before threading starts remain valid for future runs. DataChain's built-in `parallel` setting for UDF execution is not affected by this limitation. | ||
|
|
||
| These limitations don't apply when running on Studio, where job linking between runs is handled automatically by the platform. | ||
|
|
||
| ### UDF Hashing Limitations | ||
|
|
||
| DataChain computes checkpoint hashes by inspecting UDF code and metadata. Certain types of callables cannot be reliably hashed: | ||
|
|
||
| - **Built-in functions** (`len`, `str`, `int`, etc.): Cannot access bytecode, so a random hash is generated on each run. Checkpoints using these functions will not be reused. | ||
| - **C extensions**: Same limitation as built-ins - no accessible bytecode means a new hash each run. | ||
| - **Mock objects**: `Mock(side_effect=...)` cannot be reliably hashed because the side effect is not discoverable via inspection. Use regular functions instead. | ||
| - **Dynamically generated callables**: If a callable is created via `exec`/`eval` or its behavior depends on runtime state, the hash reflects only the method's code, not captured state. | ||
|
|
||
| To ensure checkpoints work correctly, use regular Python functions defined with `def` or lambda expressions for your UDFs. | ||
|
|
||
| ## Future Plans | ||
|
|
||
| ### Partial Result Tracking for Aggregations | ||
|
|
||
| Currently, `.agg()` creates checkpoints only upon successful completion, without tracking partial progress. Future versions will extend the same incremental progress tracking that `.map()` and `.gen()` have to aggregations, allowing them to resume from where they failed rather than restarting from scratch. | ||
shcheklein marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+339
to
+343
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need "Future Plans" in docs? 🤔
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, thought it was a good idea but maybe you are right ...it could be confusing @shcheklein what is your take?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both are fine, we need partial progress though (just these day, brainspace is running 7h long agg job with ~400 inputs, each taking 30 mins)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double check we have a checkbox to follow up
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ping
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already have draft PR for this which I will finish when this branch get's merged |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| import uuid | ||
| from dataclasses import dataclass | ||
| from datetime import datetime | ||
| from enum import Enum | ||
|
|
||
|
|
||
| class CheckpointEventType(str, Enum): | ||
| """Types of checkpoint events.""" | ||
|
|
||
| # UDF events | ||
| UDF_SKIPPED = "UDF_SKIPPED" | ||
| UDF_CONTINUED = "UDF_CONTINUED" | ||
| UDF_FROM_SCRATCH = "UDF_FROM_SCRATCH" | ||
|
|
||
| # Dataset save events | ||
| DATASET_SAVE_SKIPPED = "DATASET_SAVE_SKIPPED" | ||
| DATASET_SAVE_COMPLETED = "DATASET_SAVE_COMPLETED" | ||
|
|
||
|
|
||
| class CheckpointStepType(str, Enum): | ||
| """Types of checkpoint steps.""" | ||
|
|
||
| UDF_MAP = "UDF_MAP" | ||
| UDF_GEN = "UDF_GEN" | ||
| DATASET_SAVE = "DATASET_SAVE" | ||
|
|
||
|
|
||
| @dataclass | ||
| class CheckpointEvent: | ||
| """ | ||
| Represents a checkpoint event for debugging and visibility. | ||
|
|
||
| Checkpoint events are logged during job execution to track checkpoint | ||
| decisions (skip, continue, run from scratch) and provide visibility | ||
| into what happened during script execution. | ||
| """ | ||
|
|
||
| id: str | ||
| job_id: str | ||
| run_group_id: str | None | ||
| timestamp: datetime | ||
| event_type: CheckpointEventType | ||
| step_type: CheckpointStepType | ||
| udf_name: str | None = None | ||
| dataset_name: str | None = None | ||
| checkpoint_hash: str | None = None | ||
| hash_partial: str | None = None | ||
| hash_input: str | None = None | ||
| hash_output: str | None = None | ||
| rows_input: int | None = None | ||
| rows_processed: int | None = None | ||
| rows_output: int | None = None | ||
| rows_input_reused: int | None = None | ||
| rows_output_reused: int | None = None | ||
| rerun_from_job_id: str | None = None | ||
| details: dict | None = None | ||
|
|
||
| @classmethod | ||
| def parse( # noqa: PLR0913 | ||
| cls, | ||
| id: str | uuid.UUID, | ||
| job_id: str, | ||
| run_group_id: str | None, | ||
| timestamp: datetime, | ||
| event_type: str, | ||
| step_type: str, | ||
| udf_name: str | None, | ||
| dataset_name: str | None, | ||
| checkpoint_hash: str | None, | ||
| hash_partial: str | None, | ||
| hash_input: str | None, | ||
| hash_output: str | None, | ||
| rows_input: int | None, | ||
| rows_processed: int | None, | ||
| rows_output: int | None, | ||
| rows_input_reused: int | None, | ||
| rows_output_reused: int | None, | ||
| rerun_from_job_id: str | None, | ||
| details: dict | None, | ||
| ) -> "CheckpointEvent": | ||
| return cls( | ||
| id=str(id), | ||
| job_id=job_id, | ||
| run_group_id=run_group_id, | ||
| timestamp=timestamp, | ||
| event_type=CheckpointEventType(event_type), | ||
| step_type=CheckpointStepType(step_type), | ||
| udf_name=udf_name, | ||
| dataset_name=dataset_name, | ||
| checkpoint_hash=checkpoint_hash, | ||
| hash_partial=hash_partial, | ||
| hash_input=hash_input, | ||
| hash_output=hash_output, | ||
| rows_input=rows_input, | ||
| rows_processed=rows_processed, | ||
| rows_output=rows_output, | ||
| rows_input_reused=rows_input_reused, | ||
| rows_output_reused=rows_output_reused, | ||
| rerun_from_job_id=rerun_from_job_id, | ||
| details=details, | ||
| ) |
Uh oh!
There was an error while loading. Please reload this page.