From a458ce68441cdab67fc2ede0fa102de6d82d98d2 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Tue, 16 May 2023 15:27:52 -0700 Subject: [PATCH 1/7] works --- integration_tests/backend/test_reads.py | 11 +++ src/golang/cmd/server/handler/v2/dags_get.go | 94 ++++++++++++++++++++ src/golang/cmd/server/routes/routes.go | 1 + src/golang/cmd/server/server/handlers.go | 5 ++ src/ui/common/src/handlers/AqueductApi.ts | 6 ++ src/ui/common/src/handlers/v2/DagsGet.ts | 15 ++++ 6 files changed, 132 insertions(+) create mode 100644 src/golang/cmd/server/handler/v2/dags_get.go create mode 100644 src/ui/common/src/handlers/v2/DagsGet.ts diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index 6b7b22cf7..c1a9f6993 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -29,6 +29,7 @@ class TestBackend: # V2 GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows" + GET_DAGS_TEMPLATE = "/api/v2/workflow/%s/dags" GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results" GET_NODES_RESULTS_TEMPLATE = "/api/v2/workflow/%s/result/%s/nodes/results" @@ -347,6 +348,16 @@ def test_endpoint_workflows_get(self): assert key in v2_workflow assert v2_workflow["user_id"] == user_id + def test_endpoint_workflow_dags_get(self): + flow_id, _ = self.flows["flow_with_metrics_and_checks"] + resp = self.get_response(self.GET_DAGS_TEMPLATE % flow_id) + resp = resp.json() + + assert len(resp) == 2 + assert "id" in resp[0] + assert resp[0]["workflow_id"] == str(flow_id) + assert "created_at" in resp[0] + def test_endpoint_dag_results_get(self): flow_id, n_runs = self.flows["flow_with_metrics_and_checks"] resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json() diff --git a/src/golang/cmd/server/handler/v2/dags_get.go b/src/golang/cmd/server/handler/v2/dags_get.go new file mode 100644 index 000000000..1121eb0d1 --- /dev/null +++ b/src/golang/cmd/server/handler/v2/dags_get.go @@ -0,0 +1,94 @@ +package v2 + +import ( + "context" + "net/http" + + "github.com/aqueducthq/aqueduct/cmd/server/handler" + "github.com/aqueducthq/aqueduct/cmd/server/request/parser" + aq_context "github.com/aqueducthq/aqueduct/lib/context" + "github.com/aqueducthq/aqueduct/lib/database" + "github.com/aqueducthq/aqueduct/lib/functional/slices" + "github.com/aqueducthq/aqueduct/lib/models" + "github.com/aqueducthq/aqueduct/lib/repos" + "github.com/aqueducthq/aqueduct/lib/response" + "github.com/dropbox/godropbox/errors" + "github.com/google/uuid" +) + +// This file should map directly to +// src/ui/common/src/handlers/v2/DagsGet.tsx +// +// Route: /v2/workflow/{workflowId}/dags +// Method: GET +// Params: +// `workflowId`: ID for `workflow` object +// Request: +// Headers: +// `api-key`: user's API Key +// +// Response: +// Body: +// serialized `[]response.DAGResult` + +type dagsGetArgs struct { + *aq_context.AqContext + workflowID uuid.UUID +} + +type DAGsGetHandler struct { + handler.GetHandler + + Database database.Database + + WorkflowRepo repos.Workflow + DAGRepo repos.DAG +} + +func (*DAGsGetHandler) Name() string { + return "DAGsGet" +} + +func (h *DAGsGetHandler) Prepare(r *http.Request) (interface{}, int, error) { + aqContext, statusCode, err := aq_context.ParseAqContext(r.Context()) + if err != nil { + return nil, statusCode, err + } + + workflowID, err := (parser.WorkflowIDParser{}).Parse(r) + if err != nil { + return nil, http.StatusBadRequest, err + } + + return &dagsGetArgs{ + AqContext: aqContext, + workflowID: workflowID, + }, http.StatusOK, nil +} + +func (h *DAGsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*dagsGetArgs) + + ok, err := h.WorkflowRepo.ValidateOrg( + ctx, + args.workflowID, + args.OrgID, + h.Database, + ) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error during workflow ownership validation.") + } + + if !ok { + return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.") + } + + dbDAGs, err := h.DAGRepo.GetByWorkflow(ctx, args.workflowID, h.Database) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.") + } + + return slices.Map(dbDAGs, func(dbDAG models.DAG) response.DAG { + return *response.NewDAGFromDBObject(&dbDAG) + }), http.StatusOK, nil +} diff --git a/src/golang/cmd/server/routes/routes.go b/src/golang/cmd/server/routes/routes.go index c696c6e30..8824c2c69 100644 --- a/src/golang/cmd/server/routes/routes.go +++ b/src/golang/cmd/server/routes/routes.go @@ -9,6 +9,7 @@ const ( ListStorageMigrationRoute = "/api/v2/storage-migrations" WorkflowsRoute = "/api/v2/workflows" WorkflowRoute = "/api/v2/workflow/{workflowID}" + DAGsRoute = "/api/v2/workflow/{workflowID}/dags" DAGRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}" DAGResultsRoute = "/api/v2/workflow/{workflowID}/results" DAGResultRoute = "/api/v2/workflow/{workflowID}/result/{dagResultID}" diff --git a/src/golang/cmd/server/server/handlers.go b/src/golang/cmd/server/server/handlers.go index 7a7234452..8c896480c 100644 --- a/src/golang/cmd/server/server/handlers.go +++ b/src/golang/cmd/server/server/handlers.go @@ -23,6 +23,11 @@ func (s *AqServer) Handlers() map[string]handler.Handler { WorkflowRepo: s.WorkflowRepo, OperatorRepo: s.OperatorRepo, }, + routes.DAGsRoute: &v2.DAGsGetHandler{ + Database: s.Database, + WorkflowRepo: s.WorkflowRepo, + DAGRepo: s.DAGRepo, + }, routes.DAGResultRoute: &v2.DAGResultGetHandler{ Database: s.Database, WorkflowRepo: s.WorkflowRepo, diff --git a/src/ui/common/src/handlers/AqueductApi.ts b/src/ui/common/src/handlers/AqueductApi.ts index 43c0f31b4..29c6518bf 100644 --- a/src/ui/common/src/handlers/AqueductApi.ts +++ b/src/ui/common/src/handlers/AqueductApi.ts @@ -83,6 +83,7 @@ import { WorkflowsGetRequest, WorkflowsGetResponse, } from './v2/WorkflowsGet'; +import { dagsGetQuery, DagsGetRequest, DagsGetResponse } from './v2/DagsGet'; const { createApi, fetchBaseQuery } = ((rtkQueryRaw as any).default ?? rtkQueryRaw) as typeof rtkQueryRaw; @@ -106,6 +107,10 @@ export const aqueductApi = createApi({ query: (req) => dagOperatorsGetQuery(req), transformErrorResponse, }), + dagsGet: builder.query({ + query: (req) => dagsGetQuery(req), + transformErrorResponse, + }), dagResultGet: builder.query({ query: (req) => dagResultGetQuery(req), transformErrorResponse, @@ -201,6 +206,7 @@ export const aqueductApi = createApi({ export const { useDagGetQuery, + useDagsGetQuery, useDagOperatorsGetQuery, useDagResultGetQuery, useDagResultsGetQuery, diff --git a/src/ui/common/src/handlers/v2/DagsGet.ts b/src/ui/common/src/handlers/v2/DagsGet.ts new file mode 100644 index 000000000..ab66c2d87 --- /dev/null +++ b/src/ui/common/src/handlers/v2/DagsGet.ts @@ -0,0 +1,15 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/dags_get.go + +import { APIKeyParameter } from '../parameters/Header'; +import { WorkflowIdParameter } from '../parameters/Path'; +import { DagResponse } from '../responses/workflow'; + +export type DagsGetRequest = APIKeyParameter & WorkflowIdParameter + +export type DagsGetResponse = DagResponse[]; + +export const dagsGetQuery = (req: DagsGetRequest) => ({ + url: `workflow/${req.workflowId}/dags`, + headers: { 'api-key': req.apiKey }, +}); \ No newline at end of file From f81c0650a31e64e25580fbd891c33a6326df9823 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Tue, 16 May 2023 17:34:45 -0700 Subject: [PATCH 2/7] lint --- src/ui/common/src/handlers/AqueductApi.ts | 2 +- src/ui/common/src/handlers/v2/DagsGet.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ui/common/src/handlers/AqueductApi.ts b/src/ui/common/src/handlers/AqueductApi.ts index 29c6518bf..090e2e9a2 100644 --- a/src/ui/common/src/handlers/AqueductApi.ts +++ b/src/ui/common/src/handlers/AqueductApi.ts @@ -18,6 +18,7 @@ import { DagResultsGetRequest, DagResultsGetResponse, } from './v2/DagResultsGet'; +import { dagsGetQuery, DagsGetRequest, DagsGetResponse } from './v2/DagsGet'; import { integrationOperatorsGetQuery, IntegrationOperatorsGetRequest, @@ -83,7 +84,6 @@ import { WorkflowsGetRequest, WorkflowsGetResponse, } from './v2/WorkflowsGet'; -import { dagsGetQuery, DagsGetRequest, DagsGetResponse } from './v2/DagsGet'; const { createApi, fetchBaseQuery } = ((rtkQueryRaw as any).default ?? rtkQueryRaw) as typeof rtkQueryRaw; diff --git a/src/ui/common/src/handlers/v2/DagsGet.ts b/src/ui/common/src/handlers/v2/DagsGet.ts index ab66c2d87..6d6901fba 100644 --- a/src/ui/common/src/handlers/v2/DagsGet.ts +++ b/src/ui/common/src/handlers/v2/DagsGet.ts @@ -5,11 +5,11 @@ import { APIKeyParameter } from '../parameters/Header'; import { WorkflowIdParameter } from '../parameters/Path'; import { DagResponse } from '../responses/workflow'; -export type DagsGetRequest = APIKeyParameter & WorkflowIdParameter +export type DagsGetRequest = APIKeyParameter & WorkflowIdParameter; export type DagsGetResponse = DagResponse[]; export const dagsGetQuery = (req: DagsGetRequest) => ({ url: `workflow/${req.workflowId}/dags`, headers: { 'api-key': req.apiKey }, -}); \ No newline at end of file +}); From ec9437968827c4c68a7ea75f6e7657234bfc5f05 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Wed, 17 May 2023 13:28:38 -0700 Subject: [PATCH 3/7] comments --- integration_tests/backend/test_reads.py | 10 +++++++--- sdk/aqueduct/models/response_models.py | 9 ++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index c1a9f6993..5e5140d41 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -6,8 +6,10 @@ import pytest import requests import utils +from aqueduct.constants.enums import RuntimeType from aqueduct.models.response_models import ( GetArtifactResultResponse, + GetDagResponse, GetDagResultResponse, GetNodeArtifactResponse, GetNodeOperatorResponse, @@ -354,9 +356,11 @@ def test_endpoint_workflow_dags_get(self): resp = resp.json() assert len(resp) == 2 - assert "id" in resp[0] - assert resp[0]["workflow_id"] == str(flow_id) - assert "created_at" in resp[0] + for dag_dict in resp: + dag = GetDagResponse(**dag_dict) + assert dag.workflow_id == flow_id + assert dag.created_at != "" + assert dag.engine_config.type == RuntimeType.AQUEDUCT def test_endpoint_dag_results_get(self): flow_id, n_runs = self.flows["flow_with_metrics_and_checks"] diff --git a/sdk/aqueduct/models/response_models.py b/sdk/aqueduct/models/response_models.py index 0a5d6d464..36aff2143 100644 --- a/sdk/aqueduct/models/response_models.py +++ b/sdk/aqueduct/models/response_models.py @@ -9,7 +9,7 @@ SerializationType, ) from aqueduct.models.artifact import ArtifactMetadata -from aqueduct.models.dag import Metadata, RetentionPolicy, Schedule +from aqueduct.models.dag import EngineConfig, Metadata, RetentionPolicy, Schedule from aqueduct.models.execution_state import ExecutionState from aqueduct.models.operators import LoadSpec, Operator, OperatorSpec from aqueduct.models.utils import human_readable_timestamp @@ -23,6 +23,13 @@ class ArtifactResult(BaseModel): # V2 Responses +class GetDagResponse(BaseModel): + id: uuid.UUID + workflow_id: uuid.UUID + created_at: str + engine_config: EngineConfig + + class GetDagResultResponse(BaseModel): """Represents the result of a single workflow run. From ad3b740870be2c0a30b0b7aee24605a67cf3237b Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 18 May 2023 13:23:15 -0700 Subject: [PATCH 4/7] Implement RTK mutations for workflow trigger / edit endpoints (#1332) --- integration_tests/backend/test_reads.py | 243 ++++++++++-------- sdk/aqueduct/models/response_models.py | 5 + .../v2/node_artifact_result_content_get.go | 56 +--- .../v2/node_check_result_content_get.go | 6 +- .../v2/node_metric_result_content_get.go | 6 +- .../workflow_delete.go} | 43 ++-- .../workflow_objects_get.go} | 30 ++- .../workflow_patch.go} | 33 ++- .../workflow_post.go} | 27 +- src/golang/cmd/server/routes/routes.go | 11 + src/golang/cmd/server/server/cron.go | 6 +- src/golang/cmd/server/server/handlers.go | 32 ++- src/ui/common/src/handlers/AqueductApi.ts | 52 ++++ .../common/src/handlers/v2/NodeArtifactGet.ts | 2 +- .../src/handlers/v2/NodeArtifactResultsGet.ts | 2 +- .../src/handlers/v2/NodeOperatorContentGet.ts | 2 +- .../common/src/handlers/v2/NodeOperatorGet.ts | 2 +- src/ui/common/src/handlers/v2/NodesGet.ts | 2 +- .../common/src/handlers/v2/NodesResultsGet.ts | 2 +- .../src/handlers/v2/WorkflowDeletePost.ts | 26 ++ .../src/handlers/v2/WorkflowEditPost.ts | 34 +++ src/ui/common/src/handlers/v2/WorkflowGet.ts | 2 +- .../src/handlers/v2/WorkflowObjectsGet.ts | 16 ++ .../src/handlers/v2/WorkflowTriggerPost.ts | 29 +++ 24 files changed, 432 insertions(+), 237 deletions(-) rename src/golang/cmd/server/handler/{delete_workflow.go => v2/workflow_delete.go} (93%) rename src/golang/cmd/server/handler/{list_workflow_objects.go => v2/workflow_objects_get.go} (92%) rename src/golang/cmd/server/handler/{edit_workflow.go => v2/workflow_patch.go} (88%) rename src/golang/cmd/server/handler/{refresh_workflow.go => v2/workflow_post.go} (81%) create mode 100644 src/ui/common/src/handlers/v2/WorkflowDeletePost.ts create mode 100644 src/ui/common/src/handlers/v2/WorkflowEditPost.ts create mode 100644 src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts create mode 100644 src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index 0744efad3..e912f95a3 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -13,6 +13,7 @@ GetDagResultResponse, GetNodeArtifactResponse, GetNodeOperatorResponse, + GetNodeResultContentResponse, GetOperatorResultResponse, GetOperatorWithArtifactNodeResponse, ) @@ -511,29 +512,38 @@ def test_endpoint_node_artifact_get(self): assert sum(all_output_counts) == len(all_output_counts) - 1 assert set(all_output_counts) == set([0, 1]) - # TODO: ENG-2943 Investigate output - # >> {"error":"Unexpected error reading DAG.\nQuery returned no rows."} - # def test_endpoint_node_artifact_result_content_get(self): - # flow_id, n_runs = self.flows["flow_with_multiple_operators"] - # flow = self.client.flow(flow_id) - # workflow_resp = flow._get_workflow_resp() - # dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - # dag_result_id = workflow_resp.workflow_dag_results[0].id - - # dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - # flow_id, - # dag_result_id, - # ) - # artifact_ids = list(dag_result_resp.artifacts.keys()) - # artifact_id = str(artifact_ids[0]) - - # resp = self.get_response(self.GET_NODE_ARTIFACT_RESULTS_TEMPLATE % (flow_id, dag_id, artifact_id)).json() - # downstream_ids = [GetArtifactResultResponse(**result).id for result in resp] - # for downstream_id in downstream_ids: - # artifact_result_id = str(downstream_id) - # resp = self.get_response(self.GET_NODE_ARTIFACT_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, artifact_id, artifact_result_id)).json() - # # One of these should be successful (direct descendent of operator) - # print(resp) + def test_endpoint_node_artifact_result_content_get(self): + for flow_id, _ in [ + self.flows["flow_with_metrics_and_checks"], + self.flows["flow_with_multiple_operators"], + ]: + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + artifact_ids = list(dag_result_resp.artifacts.keys()) + artifact_id = str(artifact_ids[0]) + + resp = self.get_response( + self.GET_NODE_ARTIFACT_RESULTS_TEMPLATE % (flow_id, dag_id, artifact_id) + ).json() + downstream_ids = [GetArtifactResultResponse(**result).id for result in resp] + for downstream_id in downstream_ids: + artifact_result_id = str(downstream_id) + resp = self.get_response( + self.GET_NODE_ARTIFACT_RESULT_CONTENT_TEMPLATE + % (flow_id, dag_id, artifact_id, artifact_result_id) + ) + assert resp.ok + resp_obj = GetNodeResultContentResponse(**resp.json()) + # One of these should be successful (direct descendent of operator) + assert not resp_obj.is_downsampled + assert len(resp_obj.content) > 0 def test_endpoint_node_artifact_results_get(self): for flow_id, _ in [ @@ -582,24 +592,25 @@ def test_endpoint_node_operator_get(self): assert str(result.id) == operator_id assert result.dag_id == dag_id - # TODO: ENG-2943 Investigate output - # >> {"error":"Unexpected error reading DAG.\nQuery returned no rows."} - # def test_endpoint_node_operator_content_get(self): - # flow_id, n_runs = self.flows["flow_with_multiple_operators"] - # flow = self.client.flow(flow_id) - # workflow_resp = flow._get_workflow_resp() - # dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - # dag_result_id = workflow_resp.workflow_dag_results[0].id - - # dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - # flow_id, - # dag_result_id, - # ) - # operator_ids = list(dag_result_resp.operators.keys()) - # operator_id = str(operator_ids[0]) - - # resp = self.get_response(self.GET_NODE_OPERATOR_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id)) - # print(resp.text) + def test_endpoint_node_operator_content_get(self): + flow_id, _ = self.flows["flow_with_multiple_operators"] + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + operator_ids = list(dag_result_resp.operators.keys()) + operator_id = str(operator_ids[0]) + + resp = self.get_response( + self.GET_NODE_OPERATOR_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id) + ) + # The response is a form data. For now, we simply check the response's code. + assert resp.ok def test_endpoint_node_metric_get(self): flow_id, _ = self.flows["flow_with_metrics_and_checks"] @@ -628,37 +639,50 @@ def test_endpoint_node_metric_get(self): assert len(result.inputs) == 1 assert len(result.outputs) == 1 - # TODO: ENG-2943 Investigate output - # >> {"error":"Unexpected error occurred when retrieving workflow dag.\nQuery returned no rows."} - # def test_endpoint_node_metric_result_content_get(self): - # flow_id, _ = self.flows["flow_with_metrics_and_checks"] - # flow = self.client.flow(flow_id) - # workflow_resp = flow._get_workflow_resp() - # dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - # dag_result_id = workflow_resp.workflow_dag_results[0].id - - # dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - # flow_id, - # dag_result_id, - # ) - # operator_ids = [id for id in dag_result_resp.operators.keys() if dag_result_resp.operators[id].spec.metric] - # operator_id = str(operator_ids[0]) - - # resp = self.get_response( - # self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id) - # ).json() - - # result = GetOperatorWithArtifactNodeResponse(**resp) - - # artifact_id = result.artifact_id - - # resp = self.get_response(self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)).json() - # results = resp["results"] - # # One of these should be correct for the DAG run and can get result content. - # for artifact_result in results: - # resp = self.get_response( - # self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id, artifact_result["id"]) - # ).json() + def test_endpoint_node_metric_result_content_get(self): + for flow_id, _ in [ + self.flows["flow_with_metrics_and_checks"], + self.flows["flow_with_multiple_operators"], + ]: + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + operator_ids = [ + id + for id in dag_result_resp.operators.keys() + if dag_result_resp.operators[id].spec.metric + ] + operator_id = str(operator_ids[0]) + + resp = self.get_response( + self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id) + ).json() + + result = GetOperatorWithArtifactNodeResponse(**resp) + + artifact_id = result.artifact_id + + resp = self.get_response( + self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) + ).json() + results = resp["results"] + # One of these should be correct for the DAG run and can get result content. + for artifact_result in results: + resp = self.get_response( + self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE + % (flow_id, dag_id, operator_id, artifact_result["id"]) + ) + assert resp.ok + resp_obj = GetNodeResultContentResponse(**resp.json()) + # One of these should be successful (direct descendent of operator) + assert not resp_obj.is_downsampled + assert len(resp_obj.content) > 0 def test_endpoint_node_check_get(self): flow_id, _ = self.flows["flow_with_metrics_and_checks"] @@ -687,34 +711,47 @@ def test_endpoint_node_check_get(self): assert len(result.inputs) == 1 assert len(result.outputs) == 0 - # TODO: ENG-2943 Investigate output - # >> {"error":"Unexpected error occurred when retrieving workflow dag.\nQuery returned no rows."} - # def test_endpoint_node_check_result_content_get(self): - # flow_id, _ = self.flows["flow_with_metrics_and_checks"] - # flow = self.client.flow(flow_id) - # workflow_resp = flow._get_workflow_resp() - # dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - # dag_result_id = workflow_resp.workflow_dag_results[0].id - - # dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - # flow_id, - # dag_result_id, - # ) - # operator_ids = [id for id in dag_result_resp.operators.keys() if dag_result_resp.operators[id].spec.check] - # operator_id = str(operator_ids[0]) - - # resp = self.get_response( - # self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id) - # ).json() - - # result = GetOperatorWithArtifactNodeResponse(**resp) - - # artifact_id = result.artifact_id - - # resp = self.get_response(self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)).json() - # results = resp["results"] - # # One of these should be correct for the DAG run and can get result content. - # for artifact_result in results: - # resp = self.get_response( - # self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id, artifact_result["id"]) - # ).json() + def test_endpoint_node_check_result_content_get(self): + for flow_id, _ in [ + self.flows["flow_with_metrics_and_checks"], + self.flows["flow_with_multiple_operators"], + ]: + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + operator_ids = [ + id + for id in dag_result_resp.operators.keys() + if dag_result_resp.operators[id].spec.check + ] + operator_id = str(operator_ids[0]) + + resp = self.get_response( + self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id) + ).json() + + result = GetOperatorWithArtifactNodeResponse(**resp) + + artifact_id = result.artifact_id + + resp = self.get_response( + self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) + ).json() + results = resp["results"] + # One of these should be correct for the DAG run and can get result content. + for artifact_result in results: + resp = self.get_response( + self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE + % (flow_id, dag_id, operator_id, artifact_result["id"]) + ) + assert resp.ok + resp_obj = GetNodeResultContentResponse(**resp.json()) + # One of these should be successful (direct descendent of operator) + assert not resp_obj.is_downsampled + assert len(resp_obj.content) > 0 diff --git a/sdk/aqueduct/models/response_models.py b/sdk/aqueduct/models/response_models.py index 12ea7d87d..46cc8e3d8 100644 --- a/sdk/aqueduct/models/response_models.py +++ b/sdk/aqueduct/models/response_models.py @@ -30,6 +30,11 @@ class GetDagResponse(BaseModel): engine_config: EngineConfig +class GetNodeResultContentResponse(BaseModel): + is_downsampled: bool + content: str + + class GetDagResultResponse(BaseModel): """Represents the result of a single workflow run. diff --git a/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go b/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go index e8c72bc06..c65e859bf 100644 --- a/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go +++ b/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go @@ -2,7 +2,6 @@ package v2 import ( "context" - "mime/multipart" "net/http" "github.com/aqueducthq/aqueduct/cmd/server/handler" @@ -17,11 +16,6 @@ import ( "github.com/google/uuid" ) -const ( - formIsDownsampledField = "is_downsampled" - formContentField = "content" -) - // This file should map directly to // src/ui/common/src/handlers/v2/NodeArtifactResultContentGet.tsx // @@ -106,9 +100,9 @@ func (h *NodeArtifactResultContentGetHandler) Perform(ctx context.Context, inter args := interfaceArgs.(*nodeResultGetArgs) emptyResp := &nodeResultGetResponse{} - dag, err := h.DAGRepo.GetByDAGResult( + dag, err := h.DAGRepo.Get( ctx, - args.nodeResultID, + args.dagID, h.Database, ) if err != nil { @@ -160,54 +154,8 @@ func (h *NodeArtifactResultContentGetHandler) Perform(ctx context.Context, inter return emptyResp, http.StatusOK, nil } - return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") - } else if !errors.Is(err, storage.ErrObjectDoesNotExist()) { return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") } return &nodeResultGetResponse{IsDownsampled: isDownsampled, Content: data}, http.StatusOK, nil } - -// This custom implementation of SendResponse constructs a multipart form response with two fields: -// 1: "metadata" contains a json serialized blob of artifact result metadata. -// 2: "data" contains the artifact result data blob generated the serialization method -// specified in the metadata field. -func (*NodeArtifactResultContentGetHandler) SendResponse(w http.ResponseWriter, interfaceResp interface{}) { - resp := interfaceResp.(*nodeResultGetResponse) - multipartWriter := multipart.NewWriter(w) - defer multipartWriter.Close() - - w.Header().Set("Content-Type", multipartWriter.FormDataContentType()) - - // The second argument is the file name, which is redundant but required by the UI to parse the file correctly. - formFieldWriter, err := multipartWriter.CreateFormFile(formIsDownsampledField, formIsDownsampledField) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if resp.IsDownsampled { - _, err = formFieldWriter.Write([]byte{1}) - } else { - _, err = formFieldWriter.Write([]byte{0}) - } - - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if len(resp.Content) > 0 { - formFieldWriter, err = multipartWriter.CreateFormFile(formContentField, formContentField) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = formFieldWriter.Write(resp.Content) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } -} diff --git a/src/golang/cmd/server/handler/v2/node_check_result_content_get.go b/src/golang/cmd/server/handler/v2/node_check_result_content_get.go index 2879f60bc..764aea447 100644 --- a/src/golang/cmd/server/handler/v2/node_check_result_content_get.go +++ b/src/golang/cmd/server/handler/v2/node_check_result_content_get.go @@ -88,9 +88,9 @@ func (h *NodeCheckResultContentGetHandler) Perform(ctx context.Context, interfac args := interfaceArgs.(*nodeResultGetArgs) emptyResp := &nodeResultGetResponse{} - dag, err := h.DAGRepo.GetByDAGResult( + dag, err := h.DAGRepo.Get( ctx, - args.nodeResultID, + args.dagID, h.Database, ) if err != nil { @@ -148,8 +148,6 @@ func (h *NodeCheckResultContentGetHandler) Perform(ctx context.Context, interfac return emptyResp, http.StatusOK, nil } - return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") - } else if !errors.Is(err, storage.ErrObjectDoesNotExist()) { return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") } diff --git a/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go b/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go index 7f7c9601d..f877d7111 100644 --- a/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go +++ b/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go @@ -88,9 +88,9 @@ func (h *NodeMetricResultContentGetHandler) Perform(ctx context.Context, interfa args := interfaceArgs.(*nodeResultGetArgs) emptyResp := &nodeResultGetResponse{} - dag, err := h.DAGRepo.GetByDAGResult( + dag, err := h.DAGRepo.Get( ctx, - args.nodeResultID, + args.dagID, h.Database, ) if err != nil { @@ -148,8 +148,6 @@ func (h *NodeMetricResultContentGetHandler) Perform(ctx context.Context, interfa return emptyResp, http.StatusOK, nil } - return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") - } else if !errors.Is(err, storage.ErrObjectDoesNotExist()) { return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.") } diff --git a/src/golang/cmd/server/handler/delete_workflow.go b/src/golang/cmd/server/handler/v2/workflow_delete.go similarity index 93% rename from src/golang/cmd/server/handler/delete_workflow.go rename to src/golang/cmd/server/handler/v2/workflow_delete.go index da71a13ae..28d805cf8 100644 --- a/src/golang/cmd/server/handler/delete_workflow.go +++ b/src/golang/cmd/server/handler/v2/workflow_delete.go @@ -1,4 +1,4 @@ -package handler +package v2 import ( "context" @@ -7,6 +7,7 @@ import ( "net/http" "time" + "github.com/aqueducthq/aqueduct/cmd/server/handler" "github.com/aqueducthq/aqueduct/cmd/server/routes" "github.com/aqueducthq/aqueduct/config" aq_context "github.com/aqueducthq/aqueduct/lib/context" @@ -37,7 +38,11 @@ type SavedObjectResult struct { Result shared.ExecutionState `json:"exec_state"` } -// Route: /workflow/{workflowId}/delete +// Route: +// +// v2/workflow/{workflowId}/delete +// workflow/{workflowId}/delete +// // Method: POST // Params: workflowId // Request: @@ -45,20 +50,20 @@ type SavedObjectResult struct { // Headers: // `api-key`: user's API Key // Body: -// json-serialized `deleteWorkflowInput` object. +// json-serialized `workflowDeleteInput` object. // -// Response: json-serialized `deleteWorkflowResponse` object. +// Response: json-serialized `workflowDeleteResponse` object. // -// The `DeleteWorkflowHandler` does a best effort at deleting a workflow and its dependencies, such as +// The `WorkflowDeleteHandler` does a best effort at deleting a workflow and its dependencies, such as // k8s resources, Postgres state, and output objects in the user's data warehouse. -type deleteWorkflowArgs struct { +type workflowDeleteArgs struct { *aq_context.AqContext WorkflowID uuid.UUID ExternalDelete map[string][]string Force bool } -type deleteWorkflowInput struct { +type workflowDeleteInput struct { // This is a map from integration_id to the serialized load spec we want to delete. ExternalDeleteLoadParams map[string][]string `json:"external_delete"` // `Force` serve as a safe-guard for client to confirm the deletion. @@ -67,14 +72,14 @@ type deleteWorkflowInput struct { Force bool `json:"force"` } -type deleteWorkflowResponse struct { +type workflowDeleteResponse struct { // This is a map from integration_id to a list of `SavedObjectResult` // implying if each object is successfully deleted. SavedObjectDeletionResults map[string][]SavedObjectResult `json:"saved_object_deletion_results"` } -type DeleteWorkflowHandler struct { - PostHandler +type WorkflowDeleteHandler struct { + handler.PostHandler Database database.Database Engine engine.Engine @@ -88,11 +93,11 @@ type DeleteWorkflowHandler struct { ArtifactResultRepo repos.ArtifactResult } -func (*DeleteWorkflowHandler) Name() string { - return "DeleteWorkflow" +func (*WorkflowDeleteHandler) Name() string { + return "WorkflowDelete" } -func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) { +func (h *WorkflowDeleteHandler) Prepare(r *http.Request) (interface{}, int, error) { aqContext, statuscode, err := aq_context.ParseAqContext(r.Context()) if err != nil { return nil, statuscode, err @@ -117,7 +122,7 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro return nil, http.StatusBadRequest, errors.New("The organization does not own this workflow.") } - var input deleteWorkflowInput + var input workflowDeleteInput err = json.NewDecoder(r.Body).Decode(&input) if err != nil { return nil, http.StatusBadRequest, errors.Wrap(err, "Unable to parse JSON input.") @@ -143,7 +148,7 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro } } - return &deleteWorkflowArgs{ + return &workflowDeleteArgs{ AqContext: aqContext, WorkflowID: workflowID, ExternalDelete: externalDelete, @@ -151,10 +156,10 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro }, http.StatusOK, nil } -func (h *DeleteWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { - args := interfaceArgs.(*deleteWorkflowArgs) +func (h *WorkflowDeleteHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*workflowDeleteArgs) - resp := deleteWorkflowResponse{} + resp := workflowDeleteResponse{} resp.SavedObjectDeletionResults = map[string][]SavedObjectResult{} nameToID := make(map[string]uuid.UUID, len(args.ExternalDelete)) @@ -285,7 +290,7 @@ func (h *DeleteWorkflowHandler) Perform(ctx context.Context, interfaceArgs inter func DeleteSavedObject( ctx context.Context, - args *deleteWorkflowArgs, + args *workflowDeleteArgs, integrationNameToID map[string]uuid.UUID, vaultObject vault.Vault, storageConfig *shared.StorageConfig, diff --git a/src/golang/cmd/server/handler/list_workflow_objects.go b/src/golang/cmd/server/handler/v2/workflow_objects_get.go similarity index 92% rename from src/golang/cmd/server/handler/list_workflow_objects.go rename to src/golang/cmd/server/handler/v2/workflow_objects_get.go index 23dfa5e21..af5e017a6 100644 --- a/src/golang/cmd/server/handler/list_workflow_objects.go +++ b/src/golang/cmd/server/handler/v2/workflow_objects_get.go @@ -1,9 +1,10 @@ -package handler +package v2 import ( "context" "net/http" + "github.com/aqueducthq/aqueduct/cmd/server/handler" "github.com/aqueducthq/aqueduct/cmd/server/routes" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" @@ -20,7 +21,8 @@ import ( log "github.com/sirupsen/logrus" ) -// Route: /workflow/{workflowId}/objects +// Route: +// v2/workflow/{workflowId}/objects // Method: GET // Params: // `workflowId`: ID for `workflow` object @@ -31,17 +33,17 @@ import ( // Body: // all objects written by `workflowId` -type ListWorkflowObjectsArgs struct { +type WorkflowObjectsGetArgs struct { *aq_context.AqContext workflowId uuid.UUID } -type ListWorkflowObjectsResponse struct { +type WorkflowObjectsGetResponse struct { LoadDetails []views.LoadOperator `json:"object_details"` } -type ListWorkflowObjectsHandler struct { - GetHandler +type WorkflowObjectsGetHandler struct { + handler.GetHandler Database database.Database @@ -51,11 +53,11 @@ type ListWorkflowObjectsHandler struct { ArtifactResultRepo repos.ArtifactResult } -func (*ListWorkflowObjectsHandler) Name() string { - return "ListWorkflowObjects" +func (*WorkflowObjectsGetHandler) Name() string { + return "WorkflowObjectsGet" } -func (h *ListWorkflowObjectsHandler) Prepare(r *http.Request) (interface{}, int, error) { +func (h *WorkflowObjectsGetHandler) Prepare(r *http.Request) (interface{}, int, error) { aqContext, statusCode, err := aq_context.ParseAqContext(r.Context()) if err != nil { return nil, statusCode, err @@ -80,15 +82,15 @@ func (h *ListWorkflowObjectsHandler) Prepare(r *http.Request) (interface{}, int, return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.") } - return &ListWorkflowObjectsArgs{ + return &WorkflowObjectsGetArgs{ AqContext: aqContext, workflowId: workflowID, }, http.StatusOK, nil } -func (h *ListWorkflowObjectsHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { - args := interfaceArgs.(*ListWorkflowObjectsArgs) - emptyResp := ListWorkflowObjectsResponse{} +func (h *WorkflowObjectsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*WorkflowObjectsGetArgs) + emptyResp := WorkflowObjectsGetResponse{} saveOpList, err := GetDistinctLoadOpsByWorkflow( ctx, @@ -102,7 +104,7 @@ func (h *ListWorkflowObjectsHandler) Perform(ctx context.Context, interfaceArgs return emptyResp, http.StatusInternalServerError, err } - return ListWorkflowObjectsResponse{ + return WorkflowObjectsGetResponse{ LoadDetails: saveOpList, }, http.StatusOK, nil } diff --git a/src/golang/cmd/server/handler/edit_workflow.go b/src/golang/cmd/server/handler/v2/workflow_patch.go similarity index 88% rename from src/golang/cmd/server/handler/edit_workflow.go rename to src/golang/cmd/server/handler/v2/workflow_patch.go index 7b3714d43..33fd7ab85 100644 --- a/src/golang/cmd/server/handler/edit_workflow.go +++ b/src/golang/cmd/server/handler/v2/workflow_patch.go @@ -1,10 +1,11 @@ -package handler +package v2 import ( "context" "encoding/json" "net/http" + "github.com/aqueducthq/aqueduct/cmd/server/handler" "github.com/aqueducthq/aqueduct/cmd/server/routes" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" @@ -18,7 +19,11 @@ import ( "github.com/google/uuid" ) -// Route: /workflow/{workflowId}/edit +// Route: +// +// /workflow/{workflowId}/edit +// v2/workflow/{workflowId}/edit +// // Method: POST // Params: workflowId // Request: @@ -26,11 +31,11 @@ import ( // Headers: // `api-key`: user's API Key // Body: -// serialized `editWorkflowInput` object. +// serialized `workflowPatchInput` object. // // Response: none -type EditWorkflowHandler struct { - PostHandler +type WorkflowPatchHandler struct { + handler.PostHandler Database database.Database Engine engine.Engine @@ -42,7 +47,7 @@ type EditWorkflowHandler struct { WorkflowRepo repos.Workflow } -type editWorkflowInput struct { +type workflowPatchInput struct { WorkflowName string `json:"name"` WorkflowDescription string `json:"description"` Schedule *shared.Schedule `json:"schedule"` @@ -50,7 +55,7 @@ type editWorkflowInput struct { NotificationSettings *shared.NotificationSettings `json:"notification_settings"` } -type editWorkflowArgs struct { +type workflowPatchArgs struct { workflowId uuid.UUID workflowName string workflowDescription string @@ -59,11 +64,11 @@ type editWorkflowArgs struct { notificationSettings *shared.NotificationSettings } -func (*EditWorkflowHandler) Name() string { - return "EditWorkflow" +func (*WorkflowPatchHandler) Name() string { + return "WorkflowPatch" } -func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) { +func (h *WorkflowPatchHandler) Prepare(r *http.Request) (interface{}, int, error) { aqContext, statusCode, err := aq_context.ParseAqContext(r.Context()) if err != nil { return nil, statusCode, err @@ -92,7 +97,7 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.") } - var input editWorkflowInput + var input workflowPatchInput err = json.NewDecoder(r.Body).Decode(&input) if err != nil { return nil, http.StatusBadRequest, errors.New("Unable to parse JSON input.") @@ -117,7 +122,7 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) return nil, http.StatusBadRequest, errors.New("Edit request issued without any updates specified.") } - return &editWorkflowArgs{ + return &workflowPatchArgs{ workflowId: workflowID, workflowName: input.WorkflowName, workflowDescription: input.WorkflowDescription, @@ -127,8 +132,8 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) }, http.StatusOK, nil } -func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { - args := interfaceArgs.(*editWorkflowArgs) +func (h *WorkflowPatchHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*workflowPatchArgs) txn, err := h.Database.BeginTx(ctx) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/cmd/server/handler/refresh_workflow.go b/src/golang/cmd/server/handler/v2/workflow_post.go similarity index 81% rename from src/golang/cmd/server/handler/refresh_workflow.go rename to src/golang/cmd/server/handler/v2/workflow_post.go index c3a7e7029..2d6c8f9fe 100644 --- a/src/golang/cmd/server/handler/refresh_workflow.go +++ b/src/golang/cmd/server/handler/v2/workflow_post.go @@ -1,9 +1,10 @@ -package handler +package v2 import ( "context" "net/http" + "github.com/aqueducthq/aqueduct/cmd/server/handler" "github.com/aqueducthq/aqueduct/cmd/server/request" "github.com/aqueducthq/aqueduct/cmd/server/routes" aq_context "github.com/aqueducthq/aqueduct/lib/context" @@ -17,12 +18,16 @@ import ( "github.com/google/uuid" ) -type RefreshWorkflowArgs struct { +type WorkflowPostArgs struct { WorkflowId uuid.UUID Parameters map[string]param.Param } -// Route: /workflow/{workflowId}/refresh +// Route: +// +// workflow/{workflowId}/refresh +// v2/workflow/{workflowId}/refresh +// // Method: POST // Params: workflowId // Request: @@ -34,8 +39,8 @@ type RefreshWorkflowArgs struct { // // Refresh workflow creates a new workflow version by // triggering running a workflow run. -type RefreshWorkflowHandler struct { - PostHandler +type WorkflowPostHandler struct { + handler.PostHandler Database database.Database Engine engine.Engine @@ -43,11 +48,11 @@ type RefreshWorkflowHandler struct { WorkflowRepo repos.Workflow } -func (*RefreshWorkflowHandler) Name() string { - return "RefreshWorkflow" +func (*WorkflowPostHandler) Name() string { + return "WorkflowPost" } -func (h *RefreshWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) { +func (h *WorkflowPostHandler) Prepare(r *http.Request) (interface{}, int, error) { aqContext, statusCode, err := aq_context.ParseAqContext(r.Context()) if err != nil { return nil, statusCode, err @@ -81,14 +86,14 @@ func (h *RefreshWorkflowHandler) Prepare(r *http.Request) (interface{}, int, err return nil, http.StatusBadRequest, errors.Wrap(err, "The user-defined parameters could not be extracted in current format.") } - return &RefreshWorkflowArgs{ + return &WorkflowPostArgs{ WorkflowId: workflowID, Parameters: parameters, }, http.StatusOK, nil } -func (h *RefreshWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { - args := interfaceArgs.(*RefreshWorkflowArgs) +func (h *WorkflowPostHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*WorkflowPostArgs) emptyResp := struct{}{} diff --git a/src/golang/cmd/server/routes/routes.go b/src/golang/cmd/server/routes/routes.go index ba754d0d2..73f097e18 100644 --- a/src/golang/cmd/server/routes/routes.go +++ b/src/golang/cmd/server/routes/routes.go @@ -9,6 +9,7 @@ const ( ListStorageMigrationRoute = "/api/v2/storage-migrations" WorkflowsRoute = "/api/v2/workflows" WorkflowRoute = "/api/v2/workflow/{workflowID}" + WorkflowObjectsRoute = "/api/v2/workflow/{workflowID}/objects" DAGsRoute = "/api/v2/workflow/{workflowID}/dags" DAGRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}" DAGResultsRoute = "/api/v2/workflow/{workflowID}/results" @@ -26,6 +27,16 @@ const ( NodeOperatorContentRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/operator/{nodeID}/content" NodesResultsRoute = "/api/v2/workflow/{workflowID}/result/{dagResultID}/nodes/results" + // V2 hacky routes + // These routes are supposed to be `v2/workflow/{workflowId}` + // with PATCH (edit) / POST (trigger) / DELETE method. + // However, it requires significant refactor of handler interfaces as we assumed + // routes is unique per handler. + // For now, we simply use the same handler for this route and v1 workflow edit route. + WorkflowEditPostRoute = "/api/v2/workflow/{workflowId}/edit" + WorkflowTriggerPostRoute = "/api/v2/workflow/{workflowId}/trigger" + WorkflowDeletePostRoute = "/api/v2/workflow/{workflowId}/delete" + // V1 routes GetArtifactVersionsRoute = "/api/artifact/versions" GetArtifactResultRoute = "/api/artifact/{workflowDagResultId}/{artifactId}/result" diff --git a/src/golang/cmd/server/server/cron.go b/src/golang/cmd/server/server/cron.go index caf9f8a62..361613d14 100644 --- a/src/golang/cmd/server/server/cron.go +++ b/src/golang/cmd/server/server/cron.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/aqueducthq/aqueduct/cmd/server/handler" + v2 "github.com/aqueducthq/aqueduct/cmd/server/handler/v2" "github.com/aqueducthq/aqueduct/config" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/engine" @@ -40,14 +40,14 @@ func (s *AqServer) triggerMissedCronJobs( if lastExpectedTriggerTime > referenceTime.Unix() { // This means that the workflow should have been triggered, but it wasn't. // So we manually trigger the workflow here. - _, _, err := (&handler.RefreshWorkflowHandler{ + _, _, err := (&v2.WorkflowPostHandler{ Database: s.Database, Engine: s.AqEngine, WorkflowRepo: s.WorkflowRepo, }).Perform( ctx, - &handler.RefreshWorkflowArgs{ + &v2.WorkflowPostArgs{ WorkflowId: workflowId, }, ) diff --git a/src/golang/cmd/server/server/handlers.go b/src/golang/cmd/server/server/handlers.go index 85626cbb6..57d1278a1 100644 --- a/src/golang/cmd/server/server/handlers.go +++ b/src/golang/cmd/server/server/handlers.go @@ -169,7 +169,7 @@ func (s *AqServer) Handlers() map[string]handler.Handler { StorageMigrationRepo: s.StorageMigrationRepo, WorkflowRepo: s.WorkflowRepo, }, - routes.DeleteWorkflowRoute: &handler.DeleteWorkflowHandler{ + routes.DeleteWorkflowRoute: &v2.WorkflowDeleteHandler{ Database: s.Database, Engine: s.AqEngine, JobManager: s.JobManager, @@ -187,7 +187,17 @@ func (s *AqServer) Handlers() map[string]handler.Handler { IntegrationRepo: s.IntegrationRepo, }, - routes.EditWorkflowRoute: &handler.EditWorkflowHandler{ + routes.EditWorkflowRoute: &v2.WorkflowPatchHandler{ + Database: s.Database, + Engine: s.AqEngine, + + ArtifactRepo: s.ArtifactRepo, + DAGRepo: s.DAGRepo, + DAGEdgeRepo: s.DAGEdgeRepo, + OperatorRepo: s.OperatorRepo, + WorkflowRepo: s.WorkflowRepo, + }, + routes.WorkflowEditPostRoute: &v2.WorkflowPatchHandler{ Database: s.Database, Engine: s.AqEngine, @@ -248,7 +258,15 @@ func (s *AqServer) Handlers() map[string]handler.Handler { OperatorResultRepo: s.OperatorResultRepo, }, routes.GetUserProfileRoute: &handler.GetUserProfileHandler{}, - routes.ListWorkflowObjectsRoute: &handler.ListWorkflowObjectsHandler{ + routes.ListWorkflowObjectsRoute: &v2.WorkflowObjectsGetHandler{ + Database: s.Database, + + OperatorRepo: s.OperatorRepo, + WorkflowRepo: s.WorkflowRepo, + WorkflowDagRepo: s.DAGRepo, + ArtifactResultRepo: s.ArtifactResultRepo, + }, + routes.WorkflowObjectsRoute: &v2.WorkflowObjectsGetHandler{ Database: s.Database, OperatorRepo: s.OperatorRepo, @@ -380,7 +398,13 @@ func (s *AqServer) Handlers() map[string]handler.Handler { IntegrationRepo: s.IntegrationRepo, }, - routes.RefreshWorkflowRoute: &handler.RefreshWorkflowHandler{ + routes.RefreshWorkflowRoute: &v2.WorkflowPostHandler{ + Database: s.Database, + Engine: s.AqEngine, + + WorkflowRepo: s.WorkflowRepo, + }, + routes.WorkflowTriggerPostRoute: &v2.WorkflowPostHandler{ Database: s.Database, Engine: s.AqEngine, diff --git a/src/ui/common/src/handlers/AqueductApi.ts b/src/ui/common/src/handlers/AqueductApi.ts index fa9c7a97f..e9e902f85 100644 --- a/src/ui/common/src/handlers/AqueductApi.ts +++ b/src/ui/common/src/handlers/AqueductApi.ts @@ -94,16 +94,36 @@ import { NodesResultsGetRequest, NodesResultsGetResponse, } from './v2/NodesResultsGet'; +import { + workflowDeletePostQuery, + WorkflowDeletePostRequest, + WorkflowDeletePostResponse, +} from './v2/WorkflowDeletePost'; +import { + workflowEditPostQuery, + WorkflowEditPostRequest, + WorkflowEditPostResponse, +} from './v2/WorkflowEditPost'; import { workflowGetQuery, WorkflowGetRequest, WorkflowGetResponse, } from './v2/WorkflowGet'; +import { + workflowObjectsGetQuery, + WorkflowObjectsGetRequest, + WorkflowObjectsGetResponse, +} from './v2/WorkflowObjectsGet'; import { workflowsGetQuery, WorkflowsGetRequest, WorkflowsGetResponse, } from './v2/WorkflowsGet'; +import { + workflowTriggerPostQuery, + WorkflowTriggerPostRequest, + WorkflowTriggerPostResponse, +} from './v2/WorkflowTriggerPost'; const { createApi, fetchBaseQuery } = ((rtkQueryRaw as any).default ?? rtkQueryRaw) as typeof rtkQueryRaw; @@ -235,6 +255,34 @@ export const aqueductApi = createApi({ query: (req) => storageMigrationListQuery(req), transformErrorResponse, }), + workflowDeletePost: builder.mutation< + WorkflowDeletePostResponse, + WorkflowDeletePostRequest + >({ + query: (req) => workflowDeletePostQuery(req), + transformErrorResponse: transformErrorResponse, + }), + workflowEditPost: builder.mutation< + WorkflowEditPostResponse, + WorkflowEditPostRequest + >({ + query: (req) => workflowEditPostQuery(req), + transformErrorResponse: transformErrorResponse, + }), + workflowTriggerPost: builder.mutation< + WorkflowTriggerPostResponse, + WorkflowTriggerPostRequest + >({ + query: (req) => workflowTriggerPostQuery(req), + transformErrorResponse: transformErrorResponse, + }), + workflowObjectsGet: builder.query< + WorkflowObjectsGetResponse, + WorkflowObjectsGetRequest + >({ + query: (req) => workflowObjectsGetQuery(req), + transformErrorResponse, + }), workflowsGet: builder.query({ query: (req) => workflowsGetQuery(req), transformErrorResponse: transformErrorResponse, @@ -268,5 +316,9 @@ export const { useNodesGetQuery, useNodesResultsGetQuery, useWorkflowGetQuery, + useWorkflowObjectsGetQuery, useWorkflowsGetQuery, + useWorkflowDeletePostMutation, + useWorkflowEditPostMutation, + useWorkflowTriggerPostMutation, } = aqueductApi; diff --git a/src/ui/common/src/handlers/v2/NodeArtifactGet.ts b/src/ui/common/src/handlers/v2/NodeArtifactGet.ts index 206c9dbf2..97c65fd47 100644 --- a/src/ui/common/src/handlers/v2/NodeArtifactGet.ts +++ b/src/ui/common/src/handlers/v2/NodeArtifactGet.ts @@ -7,7 +7,7 @@ import { NodeIdParameter, WorkflowIdParameter, } from '../parameters/Path'; -import { ArtifactResponse } from '../responses/Node'; +import { ArtifactResponse } from '../responses/node'; export type NodeArtifactGetRequest = APIKeyParameter & DagIdParameter & diff --git a/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts b/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts index bde579895..0c24d9631 100644 --- a/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts +++ b/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts @@ -7,7 +7,7 @@ import { NodeIdParameter, WorkflowIdParameter, } from '../parameters/Path'; -import { ArtifactResultResponse } from '../responses/Node'; +import { ArtifactResultResponse } from '../responses/node'; export type NodeArtifactResultsGetRequest = APIKeyParameter & DagIdParameter & diff --git a/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts b/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts index a3e3e7f20..118f35528 100644 --- a/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts +++ b/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts @@ -7,7 +7,7 @@ import { NodeIdParameter, WorkflowIdParameter, } from '../parameters/Path'; -import { NodeContentResponse } from '../responses/Node'; +import { NodeContentResponse } from '../responses/node'; export type NodeOperatorContentGetRequest = APIKeyParameter & DagIdParameter & diff --git a/src/ui/common/src/handlers/v2/NodeOperatorGet.ts b/src/ui/common/src/handlers/v2/NodeOperatorGet.ts index 783778a67..852392ff0 100644 --- a/src/ui/common/src/handlers/v2/NodeOperatorGet.ts +++ b/src/ui/common/src/handlers/v2/NodeOperatorGet.ts @@ -7,7 +7,7 @@ import { NodeIdParameter, WorkflowIdParameter, } from '../parameters/Path'; -import { OperatorResponse } from '../responses/Node'; +import { OperatorResponse } from '../responses/node'; export type NodeOperatorGetRequest = APIKeyParameter & DagIdParameter & diff --git a/src/ui/common/src/handlers/v2/NodesGet.ts b/src/ui/common/src/handlers/v2/NodesGet.ts index ddbe9e179..7f5c8ae60 100644 --- a/src/ui/common/src/handlers/v2/NodesGet.ts +++ b/src/ui/common/src/handlers/v2/NodesGet.ts @@ -3,7 +3,7 @@ import { APIKeyParameter } from '../parameters/Header'; import { DagIdParameter, WorkflowIdParameter } from '../parameters/Path'; -import { NodesResponse } from '../responses/Node'; +import { NodesResponse } from '../responses/node'; export type NodesGetRequest = APIKeyParameter & DagIdParameter & diff --git a/src/ui/common/src/handlers/v2/NodesResultsGet.ts b/src/ui/common/src/handlers/v2/NodesResultsGet.ts index 9e5cd327f..fd551534f 100644 --- a/src/ui/common/src/handlers/v2/NodesResultsGet.ts +++ b/src/ui/common/src/handlers/v2/NodesResultsGet.ts @@ -3,7 +3,7 @@ import { APIKeyParameter } from '../parameters/Header'; import { DagResultIdParameter, WorkflowIdParameter } from '../parameters/Path'; -import { NodeResultsResponse } from '../responses/Node'; +import { NodeResultsResponse } from '../responses/node'; export type NodesResultsGetRequest = APIKeyParameter & DagResultIdParameter & diff --git a/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts b/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts new file mode 100644 index 000000000..565e30305 --- /dev/null +++ b/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts @@ -0,0 +1,26 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/workflow_delete.go + +import { SavedObjectDeletion } from '../../utils/workflows'; +import { APIKeyParameter } from '../parameters/Header'; +import { WorkflowIdParameter } from '../parameters/Path'; + +export type WorkflowDeletePostRequest = APIKeyParameter & + WorkflowIdParameter & { + external_delete: { [integration_id: string]: string[] }; + force: boolean; + }; + +export type WorkflowDeletePostResponse = { + [id: string]: SavedObjectDeletion; +}; + +export const workflowDeletePostQuery = (req: WorkflowDeletePostRequest) => ({ + url: `workflow/${req.workflowId}/delete`, + method: 'POST', + headers: { 'api-key': req.apiKey }, + body: { + external_delete: req.external_delete, + force: req.force, + }, +}); diff --git a/src/ui/common/src/handlers/v2/WorkflowEditPost.ts b/src/ui/common/src/handlers/v2/WorkflowEditPost.ts new file mode 100644 index 000000000..6f4ee5522 --- /dev/null +++ b/src/ui/common/src/handlers/v2/WorkflowEditPost.ts @@ -0,0 +1,34 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/workflow_patch.go + +import { + NotificationSettings, + RetentionPolicy, + WorkflowSchedule, +} from '../../utils/workflows'; +import { APIKeyParameter } from '../parameters/Header'; +import { WorkflowIdParameter } from '../parameters/Path'; + +export type WorkflowEditPostRequest = APIKeyParameter & + WorkflowIdParameter & { + name: string; + description: string; + schedule: WorkflowSchedule; + retention_policy: RetentionPolicy; + notification_settings: NotificationSettings; + }; + +export type WorkflowEditPostResponse = Record; + +export const workflowEditPostQuery = (req: WorkflowEditPostRequest) => ({ + url: `workflow/${req.workflowId}/edit`, + method: 'POST', + headers: { 'api-key': req.apiKey }, + body: { + name: req.name, + description: req.description, + schedule: req.schedule, + retention_policy: req.retention_policy, + notification_settings: req.notification_settings, + }, +}); diff --git a/src/ui/common/src/handlers/v2/WorkflowGet.ts b/src/ui/common/src/handlers/v2/WorkflowGet.ts index 45e32f7be..bd81870a5 100644 --- a/src/ui/common/src/handlers/v2/WorkflowGet.ts +++ b/src/ui/common/src/handlers/v2/WorkflowGet.ts @@ -2,7 +2,7 @@ // src/golang/cmd/server/handler/v2/workflow_get.go import { APIKeyParameter } from '../parameters/Header'; import { WorkflowIdParameter } from '../parameters/Path'; -import { WorkflowResponse } from '../responses/Workflow'; +import { WorkflowResponse } from '../responses/workflow'; export type WorkflowGetRequest = APIKeyParameter & WorkflowIdParameter; diff --git a/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts new file mode 100644 index 000000000..b8798581a --- /dev/null +++ b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts @@ -0,0 +1,16 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/workflow_objects_get.go + +import { SavedObject } from 'src/utils/workflows'; + +import { APIKeyParameter } from '../parameters/Header'; +import { WorkflowIdParameter } from '../parameters/Path'; + +export type WorkflowObjectsGetRequest = APIKeyParameter & WorkflowIdParameter; + +export type WorkflowObjectsGetResponse = { [id: string]: SavedObject }; + +export const workflowObjectsGetQuery = (req: WorkflowObjectsGetRequest) => ({ + url: `workflow/${req.workflowId}/objects`, + headers: { 'api-key': req.apiKey }, +}); diff --git a/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts b/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts new file mode 100644 index 000000000..47242e3b9 --- /dev/null +++ b/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts @@ -0,0 +1,29 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/workflow_post.go + +import { APIKeyParameter } from '../parameters/Header'; +import { WorkflowIdParameter } from '../parameters/Path'; + +export type WorkflowTriggerPostRequest = APIKeyParameter & + WorkflowIdParameter & { + serializedParams: string; + }; + +export type WorkflowTriggerPostResponse = Record; + +export const workflowTriggerPostQuery = (req: WorkflowTriggerPostRequest) => { + const parameters = new FormData(); + parameters.append('parameters', req.serializedParams); + return { + url: `workflow/${req.workflowId}/edit`, + method: 'POST', + // avoid built-in content-type override + // ref: https://github.com/reduxjs/redux-toolkit/issues/2287 + prepareHeaders: (headers) => { + headers.set('api-key', req.apiKey); + headers.set('Content-Type', 'multipart/form-data'); + return headers; + }, + body: parameters, + }; +}; From 27796b6aec60654786aaf426d869d06a80663a93 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 18 May 2023 15:01:16 -0700 Subject: [PATCH 5/7] tests --- integration_tests/backend/test_reads.py | 142 ++++++++++++------------ 1 file changed, 68 insertions(+), 74 deletions(-) diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index e912f95a3..71b1d5548 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -640,49 +640,46 @@ def test_endpoint_node_metric_get(self): assert len(result.outputs) == 1 def test_endpoint_node_metric_result_content_get(self): - for flow_id, _ in [ - self.flows["flow_with_metrics_and_checks"], - self.flows["flow_with_multiple_operators"], - ]: - flow = self.client.flow(flow_id) - workflow_resp = flow._get_workflow_resp() - dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - dag_result_id = workflow_resp.workflow_dag_results[0].id + flow_id, _ = self.flows["flow_with_metrics_and_checks"] + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id - dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - flow_id, - dag_result_id, - ) - operator_ids = [ - id - for id in dag_result_resp.operators.keys() - if dag_result_resp.operators[id].spec.metric - ] - operator_id = str(operator_ids[0]) + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + operator_ids = [ + id + for id in dag_result_resp.operators.keys() + if dag_result_resp.operators[id].spec.metric + ] + operator_id = str(operator_ids[0]) - resp = self.get_response( - self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id) - ).json() + resp = self.get_response( + self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id) + ).json() - result = GetOperatorWithArtifactNodeResponse(**resp) + result = GetOperatorWithArtifactNodeResponse(**resp) - artifact_id = result.artifact_id + artifact_id = result.artifact_id + resp = self.get_response( + self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) + ).json() + results = resp["results"] + # One of these should be correct for the DAG run and can get result content. + for artifact_result in results: resp = self.get_response( - self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) - ).json() - results = resp["results"] - # One of these should be correct for the DAG run and can get result content. - for artifact_result in results: - resp = self.get_response( - self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE - % (flow_id, dag_id, operator_id, artifact_result["id"]) - ) - assert resp.ok - resp_obj = GetNodeResultContentResponse(**resp.json()) - # One of these should be successful (direct descendent of operator) - assert not resp_obj.is_downsampled - assert len(resp_obj.content) > 0 + self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE + % (flow_id, dag_id, operator_id, artifact_result["id"]) + ) + assert resp.ok + resp_obj = GetNodeResultContentResponse(**resp.json()) + # One of these should be successful (direct descendent of operator) + assert not resp_obj.is_downsampled + assert len(resp_obj.content) > 0 def test_endpoint_node_check_get(self): flow_id, _ = self.flows["flow_with_metrics_and_checks"] @@ -712,46 +709,43 @@ def test_endpoint_node_check_get(self): assert len(result.outputs) == 0 def test_endpoint_node_check_result_content_get(self): - for flow_id, _ in [ - self.flows["flow_with_metrics_and_checks"], - self.flows["flow_with_multiple_operators"], - ]: - flow = self.client.flow(flow_id) - workflow_resp = flow._get_workflow_resp() - dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id - dag_result_id = workflow_resp.workflow_dag_results[0].id + flow_id, _ = self.flows["flow_with_metrics_and_checks"] + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id - dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( - flow_id, - dag_result_id, - ) - operator_ids = [ - id - for id in dag_result_resp.operators.keys() - if dag_result_resp.operators[id].spec.check - ] - operator_id = str(operator_ids[0]) + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + operator_ids = [ + id + for id in dag_result_resp.operators.keys() + if dag_result_resp.operators[id].spec.check + ] + operator_id = str(operator_ids[0]) - resp = self.get_response( - self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id) - ).json() + resp = self.get_response( + self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id) + ).json() - result = GetOperatorWithArtifactNodeResponse(**resp) + result = GetOperatorWithArtifactNodeResponse(**resp) - artifact_id = result.artifact_id + artifact_id = result.artifact_id + resp = self.get_response( + self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) + ).json() + results = resp["results"] + # One of these should be correct for the DAG run and can get result content. + for artifact_result in results: resp = self.get_response( - self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id) - ).json() - results = resp["results"] - # One of these should be correct for the DAG run and can get result content. - for artifact_result in results: - resp = self.get_response( - self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE - % (flow_id, dag_id, operator_id, artifact_result["id"]) - ) - assert resp.ok - resp_obj = GetNodeResultContentResponse(**resp.json()) - # One of these should be successful (direct descendent of operator) - assert not resp_obj.is_downsampled - assert len(resp_obj.content) > 0 + self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE + % (flow_id, dag_id, operator_id, artifact_result["id"]) + ) + assert resp.ok + resp_obj = GetNodeResultContentResponse(**resp.json()) + # One of these should be successful (direct descendent of operator) + assert not resp_obj.is_downsampled + assert len(resp_obj.content) > 0 From 5e730bd31fdc7e29412ff441106f0da05c6df1d4 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 18 May 2023 15:22:01 -0700 Subject: [PATCH 6/7] RTK --- src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts index b8798581a..7a240229c 100644 --- a/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts +++ b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts @@ -8,7 +8,7 @@ import { WorkflowIdParameter } from '../parameters/Path'; export type WorkflowObjectsGetRequest = APIKeyParameter & WorkflowIdParameter; -export type WorkflowObjectsGetResponse = { [id: string]: SavedObject }; +export type WorkflowObjectsGetResponse = { object_details: SavedObject[] }; export const workflowObjectsGetQuery = (req: WorkflowObjectsGetRequest) => ({ url: `workflow/${req.workflowId}/objects`, From c649d37678aa1aedec290675a29fc8966c1d73e6 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 18 May 2023 16:20:01 -0700 Subject: [PATCH 7/7] Implement node hooks (#1340) --- src/golang/lib/response/node.go | 13 ++++-- .../src/components/pages/workflow/id/hook.ts | 42 +++++++++++++++++++ src/ui/common/src/handlers/responses/node.ts | 12 ++++++ 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/golang/lib/response/node.go b/src/golang/lib/response/node.go index f0be463df..26289c6d3 100644 --- a/src/golang/lib/response/node.go +++ b/src/golang/lib/response/node.go @@ -120,6 +120,7 @@ func NewArtifactFromDBObject(dbArtifactNode *views.ArtifactNode) *Artifact { type ArtifactResult struct { // Contains only the `result`. It mostly mirrors 'artifact_result' schema. ID uuid.UUID `json:"id"` + ArtifactID uuid.UUID `json:"artifact_id"` SerializationType shared.ArtifactSerializationType `json:"serialization_type"` // If `ContentSerialized` is set, the content is small and we directly send @@ -140,6 +141,7 @@ func NewArtifactResultFromDBObject( ) *ArtifactResult { result := &ArtifactResult{ ID: dbArtifactResult.ID, + ArtifactID: dbArtifactResult.ArtifactID, SerializationType: dbArtifactResult.Metadata.SerializationType, ContentPath: dbArtifactResult.ContentPath, ContentSerialized: content, @@ -179,14 +181,19 @@ func NewOperatorFromDBObject(dbOperatorNode *views.OperatorNode) *Operator { type OperatorResult struct { // Contains only the `result`. It mostly mirrors 'operator_result' schema. - ID uuid.UUID `json:"id"` - ExecState *shared.ExecutionState `json:"exec_state"` + ID uuid.UUID `json:"id"` + OperatorID uuid.UUID `json:"operator_id"` + ExecState *shared.ExecutionState `json:"exec_state"` } func NewOperatorResultFromDBObject( dbOperatorResult *models.OperatorResult, ) *OperatorResult { - result := &OperatorResult{ID: dbOperatorResult.ID} + result := &OperatorResult{ + ID: dbOperatorResult.ID, + OperatorID: dbOperatorResult.OperatorID, + } + if !dbOperatorResult.ExecState.IsNull { // make a copy of execState's value execStateVal := dbOperatorResult.ExecState.ExecutionState diff --git a/src/ui/common/src/components/pages/workflow/id/hook.ts b/src/ui/common/src/components/pages/workflow/id/hook.ts index 096862d1c..de77130a8 100644 --- a/src/ui/common/src/components/pages/workflow/id/hook.ts +++ b/src/ui/common/src/components/pages/workflow/id/hook.ts @@ -3,8 +3,13 @@ import { useDispatch, useSelector } from 'react-redux'; import { useParams } from 'react-router-dom'; import { BreadcrumbLink } from '../../../../components/layouts/NavBar'; +import { + useNodesGetQuery, + useNodesResultsGetQuery, +} from '../../../../handlers/AqueductApi'; import { handleGetWorkflowDag } from '../../../../handlers/getWorkflowDag'; import { handleGetWorkflowDagResult } from '../../../../handlers/getWorkflowDagResult'; +import { NodeResultsMap, NodesMap } from '../../../../handlers/responses/node'; import { WorkflowDagResultWithLoadingStatus } from '../../../../reducers/workflowDagResults'; import { WorkflowDagWithLoadingStatus } from '../../../../reducers/workflowDags'; import { AppDispatch, RootState } from '../../../../stores/store'; @@ -107,3 +112,40 @@ export default function useWorkflow( workflowDagResultWithLoadingStatus, }; } + +export function useWorkflowNodes( + apiKey: string, + workflowId: string, + dagId: string | undefined +): NodesMap { + const { data: nodes } = useNodesGetQuery( + { apiKey, workflowId, dagId }, + { skip: !workflowId || !dagId } + ); + return { + operators: Object.fromEntries( + (nodes?.operators ?? []).map((op) => [op.id, op]) + ), + artifacts: Object.fromEntries( + (nodes?.artifacts ?? []).map((artf) => [artf.id, artf]) + ), + }; +} +export function useWorkflowNodesResults( + apiKey: string, + workflowId: string, + dagResultId: string | undefined +): NodeResultsMap { + const { data: nodeResults } = useNodesResultsGetQuery( + { apiKey, workflowId, dagResultId }, + { skip: !workflowId || !dagResultId } + ); + return { + operators: Object.fromEntries( + (nodeResults?.operators ?? []).map((op) => [op.operator_id, op]) + ), + artifacts: Object.fromEntries( + (nodeResults?.artifacts ?? []).map((artf) => [artf.artifact_id, artf]) + ), + }; +} diff --git a/src/ui/common/src/handlers/responses/node.ts b/src/ui/common/src/handlers/responses/node.ts index a217e39e7..065afd011 100644 --- a/src/ui/common/src/handlers/responses/node.ts +++ b/src/ui/common/src/handlers/responses/node.ts @@ -39,6 +39,7 @@ export type ArtifactResponse = { export type ArtifactResultResponse = { id: string; + artifact_id: string; serialization_type: SerializationType; content_path: string; content_serialized: string; @@ -57,6 +58,7 @@ export type OperatorResponse = { export type OperatorResultResponse = { id: string; + operator_id: string; exec_state?: ExecState; }; @@ -76,6 +78,16 @@ export type NodeResultsResponse = { // checks: OperatorWithArtifactNodeResultResponse[]; }; +export type NodesMap = { + operators: { [id: string]: OperatorResponse }; + artifacts: { [id: string]: ArtifactResponse }; +}; + +export type NodeResultsMap = { + operators: { [id: string]: OperatorResultResponse }; + artifacts: { [id: string]: ArtifactResultResponse }; +}; + export type NodeContentResponse = { name: string; data: string;