-
Notifications
You must be signed in to change notification settings - Fork 141
UDF Checkpoints cleanup #1454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
UDF Checkpoints cleanup #1454
Changes from all commits
Commits
Show all changes
188 commits
Select commit
Hold shift + click to select a range
7f876e8
using session instead of catalog in udfstep
ilongin a5c4572
refactoring job creation in datachain
ilongin 70a44a6
implementing first phase of UDF checkpoints
ilongin f4c848b
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin d8a337a
refactoring
ilongin d7b5ed9
changing udf table names
ilongin 8752a9a
adding checkpoint tests and fixing cleaning udf tables in test
ilongin 862fe28
added udf checkpoint continue from partial results
ilongin b599429
added udf generator logic and tests
ilongin 3804b0c
merging with main
ilongin 7c05e0d
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 20346e7
fixing logic
ilongin e0242c9
merging with main
ilongin 8fd41af
fixing issues and tests
ilongin 630e37b
refactoring tests
ilongin b31d44a
refactoring
ilongin b5bb8cd
refactoring
ilongin a5f0fcd
refactoring
ilongin 92590f7
refactoring udf table ownership logic
ilongin 3c2211d
refactoring
ilongin 181ea2e
refactoring tests
ilongin 88c2648
fixing cast of recursive sql
ilongin c0f46cb
using has_table instead checking metadata
ilongin 14e473b
fixing tests
ilongin d68d746
fixing cleaning table and partition by
ilongin 8e0339f
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 08c9ec4
fixing test
ilongin bb50da7
fixing aggregator
ilongin bd7d978
fixing hash collision
ilongin 4ddee8f
merging with main
ilongin 5b80a87
refactoring and removing processed table
ilongin 9a6c71f
fixing tests
ilongin a2d6b34
fixing tests
ilongin 3621cde
returning
ilongin 437b63c
updated coverage
ilongin 76125d0
removed coverate sysmon
ilongin c644aa1
refactoring checkpoint cleaning
ilongin 5f6f183
Remove cleanup_checkpoints functionality for separate PR
ilongin 91f7da5
Add cleanup_checkpoints functionality
ilongin 709873c
fixing tests
ilongin e180338
fixing tests
ilongin aaf43f9
added udf checkpoint docs
ilongin 1488fab
refactoring
ilongin d7f3a50
fixing tests
ilongin 8a2ec11
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 4379d11
fix creating processed table even in reset mode
ilongin 27033e5
added tests
ilongin d73d55d
refactoring processed tracking for generators
ilongin b15f1c9
refactoring tests
ilongin fd5019e
refactoring create_table method
ilongin e4e6de9
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 1e7f941
fix re-run when UDF output changes
ilongin 6a5c140
Update src/datachain/cli/commands/misc.py
ilongin f914b7f
fixing docs and some code parts
ilongin 91aa89c
refactoring
ilongin 452ae72
returning sysmon
ilongin 2375237
renaming create_checkpoint method
ilongin 55b3846
simplified logic
ilongin 87a51f3
removing batch_callback
ilongin 43835f5
merging with main
ilongin 911a3dc
refactoring
ilongin ad2907d
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 0b3e092
removing tracking_fiedl
ilongin e909118
fixing tests
ilongin 7bbe619
fixing ancestor job id find
ilongin fa5053b
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 140e56b
refactor remove_checkpoint to accept only id
ilongin 54c4493
removed comment
ilongin e9f48f5
refactoring creating table
ilongin 38fa81d
refactoring
ilongin 8ab52ae
updated docs by removing parent verb
ilongin ced14b4
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 0f88905
adding staging sufix for table atomicity when doing copy
ilongin 6f7e06f
break parent connection when reset flag is present
ilongin af784f2
fixing docs
ilongin 7703eb5
fixing docs and other small fixes
ilongin 4a7b4ca
fixing docs and other small fixes
ilongin a95a764
fixing comments
ilongin 8876a3b
discarding changes with garabage collecting method of cli
ilongin 13f3552
moving list_tables function to tests util
ilongin 157a437
unifying prepare_row functions
ilongin c9d3bb0
adding hash_input and hash_output as default args in apply method of …
ilongin 2227fe1
renaming sys_id to sys__processed_id
ilongin f459d60
removed not needed quote_schema from sqlite in removing tables for test
ilongin f93b49d
fixing issue with incomplete inputs in generator
ilongin a2a9a98
merging with main
ilongin 631cdb3
merging with main
ilongin 49e7641
added docs
ilongin f50058f
reorganizing tests
ilongin d11fb5d
var renaming
ilongin 96f9de9
added regression test for subtract
ilongin ec98372
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin c77858a
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 9a51f9c
make hash_callable not fail if unexpected callalbe is input
ilongin 298bcf3
disable checkpoints in threading / multiprocess
aa11f80
added custom migration function for checkpoints
ilongin ee464fb
Merge branch 'ilongin/1392-udf-checkpoints' of github.com:datachain-a…
ilongin e2ab50b
renaming checkpointstable and removing not needed migration function
9cb16c7
Merge branch 'main' into ilongin/1392-udf-checkpoints
3685dca
fixing non determinisitc tests for CH
ilongin eba46b5
fixing bug with continuing udf processing
ilongin e58d742
fixing test
ilongin 1feac6d
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 13c6aa0
fixing docs
ilongin a878df6
removed not needde comments
ilongin c61a13b
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin d88d68a
removed not needed flag
ilongin abccfbc
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 809e9a3
removed not needed env var
ilongin ab6799f
renamed env var
ilongin fa00047
reduced number of parallel
ilongin da8fd5b
added envs to env docs
ilongin 115ea69
moved function to check concurrency for checkpoints from session to u…
ilongin 334f5fb
removed comment
ilongin cb18ee4
fixing correct parent id
ilongin d1f83f1
moving check if checkpoint is enabled because of concurency from meta…
ilongin 79655e7
removed partial constraint
ilongin b93f328
removing test
ilongin 4352423
refactoring test
ilongin 2504498
refactoring and merging with main
ilongin 97fe6ae
fixing comment
ilongin 26f1eef
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 5049e96
Merge branch 'ilongin/1392-udf-checkpoints' into ilongin/1453-checkpo…
ilongin 985415d
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 31fe6dd
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin f117751
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 15751ba
returning old checkpoints table name
ilongin 85305b0
refactoring input table name hash
ilongin cafeadf
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin a022bc1
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 27781df
using group id for input table name in udf
ilongin 05cf600
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin af0dc7f
using pid and thread ownership to determine if checkpoints are enable…
ilongin b108dc8
fixing test
ilongin 24c3894
refactoring tests
ilongin eb0e03a
refactoring tests
ilongin 9d93aad
removing not needed conditions
ilongin e8ec502
refactoring
ilongin 3bcfa18
fixing comment
ilongin 4dd9cd4
refactoring
ilongin 8fdfea2
merging with main
ilongin 34402f4
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 8ea8b12
fixing race condition
ilongin acf79c8
adde safe_copy_table
ilongin 585d685
refactoring copy_table methods
ilongin adb828e
continuing UDF if parent partial table is not found
ilongin 797b6cd
added try/catch of missing table
ilongin 505f304
refactor transaction context usage
ilongin d702b21
optimized query
ilongin 8df4905
added thread lock
ilongin 659cc1c
updated docs with hashing limitations
ilongin b90a9d6
renaming function
ilongin 3431c10
removed unrelated lint exception
ilongin 19093a3
refactoring checkpoint tests
ilongin 6803345
fixing env vars and verbose comments
ilongin 8c57340
ading runtime error
ilongin d25b5af
refactoring
ilongin 7a44193
removing name and job_aware to hash method of DataChain
ilongin e267deb
refactoring
ilongin bee6e0f
merging with main
ilongin 0144c1a
refactoring
ilongin c457bf9
refactoring
ilongin 15650eb
added logs
ilongin 495f189
fixing env vars
ilongin 8e6b2e5
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 56c6b78
refactoring tests
ilongin 8b16be4
removing not neededd monkeypatch
ilongin 88048de
merging with main
ilongin e903715
added more tests
ilongin 79c94f2
closing sqlite connections in test
ilongin a83fafe
moving get_table to db specific implementation
ilongin 1d77ac2
return get_table to db_engine
ilongin 0de0d3f
added job_id to hash
ilongin 8b8a8d3
improved logging
ilongin 0f61ea7
Added `CheckpointEvent` model to track checkpoint events (#1575)
ilongin 25def9c
added prints
ilongin 5a70e41
added print only when it is second job
ilongin 12a771c
removed not used var
ilongin 41847c9
removed print
ilongin 6279013
merging with base branch
ilongin 85e0c42
Merge branch 'main' into ilongin/1392-udf-checkpoints
ilongin 3e6601c
fixing reading files on udf continue
ilongin a9358d1
UDF checkpoint visibility (#1576)
ilongin 22ebd7d
merged with main
ilongin 1a04a7c
refactoring checkpoint events
ilongin 5998b8e
fixing lint
ilongin e83884b
adding missing tests and fixing issues
ilongin 6e18191
Merge branch 'ilongin/1392-udf-checkpoints' into ilongin/1453-checkpo…
ilongin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |||||||||
| from contextlib import contextmanager, suppress | ||||||||||
| from copy import copy | ||||||||||
| from dataclasses import dataclass | ||||||||||
| from datetime import datetime, timedelta, timezone | ||||||||||
| from functools import cached_property, reduce | ||||||||||
| from typing import TYPE_CHECKING, Any | ||||||||||
| from uuid import uuid4 | ||||||||||
|
|
@@ -24,6 +25,7 @@ | |||||||||
| from tqdm.auto import tqdm | ||||||||||
|
|
||||||||||
| from datachain.cache import Cache | ||||||||||
| from datachain.checkpoint import Checkpoint | ||||||||||
| from datachain.client import Client | ||||||||||
| from datachain.dataset import ( | ||||||||||
| DATASET_PREFIX, | ||||||||||
|
|
@@ -2042,3 +2044,76 @@ def index( | |||||||||
| only_index=True, | ||||||||||
| ): | ||||||||||
| pass | ||||||||||
|
|
||||||||||
| def _remove_checkpoint(self, checkpoint: Checkpoint) -> None: | ||||||||||
| """Remove a checkpoint and its associated job-specific UDF tables.""" | ||||||||||
| # Remove the checkpoint from metastore first | ||||||||||
| self.metastore.remove_checkpoint(checkpoint.id) | ||||||||||
|
|
||||||||||
| # Remove job-specific tables for this checkpoint | ||||||||||
| # Table patterns: udf_{job_id}_{hash}_{suffix} | ||||||||||
| # where suffix can be: input, output, output_partial, processed | ||||||||||
| job_id_sanitized = checkpoint.job_id.replace("-", "") | ||||||||||
| table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_" | ||||||||||
| matching_tables = self.warehouse.db.list_tables(prefix=table_prefix) | ||||||||||
|
|
||||||||||
| if matching_tables: | ||||||||||
|
Comment on lines
+2058
to
+2060
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (code-quality): Use named expression to simplify assignment and conditional (
Suggested change
|
||||||||||
| self.warehouse.cleanup_tables(matching_tables) | ||||||||||
|
|
||||||||||
| def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> int: | ||||||||||
| """ | ||||||||||
| Clean up outdated checkpoints and their associated UDF tables. | ||||||||||
|
|
||||||||||
| Removes outdated checkpoints only if no jobs in the same run group have | ||||||||||
| active (non-outdated) checkpoints. | ||||||||||
|
|
||||||||||
| This prevents accumulation of checkpoints while ensuring that tables | ||||||||||
| are preserved when any related job still needs them. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| ttl_seconds: Time-to-live in seconds. Checkpoints older than this | ||||||||||
| are considered outdated. If None, uses CHECKPOINT_TTL | ||||||||||
| environment variable or default. | ||||||||||
|
|
||||||||||
| Returns: | ||||||||||
| Number of checkpoints removed. | ||||||||||
| """ | ||||||||||
| if ttl_seconds is None: | ||||||||||
| ttl_seconds = int(os.environ.get("CHECKPOINT_TTL", str(TTL_INT))) | ||||||||||
|
|
||||||||||
| ttl_threshold = datetime.now(timezone.utc) - timedelta(seconds=ttl_seconds) | ||||||||||
|
|
||||||||||
| outdated_checkpoints = list( | ||||||||||
| self.metastore.list_checkpoints(created_before=ttl_threshold) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| if not outdated_checkpoints: | ||||||||||
| return 0 | ||||||||||
|
|
||||||||||
| job_ids = list({ch.job_id for ch in outdated_checkpoints}) | ||||||||||
| jobs = {job.id: job for job in self.metastore.list_jobs_by_ids(job_ids)} | ||||||||||
|
|
||||||||||
| # Cache per run_group_id to avoid redundant checks | ||||||||||
| has_active_in_group_cache: dict[str, bool] = {} | ||||||||||
|
|
||||||||||
| removed_count = 0 | ||||||||||
| for ch in outdated_checkpoints: | ||||||||||
| job = jobs.get(ch.job_id) | ||||||||||
|
|
||||||||||
| if not job or not job.run_group_id: | ||||||||||
| self._remove_checkpoint(ch) | ||||||||||
| removed_count += 1 | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if job.run_group_id not in has_active_in_group_cache: | ||||||||||
| has_active_in_group_cache[job.run_group_id] = ( | ||||||||||
| self.metastore.has_active_checkpoints_in_run_group( | ||||||||||
| job.run_group_id, ttl_threshold | ||||||||||
| ) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| if not has_active_in_group_cache[job.run_group_id]: | ||||||||||
| self._remove_checkpoint(ch) | ||||||||||
| removed_count += 1 | ||||||||||
|
|
||||||||||
| return removed_count | ||||||||||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Sanitizing job_id by removing hyphens may not be sufficient for table name safety.
Consider validating job_id to exclude all invalid characters and reserved words to ensure table names remain safe and compliant.
Suggested implementation: