Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |
MAX_PARALLEL_FILE_BATCHES_MAX_VALUE = int(
os.environ.get("MAX_PARALLEL_FILE_BATCHES_MAX_VALUE", 100)
)
# Maximum number of times a file can be executed in a workflow
MAX_FILE_EXECUTION_COUNT = int(os.environ.get("MAX_FILE_EXECUTION_COUNT", 3))

CELERY_RESULT_CHORD_RETRY_INTERVAL = float(
os.environ.get("CELERY_RESULT_CHORD_RETRY_INTERVAL", "3")
Expand Down
4 changes: 4 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000
# File execution tracker completed TTL in seconds (10 minutes)
FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600

# Maximum number of times a file can be executed in ETL/TASK workflows
# Default: 3 (file is permanently skipped after 3 execution attempts)
MAX_FILE_EXECUTION_COUNT=3

# Runner polling timeout (3 hours)
MAX_RUNNER_POLLING_WAIT_SECONDS=10800
# Runner polling interval (2 seconds)
Expand Down
24 changes: 21 additions & 3 deletions backend/workflow_manager/internal_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ def update_status(self, request, id=None):
from workflow_manager.workflow_v2.enums import ExecutionStatus

status_enum = ExecutionStatus(validated_data["status"])
logger.info(f"Updating status for execution {id} to {status_enum}")
execution.update_execution(
status=status_enum,
error=error_message,
Expand Down Expand Up @@ -2305,6 +2306,9 @@ def post(self, request):
file_history_queryset, request, "workflow__organization"
)

# Get max execution count for this workflow (for worker decision making)
max_execution_count = workflow.get_max_execution_count()

# Get full file history details for cached results
file_histories = file_history_queryset.values(
"cache_key",
Expand All @@ -2313,6 +2317,8 @@ def post(self, request):
"error",
"file_path",
"provider_file_uuid",
"execution_count",
"status",
)

# Build response with both processed hashes (for compatibility) and full details
Expand All @@ -2328,16 +2334,21 @@ def post(self, request):
"error": fh["error"],
"file_path": fh["file_path"],
"provider_file_uuid": fh["provider_file_uuid"],
"execution_count": fh["execution_count"],
"status": fh["status"],
"max_execution_count": max_execution_count, # Include for worker logic
}

logger.info(
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed"
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed "
f"(max_execution_count: {max_execution_count})"
)

return Response(
{
"processed_file_hashes": processed_file_hashes, # For backward compatibility
"file_history_details": file_history_details, # Full details for cached results
"max_execution_count": max_execution_count, # Global max for this workflow
}
)

Expand Down Expand Up @@ -2470,10 +2481,17 @@ def post(self, request):
)

logger.info(
f"Created file history entry {file_history.id} for file {file_name}"
f"Created/updated file history entry {file_history.id} for file {file_name} "
f"(execution_count: {file_history.execution_count})"
)

return Response({"created": True, "file_history_id": str(file_history.id)})
return Response(
{
"created": True,
"file_history_id": str(file_history.id),
"execution_count": file_history.execution_count,
}
)

except Exception as e:
logger.error(f"File history creation failed: {str(e)}")
Expand Down
65 changes: 52 additions & 13 deletions backend/workflow_manager/workflow_v2/file_history_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

from django.conf import settings
from django.db.models import Q
from django.db.models import F, Q
from django.db.utils import IntegrityError
from django.utils import timezone
from utils.cache_service import CacheService
Expand Down Expand Up @@ -239,7 +239,7 @@
return None

@staticmethod
def create_file_history(

Check failure on line 242 in backend/workflow_manager/workflow_v2/file_history_helper.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 23 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=Zipstack_unstract&issues=AZq0muc9l4UGbU48qy8K&open=AZq0muc9l4UGbU48qy8K&pullRequest=1676
file_hash: FileHash,
workflow: Workflow,
status: ExecutionStatus,
Expand All @@ -248,7 +248,11 @@
error: str | None = None,
is_api: bool = False,
) -> FileHistory:
"""Create a new file history record or return existing one.
"""Create a new file history record or increment existing one's execution count.

This method implements execution count tracking:
- If file history exists: increments execution_count atomically
- If file history is new: creates with execution_count=1

