Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions frontend/src/components/logging/detailed-logs/DetailedLogs.jsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useEffect, useState } from "react";
import { useEffect, useState, useRef } from "react";
import { useNavigate, useParams } from "react-router-dom";
import {
ArrowLeftOutlined,
Expand Down Expand Up @@ -61,9 +61,47 @@ const DetailedLogs = () => {
const [statusFilter, setStatusFilter] = useState(null);
const [searchText, setSearchText] = useState("");
const [searchTimeout, setSearchTimeout] = useState(null);
// Store interval ID for proper cleanup
const pollingIntervalRef = useRef(null);
// Store latest execution details for polling re-checks
const executionDetailsRef = useRef(null);

const filterOptions = ["COMPLETED", "PENDING", "ERROR", "EXECUTING"];

// Check if execution should continue polling
const shouldPoll = (executionDetails) => {
if (!executionDetails) return false;

const status = executionDetails?.status?.toLowerCase();
// Only poll EXECUTING or PENDING status
if (status !== "executing" && status !== "pending") {
return false;
}

// Check if execution is stale (>1 hour from creation)
if (!executionDetails?.createdAtRaw) return false;
const createdAt = new Date(executionDetails?.createdAtRaw);
if (!Number.isFinite(createdAt.getTime())) return false;
const now = new Date();
const oneHourInMs = 60 * 60 * 1000;
const timeDifference = now - createdAt;

if (timeDifference > oneHourInMs) {
// Stopping polling in case the execution is possibly stuck
return false;
}

return true;
};

// Clear polling interval
const clearPolling = () => {
if (pollingIntervalRef.current) {
clearInterval(pollingIntervalRef.current);
pollingIntervalRef.current = null;
}
};

const fetchExecutionDetails = async (id) => {
try {
const url = getUrl(`/execution/${id}/`);
Expand All @@ -74,6 +112,7 @@ const DetailedLogs = () => {
(item?.successful_files || 0) + (item?.failed_files || 0);
const progress = total > 0 ? Math.round((processed / total) * 100) : 0;
const formattedData = {
createdAtRaw: item?.created_at,
executedAt: formattedDateTime(item?.created_at),
executionId: item?.id,
progress,
Expand Down Expand Up @@ -128,17 +167,17 @@ const DetailedLogs = () => {
const params = { ...defaultParams, ...customParams };
const searchParams = new URLSearchParams();

Object.entries(params).forEach(([key, value]) => {
for (const [key, value] of Object.entries(params)) {
if (value !== null && value !== undefined) {
searchParams.append(key, value);
}
});
}

// Handle file status filter for MultipleChoiceFilter
if (statusFilter && statusFilter.length > 0) {
statusFilter.forEach((status) => {
for (const status of statusFilter) {
searchParams.append("status", status);
});
}
}

const response = await axiosPrivate.get(
Expand Down Expand Up @@ -289,21 +328,39 @@ const DetailedLogs = () => {
fetchExecutionFiles(id, pagination.current);
}, [pagination.current, ordering, statusFilter]);

// Keep ref updated with latest execution details
useEffect(() => {
executionDetailsRef.current = executionDetails;
}, [executionDetails]);

// Polling logic for execution status updates
useEffect(() => {
let interval = null;
if (executionDetails?.status === "EXECUTING") {
interval = setInterval(() => {
// Clear any existing interval first
clearPolling();

if (shouldPoll(executionDetails)) {
pollingIntervalRef.current = setInterval(() => {
// Re-check staleness inside polling cycle
// Handles scenario where execution runs for >1 hour
if (!shouldPoll(executionDetailsRef.current)) {
clearPolling();
return;
}

fetchExecutionDetails(id);
fetchExecutionFiles(id, pagination.current);
}, 5000);
}
return () => {
if (interval) {
clearInterval(interval);
}
};

// Cleanup when dependencies change
return clearPolling;
}, [executionDetails?.status, id, pagination.current]);

// Clear polling when component unmounts
useEffect(() => {
return clearPolling;
}, []);

useEffect(() => {
const initialColumns = columnsDetailedTable.reduce((acc, col) => {
acc[col.key] = true;
Expand Down
106 changes: 80 additions & 26 deletions frontend/src/components/logging/execution-logs/ExecutionLogs.jsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DatePicker, Flex, Tabs, Typography } from "antd";
import { useNavigate, useParams } from "react-router-dom";
import { useEffect, useState } from "react";
import { useEffect, useState, useRef } from "react";

import { LogsTable } from "../logs-table/LogsTable";
import { DetailedLogs } from "../detailed-logs/DetailedLogs";
Expand Down Expand Up @@ -43,7 +43,10 @@ function ExecutionLogs() {
const [selectedDateRange, setSelectedDateRange] = useState([]);
const [datePickerValue, setDatePickerValue] = useState(null);
const [ordering, setOrdering] = useState(null);
const [pollingIds, setPollingIds] = useState(new Set());
// Store timeouts in a ref for proper cleanup
const pollingTimeoutsRef = useRef({});
// Store polling IDs in a ref to avoid stale closure issues
const pollingIdsRef = useRef(new Set());
const currentPath = location.pathname !== `/${sessionDetails?.orgName}/logs`;
const items = [
{
Expand Down Expand Up @@ -94,6 +97,49 @@ function ExecutionLogs() {
}
};

// Check if execution should continue polling
const shouldPoll = (item) => {
// Only poll EXECUTING or PENDING status
if (
item?.status?.toLowerCase() !== "executing" &&
item?.status?.toLowerCase() !== "pending"
) {
return false;
}

// Check if execution is stale (>1 hour from creation)
const createdAt = new Date(item?.createdAtRaw || item?.created_at);
if (!Number.isFinite(createdAt.getTime())) return false;
const now = new Date();
const oneHourInMs = 60 * 60 * 1000;
const timeDifference = now - createdAt;

if (timeDifference > oneHourInMs) {
// Stopping polling in case the execution is possibly stuck
return false;
}

return true;
};

// Clear a single polling timeout
const clearPollingTimeout = (id) => {
if (pollingTimeoutsRef.current[id]) {
clearTimeout(pollingTimeoutsRef.current[id]);
delete pollingTimeoutsRef.current[id];
}
};

// Clear all polling timeouts and reset state
const clearAllPolling = () => {
for (const id of Object.keys(pollingTimeoutsRef.current)) {
clearTimeout(pollingTimeoutsRef.current[id]);
}
pollingTimeoutsRef.current = {};
// Reset ref to keep it in sync
pollingIdsRef.current = new Set();
};

const pollExecutingRecord = async (id) => {
try {
const url = getUrl(`/execution/${id}/`);
Expand All @@ -116,50 +162,50 @@ function ExecutionLogs() {
progress,
processed,
total,
createdAtRaw: item?.created_at,
success: item?.status === "COMPLETED",
isError: item?.status === "ERROR",
status: item?.status,
workflowName: item?.workflow_name,
pipelineName: item?.pipeline_name || "Pipeline name not found",
successfulFiles: item?.successful_files,
failedFiles: item?.failed_files,
totalFiles: item?.total_files,
status: item?.status,
execution_time: item?.execution_time,
};

// If status is no longer executing, remove from polling
if (item?.status.toLowerCase() !== "executing") {
setPollingIds((prev) => {
const newSet = new Set(prev);
newSet.delete(id);
return newSet;
});
// If status should no longer be polled, remove from polling
if (!shouldPoll(item)) {
// Update ref to keep it in sync
pollingIdsRef.current.delete(id);
clearPollingTimeout(id);
}
}
return newData;
});

// Continue polling if still executing
if (item?.status === "EXECUTING") {
setTimeout(() => pollExecutingRecord(id), 5000); // Poll every 5 seconds
// Continue polling if should still poll
if (shouldPoll(item)) {
pollingTimeoutsRef.current[id] = setTimeout(
() => pollExecutingRecord(id),
5000
);
}
} catch (err) {
setPollingIds((prev) => {
const newSet = new Set(prev);
newSet.delete(id);
return newSet;
});
// Update ref to keep it in sync
pollingIdsRef.current.delete(id);
clearPollingTimeout(id);
}
};

const startPollingForExecuting = (records) => {
records.forEach((record) => {
if (record.status === "EXECUTING" && !pollingIds.has(record.key)) {
setPollingIds((prev) => {
const newSet = new Set(prev);
newSet.add(record.key);
return newSet;
});
for (const record of records) {
if (shouldPoll(record) && !pollingIdsRef.current.has(record.key)) {
// Update ref immediately to prevent stale closure issues
pollingIdsRef.current.add(record.key);
pollExecutingRecord(record.key);
}
});
}
};

const fetchLogs = async (page) => {
Expand Down Expand Up @@ -188,6 +234,7 @@ function ExecutionLogs() {
const progress = total > 0 ? Math.round((processed / total) * 100) : 0;
return {
key: item?.id,
createdAtRaw: item?.created_at,
executedAt: formattedDateTime(item?.created_at),
executedAtWithSeconds: formattedDateTimeWithSeconds(item?.created_at),
executionId: item?.id,
Expand Down Expand Up @@ -240,8 +287,15 @@ function ExecutionLogs() {
);
};

// Clear all polling when component unmounts or view changes
useEffect(() => {
return clearAllPolling;
}, [id, activeTab]);

useEffect(() => {
if (!currentPath) {
// Clear any existing polling when fetching new logs
clearAllPolling();
setDataList([]);
fetchLogs(pagination.current);
}
Expand Down