Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG 3008 Fix registered with no run workflows #1374

Merged
merged 11 commits into from
May 30, 2023
32 changes: 32 additions & 0 deletions src/ui/common/src/components/pages/workflow/id/hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { NodeResultsMap, NodesMap } from '../../../../handlers/responses/node';
import { DagResultResponse } from '../../../../handlers/responses/workflow';
import { getPathPrefix } from '../../../../utils/getPathPrefix';
import { getLatestDagResult } from '../../../../utils/shared';

export type useWorkflowIdsOutputs = {
workflowId: string;
Expand Down Expand Up @@ -187,3 +188,34 @@ export function useWorkflowNodesResults(
),
};
}

export function useLatestDagResultOrDag(apiKey: string, workflowId: string) {
const {
data: dagResults,
error: dagResultsError,
isLoading: dagResultsLoading,
} = useDagResultsGetQuery({
apiKey: apiKey,
workflowId: workflowId,
});

const latestDagResult = getLatestDagResult(dagResults ?? []); // undefined if not available

const dagIdFromLatestDagResult = latestDagResult?.dag_id;

const {
data: dags,
error: dagsError,
isLoading: dagsLoading,
} = useDagsGetQuery(
{
apiKey: apiKey,
workflowId: workflowId,
},
{
skip: dagIdFromLatestDagResult,
}
);

return { latestDagResult, dag: (dags ?? [])[0] };
}
129 changes: 46 additions & 83 deletions src/ui/common/src/components/pages/workflows/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from '../../../handlers/AqueductApi';
import UserProfile from '../../../utils/auth';
import getPathPrefix from '../../../utils/getPathPrefix';
import ExecutionStatus from '../../../utils/shared';
import ExecutionStatus, { getLatestDagResult } from '../../../utils/shared';
import { getWorkflowEngineTypes } from '../../../utils/workflows';
import DefaultLayout from '../../layouts/default';
import { BreadcrumbLink } from '../../layouts/NavBar';
Expand All @@ -18,7 +18,11 @@ import {
SortType,
} from '../../tables/PaginatedSearchTable';
import { LayoutProps } from '../types';
import { useWorkflowNodes, useWorkflowNodesResults } from '../workflow/id/hook';
import {
useLatestDagResultOrDag,
useWorkflowNodes,
useWorkflowNodesResults,
} from '../workflow/id/hook';
import CheckItem from './components/CheckItem';
import ExecutionStatusLink from './components/ExecutionStatusLink';
import MetricItem from './components/MetricItem';
Expand Down Expand Up @@ -84,22 +88,6 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
},
];

