Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |
MAX_PARALLEL_FILE_BATCHES_MAX_VALUE = int(
os.environ.get("MAX_PARALLEL_FILE_BATCHES_MAX_VALUE", 100)
)
# Maximum number of times a file can be executed in a workflow
MAX_FILE_EXECUTION_COUNT = int(os.environ.get("MAX_FILE_EXECUTION_COUNT", 3))

CELERY_RESULT_CHORD_RETRY_INTERVAL = float(
os.environ.get("CELERY_RESULT_CHORD_RETRY_INTERVAL", "3")
Expand Down
4 changes: 4 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000
# File execution tracker completed TTL in seconds (10 minutes)
FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600

# Maximum number of times a file can be executed in ETL/TASK workflows
# Default: 3 (file is permanently skipped after 3 execution attempts)
MAX_FILE_EXECUTION_COUNT=3

# Runner polling timeout (3 hours)
MAX_RUNNER_POLLING_WAIT_SECONDS=10800
# Runner polling interval (2 seconds)
Expand Down
24 changes: 21 additions & 3 deletions backend/workflow_manager/internal_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ def update_status(self, request, id=None):
from workflow_manager.workflow_v2.enums import ExecutionStatus

status_enum = ExecutionStatus(validated_data["status"])
logger.info(f"Updating status for execution {id} to {status_enum}")
execution.update_execution(
status=status_enum,
error=error_message,
Expand Down Expand Up @@ -2305,6 +2306,9 @@ def post(self, request):
file_history_queryset, request, "workflow__organization"
)

# Get max execution count for this workflow (for worker decision making)
max_execution_count = workflow.get_max_execution_count()

# Get full file history details for cached results
file_histories = file_history_queryset.values(
"cache_key",
Expand All @@ -2313,6 +2317,8 @@ def post(self, request):
"error",
"file_path",
"provider_file_uuid",
"execution_count",
"status",
)

# Build response with both processed hashes (for compatibility) and full details
Expand All @@ -2328,16 +2334,21 @@ def post(self, request):
"error": fh["error"],
"file_path": fh["file_path"],
"provider_file_uuid": fh["provider_file_uuid"],
"execution_count": fh["execution_count"],
"status": fh["status"],
"max_execution_count": max_execution_count, # Include for worker logic
}

logger.info(
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed"
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed "
f"(max_execution_count: {max_execution_count})"
)

return Response(
{
"processed_file_hashes": processed_file_hashes, # For backward compatibility
"file_history_details": file_history_details, # Full details for cached results
"max_execution_count": max_execution_count, # Global max for this workflow
}
)

Expand Down Expand Up @@ -2470,10 +2481,17 @@ def post(self, request):
)

logger.info(
f"Created file history entry {file_history.id} for file {file_name}"
f"Created/updated file history entry {file_history.id} for file {file_name} "
f"(execution_count: {file_history.execution_count})"
)

return Response({"created": True, "file_history_id": str(file_history.id)})
return Response(
{
"created": True,
"file_history_id": str(file_history.id),
"execution_count": file_history.execution_count,
}
)

except Exception as e:
logger.error(f"File history creation failed: {str(e)}")
Expand Down
133 changes: 108 additions & 25 deletions backend/workflow_manager/workflow_v2/file_history_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

