Skip to content

Eng 2735 Add order by and limit parameters for V2 workflow results #1240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@


class TestBackend:
# V2
GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows"
GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"

LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects"
GET_TEST_INTEGRATION_TEMPLATE = "/api/integration/%s/test"
Expand Down Expand Up @@ -318,3 +320,72 @@ def test_endpoint_workflows_get(self):
for key in keys:
assert key in v2_workflow
assert v2_workflow["user_id"] == user_id

def test_endpoint_dag_results_get(self):
flow_id, n_runs = self.flows["flow_with_metrics_and_checks"]
resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json()

assert len(resp) == n_runs

fields = ["id", "dag_id", "exec_state"]

def check_structure(resp, all_succeeded=False):
for result in resp:
for field in fields:
assert field in result
if all_succeeded:
assert result["exec_state"]["status"] == "succeeded"
assert result["exec_state"]["failure_type"] == None
assert result["exec_state"]["error"] == None

check_structure(resp, all_succeeded=True)

# Using the order parameter
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status",
).json()

check_structure(resp)
statuses = [result["exec_state"]["status"] for result in resp]
sorted_statuses = sorted(statuses, reverse=True) # Descending order
assert statuses == sorted_statuses

# Default is descending
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=true",
).json()

check_structure(resp)
descending_statuses = [result["exec_state"]["status"] for result in resp]
assert statuses == descending_statuses

# Ascending works
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=false",
).json()

check_structure(resp)
ascending_statuses = [result["exec_state"]["status"] for result in resp]
assert descending_statuses[::-1] == ascending_statuses

# Using the limit parameter
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?limit=1",
).json()

check_structure(resp)
assert len(resp) == 1

# Using both the order and limit parameters
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&limit=1",
).json()

check_structure(resp)
workflow_status = [result["exec_state"]["status"] for result in resp]
assert len(workflow_status) == 1
workflow_status = workflow_status[0]
assert workflow_status == sorted_statuses[0]
4 changes: 3 additions & 1 deletion src/golang/cmd/server/handler/get_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac
dags[dbDAG.ID] = constructedDAG
}

dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, "", -1, true, h.Database)
if err != nil {
return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.")
}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/cmd/server/handler/get_workflow_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func (h *GetWorkflowHistoryHandler) Perform(ctx context.Context, interfaceArgs i
return nil, http.StatusBadRequest, errors.Wrap(err, fmt.Sprintf("Workflow %v does not exist.", args.workflowId))
}

results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, h.Database)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, "", -1, true, h.Database)
if err != nil && err != database.ErrNoRows() { // Don't return an error if there are just no rows.
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error while retrieving workflow runs.")
}
Expand Down
39 changes: 35 additions & 4 deletions src/golang/cmd/server/handler/v2/dag_results_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,30 @@ import (
// Method: GET
// Params:
// `workflowId`: ID for `workflow` object
// `dagResultID`: ID for `workflow_dag_result` object
// Request:
// Headers:
// `api-key`: user's API Key
// Parameters:
// `order_by`:
// Optional single field that the query should be ordered. Requires the table prefix.
// `order_descending`:
// Optional boolean specifying whether order_by should be ascending or descending.
// `limit`:
// Optional limit on the number of storage migrations returned. Defaults to all of them.
// Response:
// Body:
// serialized `[]response.DAGResult`

type dagResultsGetArgs struct {
*aq_context.AqContext
workflowID uuid.UUID

// A nil value means that the order is not set.
orderBy string
// Default is descending (true).
orderDescending bool
// A negative value for limit (eg. -1) means that the limit is not set.
limit int
}

type DAGResultsGetHandler struct {
Expand All @@ -60,9 +73,27 @@ func (h *DAGResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error
return nil, http.StatusBadRequest, err
}

limit, err := (parser.LimitQueryParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

orderBy, err := (parser.OrderByQueryParser{}).Parse(r, models.AllDAGResultCols())
if err != nil {
return nil, http.StatusBadRequest, err
}

descending, err := (parser.OrderDescendingQueryParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

return &dagResultsGetArgs{
AqContext: aqContext,
workflowID: workflowID,
AqContext: aqContext,
workflowID: workflowID,
orderBy: orderBy,
orderDescending: descending,
limit: limit,
}, http.StatusOK, nil
}

Expand All @@ -83,7 +114,7 @@ func (h *DAGResultsGetHandler) Perform(ctx context.Context, interfaceArgs interf
return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
}

dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, args.orderBy, args.limit, args.orderDescending, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.")
}
Expand Down
25 changes: 25 additions & 0 deletions src/golang/cmd/server/request/parser/query_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package parser

import (
"net/http"
"strconv"

"github.com/dropbox/godropbox/errors"
)

type LimitQueryParser struct{}

func (LimitQueryParser) Parse(r *http.Request) (int, error) {
query := r.URL.Query()

var err error
limit := -1
if limitVal := query.Get("limit"); len(limitVal) > 0 {
limit, err = strconv.Atoi(limitVal)
if err != nil {
return -1, errors.Wrap(err, "Invalid limit parameter.")
}
}

return limit, nil
}
32 changes: 32 additions & 0 deletions src/golang/cmd/server/request/parser/query_order_by.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package parser

import (
"net/http"

"github.com/dropbox/godropbox/errors"
)

type OrderByQueryParser struct{}

func (OrderByQueryParser) Parse(r *http.Request, tableColumns []string) (string, error) {
query := r.URL.Query()

var err error
var orderBy string
if orderByVal := query.Get("order_by"); len(orderByVal) > 0 {
// Check is a field in table
isColumn := false
for _, column := range tableColumns {
if column == orderByVal {
isColumn = true
break
}
}
if !isColumn {
return "", errors.Wrap(err, "Invalid order_by value.")
}
orderBy = orderByVal
}

return orderBy, nil
}
31 changes: 31 additions & 0 deletions src/golang/cmd/server/request/parser/query_order_descending.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package parser

import (
"net/http"
"strings"

"github.com/dropbox/godropbox/errors"
)

type OrderDescendingQueryParser struct{}

func (OrderDescendingQueryParser) Parse(r *http.Request) (bool, error) {
query := r.URL.Query()

var err error
orderDescending := true
if orderDescendingVal := query.Get("order_descending"); len(orderDescendingVal) > 0 {
orderDescendingVal = strings.ToLower(orderDescendingVal)
if orderDescendingVal == "true" {
return true, nil
}

if orderDescendingVal == "false" {
return false, nil
}

return true, errors.Wrap(err, "Invalid order_descending value.")
}

return orderDescending, nil
}
4 changes: 3 additions & 1 deletion src/golang/lib/airflow/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func syncWorkflowDag(
return err
}

dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, DB)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, "", -1, true, DB)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ func (eng *aqEngine) DeleteWorkflow(
dagIDs = append(dagIDs, dag.ID)
}

dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, txn)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, "", -1, true, txn)
if err != nil {
return errors.Wrap(err, "Unexpected error occurred while retrieving workflow dag results.")
}
Expand Down
6 changes: 3 additions & 3 deletions src/golang/lib/models/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ type DAGResult struct {

// DAGResultCols returns a comma-separated string of all DAGResult columns.
func DAGResultCols() string {
return strings.Join(allDAGResultCols(), ",")
return strings.Join(AllDAGResultCols(), ",")
}

// DAGResultColsWithPrefix returns a comma-separated string of all
// DAGResult columns prefixed by the table name.
func DAGResultColsWithPrefix() string {
cols := allDAGResultCols()
cols := AllDAGResultCols()
for i, col := range cols {
cols[i] = fmt.Sprintf("%s.%s", DAGResultTable, col)
}

return strings.Join(cols, ",")
}

func allDAGResultCols() []string {
func AllDAGResultCols() []string {
return []string{
DAGResultID,
DAGResultDagID,
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/repos/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type dagResultReader interface {
GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.DAGResult, error)

// GetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID.
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error)
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error)

// GetKOffsetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID
// except for the last k DAGResults ordered by DAGResult.CreatedAt.
Expand Down
23 changes: 21 additions & 2 deletions src/golang/lib/repos/sqlite/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlite
import (
"context"
"fmt"
"strconv"
"time"

"github.com/aqueducthq/aqueduct/lib/database"
Expand Down Expand Up @@ -52,13 +53,31 @@ func (*dagResultReader) GetBatch(ctx context.Context, IDs []uuid.UUID, DB databa
return getDAGResults(ctx, DB, query, args...)
}

func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error) {
func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error) {
var orderByQuery string
if len(orderBy) > 0 {
orderByQuery = fmt.Sprintf(" ORDER BY %s.%s", models.DAGResultTable, orderBy)
if orderDescending {
orderByQuery = orderByQuery + " DESC"
} else {
orderByQuery = orderByQuery + " ASC"
}
}

var limitQuery string
if limit == 0 {
return []models.DAGResult{}, nil
}
if limit > 0 {
limitQuery = fmt.Sprintf(" LIMIT %s", strconv.Itoa(limit))
}

query := fmt.Sprintf(
`SELECT %s
FROM workflow_dag_result, workflow_dag
WHERE
workflow_dag_result.workflow_dag_id = workflow_dag.id
AND workflow_dag.workflow_id = $1;`,
AND workflow_dag.workflow_id = $1`+orderByQuery+limitQuery+`;`,
models.DAGResultColsWithPrefix(),
)
args := []interface{}{workflowID}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/lib/repos/tests/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func (ts *TestSuite) TestDAGResult_GetByWorkflow() {

expectedDAGResults := ts.seedDAGResultWithDAG(2, []uuid.UUID{dag.ID, dag.ID})

actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, ts.DB)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, "", -1, true, ts.DB)
require.Nil(ts.T(), err)
requireDeepEqualDAGResults(ts.T(), expectedDAGResults, actualDAGResults)
}
Expand Down