const getLatestDagResult = (dagResults) =>
dagResults.reduce(
(prev, curr) =>
curr.exec_state?.timestamps?.pending_at
? new Date(prev.exec_state?.timestamps?.pending_at) <
new Date(curr.exec_state?.timestamps?.pending_at)
? curr
: prev
: curr,
{
exec_state: {
status: ExecutionStatus.Registered,
timestamps: { pending_at: 0 },
},
}
);
const LastRunComponent = (row) => {
const workflowId = row.id;

Expand Down Expand Up @@ -133,43 +121,30 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
const workflowId = row.id;
const url = `${getPathPrefix()}/workflow/${workflowId}`;

const {
data: dagResults,
error: dagResultsError,
isLoading: dagResultsLoading,
} = useDagResultsGetQuery({
apiKey: user.apiKey,
workflowId: workflowId,
});
const { latestDagResult, dag } = useLatestDagResultOrDag(
user.apiKey,
workflowId
);
let status = ExecutionStatus.Unknown;

if (!dagResultsLoading && !dagResultsError && dagResults.length > 0) {
const latestDagResult = getLatestDagResult(dagResults);
if (latestDagResult) {
status = latestDagResult.exec_state.status;
}
if (latestDagResult) {
status = latestDagResult.exec_state.status;
} else if (dag) {
status = ExecutionStatus.Registered;
}

return <ExecutionStatusLink name={row.name} url={url} status={status} />;
},
'Last Run': LastRunComponent,
Engines: (row) => {
const workflowId = row.id;

const {
data: dagResults,
error: dagResultsError,
isLoading: dagResultsLoading,
} = useDagResultsGetQuery({
apiKey: user.apiKey,
workflowId: workflowId,
});

let latestDagId;
if (!dagResultsLoading && !dagResultsError && dagResults.length > 0) {
const latestDagResult = getLatestDagResult(dagResults);
latestDagId = latestDagResult.dag_id;
}
const { latestDagResult, dag: noRunDag } = useLatestDagResultOrDag(
user.apiKey,
workflowId
);

const latestDagId = latestDagResult?.dag_id ?? noRunDag?.id;

const {
data: dag,
error: dagError,
Expand All @@ -181,15 +156,17 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
dagId: latestDagId,
},
{
skip: dagResultsLoading && latestDagId,
skip: !latestDagId || noRunDag,
}
);

const nodes = useWorkflowNodes(user.apiKey, workflowId, latestDagId);

let engines = ['Unknown'];
if (!dagLoading && !dagError && dag) {
const workflowDag = structuredClone(dag);
if (dag || noRunDag) {
const workflowDag = noRunDag
? structuredClone(noRunDag)
: structuredClone(dag);
workflowDag.operators = nodes.operators;
engines = getWorkflowEngineTypes(workflowDag);
}
Expand All @@ -211,22 +188,13 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
Metrics: (row) => {
const workflowId = row.id;

const {
data: dagResults,
error: dagResultsError,
isLoading: dagResultsLoading,
} = useDagResultsGetQuery({
apiKey: user.apiKey,
workflowId: workflowId,
});

let latestDagResultId;
let latestDagId;
if (!dagResultsLoading && !dagResultsError && dagResults.length > 0) {
const latestDagResult = getLatestDagResult(dagResults);
latestDagResultId = latestDagResult.id;
latestDagId = latestDagResult.dag_id;
}
const { latestDagResult, dag } = useLatestDagResultOrDag(
user.apiKey,
workflowId
);

const latestDagResultId = latestDagResult?.id;
const latestDagId = latestDagResult?.dag_id ?? dag?.id;

const nodes = useWorkflowNodes(user.apiKey, workflowId, latestDagId);
const nodesResults = useWorkflowNodesResults(
Expand All @@ -243,30 +211,23 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
metricId: op.id,
name: op.name,
value: nodesResults.artifacts[artifactId]?.content_serialized,
status: nodesResults.artifacts[artifactId]?.exec_state?.status,
status:
nodesResults.artifacts[artifactId]?.exec_state?.status ??
ExecutionStatus.Registered,
};
});
return <MetricItem metrics={metricNodes} />;
},
Checks: (row) => {
const workflowId = row.id;

const {
data: dagResults,
error: dagResultsError,
isLoading: dagResultsLoading,
} = useDagResultsGetQuery({
apiKey: user.apiKey,
workflowId: workflowId,
});

let latestDagResultId;
let latestDagId;
if (!dagResultsLoading && !dagResultsError && dagResults.length > 0) {
const latestDagResult = getLatestDagResult(dagResults);
latestDagResultId = latestDagResult.id;
latestDagId = latestDagResult.dag_id;
}
const { latestDagResult, dag } = useLatestDagResultOrDag(
user.apiKey,
workflowId
);

const latestDagResultId = latestDagResult?.id;
const latestDagId = latestDagResult?.dag_id ?? dag?.id;

const nodes = useWorkflowNodes(user.apiKey, workflowId, latestDagId);
const nodesResults = useWorkflowNodesResults(
Expand All @@ -282,7 +243,9 @@ const WorkflowsPage: React.FC<Props> = ({ user, Layout = DefaultLayout }) => {
return {
checkId: op.id,
name: op.name,
status: nodesResults.artifacts[artifactId]?.exec_state?.status,
status:
nodesResults.artifacts[artifactId]?.exec_state?.status ??
ExecutionStatus.Registered,
level: op.spec.check.level,
value: nodesResults.artifacts[artifactId]?.content_serialized,
timestamp:
Expand Down
41 changes: 37 additions & 4 deletions src/ui/common/src/components/tables/PaginatedSearchTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export type SortColumn = {

// The sequence of keys in the row object to access in order to get the
// value which should be compared for sort purposes.
sortAccessPath: string[];
sortAccessPath: (string | number)[];
};

export enum SortType {
Expand All @@ -50,7 +50,7 @@ export enum SortType {
}

type SortConfig = {
name: SortColumn;
sortColumn: SortColumn;
sortType: SortType;
};

Expand All @@ -75,6 +75,7 @@ export const PaginatedSearchTable: React.FC<PaginatedSearchTableProps> = ({
onChangeRowsPerPage,
savedRowsPerPage,
sortColumns = [],
defaultSortConfig,
}) => {
const [page, setPage] = useState(0);
const [rowsPerPage, setRowsPerPage] = useState(
Expand All @@ -89,7 +90,7 @@ export const PaginatedSearchTable: React.FC<PaginatedSearchTableProps> = ({
const [sortTypeMenuAnchor, setSortTypeMenuAnchor] =
useState<HTMLLIElement>(null);
const [sortConfig, setSortConfig] = useState({
sortColumn: { name: null, sortAccessPath: [] as string[] },
sortColumn: null,
sortType: SortType.None,
});

Expand All @@ -105,14 +106,46 @@ export const PaginatedSearchTable: React.FC<PaginatedSearchTableProps> = ({
return value;
};

const rowData = [...data].map((row) => {
let rowData = [...data].map((row) => {
const rowData = {};
columns.forEach((column) => {
rowData[column] = getColumnValue(row, column);
});
return rowData;
});

// Default ordering
if (defaultSortConfig) {
rowData = rowData.sort((r1, r2) => {
const col = defaultSortConfig.sortColumn;
let v1: PaginatedSearchTableRow | PaginatedSearchTableElement = r1;
let v2: PaginatedSearchTableRow | PaginatedSearchTableElement = r2;
for (const path of col.sortAccessPath) {
v1 = v1[path];
v2 = v2[path];
}

if (defaultSortConfig.sortType === SortType.Ascending) {
if (v1 > v2) {
return 1;
} else if (v1 < v2) {
return -1;
} else {
return 0;
}
} else {
// sortType === SortType.Descending
if (v1 > v2) {
return -1;
} else if (v1 < v2) {
return 1;
} else {
return 0;
}
}
});
}

const [rows, setRows] = useState(rowData);
const [orderedRows, setOrderedRows] = useState(rowData);

Expand Down
16 changes: 16 additions & 0 deletions src/ui/common/src/utils/shared.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ArtifactResultResponse } from '../handlers/responses/node';
import { DagResultResponse } from '../handlers/responses/workflow';
import { TableRow } from './data';

export enum AWSCredentialType {
Expand Down Expand Up @@ -74,6 +75,21 @@ export const getArtifactResultTableRow = (
};
};

export function getLatestDagResult(
dagResults: DagResultResponse[]
): DagResultResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly prefer return an undefined here if there's no result available, and let the caller decide how to deal with undefined result

if (dagResults && dagResults.length > 0) {
return dagResults.reduce((prev, curr) =>
curr.exec_state?.timestamps?.pending_at
? new Date(prev.exec_state?.timestamps?.pending_at) <
new Date(curr.exec_state?.timestamps?.pending_at)
? curr
: prev
: curr
);
}
}

export const stringToExecutionStatus = (status: string): ExecutionStatus => {
let executionStatus = ExecutionStatus.Unknown;
switch (status) {
Expand Down