from django.conf import settings
from django.db.models import Q
from django.db.models import F, Q
from django.db.utils import IntegrityError
from django.utils import timezone
from utils.cache_service import CacheService
Expand Down Expand Up @@ -238,6 +238,61 @@ def _get_reprocessing_interval_from_config(
workflow_log.log_error(logger=logger, message=error_msg)
return None

@staticmethod
def _safe_str(value: Any) -> str:
"""Convert value to string, return empty string if None.

Args:
value: Value to convert

Returns:
str: String representation or empty string
"""
return "" if value is None else str(value)

@staticmethod
def _truncate_hash(file_hash: str | None) -> str:
"""Truncate hash for logging purposes.

Args:
file_hash: Hash string to truncate

Returns:
str: Truncated hash (first 16 chars) or 'None' if missing
"""
return file_hash[:16] if file_hash else "None"

@staticmethod
def _increment_file_history(
file_history: FileHistory,
status: ExecutionStatus,
result: Any,
metadata: str | None,
error: str | None,
) -> FileHistory:
"""Update existing file history with incremented execution count.

Args:
file_history: FileHistory instance to update
status: New execution status
result: Execution result
metadata: Execution metadata
error: Error message if any

Returns:
FileHistory: Updated file history instance
"""
FileHistory.objects.filter(id=file_history.id).update(
execution_count=F("execution_count") + 1,
status=status,
result=str(result),
metadata=FileHistoryHelper._safe_str(metadata),
error=FileHistoryHelper._safe_str(error),
)
# Refresh from DB to get updated values
file_history.refresh_from_db()
return file_history

@staticmethod
def create_file_history(
file_hash: FileHash,
Expand All @@ -248,7 +303,11 @@ def create_file_history(
error: str | None = None,
is_api: bool = False,
) -> FileHistory:
"""Create a new file history record or return existing one.
"""Create a new file history record or increment existing one's execution count.

This method implements execution count tracking:
- If file history exists: increments execution_count atomically
- If file history is new: creates with execution_count=1

Args:
file_hash (FileHash): The file hash for the file.
Expand All @@ -260,44 +319,64 @@ def create_file_history(
is_api (bool): Whether this is for API workflow (affects file_path handling).

Returns:
FileHistory: Either newly created or existing file history record.
FileHistory: Either newly created or updated file history record.
"""
file_path = file_hash.file_path if not is_api else None

# Prepare data for creation
# Check if file history already exists
existing_history = FileHistoryHelper.get_file_history(
workflow=workflow,
cache_key=file_hash.file_hash,
provider_file_uuid=file_hash.provider_file_uuid,
file_path=file_path,
)

if existing_history:
# File history exists - increment execution count atomically
updated_history = FileHistoryHelper._increment_file_history(
existing_history, status, result, metadata, error
)
logger.info(
f"Updated FileHistory record (execution_count: {updated_history.execution_count}) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
f"workflow={workflow}"
)
return updated_history

# File history doesn't exist - create new record with execution_count=1
create_data = {
"workflow": workflow,
"cache_key": file_hash.file_hash,
"provider_file_uuid": file_hash.provider_file_uuid,
"status": status,
"result": str(result),
"metadata": str(metadata) if metadata else "",
"error": str(error) if error else "",
"metadata": FileHistoryHelper._safe_str(metadata),
"error": FileHistoryHelper._safe_str(error),
"file_path": file_path,
"execution_count": 1,
}

try:
# Try to create the file history record
file_history = FileHistory.objects.create(**create_data)
logger.info(
f"Created new FileHistory record - "
f"Created new FileHistory record (execution_count: 1) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
f"workflow={workflow}"
)
return file_history

except IntegrityError as e:
# Race condition detected - another worker created the record
# Try to retrieve the existing record
# Race condition: another worker created the record between our check and create
logger.info(
f"FileHistory constraint violation (expected in concurrent environment) - "
f"FileHistory constraint violation (race condition) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
f"workflow={workflow}. Error: {str(e)}"
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
f"workflow={workflow}. Error: {e!s}"
)

# Use the existing get_file_history method to retrieve the record
# Retrieve the record created by another worker and increment it
existing_record = FileHistoryHelper.get_file_history(
workflow=workflow,
cache_key=file_hash.file_hash,
Expand All @@ -306,18 +385,22 @@ def create_file_history(
)

if existing_record:
logger.info(
f"Retrieved existing FileHistory record after constraint violation - "
f"ID: {existing_record.id}, workflow={workflow}"
# Increment the existing record
updated_record = FileHistoryHelper._increment_file_history(
existing_record, status, result, metadata, error
)
return existing_record
else:
# This should rarely happen, but if we can't find the existing record,
# log the issue and re-raise the original exception
logger.error(
f"Failed to retrieve existing FileHistory record after constraint violation - "
f"file_name='{file_hash.file_name}', workflow={workflow}"
logger.info(
f"Retrieved and updated existing FileHistory record (execution_count: {updated_record.execution_count}) - "
f"ID: {updated_record.id}, workflow={workflow}"
)
return updated_record

# This should rarely happen - existing record not found after IntegrityError
logger.exception(
f"Failed to retrieve existing FileHistory record after constraint violation - "
f"file_name='{file_hash.file_name}', workflow={workflow}"
)
raise

@staticmethod
def clear_history_for_workflow(
Expand Down
Loading