Args:
file_hash (FileHash): The file hash for the file.
Expand All @@ -260,11 +264,38 @@
is_api (bool): Whether this is for API workflow (affects file_path handling).

Returns:
FileHistory: Either newly created or existing file history record.
FileHistory: Either newly created or updated file history record.
"""
file_path = file_hash.file_path if not is_api else None

# Prepare data for creation
# Check if file history already exists
existing_history = FileHistoryHelper.get_file_history(
workflow=workflow,
cache_key=file_hash.file_hash,
provider_file_uuid=file_hash.provider_file_uuid,
file_path=file_path,
)

if existing_history:
# File history exists - increment execution count atomically
FileHistory.objects.filter(id=existing_history.id).update(
execution_count=F("execution_count") + 1,
status=status,
result=str(result),
metadata=str(metadata) if metadata else "",
error=str(error) if error else "",
)
# Refresh to get updated values
existing_history.refresh_from_db()
logger.info(
f"Updated FileHistory record (execution_count: {existing_history.execution_count}) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
f"workflow={workflow}"
)
return existing_history

# File history doesn't exist - create new record with execution_count=1
create_data = {
"workflow": workflow,
"cache_key": file_hash.file_hash,
Expand All @@ -274,30 +305,29 @@
"metadata": str(metadata) if metadata else "",
"error": str(error) if error else "",
"file_path": file_path,
"execution_count": 1,
}

try:
# Try to create the file history record
file_history = FileHistory.objects.create(**create_data)
logger.info(
f"Created new FileHistory record - "
f"Created new FileHistory record (execution_count: 1) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
f"workflow={workflow}"
)
return file_history

except IntegrityError as e:
# Race condition detected - another worker created the record
# Try to retrieve the existing record
# Race condition: another worker created the record between our check and create
logger.info(
f"FileHistory constraint violation (expected in concurrent environment) - "
f"FileHistory constraint violation (race condition) - "
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
f"workflow={workflow}. Error: {str(e)}"
)

# Use the existing get_file_history method to retrieve the record
# Retrieve the record created by another worker and increment it
existing_record = FileHistoryHelper.get_file_history(
workflow=workflow,
cache_key=file_hash.file_hash,
Expand All @@ -306,18 +336,27 @@
)

if existing_record:
# Increment the existing record
FileHistory.objects.filter(id=existing_record.id).update(
execution_count=F("execution_count") + 1,
status=status,
result=str(result),
metadata=str(metadata) if metadata else "",
error=str(error) if error else "",
)
existing_record.refresh_from_db()
logger.info(
f"Retrieved existing FileHistory record after constraint violation - "
f"Retrieved and updated existing FileHistory record (execution_count: {existing_record.execution_count}) - "
f"ID: {existing_record.id}, workflow={workflow}"
)
return existing_record
else:
# This should rarely happen, but if we can't find the existing record,
# log the issue and re-raise the original exception
# This should rarely happen
logger.error(
f"Failed to retrieve existing FileHistory record after constraint violation - "
f"file_name='{file_hash.file_name}', workflow={workflow}"
)
raise

@staticmethod
def clear_history_for_workflow(
Expand Down
135 changes: 135 additions & 0 deletions backend/workflow_manager/workflow_v2/file_history_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging

from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from utils.pagination import CustomPagination

from workflow_manager.workflow_v2.models.file_history import FileHistory
from workflow_manager.workflow_v2.models.workflow import Workflow
from workflow_manager.workflow_v2.permissions import IsWorkflowOwnerOrShared
from workflow_manager.workflow_v2.serializers import FileHistorySerializer

logger = logging.getLogger(__name__)


class FileHistoryViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for file history operations with filtering support."""

serializer_class = FileHistorySerializer
lookup_field = "id"
permission_classes = [IsAuthenticated, IsWorkflowOwnerOrShared]
pagination_class = CustomPagination

def get_queryset(self):
"""Get file histories for workflow with filters."""
# Reuse cached workflow from permission check
if hasattr(self.request, "_workflow_cache"):
workflow = self.request._workflow_cache
else:
# Fallback if permission didn't run (shouldn't happen)
workflow_id = self.kwargs.get("workflow_id")
workflow = get_object_or_404(Workflow, id=workflow_id)

