Skip to content

Commit 92245da

Browse files
committed
Do not schedule execution of save operator if other computer operators fail in a DAG
Add unit test case Do not schedule execution of save operator if other computer operators fail in a DAG
1 parent cc66b07 commit 92245da

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

integration_tests/sdk/aqueduct_tests/flow_test.py

+38-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import time
12
import uuid
23
from datetime import datetime, timedelta
34

45
import pandas as pd
56
import pytest
6-
from aqueduct.constants.enums import ExecutionStatus
7+
from aqueduct.constants.enums import ExecutionStatus, LoadUpdateMode
78
from aqueduct.error import InvalidRequestError, InvalidUserArgumentException
89

910
import aqueduct
@@ -616,3 +617,39 @@ def noop():
616617
client.delete_flow(flow_id=flow.id(), flow_name="not a real flow")
617618

618619
client.delete_flow(flow_name=flow.name())
620+
621+
622+
def test_flow_with_failed_compute_operators(
623+
client, flow_name, data_integration, engine, data_validator
624+
):
625+
"""
626+
Test if one or more compute operators fail, then the save/load operator does not succeed also.
627+
"""
628+
629+
@op
630+
def bar(arg):
631+
return 5 / 0
632+
633+
@op
634+
def baz(arg):
635+
time.sleep(10)
636+
return arg
637+
638+
@op
639+
def faz():
640+
return 123
641+
642+
table_name = generate_table_name()
643+
result = data_integration.sql("select * from hotel_reviews limit 5")
644+
test_data = bar.lazy(baz.lazy(result))
645+
save(data_integration, result, name=table_name, update_mode=LoadUpdateMode.REPLACE)
646+
647+
publish_flow_test(
648+
client,
649+
artifacts=[test_data, result],
650+
name=flow_name(),
651+
engine=engine,
652+
expected_statuses=[ExecutionStatus.FAILED],
653+
)
654+
655+
data_validator.check_saved_artifact_data_does_not_exist(result.id())

src/golang/lib/engine/aq_engine.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
shared_utils "github.com/aqueducthq/aqueduct/lib/lib_utils"
1818
"github.com/aqueducthq/aqueduct/lib/models"
1919
"github.com/aqueducthq/aqueduct/lib/models/shared"
20+
operator_model "github.com/aqueducthq/aqueduct/lib/models/shared/operator"
2021
"github.com/aqueducthq/aqueduct/lib/models/shared/operator/param"
2122
"github.com/aqueducthq/aqueduct/lib/repos"
2223
"github.com/aqueducthq/aqueduct/lib/vault"
@@ -925,6 +926,11 @@ func (eng *aqEngine) execute(
925926

926927
// Kick off execution by starting all operators that don't have any inputs.
927928
for _, op := range dag.Operators() {
929+
log.Infof("Dag Operator %s [%d], Type: %s", op.Name(), len(dag.Operators()), op.Type())
930+
if op.Type() == operator_model.LoadType {
931+
log.Infof("Skipping save operator %s Type: %s", op.Name(), op.Type())
932+
continue
933+
}
928934
if opToDependencyCount[op.ID()] == 0 {
929935
inProgressOps[op.ID()] = op
930936
}
@@ -952,12 +958,17 @@ func (eng *aqEngine) execute(
952958

953959
start := time.Now()
954960

961+
// We defer save operations until all other computer operations are completed successfully.
962+
// This flag tracks whether the save operations are scheduled for execution.
963+
loadOpsDone := false
964+
955965
for len(inProgressOps) > 0 {
956966
if time.Since(start) > timeConfig.ExecTimeout {
957967
return errors.Newf("Reached timeout %s waiting for workflow to complete.", timeConfig.ExecTimeout)
958968
}
959969

960970
for _, op := range inProgressOps {
971+
log.Infof("Operator in progress %s [%d], Type: %s", op.Name(), len(inProgressOps), op.Type())
961972
if op.Dynamic() && !op.GetDynamicProperties().Prepared() {
962973
err = dynamic.PrepareCluster(
963974
ctx,
@@ -1079,26 +1090,45 @@ func (eng *aqEngine) execute(
10791090
}
10801091

10811092
for _, nextOp := range nextOps {
1093+
10821094
// Decrement the active dependency count for every downstream operator.
10831095
// Once this count reaches zero, we can schedule the next operator.
10841096
opToDependencyCount[nextOp.ID()] -= 1
10851097

10861098
if opToDependencyCount[nextOp.ID()] < 0 {
1087-
return errors.Newf("Internal error: operator %s has a negative dependnecy count.", op.Name())
1099+
return errors.Newf("Internal error: operator %s has a negative dependency count.", op.Name())
10881100
}
10891101

10901102
if opToDependencyCount[nextOp.ID()] == 0 {
10911103
// Defensive check: do not reschedule an already in-progress operator. This shouldn't actually
10921104
// matter because we only keep and update a single copy an on operator.
10931105
if _, ok := inProgressOps[nextOp.ID()]; !ok {
1094-
inProgressOps[nextOp.ID()] = nextOp
1106+
// In this pass only pick pending compute operations, and defer the save operations
1107+
// to the end.
1108+
if nextOp.Type() != operator_model.LoadType {
1109+
inProgressOps[nextOp.ID()] = nextOp
1110+
} else {
1111+
log.Infof("Skip load operator %s", nextOp.Name())
1112+
}
10951113
}
10961114
}
10971115
}
10981116
}
10991117

11001118
time.Sleep(timeConfig.OperatorPollInterval)
11011119
}
1120+
// There are no more computer operations to run. Run the save (load) operations to persist
1121+
// artifacts to DB. The save operations are scheduled at the end so data is persisted only if
1122+
// all preceding compute operations are successful.
1123+
if len(inProgressOps) == 0 && !loadOpsDone {
1124+
for _, saveOp := range workflowDag.Operators() {
1125+
if saveOp.Type() == operator_model.LoadType {
1126+
log.Infof("Scheduling load operator %s for execution", saveOp.Name())
1127+
inProgressOps[saveOp.ID()] = saveOp
1128+
}
1129+
}
1130+
loadOpsDone = true
1131+
}
11021132
}
11031133

11041134
if len(completedOps) != len(dag.Operators()) {

0 commit comments

Comments
 (0)