-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eng 1241 Add ability to fetch an artifact from an existing flow version #157
Eng 1241 Add ability to fetch an artifact from an existing flow version #157
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fanjia-Yan, this mostly looks good, but I left a few comments and nits inline.
Let's also please make sure to title our pull requests clearly -- GitHub defaults to pulling from the branch name, but that's a little unhelpful/unclear in this case.
Thanks!
integration_tests/sdk/flow_test.py
Outdated
) | ||
wait_for_flow_runs(client, flow.id(), num_runs=1) | ||
artifact_return = flow.get(get_artifact_name()) | ||
assert artifact_return.name == get_artifact_name() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other metadata returned that we can check other than the name of the artifact?
sdk/aqueduct/flow.py
Outdated
@@ -153,3 +154,19 @@ def describe(self) -> None: | |||
) | |||
) | |||
print(json.dumps(self.list_runs(), sort_keys=False, indent=4)) | |||
|
|||
def get(self, artifact_name: str) -> Artifact: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get()
feels a little ambiguous to me. Are we getting code, data, metadata? Feels like a generic get()
method should support all of those. I would suggest doing something specific like either flow.artifact(name)
or flow.get_artifact(name)
.
@kenxu95, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think flow.artifact(name)
is consistent with how we usually do things.
sdk/aqueduct/flow.py
Outdated
assert ( | ||
len(resp.workflow_dag_results) > 0 | ||
), "Every flow must have at least one run attached to it." | ||
latest_result = resp.workflow_dag_results[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we assuming here that when we have a handle to a flow
, it always points to the latest version @kenxu95?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm really asking how this interacts with versioning -- can we access older versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, @Fanjia-Yan this method shouldn't be on the flow, this should be on the flow run. A flow itself is actually dag-agnostic right now, since a dag is always associated with a particular run. You can also grab a run with flow.latest()
or flow.fetch()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So checking the artifact name is fine, but it's incomplete. We should also make sure that calling artifact.get()
returns the same value as before! An artifact fetched from a flow should be functionally indistinguishable from an artifact that was locally created.
In order for that to work, you'll have to modify the get_workflow
route to also return the serialized functions corresponding to each dag element.
sdk/aqueduct/flow.py
Outdated
@@ -153,3 +154,19 @@ def describe(self) -> None: | |||
) | |||
) | |||
print(json.dumps(self.list_runs(), sort_keys=False, indent=4)) | |||
|
|||
def get(self, artifact_name: str) -> Artifact: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think flow.artifact(name)
is consistent with how we usually do things.
sdk/aqueduct/flow.py
Outdated
assert ( | ||
len(resp.workflow_dag_results) > 0 | ||
), "Every flow must have at least one run attached to it." | ||
latest_result = resp.workflow_dag_results[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, @Fanjia-Yan this method shouldn't be on the flow, this should be on the flow run. A flow itself is actually dag-agnostic right now, since a dag is always associated with a particular run. You can also grab a run with flow.latest()
or flow.fetch()
Many changes to this PR after last review
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome! I have a number of comments but thanks for getting this working.
There are two other things I thought of while reviewing:
- Did you want to rename the generic artifact to
GenericArtifact
? We can probably just do it (here or in a separate PR) - A returned artifact currently cannot be used to construct a new dag right? Because the underlying dag objects are different. Could we add a defensive check in there for now, where if you attempt to use a returned artifact in new computation, we'd throw an error? This might be an interesting follow up :)
sdk/aqueduct/api_client.py
Outdated
self.EXPORT_FUNCTION_ROUTE % str(operator.id), self.use_https | ||
) | ||
operator_resp = requests.get(operator_url, headers=headers) | ||
operator.change_file(operator_resp.content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of change_file()
on the operator, I'd like to keep our current abstraction where the dag class is the only thing that can modify underlying operator and artifact objects. Having fewer places where things can modify the dag will make things easier to maintain + us to have fewer bugs.
Would you be able to use dag.update_operator_spec()
instead? You can probably write a helper to construct the new operator spec from the old one + serialized contents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kenxu95 Since get_workflow
returns a class WorkflowDagResponse
, the serialize function is inserted into WorkflowDagResponse
rather than the dag
class. Therefore I believe the update_operator_spec
() should be under WorkflowDagResponse
?
The following is the update after the last review
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's getting better! Thanks for fixing the incompatible dag issue. There are a lot of code quality and abstraction issues still here so I still need to block. I think maybe there was miscommunication about what abstractions are present in the SDK? Especially around how we update the dag.
sdk/aqueduct/api_client.py
Outdated
self.EXPORT_FUNCTION_ROUTE % str(operator.id), self.use_https | ||
) | ||
operator_resp = requests.get(operator_url, headers=headers) | ||
work_flow_dag.update_operator_spec(operator, operator_resp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok this is better, but I was really saying this APIClient layer should be as thin as possible - it should literally just take in a single operator id and return the serialized function for that operator. The double-for loop belongs in the caller.
Question: does an operator always only correspond to a single workflow dag? Is there a case where an operator corresponds to multiple workflow dags on the backend. cc @cw75
After last review, several things have been changed:
This version looks nicer since serveral lines of codes are taken away from the api_client file :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks for pushing this through 👍 Left a few stylistic/cleanup comments but looks good to me. I think @vsreekanti also needs to unblock.
sdk/aqueduct/flow_run.py
Outdated
artifact_from_dag = flow_run_dag.get_artifacts_by_name(name) | ||
|
||
if artifact_from_dag is None: | ||
raise ArtifactNotFoundException("The artifact name provided does not exist.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it might be nicer for the user if we return None if the artifact doesn't exist? Maybe they just want to check for existence of an artifact, and its a bit less annoying to check for None than it is to catch an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you add that, could you also put that in the function comment string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good idea!
sdk/aqueduct/flow_run.py
Outdated
elif get_artifact_type(artifact_from_dag) is ArtifactType.PARAM: | ||
return ParamArtifact(self._api_client, self._dag, artifact_from_dag.id, True) | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I actually think we should raise an exception (aka fail very loudly), since this means something went wrong in our system. Is there an Internal error we can throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kenxu95 I think this error is really unlikely trigger as an artifact falls in either the four categories. However, if there will be an error, I think it is likely to be either "InvalidArtifactError"(does not exist yet in error.py) or "UnprocessableEntityError"(Exception raised for errors occured when the Aqueduct system fails to process certain inputs.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe we can throw the general AqueductError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's throw an InternalAqueductError
. The reason that this needs to be internal is because its definitely something thats gone wrong in our system, and is not the user's fault.
This error should never happen, as you pointed out, but it's important in our code to make sure that we cover all assumptions. If any assumption breaks, we should also fail loudly so that the person that broke the assumption can fix the issue. In this case, the assumption is that the artifact is one of four types, but this can easily be changed (imagine someone adds a new type on the backend, but doesn't find this function and doesn't update it).
Hey @Fanjia-Yan, Kenny knows this codebase much better than me, so if he's signed off on the PR, you should be good to go. 🙂 No need to wait for my approval here. |
Describe your changes and why you are making these changes
This PR add
get(artifact_name)
feature forflow
object. By passing in an artifact name, the function will return the Artifact associated with the artifact_name. If not, raise ArtifactNotFound Error.I have created a integration test:
The test create a workflow and publish it online. After the workflow has finished, I use get() to extract the Artifact provided the artifact name. Finally, I checked if the returned Artifact object has the same name as provided.
Related issue number (if any)
Eng 1241
Checklist before requesting a review
Update
Many changes to this PR after last review
get_workflow()
so that theworkflowresponse
also has serialized function attached to each operator. To get the serialized function, I route "/api/function/%s/export" and request by operator_idflow.get()
intoflow.artifact()
. Inflow.artifact()
, we first fetch theartifact.Artifact
object with matchingartifact_name
and convert it togeneric_artifact.Artifact
so that we can call.get()
.flow.artifact(output_artifact.name())
. And check for name equality and dataframe equality between locally created artifact and fetched from server artifact.Update 2.0
flow.artifact()
toflowrun.artifact()
export_serialized_function()
to handle the getting serialized function into the dag_response rather than putting them all inget_workflow()
operator
is crossing the the abstraction barrier. Therefore, in theapi_client
, I eliminated any operation onoperator
and put them intoworkflow_dag_response
artifact
fromflowrun.arifact()
, I create a boolean parameter for Artifact(TableArtifact etc.)from_flow_run
. The parameter defaults to false but can be set to true when creating the Artifact Object. Then, indecorator.py
, when the operators and artifacts are wrapped, I check whether any input artifactfrom_flow_run
is set to true. If so return a Exception.Update 3.0
After last review, several things have been changed:
dag.py
instead ofapi_client.py
flow.py: construct_flow_run()
instead ofapi_client.py: get_workflow()
_from_flow_run
parameter for InputArtifactThis version looks nicer since serveral lines of codes are taken away from the api_client file :)