From 05de51061dacb9422e527b37825424afaf5d85c8 Mon Sep 17 00:00:00 2001 From: Huy Duong Date: Sat, 18 Nov 2023 12:15:39 +0000 Subject: [PATCH 1/5] Add XCom tab to Grid --- airflow/www/static/js/api/index.ts | 3 + airflow/www/static/js/api/useTaskXcom.ts | 71 +++++++++++ airflow/www/static/js/dag/details/index.tsx | 40 ++++++- .../details/taskInstance/Xcom/XcomEntry.tsx | 82 +++++++++++++ .../dag/details/taskInstance/Xcom/index.tsx | 110 ++++++++++++++++++ airflow/www/templates/airflow/dag.html | 2 + 6 files changed, 305 insertions(+), 3 deletions(-) create mode 100644 airflow/www/static/js/api/useTaskXcom.ts create mode 100644 airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx create mode 100644 airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 782a4f99a1acd..f04a9bc887f6b 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -48,6 +48,7 @@ import usePools from "./usePools"; import useDags from "./useDags"; import useDagRuns from "./useDagRuns"; import useHistoricalMetricsData from "./useHistoricalMetricsData"; +import { useTaskXcom, useTaskXcomCollection } from "./useTaskXcom"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -91,4 +92,6 @@ export { useTaskInstance, useUpstreamDatasetEvents, useHistoricalMetricsData, + useTaskXcom, + useTaskXcomCollection, }; diff --git a/airflow/www/static/js/api/useTaskXcom.ts b/airflow/www/static/js/api/useTaskXcom.ts new file mode 100644 index 0000000000000..3db62d8d3f1ef --- /dev/null +++ b/airflow/www/static/js/api/useTaskXcom.ts @@ -0,0 +1,71 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import type { API } from "src/types"; +import { getMetaValue } from "src/utils"; +import { useQuery } from "react-query"; +import axios, { AxiosResponse } from "axios"; + +// tryNumber is not required to get XCom keys or values but is used +// in query key so refetch will occur if new tries are available +interface TaskXcomCollectionProps extends API.GetXcomEntriesVariables { + tryNumber: number; +} +interface TaskXcomProps extends API.GetXcomEntryVariables { + tryNumber: number; +} + +export const useTaskXcomCollection = ({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber, +}: TaskXcomCollectionProps) => + useQuery(["taskXcoms", dagId, dagRunId, taskId, mapIndex, tryNumber], () => + axios.get( + getMetaValue("task_xcom_entries_api") + .replace("_DAG_RUN_ID_", dagRunId) + .replace("_TASK_ID_", taskId), + { params: { map_index: mapIndex } } + ) + ); + +export const useTaskXcom = ({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber, +}: TaskXcomProps) => + useQuery( + ["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber], + () => + axios.get( + getMetaValue("task_xcom_entry_api") + .replace("_DAG_RUN_ID_", dagRunId) + .replace("_TASK_ID_", taskId) + .replace("_XCOM_KEY_", xcomKey), + { params: { map_index: mapIndex } } + ), + { + enabled: !!xcomKey, + } + ); diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index b476d61950ee3..ecfda9e6ffef5 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -39,6 +39,7 @@ import { MdReorder, MdCode, MdOutlineViewTimeline, + MdSyncAlt, } from "react-icons/md"; import { BiBracket } from "react-icons/bi"; import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper"; @@ -58,6 +59,7 @@ import ClearRun from "./dagRun/ClearRun"; import MarkRunAs from "./dagRun/MarkRunAs"; import ClearInstance from "./taskInstance/taskActions/ClearInstance"; import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs"; +import Xcom from "./taskInstance/Xcom"; const dagId = getMetaValue("dag_id")!; @@ -80,6 +82,8 @@ const tabToIndex = (tab?: string) => { case "logs": case "mapped_tasks": return 4; + case "xcom": + return 5; case "details": default: return 0; @@ -90,7 +94,8 @@ const indexToTab = ( index: number, taskId: string | null, showLogs: boolean, - showMappedTasks: boolean + showMappedTasks: boolean, + showXcom: boolean ) => { switch (index) { case 1: @@ -103,6 +108,9 @@ const indexToTab = ( if (showMappedTasks) return "mapped_tasks"; if (showLogs) return "logs"; return undefined; + case 5: + if (showXcom) return "xcom"; + return undefined; case 0: default: return undefined; @@ -138,6 +146,7 @@ const Details = ({ const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary; const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary); const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && !isGroup); + const showXcom = !!(isTaskInstance && !isGroupOrMappedTaskSummary); const [searchParams, setSearchParams] = useSearchParams(); const tab = searchParams.get(TAB_PARAM) || undefined; @@ -146,12 +155,18 @@ const Details = ({ const onChangeTab = useCallback( (index: number) => { const params = new URLSearchParamsWrapper(searchParams); - const newTab = indexToTab(index, taskId, showLogs, showMappedTasks); + const newTab = indexToTab( + index, + taskId, + showLogs, + showMappedTasks, + showXcom + ); if (newTab) params.set(TAB_PARAM, newTab); else params.delete(TAB_PARAM); setSearchParams(params); }, - [setSearchParams, searchParams, showLogs, showMappedTasks, taskId] + [setSearchParams, searchParams, showLogs, showMappedTasks, showXcom, taskId] ); useEffect(() => { @@ -268,6 +283,14 @@ const Details = ({ )} + {showXcom && ( + + + + XCom + + + )} @@ -336,6 +359,17 @@ const Details = ({ /> )} + {showXcom && run && ( + + + + )} diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx new file mode 100644 index 0000000000000..6d28579ce14fd --- /dev/null +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -0,0 +1,82 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react"; +import React from "react"; +import { useTaskXcom } from "src/api"; +import type { Dag, DagRun, TaskInstance } from "src/types"; + +interface Props { + dagId: Dag["id"]; + dagRunId: DagRun["runId"]; + taskId: TaskInstance["taskId"]; + mapIndex?: TaskInstance["mapIndex"]; + xcomKey: string; + tryNumber: TaskInstance["tryNumber"]; +} + +const XcomEntry = ({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber, +}: Props) => { + const { + data: xcom, + isLoading, + error, + } = useTaskXcom({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber: tryNumber || 1, + }); + + let content = {xcom?.value}; + if (isLoading) { + content = ; + } else if (error) { + content = ( + + + Error loading XCom entry + + ); + } else if (!xcom) { + content = ( + + + No value found for XCom key + + ); + } + + return ( + + {xcomKey} + {content} + + ); +}; + +export default XcomEntry; diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx new file mode 100644 index 0000000000000..2c17af8a19851 --- /dev/null +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx @@ -0,0 +1,110 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useRef } from "react"; +import type { Dag, DagRun, TaskInstance } from "src/types"; +import { + Table, + Text, + Thead, + Tbody, + Tr, + Td, + Spinner, + Alert, + AlertIcon, + Box, +} from "@chakra-ui/react"; +import { useTaskXcomCollection } from "src/api"; +import { useOffsetTop } from "src/utils"; +import XcomEntry from "./XcomEntry"; + +interface Props { + dagId: Dag["id"]; + dagRunId: DagRun["runId"]; + taskId: TaskInstance["taskId"]; + mapIndex?: TaskInstance["mapIndex"]; + tryNumber: TaskInstance["tryNumber"]; +} + +const Xcom = ({ dagId, dagRunId, taskId, mapIndex, tryNumber }: Props) => { + const taskXcomRef = useRef(null); + const offsetTop = useOffsetTop(taskXcomRef); + + const { + data: xcomCollection, + isLoading, + error, + } = useTaskXcomCollection({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber: tryNumber || 1, + }); + + return ( + + {isLoading && } + {!!error && ( + + + An error occurred while fetching task XCom. + + )} + {xcomCollection && + (xcomCollection.totalEntries === 0 ? ( + No XCom + ) : ( + + + + + + + + + {xcomCollection.xcomEntries?.map((xcomEntry) => ( + + ))} + +
+ Key + + Value +
+ ))} +
+ ); +}; + +export default Xcom; diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 40440d3fd6672..fcf114d2d5a2f 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -70,6 +70,8 @@ + + From 170847b69c768baf3d34e8128c1da08ab53f1e08 Mon Sep 17 00:00:00 2001 From: Huy Duong Date: Sun, 26 Nov 2023 20:47:52 +0000 Subject: [PATCH 2/5] Combine showLogs and showXcom logic evaluation to isIndividualTaskInstance --- airflow/www/static/js/dag/details/index.tsx | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index ecfda9e6ffef5..ebc5a06baba1d 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -144,9 +144,12 @@ const Details = ({ const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId; const isGroup = !!children; const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary; - const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary); + const isIndividualTaskInstance = !!( + isTaskInstance && !isGroupOrMappedTaskSummary + ); + const showLogs = isIndividualTaskInstance; + const showXcom = isIndividualTaskInstance; const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && !isGroup); - const showXcom = !!(isTaskInstance && !isGroupOrMappedTaskSummary); const [searchParams, setSearchParams] = useSearchParams(); const tab = searchParams.get(TAB_PARAM) || undefined; From 824049d068b63565f659a214aba7bdd3e52e6aeb Mon Sep 17 00:00:00 2001 From: Huy Duong Date: Sun, 26 Nov 2023 20:51:49 +0000 Subject: [PATCH 3/5] Remove link to /xcom page from UI grid view --- airflow/www/static/js/dag/details/taskInstance/Nav.tsx | 3 --- airflow/www/templates/airflow/dag.html | 1 - 2 files changed, 4 deletions(-) diff --git a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx index 3541240ce3255..1e5d84d8ba48b 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx @@ -30,7 +30,6 @@ const isK8sExecutor = getMetaValue("k8s_or_k8scelery_executor") === "True"; const taskInstancesUrl = getMetaValue("task_instances_list_url"); const renderedK8sUrl = getMetaValue("rendered_k8s_url"); const renderedTemplatesUrl = getMetaValue("rendered_templates_url"); -const xcomUrl = getMetaValue("xcom_url"); const taskUrl = getMetaValue("task_url"); const gridUrl = getMetaValue("grid_url"); @@ -52,7 +51,6 @@ const Nav = forwardRef( }); const detailsLink = `${taskUrl}&${params}`; const renderedLink = `${renderedTemplatesUrl}&${params}`; - const xcomLink = `${xcomUrl}&${params}`; const k8sLink = `${renderedK8sUrl}&${params}`; const listParams = new URLSearchParamsWrapper({ _flt_3_dag_id: dagId, @@ -88,7 +86,6 @@ const Nav = forwardRef( {isSubDag && ( Zoom into SubDag )} - XCom )} - From b2b6fab43f47a5a1b25f79b0dc9d53bb0ecadeb2 Mon Sep 17 00:00:00 2001 From: Huy Duong Date: Sun, 26 Nov 2023 21:10:51 +0000 Subject: [PATCH 4/5] Use consistent naming to distinguish XcomCollection and XcomEntry --- airflow/www/static/js/api/index.ts | 4 ++-- airflow/www/static/js/api/useTaskXcom.ts | 2 +- airflow/www/static/js/dag/details/index.tsx | 4 ++-- .../js/dag/details/taskInstance/Xcom/XcomEntry.tsx | 4 ++-- .../static/js/dag/details/taskInstance/Xcom/index.tsx | 10 ++++++++-- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index f04a9bc887f6b..6369a819d26ef 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -48,7 +48,7 @@ import usePools from "./usePools"; import useDags from "./useDags"; import useDagRuns from "./useDagRuns"; import useHistoricalMetricsData from "./useHistoricalMetricsData"; -import { useTaskXcom, useTaskXcomCollection } from "./useTaskXcom"; +import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -92,6 +92,6 @@ export { useTaskInstance, useUpstreamDatasetEvents, useHistoricalMetricsData, - useTaskXcom, + useTaskXcomEntry, useTaskXcomCollection, }; diff --git a/airflow/www/static/js/api/useTaskXcom.ts b/airflow/www/static/js/api/useTaskXcom.ts index 3db62d8d3f1ef..1faa19005a906 100644 --- a/airflow/www/static/js/api/useTaskXcom.ts +++ b/airflow/www/static/js/api/useTaskXcom.ts @@ -47,7 +47,7 @@ export const useTaskXcomCollection = ({ ) ); -export const useTaskXcom = ({ +export const useTaskXcomEntry = ({ dagId, dagRunId, taskId, diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index ebc5a06baba1d..c9eb432463141 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -59,7 +59,7 @@ import ClearRun from "./dagRun/ClearRun"; import MarkRunAs from "./dagRun/MarkRunAs"; import ClearInstance from "./taskInstance/taskActions/ClearInstance"; import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs"; -import Xcom from "./taskInstance/Xcom"; +import XcomCollection from "./taskInstance/Xcom"; const dagId = getMetaValue("dag_id")!; @@ -364,7 +364,7 @@ const Details = ({ )} {showXcom && run && ( - { +const XcomCollection = ({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber, +}: Props) => { const taskXcomRef = useRef(null); const offsetTop = useOffsetTop(taskXcomRef); @@ -107,4 +113,4 @@ const Xcom = ({ dagId, dagRunId, taskId, mapIndex, tryNumber }: Props) => { ); }; -export default Xcom; +export default XcomCollection; From fec1baedc812921bae3b621eb8dc9a9782f5817f Mon Sep 17 00:00:00 2001 From: Huy Duong Date: Sat, 2 Dec 2023 11:27:32 +0000 Subject: [PATCH 5/5] Refactor boolean vars --- airflow/www/static/js/dag/details/index.tsx | 52 +++++++++++---------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index c9eb432463141..3c555c701e191 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -93,9 +93,8 @@ const tabToIndex = (tab?: string) => { const indexToTab = ( index: number, taskId: string | null, - showLogs: boolean, - showMappedTasks: boolean, - showXcom: boolean + isTaskInstance: boolean, + isMappedTaskSummary: boolean ) => { switch (index) { case 1: @@ -105,11 +104,11 @@ const indexToTab = ( case 3: return "code"; case 4: - if (showMappedTasks) return "mapped_tasks"; - if (showLogs) return "logs"; + if (isMappedTaskSummary) return "mapped_tasks"; + if (isTaskInstance) return "logs"; return undefined; case 5: - if (showXcom) return "xcom"; + if (isTaskInstance) return "xcom"; return undefined; case 0: default: @@ -132,7 +131,6 @@ const Details = ({ } = useSelection(); const isDag = !runId && !taskId; const isDagRun = runId && !taskId; - const isTaskInstance = taskId && runId; const { data: { dagRuns, groups }, @@ -140,16 +138,21 @@ const Details = ({ const group = getTask({ taskId, task: groups }); const children = group?.children; const isMapped = group?.isMapped; - - const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId; const isGroup = !!children; - const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary; - const isIndividualTaskInstance = !!( - isTaskInstance && !isGroupOrMappedTaskSummary + + const isMappedTaskSummary = !!( + taskId && + runId && + !isGroup && + isMapped && + mapIndex === undefined + ); + const isTaskInstance = !!( + taskId && + runId && + !isGroup && + !isMappedTaskSummary ); - const showLogs = isIndividualTaskInstance; - const showXcom = isIndividualTaskInstance; - const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && !isGroup); const [searchParams, setSearchParams] = useSearchParams(); const tab = searchParams.get(TAB_PARAM) || undefined; @@ -161,15 +164,14 @@ const Details = ({ const newTab = indexToTab( index, taskId, - showLogs, - showMappedTasks, - showXcom + isTaskInstance, + isMappedTaskSummary ); if (newTab) params.set(TAB_PARAM, newTab); else params.delete(TAB_PARAM); setSearchParams(params); }, - [setSearchParams, searchParams, showLogs, showMappedTasks, showXcom, taskId] + [setSearchParams, searchParams, isTaskInstance, isMappedTaskSummary, taskId] ); useEffect(() => { @@ -270,7 +272,7 @@ const Details = ({ Code - {showLogs && ( + {isTaskInstance && ( @@ -278,7 +280,7 @@ const Details = ({ )} - {showMappedTasks && ( + {isMappedTaskSummary && ( @@ -286,7 +288,7 @@ const Details = ({ )} - {showXcom && ( + {isTaskInstance && ( @@ -330,7 +332,7 @@ const Details = ({ - {showLogs && run && ( + {isTaskInstance && run && ( )} - {showMappedTasks && ( + {isMappedTaskSummary && ( )} - {showXcom && run && ( + {isTaskInstance && (