Skip to content

Commit d1a9bf5

Browse files
committed
Merge id and name arguments to one field
1 parent bd626d5 commit d1a9bf5

File tree

6 files changed

+56
-107
lines changed

6 files changed

+56
-107
lines changed

integration_tests/sdk/aqueduct_tests/flow_test.py

+9-11
Original file line numberDiff line numberDiff line change
@@ -571,15 +571,13 @@ def noop():
571571
)
572572

573573
fetched_with_id = client.flow(flow.id())
574-
fetched_with_name = client.flow(flow_name=flow.name())
575-
fetched_with_id_and_name = client.flow(flow_id=flow.id(), flow_name=flow.name())
574+
# fetched_with_name = client.flow(flow_identifier=flow.name())
576575

577-
assert fetched_with_id.id() == fetched_with_name.id()
578-
assert fetched_with_id.id() == fetched_with_id_and_name.id()
576+
# assert fetched_with_id.id() == fetched_with_name.id()
579577

580-
# Failure case: flow id and name do not match
578+
# Failure case: flow name does not match
581579
with pytest.raises(InvalidUserArgumentException):
582-
client.flow(flow_id=flow.id(), flow_name="not a real flow")
580+
client.flow(flow_identifier="not a real flow")
583581

584582

585583
def test_refresh_flow_with_name(client, flow_name, engine):
@@ -598,11 +596,11 @@ def noop():
598596
engine=engine,
599597
)
600598

601-
# Failure case: flow id and name do not match
599+
# Failure case: name does not match
602600
with pytest.raises(InvalidUserArgumentException):
603-
client.trigger(flow_id=flow.id(), flow_name="not a real flow")
601+
client.trigger(flow_identifier="not a real flow")
604602

605-
client.trigger(flow_name=flow.name())
603+
client.trigger(flow_identifier=flow.id())
606604

607605

608606
def test_delete_flow_with_name(client, flow_name, engine):
@@ -623,9 +621,9 @@ def noop():
623621

624622
# Failure case: flow id and name do not match
625623
with pytest.raises(InvalidUserArgumentException):
626-
client.delete_flow(flow_id=flow.id(), flow_name="not a real flow")
624+
client.delete_flow(flow_identifier="not a real flow")
627625

628-
client.delete_flow(flow_name=flow.name())
626+
client.delete_flow(flow_identifier=flow.id())
629627

630628

