Skip to content

Conversation

@vandonr-amz
Copy link
Contributor

aims:

  • reduce the amount of code we need to write when making operators deferrable
  • reduce code sensibility to copy-paste mistakes, both:
    • in the text of the messages logged (I made that mistake already)
    • in the structure of the trigger events, by making them constants/built in one place
  • a unified way to pass events -> makes the code less surprising to read, i.e. easier to understand

it doesn't prevent from using a custom way of doing things if needed, but abstracts away the boilerplate code when nothing interesting happens.

I converted the glue operators to this as a demo. If it's pushed, I can do a round on existing deferrable implems to convert them to this.

aims:

 - reduce the amount of code we need to write when making operators deferrable
 - reduce code sensibility to copy-paste mistakes, both:
   - in the text of the messages logged (I made that mistake already)
   - in the structure of the trigger events, by making them constants/built in one place
 - a unified way to pass events -> makes the code less surprising to read, i.e. easier to understand

it doesn't prevent from using a custom way of doing things if needed, but abstracts away the boilerplate code when nothing interesting happens.

I converted the glue operators to this as a demo. If it's pushed, I can do a round on existing deferrable implems to convert them to this.
@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jun 2, 2023
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets please try not to mix core with providers in the same PR unless we absolutely have to.
It confuses users since providers and core don't have the same release cycle.

can you split this into two PRs? (where provider one is depended on core PR)

Comment on lines 78 to 75
yield TriggerEvent({"status": "success", "message": "Crawl Complete"})
yield TriggerEvent.success(self.crawler_name)
Copy link
Contributor

@eladkal eladkal Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a problem as provider needs to be compatible with Airflow>=2.4.0 but this function will be added on 2.6.2+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll split it from this. I see how it's a breaking change, but I believe the glue crawler deferrable has not yet been shipped in a provider package, so if this is merged before the next wave, it should be no trouble.
Also, it was a bad implem (from me) because I didn't return the same thing as the non-deferred version, so we can see this more as a fix.

I'll do 3 PRs:
this fix on glue
the core code change
the conversion to the new method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #31694

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that you can not use
TriggerEvent.success
In provider code because the provider must be compatible with Airflow 2.4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm right...
So you're saying that I can introduce this, but it's gonna be pretty useless because we won't be able to use it until years, when we say that the minimum supported version is 2.7 🤔
Maybe I can introduce it as an AwsBaseDeferrableOperator or something, that way it'd be packaged the same way ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The way how to use it until 2.7+ (which is not multiple years, but 12 months since we release 2.7) - there has to be a back-compatible way of using it. Unfortunately, vack compatibility of core classses is important.

I think the code to use hte "success" should fall-back to hard-coded version of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's the same for the default method and default param. If airflow is in an old version, defer will say it lacks a mandatory parameter, and even if we specify it, we won't find the default method with getattr when we want to call it.
None of this can work I think ? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's the same for the default method and default param. If airflow is in an old version, defer will say it lacks a mandatory parameter, and even if we specify it, we won't find the default method with getattr when we want to call it.

welcome to backcompat puzzles @vandonr-amz . they can be very confusing. in that particular examplethough, i think you could make method_name optional in BaseOperator.defer because, the change itself is backward compatable, and then in providers you just have to use the old signature until the min airflow version is >= 2.x (when the signature was changed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's the way to go, but it'd be pretty misleading to have an optional param, but you'd have to know that you have to fill it otherwise you break backcompat 🤔

Copy link
Member

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea, looks good to me, but would like to see if there are some reservations for this.

@pankajkoti pankajkoti requested a review from dstandish June 4, 2023 13:58
@dstandish
Copy link
Contributor

My thoughts on this one...

Maybe we don't need this change. Maybe better, in the simple "boring" case such as this, to not go back to the task worker at all, only to (1) print "success" to the logs and (2) send xcom. Already, triggers can now print to logs. We'd just have to solve the xcom part I think. And maybe with this in mind, Trigger.success would be better used for simply making the task successful and ending it there, full stop, without having to go back to e.g. a celery worker.

But if we are going to add a default method, the default in execute_complete_default seems unnecessary. Why not make execute_complete the default re-entry point -- defacto it already is. If we're going to add a default re-entry method, that's what i would choose. Either that or something more agnostic such as trigger_reentry.

@vandonr-amz
Copy link
Contributor Author

(1) print "success" to the logs and (2) send xcom.

yeah good idea. I'm not well versed in xcom, but I guess "sending xcom" just means writing some value in the DB ? Probably very doable from the triggerer

Why not make execute_complete the default re-entry point

Yeah I started by doing that, but it creates some trouble with mypy because some operators define this method with slightly different arguments
see https://github.com/apache/airflow/actions/runs/5159348955/jobs/9294276991

@dstandish
Copy link
Contributor

dstandish commented Jun 5, 2023

yeah good idea. I'm not well versed in xcom, but I guess "sending xcom" just means writing some value in the DB ? Probably very doable from the triggerer

The tricky part about it is that triggers run in asyncio, but interactions with database (or custom xcom backend) would probably be non-asyncio blocking calls so, I think the initial solution would probably want to do somithing similar to what we did with logging which is to say, run an "xcom listener" in a thread that reads from a queue, and then when trigger exits with xcom it must just send to that queue. That's my thought anyway.

So it's non-trivial, but doable I think.

@dstandish
Copy link
Contributor

Why not make execute_complete the default re-entry point

Yeah I started by doing that, but it creates some trouble with mypy because some operators define this method with slightly different arguments
see https://github.com/apache/airflow/actions/runs/5159348955/jobs/9294276991

Ah, I see... my first thought would be... just ignore those complaints in the providers. They are overriding the method and that's ok. If in future method we want to remove the mypy complaint, we could change the provider method.

Another option, though, if truly nothing interesting is happening, is we could do something more like this:
https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1683

Here, at some point we added some logic to triggerer where if trigger failed we would send the traceback with next_method == 'fail'. It's conceivable we could add something here to handle the boring case, without a method on baseoperator... but i'm not sure that's a good idea.

@dstandish
Copy link
Contributor

@vandonr-amz created issue for the "exit task from trigger" item here #31718

@vandonr-amz
Copy link
Contributor Author

closing as back-compat issues are going to make this unusable before a while, and having all this code in core with just flags around saying not to use it is bad/dangerous imo

maybe #31718 can be a good alternative.

@vandonr-amz vandonr-amz closed this Jun 6, 2023
@vandonr-amz vandonr-amz deleted the vandonr/op branch June 6, 2023 17:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants