-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add deferrable mode to AWS glue operators (Job & Crawl) #30948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f03dc5a
8738c81
5b3e491
6ae0731
02b734b
0c46471
73fdb1f
6e7fc90
c835247
18cab8b
f144ec7
f7d5e2c
3f8367b
a0eab84
0dcd910
759f48f
ba0e528
8dd20fd
51e300e
2e74296
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import Any, AsyncIterator | ||
|
|
||
| from airflow.providers.amazon.aws.hooks.glue import GlueJobHook | ||
| from airflow.triggers.base import BaseTrigger, TriggerEvent | ||
|
|
||
|
|
||
| class GlueJobCompleteTrigger(BaseTrigger): | ||
| """ | ||
| Watches for a glue job, triggers when it finishes | ||
| :param job_name: glue job name | ||
| :param run_id: the ID of the specific run to watch for that job | ||
| :param verbose: whether to print the job's logs in airflow logs or not | ||
| :param aws_conn_id: The Airflow connection used for AWS credentials. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| job_name: str, | ||
| run_id: str, | ||
| verbose: bool, | ||
| aws_conn_id: str, | ||
| ): | ||
| self.job_name = job_name | ||
| self.run_id = run_id | ||
| self.verbose = verbose | ||
| self.aws_conn_id = aws_conn_id | ||
|
|
||
| def serialize(self) -> tuple[str, dict[str, Any]]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add a test for this method?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I posted a comment about it here #30928 (comment) Simply testing that variables have the value I set them to is a bit pointless imho. It's not testing the behavior.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, I see. But that test at least prevents accidentally updating the args like CI will catch if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what I'm thinking is that no one is going to remove A good test here would be serialize, deserialize, and then assert that all fields of the object are the same, with an exclusion list. |
||
| return ( | ||
| # dynamically generate the fully qualified name of the class | ||
| self.__class__.__module__ + "." + self.__class__.__qualname__, | ||
vandonr-amz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| "job_name": self.job_name, | ||
| "run_id": self.run_id, | ||
| "verbose": str(self.verbose), | ||
| "aws_conn_id": self.aws_conn_id, | ||
| }, | ||
| ) | ||
|
|
||
| async def run(self) -> AsyncIterator[TriggerEvent]: | ||
| hook = GlueJobHook(aws_conn_id=self.aws_conn_id) | ||
| await hook.async_job_completion(self.job_name, self.run_id, self.verbose) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more of a comment than any actionable item, but I wonder if it is possible to include job completion logic here rather than everything in the hook? Is there a benefit to keeping the completion logic in the hook? This ends up being a very thin trigger otherwise. But I understand that that is sometimes necessary.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just wanted to have the async method next to the non-async, in the hook. If we move the logic here, it'd make sense to move the logic of |
||
| yield TriggerEvent({"status": "success", "message": "Job done"}) | ||
Uh oh!
There was an error while loading. Please reload this page.