631629
def test_flow_with_failed_compute_operators(

integration_tests/sdk/data_integration_tests/s3_test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ def test_s3_fetch_directory_with_delete(flow_manager, data_integration):
327327
# Delete the workflow and include artifact into deletion.
328328
tables = flow.list_saved_objects()
329329
flow_manager._client.delete_flow(
330-
flow_id=str(flow.id()),
330+
flow_identifier=str(flow.id()),
331331
saved_objects_to_delete=tables,
332332
force=True,
333333
)

integration_tests/sdk/shared/flow_helpers.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def publish_flow_test(
107107
time.sleep(30)
108108

109109
# Manually trigger a run to match behavior of non-Airflow engines
110-
client.trigger(flow_id=flow.id())
110+
client.trigger(flow_identifier=flow.id())
111111

112112
if should_block:
113113
wait_for_flow_runs(
@@ -168,7 +168,7 @@ def stop_condition(
168168
if all(str(flow_id) != flow_dict["flow_id"] for flow_dict in client.list_flows()):
169169
return False
170170

171-
flow = client.flow(flow_id)
171+
flow = client.flow(flow_identifier=flow_id)
172172

173173
# Last run is prepended to this list. We need to reverse it in order to compare with expected statuses,
174174
# which is sorted in chronologically ascending order.
@@ -227,20 +227,20 @@ def delete_flow(client: aqueduct.Client, workflow_id: uuid.UUID) -> None:
227227
def delete_all_flows(client: aqueduct.Client) -> None:
228228
flows = client.list_flows()
229229
for flow in flows:
230-
flow_name = flow["name"]
230+
flow_id = flow["flow_id"]
231231
try:
232232
client.delete_flow(
233-
flow_name=flow_name,
234-
saved_objects_to_delete=client.flow(flow_name=flow_name).list_saved_objects(),
233+
flow_identifier=flow_id,
234+
saved_objects_to_delete=client.flow(flow_identifier=flow_id).list_saved_objects(),
235235
)
236236
except Exception as e:
237-
print("Error deleting workflow %s with exception: %s" % (flow_name, e))
237+
print("Error deleting workflow %s with exception: %s" % (flow_id, e))
238238
else:
239-
print("Successfully deleted workflow %s" % flow_name)
239+
print("Successfully deleted workflow %s" % flow_id)
240240

241241
# Try deleting Airflow DAG file if it exists
242242
# TODO ENG-2868 - Only do this for Airflow workflows
243-
dag_path = f"{flow_name}_airflow.py"
243+
dag_path = f"{flow_id}_airflow.py"
244244
if os.path.exists(dag_path):
245245
os.remove(dag_path)
246246

sdk/aqueduct/client.py

+14-39
Original file line numberDiff line numberDiff line change
@@ -491,29 +491,22 @@ def list_flows(self) -> List[Dict[str, str]]:
491491

492492
def flow(
493493
self,
494-
flow_id: Optional[Union[str, uuid.UUID]] = None,
495-
flow_name: Optional[str] = None,
494+
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
496495
) -> Flow:
497496
"""Fetches a flow corresponding to the given flow id.
498497
499498
Args:
500-
flow_id:
501-
Used to identify the flow to fetch from the system.
502-
Between `flow_id` and `flow_name`, at least one must be provided.
503-
If both are specified, they must correspond to the same flow.
504-
flow_name:
499+
flow_identifier:
505500
Used to identify the flow to fetch from the system.
501+
Use either the flow name or id as identifier to fetch
502+
from the system.
506503
507504
Raises:
508505
InvalidUserArgumentException:
509506
If the provided flow id or name does not correspond to a flow the client knows about.
510507
"""
511508
flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
512-
flow_id = find_flow_with_user_supplied_id_and_name(
513-
flows,
514-
flow_id,
515-
flow_name,
516-
)
509+
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)
517510

518511
return Flow(
519512
flow_id,
@@ -774,19 +767,14 @@ def publish_flow(
774767

775768
def trigger(
776769
self,
777-
flow_id: Optional[Union[str, uuid.UUID]] = None,
778-
flow_name: Optional[str] = None,
770+
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
779771
parameters: Optional[Dict[str, Any]] = None,
780772
) -> None:
781773
"""Immediately triggers another run of the provided flow.
782774
783775
Args:
784-
flow_id:
785-
The id of the flow to delete.
786-
Between `flow_id` and `flow_name`, at least one must be provided.
787-
If both are specified, they must correspond to the same flow.
788-
flow_name:
789-
The name of the flow to delete.
776+
flow_identifier:
777+
The uuid or name of the flow to delete.
790778
parameters:
791779
A map containing custom values to use for the designated parameters. The mapping
792780
is expected to be from parameter name to the custom value. These custom values
@@ -802,7 +790,7 @@ def trigger(
802790
"""
803791
param_specs: Dict[str, ParamSpec] = {}
804792
if parameters is not None:
805-
flow = self.flow(flow_id)
793+
flow = self.flow(flow_identifier)
806794
latest_run = flow.latest()
807795

808796
# NOTE: this is a defense check against triggering runs that haven't run yet.
@@ -818,17 +806,12 @@ def trigger(
818806
param_specs[name] = construct_param_spec(new_val, artifact_type)
819807

820808
flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
821-
flow_id = find_flow_with_user_supplied_id_and_name(
822-
flows,
823-
flow_id,
824-
flow_name,
825-
)
809+
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)
826810
globals.__GLOBAL_API_CLIENT__.refresh_workflow(flow_id, param_specs)
827811

828812
def delete_flow(
829813
self,
830-
flow_id: Optional[Union[str, uuid.UUID]] = None,
831-
flow_name: Optional[str] = None,
814+
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
832815
saved_objects_to_delete: Optional[
833816
DefaultDict[Union[str, BaseResource], List[SavedObjectUpdate]]
834817
] = None,
@@ -837,12 +820,8 @@ def delete_flow(
837820
"""Deletes a flow object.
838821
839822
Args:
840-
flow_id:
841-
The id of the flow to delete.
842-
Between `flow_id` and `flow_name`, at least one must be provided.
843-
If both are specified, they must correspond to the same flow.
844-
flow_name:
845-
The name of the flow to delete.
823+
flow_identifier:
824+
The id of the flow to delete. Must be name or uuid
846825
saved_objects_to_delete:
847826
The tables or storage paths to delete grouped by integration name.
848827
force:
@@ -859,11 +838,7 @@ def delete_flow(
859838
saved_objects_to_delete = defaultdict()
860839

861840
flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
862-
flow_id = find_flow_with_user_supplied_id_and_name(
863-
flows,
864-
flow_id,
865-
flow_name,
866-
)
841+
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)
867842

868843
# TODO(ENG-410): This method gives no indication as to whether the flow
869844
# was successfully deleted.

sdk/aqueduct/tests/flow_test.py

+10-16
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,20 @@
77

88

99
def test_find_flow_with_user_supplied_id_and_name():
10-
flow_1_id = "9740eb10-d77d-4393-a9bc-42862d7008e0"
11-
flow_2_id = "6d9d7b93-028f-48b1-b723-ebd9e66ac867"
12-
flow_3_id = "431841d6-aac5-450f-b395-66e110ba547b"
10+
flow_1_id = uuid.UUID("9740eb10-d77d-4393-a9bc-42862d7008e0")
11+
flow_2_id = uuid.UUID("6d9d7b93-028f-48b1-b723-ebd9e66ac867")
12+
flow_3_id = uuid.UUID("431841d6-aac5-450f-b395-66e110ba547b")
1313

1414
flows = [
15-
(uuid.UUID(flow_1_id), "flow_1"),
16-
(uuid.UUID(flow_2_id), "flow_2"),
17-
(uuid.UUID(flow_3_id), "flow_3"),
15+
(flow_1_id, "flow_1"),
16+
(flow_2_id, "flow_2"),
1817
]
1918

20-
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_id=flow_1_id)
21-
assert flow_id == flow_1_id
19+
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier=flow_1_id)
20+
assert flow_id == str(flow_1_id)
2221

23-
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_name="flow_1")
24-
assert flow_id == flow_1_id
25-
26-
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_id=flow_1_id, flow_name="flow_1")
27-
assert flow_id == flow_1_id
22+
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier="flow_2")
23+
assert flow_id == "flow_2"
2824

2925
with pytest.raises(InvalidUserArgumentException):
30-
flow_id = find_flow_with_user_supplied_id_and_name(
31-
flows, flow_id=flow_1_id, flow_name="flow_2"
32-
)
26+
find_flow_with_user_supplied_id_and_name(flows, flow_identifier=flow_3_id)

sdk/aqueduct/utils/utils.py

+14-32
Original file line numberDiff line numberDiff line change
@@ -213,43 +213,25 @@ def generate_engine_config(
213213

214214

215215
def find_flow_with_user_supplied_id_and_name(
216-
flows: List[Tuple[uuid.UUID, str]],
217-
flow_id: Optional[Union[str, uuid.UUID]] = None,
218-
flow_name: Optional[str] = None,
216+
flows: List[Tuple[uuid.UUID, str]], flow_identifier: Optional[Union[str, uuid.UUID]] = None
219217
) -> str:
220-
"""Verifies that the user supplied flow id and name correspond
221-
to an actual flow in `flows`. Only one of `flow_id` and `flow_name` is necessary,
222-
but if both are provided, they must match to the same flow. It returns the
223-
string version of the matching flow's id.
218+
"""Verifies that the user supplied flow_identifier correspond
219+
to an actual flow in `flows`. Must be either uuid or name that
220+
must match to the same flow. It returns the string version of
221+
the matching flow's id.
224222
"""
225-
if not flow_id and not flow_name:
223+
if not flow_identifier:
226224
raise InvalidUserArgumentException(
227-
"Must supply at least one of the following:`flow_id` or `flow_name`"
225+
"Must supply a valid flow identifier, either name or uuid"
228226
)
229227

230-
if flow_id:
231-
flow_id_str = parse_user_supplied_id(flow_id)
228+
flow_id_str = parse_user_supplied_id(flow_identifier)
229+
230+
if isinstance(flow_identifier, uuid.UUID) or is_string_valid_uuid(flow_identifier):
232231
if all(uuid.UUID(flow_id_str) != flow[0] for flow in flows):
233-
raise InvalidUserArgumentException("Unable to find a flow with id %s" % flow_id)
234-
235-
if flow_name:
236-
flow_id_str_from_name = None
237-
for flow in flows:
238-
if flow[1] == flow_name:
239-
flow_id_str_from_name = str(flow[0])
240-
break
241-
242-
if not flow_id_str_from_name:
243-
raise InvalidUserArgumentException("Unable to find a flow with name %s" % flow_name)
244-
245-
if flow_id and flow_id_str != flow_id_str_from_name:
246-
# User supplied both flow_id and flow_name, but they do not
247-
# correspond to the same flow
248-
raise InvalidUserArgumentException(
249-
"The flow with id %s does not correspond to the flow with name %s"
250-
% (flow_id, flow_name)
251-
)
252-
253-
return flow_id_str_from_name
232+
raise InvalidUserArgumentException("Unable to find a flow with id %s" % flow_identifier)
233+
elif isinstance(flow_identifier, str):
234+
if all(flow_id_str != flow[1] for flow in flows):
235+
raise InvalidUserArgumentException("Unable to find a flow with name %s" % flow_id_str)
254236

255237
return flow_id_str

0 commit comments

Comments
 (0)