diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 00209b4b7c..b47e0e66a6 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -126,16 +126,24 @@ def _get_source_endpoint_for_workflow( def validate(self) -> None: connection_type = self.endpoint.connection_type - connector: ConnectorInstance = self.endpoint.connector_instance + connector: ConnectorInstance | None = self.endpoint.connector_instance + if connection_type is None: + error_msg = "Missing destination connection type" + self.workflow_log.log_error(logger, error_msg) raise MissingDestinationConnectionType() if connection_type not in WorkflowEndpoint.ConnectionType.values: + error_msg = f"Invalid destination connection type: {connection_type}" + self.workflow_log.log_error(logger, error_msg) raise InvalidDestinationConnectionType() - if ( + # Check if connector is required but missing + requires_connector = ( connection_type != WorkflowEndpoint.ConnectionType.API and connection_type != WorkflowEndpoint.ConnectionType.MANUALREVIEW - and connector is None - ): + ) + if requires_connector and connector is None: + error_msg = "Destination connector not configured" + self.workflow_log.log_error(logger, error_msg) raise DestinationConnectorNotConfigured() # Validate database connection if it's a database destination @@ -147,10 +155,18 @@ def validate(self) -> None: connector_settings=connector.connector_metadata, ) engine = db_class.get_engine() + self.workflow_log.log_info(logger, "Database connection test successful") if hasattr(engine, "close"): engine.close() + except ConnectorError as e: + error_msg = f"Database connector validation failed: {e}" + self.workflow_log.log_error(logger, error_msg) + logger.exception(error_msg) + raise except Exception as e: - logger.error(f"Database connection failed: {str(e)}") + error_msg = f"Unexpected error during database validation: {e}" + self.workflow_log.log_error(logger, error_msg) + logger.exception(error_msg) raise def _should_handle_hitl( diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 78c9b66764..0f4bccae13 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -124,11 +124,20 @@ def _get_endpoint_for_workflow( def validate(self) -> None: connection_type = self.endpoint.connection_type connector: ConnectorInstance = self.endpoint.connector_instance + if connection_type is None: + error_msg = "Missing source connection type" + self.workflow_log.log_error(logger, error_msg) raise MissingSourceConnectionType() + if connection_type not in WorkflowEndpoint.ConnectionType.values: + error_msg = f"Invalid source connection type: {connection_type}" + self.workflow_log.log_error(logger, error_msg) raise InvalidSourceConnectionType() + if connection_type != WorkflowEndpoint.ConnectionType.API and connector is None: + error_msg = "Source connector not configured" + self.workflow_log.log_error(logger, error_msg) raise SourceConnectorNotConfigured() def valid_file_patterns(self, required_patterns: list[Any]) -> list[str]: diff --git a/backend/workflow_manager/utils/workflow_log.py b/backend/workflow_manager/utils/workflow_log.py index d7ccad856f..cbd351c91a 100644 --- a/backend/workflow_manager/utils/workflow_log.py +++ b/backend/workflow_manager/utils/workflow_log.py @@ -4,12 +4,7 @@ from utils.local_context import StateStore from unstract.core.pubsub_helper import LogPublisher -from unstract.workflow_execution.enums import ( - LogComponent, - LogLevel, - LogStage, - LogState, -) +from unstract.workflow_execution.enums import LogComponent, LogLevel, LogStage, LogState class WorkflowLog: @@ -22,7 +17,8 @@ def __init__( pipeline_id: str | None = None, ): log_events_id: str | None = StateStore.get(Common.LOG_EVENTS_ID) - self.messaging_channel = log_events_id if log_events_id else pipeline_id + # Ensure messaging_channel is never None - use execution_id as fallback + self.messaging_channel = log_events_id or pipeline_id or str(execution_id) self.execution_id = str(execution_id) self.file_execution_id = str(file_execution_id) if file_execution_id else None self.organization_id = str(organization_id) if organization_id else None diff --git a/workers/shared/workflow/destination_connector.py b/workers/shared/workflow/destination_connector.py index ab4c6a13d1..29c68d31a3 100644 --- a/workers/shared/workflow/destination_connector.py +++ b/workers/shared/workflow/destination_connector.py @@ -20,15 +20,20 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional -from shared.enums import QueueResultStatus +from shared.enums import DestinationConfigKey, QueueResultStatus # Import database utils (stable path) from shared.infrastructure.database.utils import WorkerDatabaseUtils +from shared.infrastructure.logging import WorkerLogger +from shared.infrastructure.logging.helpers import log_file_error, log_file_info from shared.models.result_models import QueueResult +from shared.utils.api_result_cache import get_api_cache_manager from shared.utils.manual_review_factory import ( get_manual_review_service, has_manual_review_plugin, ) +from shared.workflow.connectors.service import WorkerConnectorService +from shared.workflow.logger_helper import WorkflowLoggerHelper from unstract.connectors.connectorkit import Connectorkit from unstract.connectors.exceptions import ConnectorError @@ -53,12 +58,6 @@ ExecutionFileHandler, ) -from ..enums import DestinationConfigKey -from ..infrastructure.logging import WorkerLogger -from ..infrastructure.logging.helpers import log_file_error, log_file_info -from ..utils.api_result_cache import get_api_cache_manager -from .connectors.service import WorkerConnectorService - if TYPE_CHECKING: from ..api_client import InternalAPIClient @@ -198,6 +197,9 @@ def __init__(self, config: DestinationConfig, workflow_log=None): self.settings = config.settings self.workflow_log = workflow_log + # Initialize logger helper for safe logging operations + self.logger_helper = WorkflowLoggerHelper(workflow_log) + # Store destination connector instance details self.connector_id = config.connector_id self.connector_settings = config.connector_settings @@ -927,7 +929,7 @@ def insert_into_db( logger.info(f"Successfully inserted data into database table {table_name}") # Log to UI with file_execution_id for better correlation - if self.workflow_log and hasattr(self, "current_file_execution_id"): + if hasattr(self, "current_file_execution_id"): log_file_info( self.workflow_log, self.current_file_execution_id, @@ -1175,7 +1177,7 @@ def copy_output_to_output_directory( logger.error(error_message) # Log to UI with file_execution_id - if self.workflow_log and hasattr(self, "current_file_execution_id"): + if hasattr(self, "current_file_execution_id"): log_file_info( self.workflow_log, self.current_file_execution_id, @@ -1189,7 +1191,7 @@ def copy_output_to_output_directory( logger.info(success_message) # Log to UI - if self.workflow_log and hasattr(self, "current_file_execution_id"): + if hasattr(self, "current_file_execution_id"): log_file_info( self.workflow_log, self.current_file_execution_id, @@ -1202,7 +1204,7 @@ def copy_output_to_output_directory( logger.info(success_message) # Log to UI - if self.workflow_log and hasattr(self, "current_file_execution_id"): + if hasattr(self, "current_file_execution_id"): log_file_info( self.workflow_log, self.current_file_execution_id, @@ -1214,7 +1216,7 @@ def copy_output_to_output_directory( logger.error(error_msg, exc_info=True) # Log error to UI - if self.workflow_log and hasattr(self, "current_file_execution_id"): + if hasattr(self, "current_file_execution_id"): log_file_info( self.workflow_log, self.current_file_execution_id, diff --git a/workers/shared/workflow/logger_helper.py b/workers/shared/workflow/logger_helper.py new file mode 100644 index 0000000000..63a6c5801d --- /dev/null +++ b/workers/shared/workflow/logger_helper.py @@ -0,0 +1,48 @@ +"""Workflow logger helper for safe logging operations. + +This module provides a helper class to handle workflow logging operations +safely, eliminating the need for repetitive conditional checks throughout +the codebase. +""" + +import logging + +from shared.infrastructure.logging.workflow_logger import WorkerWorkflowLogger + + +class WorkflowLoggerHelper: + """Helper class for safe workflow logging operations. + + This class encapsulates the logic for safely logging messages when a + workflow_log instance is available, eliminating repetitive conditional + checks throughout the connector classes. + """ + + def __init__(self, workflow_log: WorkerWorkflowLogger | None = None) -> None: + """Initialize the logger helper. + + Args: + workflow_log: Optional workflow log instance that provides + log_info and log_error methods. + """ + self.workflow_log = workflow_log + + def log_info(self, logger: logging.Logger, message: str) -> None: + """Safely log info message if workflow_log is available. + + Args: + logger: The standard Python logger instance + message: The message to log + """ + if self.workflow_log: + self.workflow_log.log_info(logger, message) + + def log_error(self, logger: logging.Logger, message: str) -> None: + """Safely log error message if workflow_log is available. + + Args: + logger: The standard Python logger instance + message: The error message to log + """ + if self.workflow_log: + self.workflow_log.log_error(logger, message) diff --git a/workers/shared/workflow/source_connector.py b/workers/shared/workflow/source_connector.py index 49941bfc0e..950b54e462 100644 --- a/workers/shared/workflow/source_connector.py +++ b/workers/shared/workflow/source_connector.py @@ -13,9 +13,10 @@ from dataclasses import dataclass from typing import Any -from unstract.core.data_models import ConnectionType as CoreConnectionType +from shared.infrastructure.logging.logger import WorkerLogger +from shared.workflow.logger_helper import WorkflowLoggerHelper -from ..infrastructure.logging.logger import WorkerLogger +from unstract.core.data_models import ConnectionType as CoreConnectionType logger = WorkerLogger.get_logger(__name__) @@ -94,6 +95,7 @@ def __init__(self, config: SourceConfig, workflow_log=None): self.connection_type = config.connection_type self.settings = config.settings self.workflow_log = workflow_log + self.logger_helper = WorkflowLoggerHelper(workflow_log) # Store connector instance details self.connector_id = config.connector_id @@ -110,27 +112,39 @@ def get_fsspec_fs(self): This method replicates backend logic for getting filesystem access. """ - if self.connection_type == self.ConnectionType.API_STORAGE: - # API storage uses workflow execution storage - from unstract.filesystem import FileStorageType, FileSystem + try: + if self.connection_type == self.ConnectionType.API_STORAGE: + # API storage uses workflow execution storage + from unstract.filesystem import FileStorageType, FileSystem + + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + return file_system.get_file_storage() - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - return file_system.get_file_storage() + if not self.connector_id or not self.connector_settings: + error_msg = ( + "Source connector not configured - missing connector_id or settings" + ) + self.logger_helper.log_error(logger, error_msg) + raise Exception(error_msg) - if not self.connector_id or not self.connector_settings: - raise Exception("Source connector not configured") + # Get the connector instance using connectorkit + from unstract.connectors.connectorkit import Connectorkit - # Get the connector instance using connectorkit - from unstract.connectors.connectorkit import Connectorkit + connectorkit = Connectorkit() + connector_class = connectorkit.get_connector_class_by_connector_id( + self.connector_id + ) + connector_instance = connector_class(self.connector_settings) - connectorkit = Connectorkit() - connector_class = connectorkit.get_connector_class_by_connector_id( - self.connector_id - ) - connector_instance = connector_class(self.connector_settings) + # Get fsspec filesystem + fs = connector_instance.get_fsspec_fs() + return fs - # Get fsspec filesystem - return connector_instance.get_fsspec_fs() + except Exception as e: + error_msg = f"Failed to initialize source connector filesystem: {str(e)}" + self.logger_helper.log_error(logger, error_msg) + logger.error(error_msg) + raise def read_file_content(self, file_path: str) -> bytes: """Read file content from source connector. @@ -164,7 +178,6 @@ def list_files( List of file information dictionaries """ fs = self.get_fsspec_fs() - # Implementation would list files using fsspec # This is a simplified version try: @@ -186,7 +199,9 @@ def list_files( return files except Exception as e: - logger.error(f"Failed to list files from source: {e}") + error_msg = f"Failed to list files from source connector directory '{input_directory}': {str(e)}" + self.logger_helper.log_error(logger, error_msg) + logger.error(error_msg) return [] def validate(self) -> None: @@ -198,11 +213,15 @@ def validate(self) -> None: self.ConnectionType.API, self.ConnectionType.API_STORAGE, ]: - raise Exception(f"Invalid source connection type: {connection_type}") + error_msg = f"Invalid source connection type: {connection_type}" + self.logger_helper.log_error(logger, error_msg) + raise Exception(error_msg) if connection_type == self.ConnectionType.FILESYSTEM: if not self.connector_id or not self.connector_settings: - raise Exception("Filesystem source requires connector configuration") + error_msg = "Filesystem source requires connector configuration" + self.logger_helper.log_error(logger, error_msg) + raise Exception(error_msg) def get_config(self) -> SourceConfig: """Get serializable configuration for the source connector."""