Skip to content

Commit

Permalink
Merge id and name arguments to one field
Browse files Browse the repository at this point in the history
  • Loading branch information
jpurusho65 committed May 16, 2023
1 parent 942dec1 commit 8a4775e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 89 deletions.
57 changes: 16 additions & 41 deletions sdk/aqueduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,29 +491,22 @@ def list_flows(self) -> List[Dict[str, str]]:

def flow(
self,
flow_id: Optional[Union[str, uuid.UUID]] = None,
flow_name: Optional[str] = None,
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
) -> Flow:
"""Fetches a flow corresponding to the given flow id.
Args:
flow_id:
Used to identify the flow to fetch from the system.
Between `flow_id` and `flow_name`, at least one must be provided.
If both are specified, they must correspond to the same flow.
flow_name:
flow_identifier:
Used to identify the flow to fetch from the system.
Use either the flow name or id as identifier to fetch
from the system.
Raises:
InvalidUserArgumentException:
If the provided flow id or name does not correspond to a flow the client knows about.
"""
flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
flow_id = find_flow_with_user_supplied_id_and_name(
flows,
flow_id,
flow_name,
)
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)

return Flow(
flow_id,
Expand Down Expand Up @@ -774,19 +767,14 @@ def publish_flow(

def trigger(
self,
flow_id: Optional[Union[str, uuid.UUID]] = None,
flow_name: Optional[str] = None,
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
parameters: Optional[Dict[str, Any]] = None,
) -> None:
"""Immediately triggers another run of the provided flow.
Args:
flow_id:
The id of the flow to delete.
Between `flow_id` and `flow_name`, at least one must be provided.
If both are specified, they must correspond to the same flow.
flow_name:
The name of the flow to delete.
flow_identifier:
The uuid or name of the flow to delete.
parameters:
A map containing custom values to use for the designated parameters. The mapping
is expected to be from parameter name to the custom value. These custom values
Expand All @@ -802,7 +790,7 @@ def trigger(
"""
param_specs: Dict[str, ParamSpec] = {}
if parameters is not None:
flow = self.flow(flow_id)
flow = self.flow(flow_identifier)
latest_run = flow.latest()

# NOTE: this is a defense check against triggering runs that haven't run yet.
Expand All @@ -817,18 +805,13 @@ def trigger(
artifact_type = infer_artifact_type(new_val)
param_specs[name] = construct_param_spec(new_val, artifact_type)

flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
flow_id = find_flow_with_user_supplied_id_and_name(
flows,
flow_id,
flow_name,
)
flows = [flow_identifier for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)
globals.__GLOBAL_API_CLIENT__.refresh_workflow(flow_id, param_specs)

def delete_flow(
self,
flow_id: Optional[Union[str, uuid.UUID]] = None,
flow_name: Optional[str] = None,
flow_identifier: Optional[Union[str, uuid.UUID]] = None,
saved_objects_to_delete: Optional[
DefaultDict[Union[str, BaseResource], List[SavedObjectUpdate]]
] = None,
Expand All @@ -837,12 +820,8 @@ def delete_flow(
"""Deletes a flow object.
Args:
flow_id:
The id of the flow to delete.
Between `flow_id` and `flow_name`, at least one must be provided.
If both are specified, they must correspond to the same flow.
flow_name:
The name of the flow to delete.
flow_identifier:
The id of the flow to delete. Must be name or uuid
saved_objects_to_delete:
The tables or storage paths to delete grouped by integration name.
force:
Expand All @@ -858,12 +837,8 @@ def delete_flow(
if saved_objects_to_delete is None:
saved_objects_to_delete = defaultdict()

flows = [(flow.id, flow.name) for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
flow_id = find_flow_with_user_supplied_id_and_name(
flows,
flow_id,
flow_name,
)
flows = [flow_identifier for flow in globals.__GLOBAL_API_CLIENT__.list_workflows()]
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier)

# TODO(ENG-410): This method gives no indication as to whether the flow
# was successfully deleted.
Expand Down
26 changes: 10 additions & 16 deletions sdk/aqueduct/tests/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,20 @@


def test_find_flow_with_user_supplied_id_and_name():
flow_1_id = "9740eb10-d77d-4393-a9bc-42862d7008e0"
flow_2_id = "6d9d7b93-028f-48b1-b723-ebd9e66ac867"
flow_3_id = "431841d6-aac5-450f-b395-66e110ba547b"
flow_1_id = uuid.UUID("9740eb10-d77d-4393-a9bc-42862d7008e0")
flow_2_id = uuid.UUID("6d9d7b93-028f-48b1-b723-ebd9e66ac867")
flow_3_id = uuid.UUID("431841d6-aac5-450f-b395-66e110ba547b")

flows = [
(uuid.UUID(flow_1_id), "flow_1"),
(uuid.UUID(flow_2_id), "flow_2"),
(uuid.UUID(flow_3_id), "flow_3"),
(flow_1_id, "flow_1"),
(flow_2_id, "flow_2"),
]

flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_id=flow_1_id)
assert flow_id == flow_1_id
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier=flow_1_id)
assert flow_id == str(flow_1_id)

flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_name="flow_1")
assert flow_id == flow_1_id

flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_id=flow_1_id, flow_name="flow_1")
assert flow_id == flow_1_id
flow_id = find_flow_with_user_supplied_id_and_name(flows, flow_identifier="flow_2")
assert flow_id == "flow_2"

with pytest.raises(InvalidUserArgumentException):
flow_id = find_flow_with_user_supplied_id_and_name(
flows, flow_id=flow_1_id, flow_name="flow_2"
)
find_flow_with_user_supplied_id_and_name(flows, flow_identifier=flow_3_id)
46 changes: 14 additions & 32 deletions sdk/aqueduct/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,43 +213,25 @@ def generate_engine_config(


def find_flow_with_user_supplied_id_and_name(
flows: List[Tuple[uuid.UUID, str]],
flow_id: Optional[Union[str, uuid.UUID]] = None,
flow_name: Optional[str] = None,
flows: List[Tuple[uuid.UUID, str]], flow_identifier: Optional[Union[str, uuid.UUID]] = None
) -> str:
"""Verifies that the user supplied flow id and name correspond
to an actual flow in `flows`. Only one of `flow_id` and `flow_name` is necessary,
but if both are provided, they must match to the same flow. It returns the
string version of the matching flow's id.
"""Verifies that the user supplied flow_identifier correspond
to an actual flow in `flows`. Must be either uuid or name that
must match to the same flow. It returns the string version of
the matching flow's id.
"""
if not flow_id and not flow_name:
if not flow_identifier:
raise InvalidUserArgumentException(
"Must supply at least one of the following:`flow_id` or `flow_name`"
"Must supply a valid flow identifier, either name or uuid"
)

if flow_id:
flow_id_str = parse_user_supplied_id(flow_id)
flow_id_str = parse_user_supplied_id(flow_identifier)

if isinstance(flow_identifier, uuid.UUID):
if all(uuid.UUID(flow_id_str) != flow[0] for flow in flows):
raise InvalidUserArgumentException("Unable to find a flow with id %s" % flow_id)

if flow_name:
flow_id_str_from_name = None
for flow in flows:
if flow[1] == flow_name:
flow_id_str_from_name = str(flow[0])
break

if not flow_id_str_from_name:
raise InvalidUserArgumentException("Unable to find a flow with name %s" % flow_name)

if flow_id and flow_id_str != flow_id_str_from_name:
# User supplied both flow_id and flow_name, but they do not
# correspond to the same flow
raise InvalidUserArgumentException(
"The flow with id %s does not correspond to the flow with name %s"
% (flow_id, flow_name)
)

return flow_id_str_from_name
raise InvalidUserArgumentException("Unable to find a flow with id %s" % flow_identifier)
elif isinstance(flow_identifier, str):
if all(flow_id_str != flow[1] for flow in flows):
raise InvalidUserArgumentException("Unable to find a flow with name %s" % flow_id_str)

return flow_id_str

0 comments on commit 8a4775e

Please sign in to comment.