Skip to content

Commit 3e1efc8

Browse files
Adding logs to worker-logging
1 parent 8d19e7a commit 3e1efc8

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

backend/workflow_manager/workflow_v2/execution_log_utils.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,25 @@ def process_log_history_from_cache(
9393
)
9494

9595
for log_data in sorted_logs:
96+
# Display log content for all levels in workflow-logging service
97+
log_level = log_data.data.get("level", "INFO")
98+
log_message = log_data.data.get("log") or log_data.data.get(
99+
"message", "No message"
100+
)
101+
log_stage = log_data.data.get("stage", "UNKNOWN_STAGE")
102+
103+
# Display log with appropriate level
104+
if log_level == "ERROR":
105+
logger.error(f"Processing ERROR log [{log_stage}]: {log_message}")
106+
elif log_level == "WARNING":
107+
logger.warning(f"Processing WARNING log [{log_stage}]: {log_message}")
108+
elif log_level == "INFO":
109+
logger.info(f"Processing INFO log [{log_stage}]: {log_message}")
110+
elif log_level == "DEBUG":
111+
logger.debug(f"Processing DEBUG log [{log_stage}]: {log_message}")
112+
else:
113+
logger.info(f"Processing {log_level} log [{log_stage}]: {log_message}")
114+
96115
execution = execution_map.get(log_data.execution_id)
97116
if not execution:
98117
error_msg = f"Execution not found for execution_id: {log_data.execution_id}, skipping log push"
@@ -127,7 +146,28 @@ def process_log_history_from_cache(
127146
# Bulk insert logs for each organization
128147
processed_count = 0
129148
for organization_id, logs in organization_logs.items():
130-
logger.info(f"Storing {len(logs)} logs for org: {organization_id}")
149+
# Count logs by level for better visibility
150+
log_counts: dict[str, int] = {}
151+
for log in logs:
152+
level = log.data.get("level", "INFO")
153+
log_counts[level] = log_counts.get(level, 0) + 1
154+
155+
# Display log count summary
156+
if "ERROR" in log_counts:
157+
count_summary = ", ".join(
158+
[f"{count} {level}" for level, count in log_counts.items()]
159+
)
160+
logger.error(
161+
f"Storing logs for org {organization_id}: {count_summary} (total: {len(logs)})"
162+
)
163+
else:
164+
count_summary = ", ".join(
165+
[f"{count} {level}" for level, count in log_counts.items()]
166+
)
167+
logger.info(
168+
f"Storing logs for org {organization_id}: {count_summary} (total: {len(logs)})"
169+
)
170+
131171
ExecutionLog.objects.bulk_create(objs=logs, ignore_conflicts=True)
132172
processed_count += len(logs)
133173

0 commit comments

Comments
 (0)