Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement workflow dags endpoint #1330

Merged
merged 10 commits into from
May 19, 2023
15 changes: 15 additions & 0 deletions integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import pytest
import requests
import utils
from aqueduct.constants.enums import RuntimeType
from aqueduct.models.response_models import (
GetArtifactResultResponse,
GetDagResponse,
GetDagResultResponse,
GetNodeArtifactResponse,
GetNodeOperatorResponse,
Expand All @@ -30,6 +32,7 @@ class TestBackend:
# V2
GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows"

GET_DAGS_TEMPLATE = "/api/v2/workflow/%s/dags"
GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"
GET_NODES_RESULTS_TEMPLATE = "/api/v2/workflow/%s/result/%s/nodes/results"

Expand Down Expand Up @@ -358,6 +361,18 @@ def test_endpoint_workflows_get(self):
assert key in v2_workflow
assert v2_workflow["user_id"] == user_id

def test_endpoint_workflow_dags_get(self):
flow_id, _ = self.flows["flow_with_metrics_and_checks"]
resp = self.get_response(self.GET_DAGS_TEMPLATE % flow_id)
resp = resp.json()

assert len(resp) == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check the format of the response, I cast to a Response class object. I think you can do that for this too. As bonus, it will be easier for me to integrate into the SDK if the Response classes are already written up.

for dag_dict in resp:
dag = GetDagResponse(**dag_dict)
assert dag.workflow_id == flow_id
assert dag.created_at != ""
assert dag.engine_config.type == RuntimeType.AQUEDUCT

def test_endpoint_dag_results_get(self):
flow_id, n_runs = self.flows["flow_with_metrics_and_checks"]
resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json()
Expand Down
9 changes: 8 additions & 1 deletion sdk/aqueduct/models/response_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
SerializationType,
)
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import Metadata, RetentionPolicy, Schedule
from aqueduct.models.dag import EngineConfig, Metadata, RetentionPolicy, Schedule
from aqueduct.models.execution_state import ExecutionState
from aqueduct.models.operators import LoadSpec, Operator, OperatorSpec
from aqueduct.models.utils import human_readable_timestamp
Expand All @@ -23,6 +23,13 @@ class ArtifactResult(BaseModel):


# V2 Responses
class GetDagResponse(BaseModel):
id: uuid.UUID
workflow_id: uuid.UUID
created_at: str
engine_config: EngineConfig


class GetDagResultResponse(BaseModel):
"""Represents the result of a single workflow run.

Expand Down
94 changes: 94 additions & 0 deletions src/golang/cmd/server/handler/v2/dags_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package v2

import (
"context"
"net/http"

"github.com/aqueducthq/aqueduct/cmd/server/handler"
"github.com/aqueducthq/aqueduct/cmd/server/request/parser"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/functional/slices"
"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/response"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)

// This file should map directly to
// src/ui/common/src/handlers/v2/DagsGet.tsx
//
// Route: /v2/workflow/{workflowId}/dags
// Method: GET
// Params:
// `workflowId`: ID for `workflow` object
// Request:
// Headers:
// `api-key`: user's API Key
//
// Response:
// Body:
// serialized `[]response.DAGResult`

type dagsGetArgs struct {
*aq_context.AqContext
workflowID uuid.UUID
}

type DAGsGetHandler struct {
handler.GetHandler

Database database.Database

WorkflowRepo repos.Workflow
DAGRepo repos.DAG
}

func (*DAGsGetHandler) Name() string {
return "DAGsGet"
}

func (h *DAGsGetHandler) Prepare(r *http.Request) (interface{}, int, error) {
aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
if err != nil {
return nil, statusCode, err
}

workflowID, err := (parser.WorkflowIDParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

return &dagsGetArgs{
AqContext: aqContext,
workflowID: workflowID,
}, http.StatusOK, nil
}

func (h *DAGsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
args := interfaceArgs.(*dagsGetArgs)

ok, err := h.WorkflowRepo.ValidateOrg(
ctx,
args.workflowID,
args.OrgID,
h.Database,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error during workflow ownership validation.")
}

if !ok {
return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
}

dbDAGs, err := h.DAGRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.")
}

return slices.Map(dbDAGs, func(dbDAG models.DAG) response.DAG {
return *response.NewDAGFromDBObject(&dbDAG)
}), http.StatusOK, nil
}
1 change: 1 addition & 0 deletions src/golang/cmd/server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
ListStorageMigrationRoute = "/api/v2/storage-migrations"
WorkflowsRoute = "/api/v2/workflows"
WorkflowRoute = "/api/v2/workflow/{workflowID}"
DAGsRoute = "/api/v2/workflow/{workflowID}/dags"
DAGRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}"
DAGResultsRoute = "/api/v2/workflow/{workflowID}/results"
DAGResultRoute = "/api/v2/workflow/{workflowID}/result/{dagResultID}"
Expand Down
5 changes: 5 additions & 0 deletions src/golang/cmd/server/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
WorkflowRepo: s.WorkflowRepo,
OperatorRepo: s.OperatorRepo,
},
routes.DAGsRoute: &v2.DAGsGetHandler{
Database: s.Database,
WorkflowRepo: s.WorkflowRepo,
DAGRepo: s.DAGRepo,
},
routes.DAGResultRoute: &v2.DAGResultGetHandler{
Database: s.Database,
WorkflowRepo: s.WorkflowRepo,
Expand Down
6 changes: 6 additions & 0 deletions src/ui/common/src/handlers/AqueductApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
DagResultsGetRequest,
DagResultsGetResponse,
} from './v2/DagResultsGet';
import { dagsGetQuery, DagsGetRequest, DagsGetResponse } from './v2/DagsGet';
import {
integrationOperatorsGetQuery,
IntegrationOperatorsGetRequest,
Expand Down Expand Up @@ -126,6 +127,10 @@ export const aqueductApi = createApi({
query: (req) => dagOperatorsGetQuery(req),
transformErrorResponse,
}),
dagsGet: builder.query<DagsGetResponse, DagsGetRequest>({
query: (req) => dagsGetQuery(req),
transformErrorResponse,
}),
dagResultGet: builder.query<DagResultGetResponse, DagResultGetRequest>({
query: (req) => dagResultGetQuery(req),
transformErrorResponse,
Expand Down Expand Up @@ -243,6 +248,7 @@ export const aqueductApi = createApi({

export const {
useDagGetQuery,
useDagsGetQuery,
useDagOperatorsGetQuery,
useDagResultGetQuery,
useDagResultsGetQuery,
Expand Down
15 changes: 15 additions & 0 deletions src/ui/common/src/handlers/v2/DagsGet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// This file should map exactly to
// src/golang/cmd/server/handler/v2/dags_get.go

import { APIKeyParameter } from '../parameters/Header';
import { WorkflowIdParameter } from '../parameters/Path';
import { DagResponse } from '../responses/workflow';

export type DagsGetRequest = APIKeyParameter & WorkflowIdParameter;

export type DagsGetResponse = DagResponse[];

export const dagsGetQuery = (req: DagsGetRequest) => ({
url: `workflow/${req.workflowId}/dags`,
headers: { 'api-key': req.apiKey },
});