Skip to content

Eng 2735 Add order by and limit parameters for V2 workflow results #1240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@


class TestBackend:
# V2
GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows"
GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"

LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects"
GET_TEST_INTEGRATION_TEMPLATE = "/api/integration/%s/test"
Expand Down Expand Up @@ -318,3 +320,56 @@ def test_endpoint_workflows_get(self):
for key in keys:
assert key in v2_workflow
assert v2_workflow["user_id"] == user_id

def test_endpoint_dag_results_get(self):
flow_id, n_runs = self.flows["flow_with_metrics_and_checks"]
resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json()

assert len(resp) == n_runs

fields = ["id", "dag_id", "exec_state"]

def check_structure(resp):
for result in resp:
for field in fields:
assert field in result
assert result["exec_state"]["status"] == "succeeded"
assert result["exec_state"]["failure_type"] == None
assert result["exec_state"]["error"] == None

check_structure(resp)

# Using the order parameter
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id,
additional_headers={
"order_by": "status",
},
).json()
statuses = [result["exec_state"]["status"] for result in resp]
sorted_statuses = sorted(statuses, reverse=True) # Descending order
assert statuses == sorted_statuses

# Using the limit parameter
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id,
additional_headers={
"limit": "1",
},
).json()
assert len(resp) == 1

# Using both the order and limit parameters
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id,
additional_headers={
"order_by": "status",
"limit": "1",
},
).json()

workflow_status = [result["exec_state"]["status"] for result in resp]
assert len(workflow_status) == 1
workflow_status = workflow_status[0]
assert workflow_status == sorted_statuses[0]
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/get_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac
dags[dbDAG.ID] = constructedDAG
}

dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, "", -1, h.Database)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here, let's make a comment to these default values, something like:

(
...,
"", // order_by_desc
-1, // limit, -1 means no limit
)

if err != nil {
return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.")
}
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/get_workflow_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (h *GetWorkflowHistoryHandler) Perform(ctx context.Context, interfaceArgs i
return nil, http.StatusBadRequest, errors.Wrap(err, fmt.Sprintf("Workflow %v does not exist.", args.workflowId))
}

results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, h.Database)
results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, "", -1, h.Database)
if err != nil && err != database.ErrNoRows() { // Don't return an error if there are just no rows.
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error while retrieving workflow runs.")
}
Expand Down
25 changes: 23 additions & 2 deletions src/golang/cmd/server/handler/v2/dag_results_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@ import (
// Method: GET
// Params:
// `workflowId`: ID for `workflow` object
// `dagResultID`: ID for `workflow_dag_result` object
// Request:
// Headers:
// `api-key`: user's API Key
// Parameters:
// `order_by`:
// Optional single field that the query should be ordered. Requires the table prefix.
// `limit`:
// Optional limit on the number of storage migrations returned. Defaults to all of them.
// Response:
// Body:
// serialized `[]response.DAGResult`

type dagResultsGetArgs struct {
*aq_context.AqContext
workflowID uuid.UUID

// A nil value means that the order is not set.
orderBy string
// A negative value for limit (eg. -1) means that the limit is not set.
limit int
}

type DAGResultsGetHandler struct {
Expand All @@ -60,9 +69,21 @@ func (h *DAGResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error
return nil, http.StatusBadRequest, err
}

limit, err := (parser.LimitQueryParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

orderBy, err := (parser.OrderByQueryParser{}).Parse(r, models.AllDAGResultCols())
if err != nil {
return nil, http.StatusBadRequest, err
}

return &dagResultsGetArgs{
AqContext: aqContext,
workflowID: workflowID,
orderBy: orderBy,
limit: limit,
}, http.StatusOK, nil
}

Expand All @@ -83,7 +104,7 @@ func (h *DAGResultsGetHandler) Perform(ctx context.Context, interfaceArgs interf
return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
}

dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, args.orderBy, args.limit, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.")
}
Expand Down
25 changes: 25 additions & 0 deletions src/golang/cmd/server/request/parser/query_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package parser

import (
"net/http"
"strconv"

"github.com/dropbox/godropbox/errors"
)

type LimitQueryParser struct{}

func (LimitQueryParser) Parse(r *http.Request) (int, error) {
query := r.URL.Query()

var err error
limit := -1
if limitVal := query.Get("limit"); len(limitVal) > 0 {
limit, err = strconv.Atoi(limitVal)
if err != nil {
return -1, errors.Wrap(err, "Invalid limit header.")
}
}

return limit, nil
}
32 changes: 32 additions & 0 deletions src/golang/cmd/server/request/parser/query_order_by.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package parser

import (
"net/http"

"github.com/dropbox/godropbox/errors"
)

type OrderByQueryParser struct{}

func (OrderByQueryParser) Parse(r *http.Request, tableColumns []string) (string, error) {
query := r.URL.Query()

var err error
var orderBy string
if orderByVal := query.Get("order_by"); len(orderByVal) > 0 {
// Check is a field in table
isColumn := false
for _, column := range tableColumns {
if column == orderByVal {
isColumn = true
break
}
}
if !isColumn {
return "", errors.Wrap(err, "Invalid order_by value.")
}
orderBy = orderByVal
}

return orderBy, nil
}
2 changes: 1 addition & 1 deletion src/golang/lib/airflow/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func syncWorkflowDag(
return err
}

dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, DB)
dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, "", -1, DB)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (eng *aqEngine) DeleteWorkflow(
dagIDs = append(dagIDs, dag.ID)
}

dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, txn)
dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, "", -1, txn)
if err != nil {
return errors.Wrap(err, "Unexpected error occurred while retrieving workflow dag results.")
}
Expand Down
6 changes: 3 additions & 3 deletions src/golang/lib/models/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ type DAGResult struct {

// DAGResultCols returns a comma-separated string of all DAGResult columns.
func DAGResultCols() string {
return strings.Join(allDAGResultCols(), ",")
return strings.Join(AllDAGResultCols(), ",")
}

// DAGResultColsWithPrefix returns a comma-separated string of all
// DAGResult columns prefixed by the table name.
func DAGResultColsWithPrefix() string {
cols := allDAGResultCols()
cols := AllDAGResultCols()
for i, col := range cols {
cols[i] = fmt.Sprintf("%s.%s", DAGResultTable, col)
}

return strings.Join(cols, ",")
}

func allDAGResultCols() []string {
func AllDAGResultCols() []string {
return []string{
DAGResultID,
DAGResultDagID,
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/repos/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type dagResultReader interface {
GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.DAGResult, error)

// GetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID.
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error)
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, DB database.Database) ([]models.DAGResult, error)

// GetKOffsetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID
// except for the last k DAGResults ordered by DAGResult.CreatedAt.
Expand Down
15 changes: 13 additions & 2 deletions src/golang/lib/repos/sqlite/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlite
import (
"context"
"fmt"
"strconv"
"time"

"github.com/aqueducthq/aqueduct/lib/database"
Expand Down Expand Up @@ -52,13 +53,23 @@ func (*dagResultReader) GetBatch(ctx context.Context, IDs []uuid.UUID, DB databa
return getDAGResults(ctx, DB, query, args...)
}

func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error) {
func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, DB database.Database) ([]models.DAGResult, error) {
var orderByQuery string
if len(orderBy) > 0 {
orderByQuery = fmt.Sprintf(" ORDER BY %s.%s DESC", models.DAGResultTable, orderBy)
}

var limitQuery string
if limit >= 0 {
limitQuery = fmt.Sprintf(" LIMIT %s", strconv.Itoa(limit))
}

query := fmt.Sprintf(
`SELECT %s
FROM workflow_dag_result, workflow_dag
WHERE
workflow_dag_result.workflow_dag_id = workflow_dag.id
AND workflow_dag.workflow_id = $1;`,
AND workflow_dag.workflow_id = $1`+orderByQuery+limitQuery+`;`,
models.DAGResultColsWithPrefix(),
)
args := []interface{}{workflowID}
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/repos/tests/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (ts *TestSuite) TestDAGResult_GetByWorkflow() {

expectedDAGResults := ts.seedDAGResultWithDAG(2, []uuid.UUID{dag.ID, dag.ID})

actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, ts.DB)
actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, "", -1, ts.DB)
require.Nil(ts.T(), err)
requireDeepEqualDAGResults(ts.T(), expectedDAGResults, actualDAGResults)
}
Expand Down