diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index b6cbb2599d..94cb55b3ae 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -200,6 +200,8 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | MAX_PARALLEL_FILE_BATCHES_MAX_VALUE = int( os.environ.get("MAX_PARALLEL_FILE_BATCHES_MAX_VALUE", 100) ) +# Maximum number of times a file can be executed in a workflow +MAX_FILE_EXECUTION_COUNT = int(os.environ.get("MAX_FILE_EXECUTION_COUNT", 3)) CELERY_RESULT_CHORD_RETRY_INTERVAL = float( os.environ.get("CELERY_RESULT_CHORD_RETRY_INTERVAL", "3") diff --git a/backend/sample.env b/backend/sample.env index 0ed0ce9b99..bd62165546 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -217,6 +217,10 @@ FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000 # File execution tracker completed TTL in seconds (10 minutes) FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 +# Maximum number of times a file can be executed in ETL/TASK workflows +# Default: 3 (file is permanently skipped after 3 execution attempts) +MAX_FILE_EXECUTION_COUNT=3 + # Runner polling timeout (3 hours) MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # Runner polling interval (2 seconds) diff --git a/backend/workflow_manager/internal_views.py b/backend/workflow_manager/internal_views.py index 411604488c..94ecd7f76e 100644 --- a/backend/workflow_manager/internal_views.py +++ b/backend/workflow_manager/internal_views.py @@ -502,6 +502,7 @@ def update_status(self, request, id=None): from workflow_manager.workflow_v2.enums import ExecutionStatus status_enum = ExecutionStatus(validated_data["status"]) + logger.info(f"Updating status for execution {id} to {status_enum}") execution.update_execution( status=status_enum, error=error_message, @@ -2305,6 +2306,9 @@ def post(self, request): file_history_queryset, request, "workflow__organization" ) + # Get max execution count for this workflow (for worker decision making) + max_execution_count = workflow.get_max_execution_count() + # Get full file history details for cached results file_histories = file_history_queryset.values( "cache_key", @@ -2313,6 +2317,8 @@ def post(self, request): "error", "file_path", "provider_file_uuid", + "execution_count", + "status", ) # Build response with both processed hashes (for compatibility) and full details @@ -2328,16 +2334,21 @@ def post(self, request): "error": fh["error"], "file_path": fh["file_path"], "provider_file_uuid": fh["provider_file_uuid"], + "execution_count": fh["execution_count"], + "status": fh["status"], + "max_execution_count": max_execution_count, # Include for worker logic } logger.info( - f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed" + f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed " + f"(max_execution_count: {max_execution_count})" ) return Response( { "processed_file_hashes": processed_file_hashes, # For backward compatibility "file_history_details": file_history_details, # Full details for cached results + "max_execution_count": max_execution_count, # Global max for this workflow } ) @@ -2470,10 +2481,17 @@ def post(self, request): ) logger.info( - f"Created file history entry {file_history.id} for file {file_name}" + f"Created/updated file history entry {file_history.id} for file {file_name} " + f"(execution_count: {file_history.execution_count})" ) - return Response({"created": True, "file_history_id": str(file_history.id)}) + return Response( + { + "created": True, + "file_history_id": str(file_history.id), + "execution_count": file_history.execution_count, + } + ) except Exception as e: logger.error(f"File history creation failed: {str(e)}") diff --git a/backend/workflow_manager/workflow_v2/file_history_helper.py b/backend/workflow_manager/workflow_v2/file_history_helper.py index 0929f1557b..687f88dfe9 100644 --- a/backend/workflow_manager/workflow_v2/file_history_helper.py +++ b/backend/workflow_manager/workflow_v2/file_history_helper.py @@ -3,7 +3,7 @@ from typing import Any from django.conf import settings -from django.db.models import Q +from django.db.models import F, Q from django.db.utils import IntegrityError from django.utils import timezone from utils.cache_service import CacheService @@ -238,6 +238,61 @@ def _get_reprocessing_interval_from_config( workflow_log.log_error(logger=logger, message=error_msg) return None + @staticmethod + def _safe_str(value: Any) -> str: + """Convert value to string, return empty string if None. + + Args: + value: Value to convert + + Returns: + str: String representation or empty string + """ + return "" if value is None else str(value) + + @staticmethod + def _truncate_hash(file_hash: str | None) -> str: + """Truncate hash for logging purposes. + + Args: + file_hash: Hash string to truncate + + Returns: + str: Truncated hash (first 16 chars) or 'None' if missing + """ + return file_hash[:16] if file_hash else "None" + + @staticmethod + def _increment_file_history( + file_history: FileHistory, + status: ExecutionStatus, + result: Any, + metadata: str | None, + error: str | None, + ) -> FileHistory: + """Update existing file history with incremented execution count. + + Args: + file_history: FileHistory instance to update + status: New execution status + result: Execution result + metadata: Execution metadata + error: Error message if any + + Returns: + FileHistory: Updated file history instance + """ + FileHistory.objects.filter(id=file_history.id).update( + execution_count=F("execution_count") + 1, + status=status, + result=str(result), + metadata=FileHistoryHelper._safe_str(metadata), + error=FileHistoryHelper._safe_str(error), + ) + # Refresh from DB to get updated values + file_history.refresh_from_db() + return file_history + @staticmethod def create_file_history( file_hash: FileHash, @@ -248,7 +303,11 @@ def create_file_history( error: str | None = None, is_api: bool = False, ) -> FileHistory: - """Create a new file history record or return existing one. + """Create a new file history record or increment existing one's execution count. + + This method implements execution count tracking: + - If file history exists: increments execution_count atomically + - If file history is new: creates with execution_count=1 Args: file_hash (FileHash): The file hash for the file. @@ -260,44 +319,64 @@ def create_file_history( is_api (bool): Whether this is for API workflow (affects file_path handling). Returns: - FileHistory: Either newly created or existing file history record. + FileHistory: Either newly created or updated file history record. """ file_path = file_hash.file_path if not is_api else None - # Prepare data for creation + # Check if file history already exists + existing_history = FileHistoryHelper.get_file_history( + workflow=workflow, + cache_key=file_hash.file_hash, + provider_file_uuid=file_hash.provider_file_uuid, + file_path=file_path, + ) + + if existing_history: + # File history exists - increment execution count atomically + updated_history = FileHistoryHelper._increment_file_history( + existing_history, status, result, metadata, error + ) + logger.info( + f"Updated FileHistory record (execution_count: {updated_history.execution_count}) - " + f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', " + f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', " + f"workflow={workflow}" + ) + return updated_history + + # File history doesn't exist - create new record with execution_count=1 create_data = { "workflow": workflow, "cache_key": file_hash.file_hash, "provider_file_uuid": file_hash.provider_file_uuid, "status": status, "result": str(result), - "metadata": str(metadata) if metadata else "", - "error": str(error) if error else "", + "metadata": FileHistoryHelper._safe_str(metadata), + "error": FileHistoryHelper._safe_str(error), "file_path": file_path, + "execution_count": 1, } try: - # Try to create the file history record file_history = FileHistory.objects.create(**create_data) logger.info( - f"Created new FileHistory record - " + f"Created new FileHistory record (execution_count: 1) - " f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', " - f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', " + f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', " f"workflow={workflow}" ) return file_history except IntegrityError as e: - # Race condition detected - another worker created the record - # Try to retrieve the existing record + # Race condition: another worker created the record between our check and create logger.info( - f"FileHistory constraint violation (expected in concurrent environment) - " + f"FileHistory constraint violation (race condition) - " f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', " - f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', " - f"workflow={workflow}. Error: {str(e)}" + f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', " + f"workflow={workflow}. Error: {e!s}" ) - # Use the existing get_file_history method to retrieve the record + # Retrieve the record created by another worker and increment it existing_record = FileHistoryHelper.get_file_history( workflow=workflow, cache_key=file_hash.file_hash, @@ -306,18 +385,22 @@ def create_file_history( ) if existing_record: - logger.info( - f"Retrieved existing FileHistory record after constraint violation - " - f"ID: {existing_record.id}, workflow={workflow}" + # Increment the existing record + updated_record = FileHistoryHelper._increment_file_history( + existing_record, status, result, metadata, error ) - return existing_record - else: - # This should rarely happen, but if we can't find the existing record, - # log the issue and re-raise the original exception - logger.error( - f"Failed to retrieve existing FileHistory record after constraint violation - " - f"file_name='{file_hash.file_name}', workflow={workflow}" + logger.info( + f"Retrieved and updated existing FileHistory record (execution_count: {updated_record.execution_count}) - " + f"ID: {updated_record.id}, workflow={workflow}" ) + return updated_record + + # This should rarely happen - existing record not found after IntegrityError + logger.exception( + f"Failed to retrieve existing FileHistory record after constraint violation - " + f"file_name='{file_hash.file_name}', workflow={workflow}" + ) + raise @staticmethod def clear_history_for_workflow( diff --git a/backend/workflow_manager/workflow_v2/file_history_views.py b/backend/workflow_manager/workflow_v2/file_history_views.py new file mode 100644 index 0000000000..316cec5937 --- /dev/null +++ b/backend/workflow_manager/workflow_v2/file_history_views.py @@ -0,0 +1,178 @@ +import logging + +from django.shortcuts import get_object_or_404 +from rest_framework import status, viewsets +from rest_framework.decorators import action +from rest_framework.exceptions import ValidationError +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response +from utils.pagination import CustomPagination + +from workflow_manager.workflow_v2.models.file_history import FileHistory +from workflow_manager.workflow_v2.models.workflow import Workflow +from workflow_manager.workflow_v2.permissions import IsWorkflowOwnerOrShared +from workflow_manager.workflow_v2.serializers import FileHistorySerializer + +logger = logging.getLogger(__name__) + +MAX_BULK_DELETE_LIMIT = 100 + + +class FileHistoryViewSet(viewsets.ReadOnlyModelViewSet): + """ViewSet for file history operations with filtering support.""" + + serializer_class = FileHistorySerializer + lookup_field = "id" + permission_classes = [IsAuthenticated, IsWorkflowOwnerOrShared] + pagination_class = CustomPagination + + def _validate_execution_count(self, value, param_name): + """Validate execution count parameter is a non-negative integer. + + Args: + value: The value to validate + param_name: Name of the parameter for error messages + + Returns: + int: The validated integer value + + Raises: + ValidationError: If value is invalid + """ + try: + int_value = int(value) + if int_value < 0: + raise ValidationError({param_name: "Must be a non-negative integer"}) + return int_value + except (ValueError, TypeError): + raise ValidationError({param_name: "Must be a valid integer"}) + + def get_queryset(self): + """Get file histories for workflow with filters.""" + # Reuse cached workflow from permission check + if hasattr(self.request, "_workflow_cache"): + workflow = self.request._workflow_cache + else: + # Fallback if permission didn't run (shouldn't happen) + workflow_id = self.kwargs.get("workflow_id") + workflow = get_object_or_404(Workflow, id=workflow_id) + + queryset = FileHistory.objects.filter(workflow=workflow) + + # Apply simple filters from query parameters + status_param = self.request.query_params.get("status") + if status_param: + status_list = [s.strip() for s in status_param.split(",")] + queryset = queryset.filter(status__in=status_list) + + exec_min = self.request.query_params.get("execution_count_min") + if exec_min: + exec_min_val = self._validate_execution_count(exec_min, "execution_count_min") + queryset = queryset.filter(execution_count__gte=exec_min_val) + + exec_max = self.request.query_params.get("execution_count_max") + if exec_max: + exec_max_val = self._validate_execution_count(exec_max, "execution_count_max") + queryset = queryset.filter(execution_count__lte=exec_max_val) + + file_path_param = self.request.query_params.get("file_path") + if file_path_param: + # Support partial matching (case-insensitive) + queryset = queryset.filter(file_path__icontains=file_path_param) + + return queryset.order_by("-created_at") + + def destroy(self, request, workflow_id=None, id=None): + """Delete single file history by ID.""" + file_history = self.get_object() + file_history_id = file_history.id + + file_history.delete() + + logger.info(f"Deleted file history {file_history_id} for workflow {workflow_id}") + + return Response( + {"message": "File history deleted successfully"}, status=status.HTTP_200_OK + ) + + @action(detail=False, methods=["post"]) + def clear(self, request, workflow_id=None): + """Clear file histories with filters or by specific IDs. + + Supports two modes: + 1. ID-based deletion: Pass {"ids": [...]} to delete specific records (max 100) + 2. Filter-based deletion: Pass filters like status, execution_count_min, etc. + + At least one filter or IDs must be provided to prevent accidental deletion + of all records. + """ + # Extract all filter parameters upfront + ids = request.data.get("ids", []) + status_list = request.data.get("status", []) + exec_min = request.data.get("execution_count_min") + exec_max = request.data.get("execution_count_max") + file_path_param = request.data.get("file_path") + + # Safeguard: require at least one filter or IDs to prevent accidental mass deletion + has_criteria = bool( + ids + or status_list + or exec_min is not None + or exec_max is not None + or file_path_param + ) + if not has_criteria: + return Response( + { + "error": "At least one filter (ids, status, execution_count_min, " + "execution_count_max, or file_path) must be provided" + }, + status=status.HTTP_400_BAD_REQUEST, + ) + + workflow = get_object_or_404(Workflow, id=workflow_id) + queryset = FileHistory.objects.filter(workflow=workflow) + + # Check for ID-based deletion + if ids: + if len(ids) > MAX_BULK_DELETE_LIMIT: + return Response( + { + "error": f"Cannot delete more than {MAX_BULK_DELETE_LIMIT} " + f"items at once. Received {len(ids)} IDs." + }, + status=status.HTTP_400_BAD_REQUEST, + ) + queryset = queryset.filter(id__in=ids) + else: + # Apply filters from request body (filter-based deletion) + if status_list: + queryset = queryset.filter(status__in=status_list) + + if exec_min is not None: + exec_min_val = self._validate_execution_count( + exec_min, "execution_count_min" + ) + queryset = queryset.filter(execution_count__gte=exec_min_val) + + if exec_max is not None: + exec_max_val = self._validate_execution_count( + exec_max, "execution_count_max" + ) + queryset = queryset.filter(execution_count__lte=exec_max_val) + + if file_path_param: + queryset = queryset.filter(file_path__icontains=file_path_param) + + deleted_count, _ = queryset.delete() + + logger.info( + f"Cleared {deleted_count} file history records for workflow {workflow_id}" + ) + + return Response( + { + "deleted_count": deleted_count, + "message": f"{deleted_count} file history records deleted", + } + ) diff --git a/backend/workflow_manager/workflow_v2/migrations/0018_filehistory_execution_count_and_more.py b/backend/workflow_manager/workflow_v2/migrations/0018_filehistory_execution_count_and_more.py new file mode 100644 index 0000000000..1e02961d76 --- /dev/null +++ b/backend/workflow_manager/workflow_v2/migrations/0018_filehistory_execution_count_and_more.py @@ -0,0 +1,59 @@ +# Generated by Django 4.2.1 on 2025-11-24 12:41 + +import django.core.validators +from django.contrib.postgres.indexes import GinIndex +from django.contrib.postgres.operations import TrigramExtension +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflow_v2", "0017_workflow_shared_to_org_workflow_shared_users"), + ] + + operations = [ + # Enable PostgreSQL trigram extension for fast partial text matching + TrigramExtension(), + migrations.AddField( + model_name="filehistory", + name="execution_count", + field=models.IntegerField( + db_comment="Number of times this file has been processed", default=1 + ), + ), + migrations.AddField( + model_name="workflow", + name="max_file_execution_count", + field=models.IntegerField( + blank=True, + db_comment="Maximum times a file can be executed. null=use org/global default. Only enforced for ETL/TASK workflows.", + null=True, + validators=[django.core.validators.MinValueValidator(1)], + ), + ), + migrations.AddIndex( + model_name="filehistory", + index=models.Index(fields=["workflow", "status"], name="idx_fh_wf_status"), + ), + migrations.AddIndex( + model_name="filehistory", + index=models.Index( + fields=["workflow", "execution_count"], name="idx_fh_wf_exec_cnt" + ), + ), + migrations.AddIndex( + model_name="filehistory", + index=models.Index( + fields=["workflow", "file_path"], name="idx_fh_workflow_filepath" + ), + ), + # Add GIN trigram index for fast partial file_path search (10-250x faster) + migrations.AddIndex( + model_name="filehistory", + index=GinIndex( + fields=["file_path"], + name="idx_fh_file_path_trgm", + opclasses=["gin_trgm_ops"], + ), + ), + ] diff --git a/backend/workflow_manager/workflow_v2/models/file_history.py b/backend/workflow_manager/workflow_v2/models/file_history.py index 1935b246c9..ca0094e936 100644 --- a/backend/workflow_manager/workflow_v2/models/file_history.py +++ b/backend/workflow_manager/workflow_v2/models/file_history.py @@ -1,5 +1,6 @@ import uuid +from django.contrib.postgres.indexes import GinIndex from django.db import models from django.db.models import Q from utils.models.base_model import BaseModel @@ -20,6 +21,29 @@ def is_completed(self) -> bool: """ return self.status is not None and self.status == ExecutionStatus.COMPLETED.value + def __str__(self) -> str: + """String representation of FileHistory.""" + return f"FileHistory({self.id}, CacheKey: {self.cache_key}, Status: {self.status}, Count: {self.execution_count})" + + def has_exceeded_limit(self, workflow: "Workflow") -> bool: + """Check if this file has exceeded its maximum execution count. + + For API workflows, this always returns False (no limit enforcement). + For ETL/TASK workflows, checks against configured limit. + + Args: + workflow: The workflow being executed. + + Returns: + bool: True if file has exceeded limit and should be skipped. + """ + # API workflows don't enforce execution limits (track count but don't skip) + if workflow.deployment_type == Workflow.WorkflowType.API: + return False + + max_count = workflow.get_max_execution_count() + return self.execution_count >= max_count + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) cache_key = models.CharField( max_length=HASH_LENGTH, @@ -39,6 +63,10 @@ def is_completed(self) -> bool: choices=ExecutionStatus.choices, db_comment="Latest status of execution", ) + execution_count = models.IntegerField( + default=1, + db_comment="Number of times this file has been processed", + ) error = models.TextField( blank=True, default="", @@ -59,6 +87,18 @@ class Meta: models.Index( fields=["workflow", "created_at"], name="idx_fh_workflow_created" ), + models.Index(fields=["workflow", "status"], name="idx_fh_wf_status"), + models.Index( + fields=["workflow", "execution_count"], name="idx_fh_wf_exec_cnt" + ), + models.Index( + fields=["workflow", "file_path"], name="idx_fh_workflow_filepath" + ), + GinIndex( + fields=["file_path"], + name="idx_fh_file_path_trgm", + opclasses=["gin_trgm_ops"], + ), ] constraints = [ # Legacy behavior: file_path is not present or is null diff --git a/backend/workflow_manager/workflow_v2/models/workflow.py b/backend/workflow_manager/workflow_v2/models/workflow.py index 01f4e036af..a286662a19 100644 --- a/backend/workflow_manager/workflow_v2/models/workflow.py +++ b/backend/workflow_manager/workflow_v2/models/workflow.py @@ -1,6 +1,8 @@ import uuid from account_v2.models import User +from django.conf import settings +from django.core.validators import MinValueValidator from django.db import models from utils.models.base_model import BaseModel from utils.models.organization_mixin import ( @@ -67,6 +69,12 @@ class ExecutionAction(models.TextChoices): destination_settings = models.JSONField( null=True, db_comment="Settings for the Destination module" ) + max_file_execution_count = models.IntegerField( + null=True, + blank=True, + validators=[MinValueValidator(1)], + db_comment="Maximum times a file can be executed. null=use org/global default. Only enforced for ETL/TASK workflows.", + ) created_by = models.ForeignKey( User, @@ -98,6 +106,21 @@ class ExecutionAction(models.TextChoices): def __str__(self) -> str: return f"{self.id}, name: {self.workflow_name}" + def get_max_execution_count(self) -> int: + """Get maximum execution count from configuration hierarchy. + + Priority: workflow setting > organization setting > global Django setting + + Returns: + int: Maximum execution count limit. + """ + # Check workflow-level setting first + if self.max_file_execution_count is not None: + return self.max_file_execution_count + + # Fall back to global Django setting (from backend.settings.execution_config) + return settings.MAX_FILE_EXECUTION_COUNT + class Meta: verbose_name = "Workflow" verbose_name_plural = "Workflows" diff --git a/backend/workflow_manager/workflow_v2/permissions.py b/backend/workflow_manager/workflow_v2/permissions.py new file mode 100644 index 0000000000..8d38435156 --- /dev/null +++ b/backend/workflow_manager/workflow_v2/permissions.py @@ -0,0 +1,45 @@ +from django.shortcuts import get_object_or_404 +from rest_framework.permissions import BasePermission + +from workflow_manager.workflow_v2.models.workflow import Workflow + + +class IsWorkflowOwnerOrShared(BasePermission): + """Permission class to check if user has access to a workflow. + + Checks: + 1. User is the workflow owner (created_by) + 2. User has shared access (in shared_users) + 3. Workflow is shared to user's organization (shared_to_org) + + Caches the workflow on request object to avoid duplicate fetching. + """ + + message = "You do not have permission to access this workflow" + + def has_permission(self, request, view): + workflow_id = view.kwargs.get("workflow_id") + + if not workflow_id: + return False + + # Cache workflow on request to avoid re-fetching in get_queryset + if not hasattr(request, "_workflow_cache"): + request._workflow_cache = get_object_or_404( + Workflow.objects.select_related( + "created_by", "organization" + ).prefetch_related("shared_users"), + id=workflow_id, + ) + + workflow = request._workflow_cache + user = request.user + + # Check access: owner OR shared user OR shared to organization + has_access = ( + workflow.created_by == user + or user in workflow.shared_users.all() + or (workflow.shared_to_org and workflow.organization == user.organization) + ) + + return has_access diff --git a/backend/workflow_manager/workflow_v2/serializers.py b/backend/workflow_manager/workflow_v2/serializers.py index 26e377a6af..6442d1e1fa 100644 --- a/backend/workflow_manager/workflow_v2/serializers.py +++ b/backend/workflow_manager/workflow_v2/serializers.py @@ -131,10 +131,35 @@ class Meta: class FileHistorySerializer(ModelSerializer): + max_execution_count = SerializerMethodField() + has_exceeded_limit = SerializerMethodField() + class Meta: model = FileHistory fields = "__all__" + def get_max_execution_count(self, obj: FileHistory) -> int: + """Get the maximum execution count from the associated workflow. + + Args: + obj: FileHistory instance + + Returns: + int: Maximum execution count from workflow configuration + """ + return obj.workflow.get_max_execution_count() + + def get_has_exceeded_limit(self, obj: FileHistory) -> bool: + """Check if this file has exceeded its execution limit. + + Args: + obj: FileHistory instance + + Returns: + bool: True if file has exceeded limit and should be skipped + """ + return obj.has_exceeded_limit(obj.workflow) + class SharedUserListSerializer(ModelSerializer): """Serializer for returning workflow with shared user details.""" diff --git a/backend/workflow_manager/workflow_v2/urls/workflow.py b/backend/workflow_manager/workflow_v2/urls/workflow.py index 50f6de1f6b..e9f404fed2 100644 --- a/backend/workflow_manager/workflow_v2/urls/workflow.py +++ b/backend/workflow_manager/workflow_v2/urls/workflow.py @@ -3,6 +3,7 @@ from workflow_manager.workflow_v2.execution_log_view import WorkflowExecutionLogViewSet from workflow_manager.workflow_v2.execution_view import WorkflowExecutionViewSet +from workflow_manager.workflow_v2.file_history_views import FileHistoryViewSet from workflow_manager.workflow_v2.views import WorkflowViewSet workflow_list = WorkflowViewSet.as_view( @@ -24,6 +25,12 @@ workflow_schema = WorkflowViewSet.as_view({"get": "get_schema"}) can_update = WorkflowViewSet.as_view({"get": "can_update"}) list_shared_users = WorkflowViewSet.as_view({"get": "list_of_shared_users"}) + +# File History views +file_history_list = FileHistoryViewSet.as_view({"get": "list"}) +file_history_detail = FileHistoryViewSet.as_view({"get": "retrieve", "delete": "destroy"}) +file_history_clear = FileHistoryViewSet.as_view({"post": "clear"}) + urlpatterns = format_suffix_patterns( [ path("", workflow_list, name="workflow-list"), @@ -69,5 +76,21 @@ workflow_schema, name="workflow-schema", ), + # File History nested routes + path( + "/file-histories/", + file_history_list, + name="workflow-file-history-list", + ), + path( + "/file-histories//", + file_history_detail, + name="workflow-file-history-detail", + ), + path( + "/file-histories/clear/", + file_history_clear, + name="workflow-file-history-clear", + ), ] ) diff --git a/backend/workflow_manager/workflow_v2/views.py b/backend/workflow_manager/workflow_v2/views.py index e2d8e21acd..ec6fb46b77 100644 --- a/backend/workflow_manager/workflow_v2/views.py +++ b/backend/workflow_manager/workflow_v2/views.py @@ -1067,25 +1067,31 @@ def file_history_batch_lookup_internal(request): f"cache_key={fh.cache_key}, provider_file_uuid={fh.provider_file_uuid}, file_path={fh.file_path}" ) + # Serialize once for all matches (optimization) + is_completed_result = fh.is_completed() + serializer = FileHistorySerializer(fh) + file_history_data = serializer.data + # Update response data for all matches for result_identifier in matched_identifiers: - is_completed_result = fh.is_completed() logger.info( f"DEBUG: FileHistoryBatch - Found record for UUID: {fh.provider_file_uuid}, " f"Path: {fh.file_path}, Status: {fh.status}, is_completed(): {is_completed_result}, " + f"has_exceeded_limit: {file_history_data.get('has_exceeded_limit')}, " f"result_identifier: {result_identifier}" ) - serializer = FileHistorySerializer(fh) response_data[result_identifier] = { "found": True, "is_completed": is_completed_result, - "file_history": serializer.data, + "file_history": file_history_data, } logger.info( f"DEBUG: FileHistoryBatch - Response updated for {result_identifier}: " - f"found=True, is_completed={is_completed_result}, status={fh.status}" + f"found=True, is_completed={is_completed_result}, status={fh.status}, " + f"max_execution_count={file_history_data.get('max_execution_count')}, " + f"has_exceeded_limit={file_history_data.get('has_exceeded_limit')}" ) logger.info( diff --git a/frontend/src/components/agency/agency/Agency.jsx b/frontend/src/components/agency/agency/Agency.jsx index 98040134fe..8d4059f49f 100644 --- a/frontend/src/components/agency/agency/Agency.jsx +++ b/frontend/src/components/agency/agency/Agency.jsx @@ -42,6 +42,7 @@ import { pipelineService } from "../../pipelines-or-deployments/pipeline-service import { InputOutput } from "../input-output/InputOutput"; import { ToolSelectionSidebar } from "../tool-selection-sidebar/ToolSelectionSidebar.jsx"; import { SpinnerLoader } from "../../widgets/spinner-loader/SpinnerLoader.jsx"; +import FileHistoryModal from "../../pipelines-or-deployments/file-history-modal/FileHistoryModal.jsx"; function Agency() { const [steps, setSteps] = useState([]); const [workflowProgress, setWorkflowProgress] = useState(0); @@ -96,6 +97,7 @@ function Agency() { const [openFileUploadModal, setOpenFileUploadModal] = useState(false); const [fileList, setFileList] = useState([]); const [wfExecutionParams, setWfExecutionParams] = useState([]); + const [fileHistoryModalOpen, setFileHistoryModalOpen] = useState(false); const { setPostHogCustomEvent } = usePostHogEvents(); @@ -1002,6 +1004,9 @@ function Agency() { case "run-workflow": handleRunWorkflow(); break; + case "view-file-history": + setFileHistoryModalOpen(true); + break; case "clear-history": handleClearFileMarker(); break; @@ -1017,6 +1022,12 @@ function Agency() { icon: , disabled: isClearingFileHistory || loadingType === "EXECUTE", }, + { + key: "view-file-history", + label: "View File History", + icon: , + disabled: isClearingFileHistory || loadingType === "EXECUTE", + }, { key: "clear-history", label: "Clear Processed File History", @@ -1318,6 +1329,14 @@ function Agency() { onToolSelect={handleToolSelection} onSave={() => setShowToolSelectionSidebar(false)} /> + + {/* File History Modal */} + ); diff --git a/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.css b/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.css new file mode 100644 index 0000000000..814c879014 --- /dev/null +++ b/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.css @@ -0,0 +1,122 @@ +.file-history-modal .file-history-content { + display: flex; + flex-direction: column; + gap: 16px; +} + +.file-history-modal .filters-section { + padding: 10px 12px; + background-color: #fafafa; + border: 1px solid #e8e8e8; + border-radius: 6px; + margin-bottom: 10px; +} + +.file-history-modal .action-buttons { + display: flex; + justify-content: flex-start; + padding: 8px 0; +} + +.file-history-modal .ant-table-wrapper { + margin-top: 8px; +} + +/* Status tag colors */ +.file-history-modal .ant-tag { + font-weight: 500; +} + +/* Mobile responsiveness - Small screens (< 576px) */ +@media (max-width: 575px) { + .file-history-modal .filters-section { + padding: 8px 10px; + } + + .file-history-modal .filter-buttons { + margin-top: 4px !important; + width: 100%; + } + + .file-history-modal .filter-buttons .ant-btn { + flex: 1; + } + + .file-history-modal .ant-space-compact { + display: flex; + width: 100%; + } + + .file-history-modal .action-buttons { + flex-direction: column; + gap: 8px; + } + + .file-history-modal .action-buttons .ant-space { + width: 100%; + } + + .file-history-modal .action-buttons .ant-btn { + width: 100%; + justify-content: center; + } + + /* Adjust modal padding on mobile */ + .file-history-modal .ant-modal-body { + padding: 12px; + } + + .file-history-modal .ant-modal-header { + padding: 12px 16px; + } +} + +/* Tablet responsiveness - Medium screens (576px - 767px) */ +@media (min-width: 576px) and (max-width: 767px) { + .file-history-modal .filters-section { + padding: 10px 12px; + } + + .file-history-modal .filter-buttons { + margin-top: 16px !important; + } +} + +/* Desktop - Large screens (>= 768px) */ +@media (min-width: 768px) { + .file-history-modal .filter-buttons { + justify-content: flex-start; + } +} + +/* Utility classes for inline style replacement */ +.file-history-modal .filter-label { + font-size: 13px; +} + +.file-history-modal .full-width { + width: 100%; +} + +.file-history-modal .flex-button { + flex: 1; +} + +/* Table cell content styles */ +.file-history-modal .file-path-text { + flex: 1; + max-width: 400px; +} + +.file-history-modal .error-text { + flex: 1; + max-width: 230px; +} + +.file-history-modal .cell-content { + width: 100%; +} + +.warning-icon { + color: #faad14; +} diff --git a/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.jsx b/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.jsx new file mode 100644 index 0000000000..e8dc422ec3 --- /dev/null +++ b/frontend/src/components/pipelines-or-deployments/file-history-modal/FileHistoryModal.jsx @@ -0,0 +1,707 @@ +import { + Table, + Modal, + Button, + Select, + InputNumber, + Input, + Space, + Typography, + Popconfirm, + Tag, + Row, + Col, + message, +} from "antd"; +import { + DeleteOutlined, + ReloadOutlined, + FilterOutlined, + ClearOutlined, + CopyOutlined, + ExclamationCircleFilled, +} from "@ant-design/icons"; +import PropTypes from "prop-types"; +import { useState, useEffect } from "react"; + +import { workflowService } from "../../workflows/workflow/workflow-service.js"; +import { useAlertStore } from "../../../store/alert-store.js"; +import { useExceptionHandler } from "../../../hooks/useExceptionHandler.jsx"; +import { copyToClipboard } from "../../../helpers/GetStaticData"; +import "./FileHistoryModal.css"; + +const { Text } = Typography; + +const MAX_BULK_DELETE = 100; + +const FileHistoryModal = ({ open, setOpen, workflowId, workflowName }) => { + const [fileHistories, setFileHistories] = useState([]); + const [loading, setLoading] = useState(false); + const [selectedRowKeys, setSelectedRowKeys] = useState([]); + const { setAlertDetails } = useAlertStore(); + const handleException = useExceptionHandler(); + const workflowApiService = workflowService(); + + // Filter states + const [statusFilter, setStatusFilter] = useState([]); + const [executionCountMin, setExecutionCountMin] = useState(null); + const [executionCountMax, setExecutionCountMax] = useState(null); + const [filePathFilter, setFilePathFilter] = useState(""); + + // Bulk delete confirmation states + const [showBulkDeleteConfirm, setShowBulkDeleteConfirm] = useState(false); + const [fetchingCount, setFetchingCount] = useState(false); + const [bulkDeleteCount, setBulkDeleteCount] = useState(0); + + // Track applied filter values (used for data fetching, pagination, and bulk clear) + // These are separate from input values to prevent fetching with unapplied filters + const [appliedFilters, setAppliedFilters] = useState({ + status: [], + executionCountMin: null, + executionCountMax: null, + filePath: "", + }); + + // Pagination states + const [pagination, setPagination] = useState({ + current: 1, + pageSize: 10, + total: 0, + }); + + const statusOptions = [ + { label: "Error", value: "ERROR" }, + { label: "Completed", value: "COMPLETED" }, + ]; + + // Check if filters have been applied (for "Clear with Filters" button) + const hasAppliedFilters = Boolean( + appliedFilters.status?.length > 0 || + appliedFilters.executionCountMin !== null || + appliedFilters.executionCountMax !== null || + appliedFilters.filePath + ); + + // Check if input values differ from applied values (for Apply button indicator) + const hasUnappliedChanges = (() => { + // Compare status arrays (order-independent) + const statusEqual = + statusFilter.length === appliedFilters.status.length && + statusFilter.every((s) => appliedFilters.status.includes(s)); + + return ( + !statusEqual || + executionCountMin !== appliedFilters.executionCountMin || + executionCountMax !== appliedFilters.executionCountMax || + filePathFilter !== appliedFilters.filePath + ); + })(); + + // Copy to clipboard helper + const handleCopy = async (text, label = "Text") => { + const success = await copyToClipboard(text); + if (success) { + message.success(`${label} copied to clipboard!`); + } else { + message.error("Failed to copy to clipboard"); + } + }; + + // Fetch file histories with optional filters parameter + // If filters are passed, use them; otherwise use appliedFilters from state + const fetchFileHistoriesWithFilters = async ( + page = 1, + pageSize = 10, + filters = null + ) => { + if (!workflowId) { + setAlertDetails({ + type: "error", + content: "Workflow ID is missing. Unable to fetch file histories.", + }); + return; + } + + const filtersToUse = filters || appliedFilters; + + setLoading(true); + try { + const params = { + page, + page_size: pageSize, + }; + + // Use provided filters for data fetching + if (filtersToUse.status?.length > 0) { + params.status = filtersToUse.status.join(","); + } + if ( + filtersToUse.executionCountMin !== null && + filtersToUse.executionCountMin !== undefined + ) { + params.execution_count_min = filtersToUse.executionCountMin; + } + if ( + filtersToUse.executionCountMax !== null && + filtersToUse.executionCountMax !== undefined + ) { + params.execution_count_max = filtersToUse.executionCountMax; + } + if (filtersToUse.filePath) { + params.file_path = filtersToUse.filePath; + } + + const response = await workflowApiService.getFileHistories( + workflowId, + params + ); + + const data = response?.data?.results || []; + setFileHistories(data); + setPagination({ + current: page, + pageSize: pageSize, + total: response?.data?.count || 0, + }); + } catch (err) { + // Extract more detailed error message + const errorMessage = + err?.response?.data?.message || + err?.response?.data?.error || + err?.response?.data?.detail || + err?.message || + "Failed to fetch file histories"; + + setAlertDetails({ + type: "error", + content: errorMessage, + }); + } finally { + setLoading(false); + } + }; + + // Alias for backward compatibility (pagination, refresh, etc.) + const fetchFileHistories = (page, pageSize) => + fetchFileHistoriesWithFilters(page, pageSize); + + // Load data on open or filter change + useEffect(() => { + if (open && workflowId) { + fetchFileHistories(1, pagination.pageSize); + setSelectedRowKeys([]); + } + }, [open, workflowId]); + + // Handle filter apply + const handleApplyFilters = () => { + // Save current input values as applied filters + const newFilters = { + status: statusFilter, + executionCountMin: executionCountMin, + executionCountMax: executionCountMax, + filePath: filePathFilter, + }; + setAppliedFilters(newFilters); + // Pass new filters directly (state update is async, so can't rely on appliedFilters yet) + fetchFileHistoriesWithFilters(1, pagination.pageSize, newFilters); + setSelectedRowKeys([]); + }; + + // Handle filter reset + const handleResetFilters = () => { + setStatusFilter([]); + setExecutionCountMin(null); + setExecutionCountMax(null); + setFilePathFilter(""); + // Reset applied filters + const emptyFilters = { + status: [], + executionCountMin: null, + executionCountMax: null, + filePath: "", + }; + setAppliedFilters(emptyFilters); + // Fetch with empty filters + fetchFileHistoriesWithFilters(1, pagination.pageSize, emptyFilters); + }; + + // Handle pagination change + const handleTableChange = (paginationConfig) => { + fetchFileHistories(paginationConfig.current, paginationConfig.pageSize); + }; + + // Delete single file history + const handleDeleteSingle = async (fileHistoryId) => { + try { + await workflowApiService.deleteFileHistory(workflowId, fileHistoryId); + setAlertDetails({ + type: "success", + message: "File history deleted successfully", + }); + // Refresh data + fetchFileHistories(pagination.current, pagination.pageSize); + setSelectedRowKeys([]); + } catch (err) { + setAlertDetails(handleException(err)); + } + }; + + // Delete selected file histories (bulk delete by IDs) + const handleDeleteSelected = async () => { + if (selectedRowKeys.length === 0) return; + + if (selectedRowKeys.length > MAX_BULK_DELETE) { + setAlertDetails({ + type: "error", + content: `Cannot delete more than ${MAX_BULK_DELETE} items at once. Please select fewer items.`, + }); + return; + } + + setLoading(true); + try { + const response = await workflowApiService.bulkDeleteFileHistoriesByIds( + workflowId, + selectedRowKeys + ); + + setAlertDetails({ + type: "success", + message: + response?.data?.message || + `${selectedRowKeys.length} file histories deleted successfully`, + }); + + // Refresh data + fetchFileHistories(pagination.current, pagination.pageSize); + setSelectedRowKeys([]); + } catch (err) { + setAlertDetails(handleException(err)); + } finally { + setLoading(false); + } + }; + + // Prepare bulk clear - fetch count for confirmation using applied filters + const handlePrepareBulkClear = async () => { + setFetchingCount(true); + try { + const params = { + page: 1, + page_size: 1, // We only need count, not actual data + }; + + // Use applied filters (not input values) + if (appliedFilters.status?.length > 0) { + params.status = appliedFilters.status.join(","); + } + if ( + appliedFilters.executionCountMin !== null && + appliedFilters.executionCountMin !== undefined + ) { + params.execution_count_min = appliedFilters.executionCountMin; + } + if ( + appliedFilters.executionCountMax !== null && + appliedFilters.executionCountMax !== undefined + ) { + params.execution_count_max = appliedFilters.executionCountMax; + } + if (appliedFilters.filePath) { + params.file_path = appliedFilters.filePath; + } + + const response = await workflowApiService.getFileHistories( + workflowId, + params + ); + + const count = response?.data?.count || 0; + if (count > 0) { + setBulkDeleteCount(count); + setShowBulkDeleteConfirm(true); + } + // Note: count === 0 case is handled by disabling the button when pagination.total === 0 + } catch (err) { + setAlertDetails(handleException(err)); + } finally { + setFetchingCount(false); + } + }; + + // Perform bulk clear after user confirms using applied filters + const performBulkClear = async () => { + setShowBulkDeleteConfirm(false); + setLoading(true); + try { + const filters = {}; + + // Use applied filters (not input values) + if (appliedFilters.status?.length > 0) { + filters.status = appliedFilters.status; + } + if ( + appliedFilters.executionCountMin !== null && + appliedFilters.executionCountMin !== undefined + ) { + filters.execution_count_min = appliedFilters.executionCountMin; + } + if ( + appliedFilters.executionCountMax !== null && + appliedFilters.executionCountMax !== undefined + ) { + filters.execution_count_max = appliedFilters.executionCountMax; + } + if (appliedFilters.filePath) { + filters.file_path = appliedFilters.filePath; + } + + const response = await workflowApiService.bulkClearFileHistories( + workflowId, + filters + ); + + setAlertDetails({ + type: "success", + message: + response?.data?.message || + `${response?.data?.deleted_count} file histories cleared`, + }); + + // Refresh data + fetchFileHistories(1, pagination.pageSize); + setSelectedRowKeys([]); + } catch (err) { + setAlertDetails(handleException(err)); + } finally { + setLoading(false); + } + }; + + // Row selection config + const rowSelection = { + selectedRowKeys, + onChange: (selectedKeys) => { + setSelectedRowKeys(selectedKeys); + }, + }; + + // Table columns + const columns = [ + { + title: "File Path", + dataIndex: "file_path", + key: "file_path", + width: "40%", + ellipsis: true, + render: (text) => ( + + + {text || "N/A"} + + {text && ( + + + ), + }, + ]; + + return ( + setOpen(false)} + footer={null} + width="90%" + className="file-history-modal" + > +
+ {/* Filters Section */} +
+ + + + + File Path: + + setFilePathFilter(e.target.value)} + allowClear + onPressEnter={handleApplyFilters} + /> + + + + + + Status: + +