queryset = FileHistory.objects.filter(workflow=workflow)

# Apply simple filters from query parameters
status_param = self.request.query_params.get("status")
if status_param:
status_list = [s.strip() for s in status_param.split(",")]
queryset = queryset.filter(status__in=status_list)

exec_min = self.request.query_params.get("execution_count_min")
if exec_min:
queryset = queryset.filter(execution_count__gte=int(exec_min))

exec_max = self.request.query_params.get("execution_count_max")
if exec_max:
queryset = queryset.filter(execution_count__lte=int(exec_max))

return queryset.order_by("-created_at")

def destroy(self, request, workflow_id=None, id=None):
"""Delete single file history by ID."""
file_history = self.get_object()
file_history_id = file_history.id

# Clear Redis cache for this file before deletion
self._clear_cache_for_file(workflow_id, file_history)

file_history.delete()

logger.info(f"Deleted file history {file_history_id} for workflow {workflow_id}")

return Response(
{"message": "File history deleted successfully"}, status=status.HTTP_200_OK
)

@action(detail=False, methods=["post"])
def clear(self, request, workflow_id=None):
"""Clear file histories with filters (direct delete)."""
workflow = get_object_or_404(Workflow, id=workflow_id)
queryset = FileHistory.objects.filter(workflow=workflow)

# Apply filters from request body
status_list = request.data.get("status", [])
if status_list:
queryset = queryset.filter(status__in=status_list)

exec_min = request.data.get("execution_count_min")
if exec_min:
queryset = queryset.filter(execution_count__gte=exec_min)

exec_max = request.data.get("execution_count_max")
if exec_max:
queryset = queryset.filter(execution_count__lte=exec_max)

# Delete directly (no dry run complexity)
deleted_count, _ = queryset.delete()

# Clear Redis cache pattern for workflow
self._clear_workflow_cache(workflow_id)

logger.info(
f"Cleared {deleted_count} file history records for workflow {workflow_id}"
)

return Response(
{
"deleted_count": deleted_count,
"message": f"{deleted_count} file history records deleted",
}
)

def _clear_cache_for_file(self, workflow_id, file_history):
"""Clear Redis cache for specific file."""
try:
from workflow_manager.workflow_v2.execution.active_file_manager import (
ActiveFileManager,
)

cache_key = ActiveFileManager._create_cache_key(
workflow_id, file_history.provider_file_uuid, file_history.file_path
)
DB = settings.FILE_ACTIVE_CACHE_REDIS_DB
from utils.cache import CacheService

CacheService.delete_key(cache_key, db=DB)
logger.debug(f"Cleared cache for file: {cache_key}")
except Exception as e:
logger.warning(f"Failed to clear cache for file: {e}")

def _clear_workflow_cache(self, workflow_id):
"""Clear all Redis cache for workflow."""
try:
pattern = f"file_active:{workflow_id}:*"
DB = settings.FILE_ACTIVE_CACHE_REDIS_DB
from utils.cache import CacheService

CacheService.clear_cache_optimized(pattern, db=DB)
logger.debug(f"Cleared cache pattern: {pattern}")
except Exception as e:
logger.warning(f"Failed to clear workflow cache: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 4.2.1 on 2025-11-21 12:18

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("workflow_v2", "0017_workflow_shared_to_org_workflow_shared_users"),
]

operations = [
migrations.AddField(
model_name="filehistory",
name="execution_count",
field=models.IntegerField(
db_comment="Number of times this file has been processed", default=1
),
),
migrations.AddField(
model_name="workflow",
name="max_file_execution_count",
field=models.IntegerField(
blank=True,
db_comment="Maximum times a file can be executed. null=use org/global default. Only enforced for ETL/TASK workflows.",
null=True,
),
),
migrations.AddIndex(
model_name="filehistory",
index=models.Index(
fields=["workflow", "status"],
name="idx_fh_wf_status",
),
),
migrations.AddIndex(
model_name="filehistory",
index=models.Index(
fields=["workflow", "execution_count"],
name="idx_fh_wf_exec_cnt",
),
),
]
Loading