-
Notifications
You must be signed in to change notification settings - Fork 16.3k
S3 Key sensor deferrable #31749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 Key sensor deferrable #31749
Conversation
|
@syedahsn perhaps you meant to tag @pankajastro :) |
|
There is already a PR I have created here for the same: #31018 |
|
@syedahsn, you probably missed my comment in the other PR, See #31018 (comment). Since duplication is the issue, can you do this when the other PR is merged? The other PR also added some dags which we can run in system tests when deferrable on system tests is supported |
| for i in range(len(self.bucket_keys)): | ||
| bucket_key_names.append( | ||
| S3Hook.get_s3_bucket_key(self.bucket_name, self.bucket_keys[i], "bucket_name", "bucket_key") | ||
| ) | ||
| bucket_name = bucket_key_names[i][0] | ||
| key = bucket_key_names[i][1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can use a rewrite with enumerate
| if self.check_fn is not None: | ||
| for files in event["files_list"]: | ||
| results.append(self.check_fn(files)) | ||
| return all(results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why build the entire results list and then call all()? Instead you can just do
for f in event["files_list"]:
if not self.check_fn(f):
return False
return Trueor
return all(self.check_fn(f) for f in event["files_list"])| results.append(False) | ||
| continue | ||
| # Reduce the set of metadata to size only | ||
| files_list.append(list(map(lambda f: {"Size": f["Size"]}, key_matches))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list-building code can be improved by using iterators
| wildcard_keys = [] | ||
| obj = [] | ||
| bucket_key_names = [] | ||
| for i in range(len(self.bucket_keys)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop looks very much like poke in S3KeySensor. Can they share one single implementation?
I saw your comment, but I decided to open this PR to get your thoughts (as well as the community's) on the approach I took for this sensor. As I mentioned in the initial PR for this sensor, one concern is the amount of code that is being duplicated for the async case. I think that keeping code duplication to a minimum is very important because it will lead to a code base that is easier to maintain, and less error-prone. I'm willing to wait for the merge of #31018 before addressing the code duplication, but because of the differences in our approaches, it would mean that I would end up removing a lot of the code introduced in #31018. I'll leave it to you to decide whether we should merge the inital PR and I address the code duplication in a follow-up PR or whether we collaborate now to come up with a suitable solution. |
The PR allows the S3KeySensor to be run in deferrable mode. I refactored the existing S3KeySensor to pull out the API calls, and leave a common
process_filesmethod that is used in both the deferrable and non-deferrable case. This reduces a lot of the code duplication.Some duplication is unavoidable - like writing the
head_object_asyncandget_file_metadata_async, but where possible, I tried code duplication to a minimum.The unit tests for the S3KeySensorTrigger follow the same pattern of testing as the S3KeySensor.
@sunank200 @ephraimbuddy @pankajkoti
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.