Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ def create_job(
python_version: str | None = None,
params: dict[str, str] | None = None,
parent_job_id: str | None = None,
rerun_from_job_id: str | None = None,
run_group_id: str | None = None,
) -> str:
"""
Creates a new job.
Expand Down Expand Up @@ -1835,7 +1837,11 @@ def _jobs_columns() -> "list[SchemaItem]":
Column("params", JSON, nullable=False),
Column("metrics", JSON, nullable=False),
Column("parent_job_id", Text, nullable=True),
Column("rerun_from_job_id", Text, nullable=True),
Column("run_group_id", Text, nullable=True),
Index("idx_jobs_parent_job_id", "parent_job_id"),
Index("idx_jobs_rerun_from_job_id", "rerun_from_job_id"),
Index("idx_jobs_run_group_id", "run_group_id"),
]

@cached_property
Expand Down Expand Up @@ -1896,13 +1902,29 @@ def create_job(
python_version: str | None = None,
params: dict[str, str] | None = None,
parent_job_id: str | None = None,
rerun_from_job_id: str | None = None,
run_group_id: str | None = None,
conn: Any = None,
) -> str:
"""
Creates a new job.
Returns the job id.
"""
Comment on lines 1909 to 1912
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring should be expanded to document the new parameters rerun_from_job_id and run_group_id, especially their relationship and the validation requirements. Consider adding documentation that explains:

  • rerun_from_job_id: The ID of the job that this job is rerunning from (None for initial jobs)
  • run_group_id: The ID that groups all jobs in a rerun chain (automatically set to the job's own ID for initial jobs, must be provided when rerun_from_job_id is set)
  • The validation requirement that both must be provided together or neither

Copilot uses AI. Check for mistakes.
job_id = str(uuid4())

# Validate run_group_id and rerun_from_job_id consistency
if rerun_from_job_id:
# Rerun job: run_group_id must be provided by caller
assert run_group_id is not None, (
"run_group_id must be provided when rerun_from_job_id is set"
)
else:
# First job: run_group_id should not be provided (we set it here)
assert run_group_id is None, (
"run_group_id should not be provided when rerun_from_job_id is not set"
)
Comment on lines +1918 to +1925
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation uses assertions which will be removed in optimized Python code (when running with -O flag). For production code, consider using explicit if statements with ValueError or AssertionError exceptions instead. This ensures the validation always runs regardless of Python optimization settings, which is important for maintaining data integrity in the jobs table.

Suggested change
assert run_group_id is not None, (
"run_group_id must be provided when rerun_from_job_id is set"
)
else:
# First job: run_group_id should not be provided (we set it here)
assert run_group_id is None, (
"run_group_id should not be provided when rerun_from_job_id is not set"
)
if run_group_id is None:
raise ValueError(
"run_group_id must be provided when rerun_from_job_id is set"
)
else:
# First job: run_group_id should not be provided (we set it here)
if run_group_id is not None:
raise ValueError(
"run_group_id should not be provided when rerun_from_job_id is not set"
)

Copilot uses AI. Check for mistakes.
run_group_id = job_id

self.db.execute(
self._jobs_insert().values(
id=job_id,
Expand All @@ -1918,6 +1940,8 @@ def create_job(
params=json.dumps(params or {}),
metrics=json.dumps({}),
parent_job_id=parent_job_id,
rerun_from_job_id=rerun_from_job_id,
run_group_id=run_group_id,
),
conn=conn,
)
Expand Down Expand Up @@ -2191,35 +2215,47 @@ def link_dataset_version_to_job(
self.db.execute(update_query, conn=conn)

def get_ancestor_job_ids(self, job_id: str, conn=None) -> list[str]:
# Use recursive CTE to walk up the parent chain
# Format: WITH RECURSIVE ancestors(id, parent_job_id, depth) AS (...)
# Use recursive CTE to walk up the rerun chain
# Format: WITH RECURSIVE ancestors(id, rerun_from_job_id, run_group_id,
# depth) AS (...)
# Include depth tracking to prevent infinite recursion in case of
# circular dependencies
ancestors_cte = (
self._jobs_select(
self._jobs.c.id.label("id"),
self._jobs.c.parent_job_id.label("parent_job_id"),
self._jobs.c.rerun_from_job_id.label("rerun_from_job_id"),
self._jobs.c.run_group_id.label("run_group_id"),
literal(0).label("depth"),
)
.where(self._jobs.c.id == job_id)
.cte(name="ancestors", recursive=True)
)

# Recursive part: join with parent jobs, incrementing depth and checking limit
# Also ensure we only traverse jobs within the same run_group_id for safety
ancestors_recursive = ancestors_cte.union_all(
self._jobs_select(
self._jobs.c.id.label("id"),
self._jobs.c.parent_job_id.label("parent_job_id"),
self._jobs.c.rerun_from_job_id.label("rerun_from_job_id"),
self._jobs.c.run_group_id.label("run_group_id"),
(ancestors_cte.c.depth + 1).label("depth"),
).select_from(
self._jobs.join(
ancestors_cte,
(
self._jobs.c.id
== cast(ancestors_cte.c.parent_job_id, self._jobs.c.id.type)
== cast(ancestors_cte.c.rerun_from_job_id, self._jobs.c.id.type)
)
& (ancestors_cte.c.parent_job_id.isnot(None)) # Stop at root jobs
& (ancestors_cte.c.depth < JOB_ANCESTRY_MAX_DEPTH),
& (
ancestors_cte.c.rerun_from_job_id.isnot(None)
) # Stop at root jobs
& (ancestors_cte.c.depth < JOB_ANCESTRY_MAX_DEPTH)
& (
self._jobs.c.run_group_id
== cast(
ancestors_cte.c.run_group_id, self._jobs.c.run_group_id.type
)
), # Safety: only traverse within same run group
Comment on lines +2253 to +2258
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safety check that filters by run_group_id could have issues with NULL values from legacy jobs created before this PR. In SQL, NULL == NULL evaluates to NULL (not TRUE), which means the recursive traversal might not work correctly for jobs with NULL run_group_id values.

Consider adding a check at the beginning of the function to handle the case where the starting job has a NULL run_group_id, or document that this function requires jobs to have non-NULL run_group_id values. For example, you could either:

  1. Return an empty list if the initial job's run_group_id is NULL
  2. Skip the run_group_id filter if the initial job's run_group_id is NULL (though this would reduce safety)
  3. Use a coalesce or IS NOT DISTINCT FROM comparison that handles NULLs properly
Suggested change
& (
self._jobs.c.run_group_id
== cast(
ancestors_cte.c.run_group_id, self._jobs.c.run_group_id.type
)
), # Safety: only traverse within same run group
& self._jobs.c.run_group_id.isnot_distinct_from(
cast(
ancestors_cte.c.run_group_id,
self._jobs.c.run_group_id.type,
)
), # Safety: only traverse within same run group (handles NULLs)

Copilot uses AI. Check for mistakes.
)
)
)
Expand Down
6 changes: 6 additions & 0 deletions src/datachain/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Job:
error_message: str = ""
error_stack: str = ""
parent_job_id: str | None = None
rerun_from_job_id: str | None = None
run_group_id: str | None = None

@classmethod
def parse(
Expand All @@ -42,6 +44,8 @@ def parse(
params: str,
metrics: str,
parent_job_id: str | None,
rerun_from_job_id: str | None,
run_group_id: str | None,
) -> "Job":
return cls(
str(id),
Expand All @@ -58,4 +62,6 @@ def parse(
error_message,
error_stack,
str(parent_job_id) if parent_job_id else None,
str(rerun_from_job_id) if rerun_from_job_id else None,
str(run_group_id) if run_group_id else None,
)
4 changes: 2 additions & 2 deletions src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,9 @@ def _resolve_checkpoint(
_hash = self._calculate_job_hash(job.id)

if (
job.parent_job_id
job.rerun_from_job_id
and not checkpoints_reset
and metastore.find_checkpoint(job.parent_job_id, _hash)
and metastore.find_checkpoint(job.rerun_from_job_id, _hash)
):
# checkpoint found → find which dataset version to reuse

Expand Down
5 changes: 3 additions & 2 deletions src/datachain/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def get_or_create_job(self) -> "Job":
script = str(uuid4())
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"

# try to find the parent job
# try to find the parent job for checkpoint/rerun chain
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "try to find the parent job for checkpoint/rerun chain" but the code now uses rerun_from_job_id instead of parent_job_id. Consider updating the comment to "try to find the previous job in the checkpoint/rerun chain" to better reflect the new terminology and avoid confusion.

Suggested change
# try to find the parent job for checkpoint/rerun chain
# try to find the previous job in the checkpoint/rerun chain

Copilot uses AI. Check for mistakes.
parent = self.catalog.metastore.get_last_job_by_name(script)

job_id = self.catalog.metastore.create_job(
Expand All @@ -163,7 +163,8 @@ def get_or_create_job(self) -> "Job":
query_type=JobQueryType.PYTHON,
status=JobStatus.RUNNING,
python_version=python_version,
parent_job_id=parent.id if parent else None,
rerun_from_job_id=parent.id if parent else None,
run_group_id=parent.run_group_id if parent else None,
)
Session._CURRENT_JOB = self.catalog.metastore.get_job(job_id)
Session._OWNS_JOB = True
Expand Down
19 changes: 12 additions & 7 deletions tests/func/test_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,13 +912,14 @@ def test_get_job_status(metastore):
@pytest.mark.parametrize("depth", [0, 1, 2, 3, 5])
def test_get_ancestor_job_ids(metastore, depth):
"""Test get_ancestor_job_ids with different hierarchy depths."""
# Create a chain of jobs with parent relationships
# depth=0: single job with no parent
# depth=1: job -> parent
# depth=2: job -> parent -> grandparent
# Create a chain of jobs with rerun relationships
# depth=0: single job with no rerun ancestor
# depth=1: job -> rerun_from
# depth=2: job -> rerun_from -> rerun_from

job_ids = []
parent_id = None
rerun_from_id = None
group_id = None

# Create jobs from root to leaf
for i in range(depth + 1):
Expand All @@ -928,10 +929,14 @@ def test_get_ancestor_job_ids(metastore, depth):
query_type=JobQueryType.PYTHON,
status=JobStatus.CREATED,
workers=1,
parent_job_id=parent_id,
rerun_from_job_id=rerun_from_id,
run_group_id=group_id,
)
job_ids.append(job_id)
parent_id = job_id
rerun_from_id = job_id
# First job sets the group_id
if group_id is None:
group_id = metastore.get_job(job_id).run_group_id

# The last job is the leaf (youngest)
leaf_job_id = job_ids[-1]
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/lib/test_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def test_checkpoints(
chain.save("nums2")
with pytest.raises(CustomMapperError):
chain.map(new=mapper_fail).save("nums3")
first_job_id = test_session.get_or_create_job().id
first_job = test_session.get_or_create_job()
first_job_id = first_job.id

catalog.get_dataset("nums1")
catalog.get_dataset("nums2")
Expand All @@ -116,7 +117,12 @@ def test_checkpoints(
if use_datachain_job_id_env:
monkeypatch.setenv(
"DATACHAIN_JOB_ID",
metastore.create_job("my-job", "echo 1;", parent_job_id=first_job_id),
metastore.create_job(
"my-job",
"echo 1;",
rerun_from_job_id=first_job_id,
run_group_id=first_job.run_group_id,
),
)

chain.save("nums1")
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_get_or_create_links_to_parent(test_session, patch_argv, monkeypatch):
session2 = Session(catalog=test_session.catalog)
job2 = session2.get_or_create_job()

assert job2.parent_job_id == job1.id
assert job2.rerun_from_job_id == job1.id
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies that rerun_from_job_id is set correctly but doesn't verify that run_group_id is also set properly. Since these two fields must be provided together (as enforced by the validation in create_job()), the test should also assert that job2.run_group_id == job1.run_group_id (or more specifically, that job2.run_group_id == job1.id since the first job in a chain sets its own ID as the group ID).

Copilot uses AI. Check for mistakes.


def test_nested_sessions_share_same_job(test_session, patch_argv, monkeypatch):
Expand Down