You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am using Flytekit v1.15.1.
When testing an entire workflow file which contains Conditionals and sub_workflows, when a variable is passed to a Flyte Conditional, passing the right parameters still calls the incorrect code block.
This is the python test code:
def test_ml_workflow(mocker):
bucket_name = "test-bucket"
prefix = "test-prefix"
model_path = ""
with task_mock(load_pretrained_model) as load_pretrained_model_mocked:
# collect_data, no cold_start
ml_workflow(bucket_name=bucket_name, prefix=prefix, model_path=model_path, collect_data=True, cold_start=False)
print("-" * 30)
In the above test, even though cold_start=False is passed, the flyte calls cold_start_data_collect() method (see code below).
It actually goes inside both the methods: data_collect() and cold_start_data_collect().
If the is_true() is changed to is_false() only for the lowest level Conditional, the error is resolved and only cold_start_data_collect() is called correctly.
Workflow code is below.
`import datetime
import joblib
import pandas as pd
from flytekit import workflow, conditional, task, StructuredDataset, LaunchPlan, CronSchedule
from flytekit.core.task import Echo
Describe the bug
I am using Flytekit v1.15.1.
When testing an entire workflow file which contains Conditionals and sub_workflows, when a variable is passed to a Flyte Conditional, passing the right parameters still calls the incorrect code block.
This is the python test code:
cold_start=False
is passed, the flyte callscold_start_data_collect()
method (see code below).data_collect()
andcold_start_data_collect()
.is_true()
is changed tois_false()
only for the lowest level Conditional, the error is resolved and onlycold_start_data_collect()
is called correctly.`import datetime
import joblib
import pandas as pd
from flytekit import workflow, conditional, task, StructuredDataset, LaunchPlan, CronSchedule
from flytekit.core.task import Echo
@task
def get_date_from_datetime() -> datetime.date:
return datetime.datetime.now().date()
@task
def cold_start_data_collect() -> bool:
date_result = get_date_from_datetime()
print("Inside cold_start_data_collect", date_result)
data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
final_df = pd.DataFrame(data)
return True
@task
def data_collect() -> bool:
print("Inside data_collect")
data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
raw_data = pd.DataFrame(data)
return True
@task
def preprocess_data() -> bool:
return True
@task
def model_training() -> bool:
return True
@task
def load_pretrained_model(model_path: str) -> bool:
print("Inside load_pretrained_model")
loaded_model = joblib.load(model_path)
return True
echo = Echo(name="no_data_collection", inputs={"data_collect": bool})
@workflow
def data_collection_workflow(bucket_name: str, prefix: str, cold_start: bool) -> bool:
level_3 = (conditional("cold_start?")
.if_(cold_start.is_true())
.then(data_collect())
.else_()
.then(cold_start_data_collect()))
return True
@workflow
def old_workflow(collect_data:bool, bucket_name:str, prefix:str, cold_start:bool) -> bool:
print("Inside old_workflow")
level_2 = (conditional("cold_start?")
.if_(collect_data.is_true())
.then(data_collection_workflow(bucket_name=bucket_name, prefix=prefix, cold_start=cold_start))
.else_()
.then(echo(data_collect=False)))
return True
@workflow
def ml_workflow(bucket_name: str, prefix: str, model_path: str, collect_data: bool = True, cold_start: bool = False):
level_1 = (
conditional("collect_data ?")
.if_(collect_data.is_true())
.then(old_workflow(
collect_data=collect_data,
bucket_name=bucket_name,
prefix=prefix,
cold_start=cold_start
))
.else_()
.then(load_pretrained_model(model_path))
)
cron_lp = LaunchPlan.get_or_create(
name="test_workflow_bug_lp",
workflow=ml_workflow,
default_inputs={
"bucket_name": "",
"prefix": "",
"model_path": "",
"collect_data": True,
"cold_start": False
}
)
if name == "main":
ml_workflow()`
Expected behavior
For this method, call the correct code branch ie
cold_start_data_collect()
whencold_start=True
.Additional context to reproduce
No response
Screenshots
I am attaching two screenshots with the outputs.
Are you sure this issue hasn't been raised already?
Have you read the Code of Conduct?
The text was updated successfully, but these errors were encountered: