-
Notifications
You must be signed in to change notification settings - Fork 15.6k
Description
Apache Airflow version
3.0.6
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When trying to pull a value from the xcom the below error message shows up in the airflow-dag-processor. We create a custom notifier that extends the BaseNotifier
, we get the TaskInstance
as follows ti = context['ti']
, we get all the task ids as follows task_ids: list[str] = list(context['dag'].task_dict.keys())
and pull from the XCom as follows report_df = ti.xcom_pull(key='report_df', task_ids=task_ids) if ti.xcom_pull(key='report_df', task_ids=task_ids) is not None else ''
What you think should happen instead?
We should return the XCom value, or None
if it doesn't exisit
How to reproduce
Create custom notifier that extends BaseNotifier
, somewhere in te DAG optionally push a value to the XCom, add on_success_callback
to your custom notifier, in the notify()
function, pull from the XCom using the code above, and look at the airflow-dag-processor
logs to see the error messages
Operating System
Debian
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.13.0
apache-airflow-providers-common-compat==1.7.3
apache-airflow-providers-common-io==1.6.2
apache-airflow-providers-common-sql==1.27.5
apache-airflow-providers-fab==2.4.2
apache-airflow-providers-google==17.2.0
apache-airflow-providers-http==5.3.4
apache-airflow-providers-microsoft-mssql==4.3.2
apache-airflow-providers-openai==1.6.2
apache-airflow-providers-oracle==4.2.0
apache-airflow-providers-postgres==6.3.0
apache-airflow-providers-sftp==5.4.0
apache-airflow-providers-smtp==2.2.0
apache-airflow-providers-ssh==4.1.3
apache-airflow-providers-standard==1.6.0
Deployment
Other Docker-based deployment
Deployment details
Custom docker image which is based off the official docker images
Anything else?
Airflow DAG Processor log message
2025-09-10 10:31:54 [error ] Unable to decode message [supervisor] body={'key': 'report_df', 'dag_id': 'Oracle-DB-Check', 'run_id': 'manual__2025-09-10T15:31:49.020767+00:00', 'task_id': 'check_dw_prod', 'start': None, 'stop': None, 'step': None, 'include_prior_dates': False, 'type': 'GetXComSequenceSlice'}
2025-09-10T15:31:54.097275837Z ╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
2025-09-10T15:31:54.097279587Z │ /home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:601 │
2025-09-10T15:31:54.097281128Z │ in handle_requests │
2025-09-10T15:31:54.097282170Z │ │
2025-09-10T15:31:54.097283337Z │ 598 │ │ │ request = yield │
2025-09-10T15:31:54.097284587Z │ 599 │ │ │ │
2025-09-10T15:31:54.097285587Z │ 600 │ │ │ try: │
2025-09-10T15:31:54.097286670Z │ ❱ 601 │ │ │ │ msg = self.decoder.validate_python(request.body) │
2025-09-10T15:31:54.097287837Z │ 602 │ │ │ except Exception: │
2025-09-10T15:31:54.097289003Z │ 603 │ │ │ │ log.exception("Unable to decode message", body=request.body) │
2025-09-10T15:31:54.097290795Z │ 604 │ │ │ │ continue │
2025-09-10T15:31:54.097291837Z │ │
2025-09-10T15:31:54.097292795Z │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
2025-09-10T15:31:54.097299003Z │ │ log = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, │ │
2025-09-10T15:31:54.097301128Z │ │ context_class=None, initial_values={'logger_name': 'supervisor'}, │ │
2025-09-10T15:31:54.097302795Z │ │ logger_factory_args=())> │ │
2025-09-10T15:31:54.097304087Z │ │ msg = MaskSecret( │ │
2025-09-10T15:31:54.097305378Z │ │ │ value='<secret_value_hidden>', │ │
2025-09-10T15:31:54.097306670Z │ │ │ name='dispatch_api_key', │ │
2025-09-10T15:31:54.097309837Z │ │ │ type='MaskSecret' │ │
2025-09-10T15:31:54.097311295Z │ │ ) │ │
2025-09-10T15:31:54.097312337Z │ │ request = _RequestFrame( │ │
2025-09-10T15:31:54.097313503Z │ │ │ id=1, │ │
2025-09-10T15:31:54.097314628Z │ │ │ body={ │ │
2025-09-10T15:31:54.097315795Z │ │ │ │ 'key': 'report_df', │ │
2025-09-10T15:31:54.097321962Z │ │ │ │ 'dag_id': 'Oracle-DB-Check', │ │
2025-09-10T15:31:54.097323253Z │ │ │ │ 'run_id': 'manual__2025-09-10T15:31:49.020767+00:00', │ │
2025-09-10T15:31:54.097324503Z │ │ │ │ 'task_id': 'check_dw_prod', │ │
2025-09-10T15:31:54.097325795Z │ │ │ │ 'start': None, │ │
2025-09-10T15:31:54.097327087Z │ │ │ │ 'stop': None, │ │
2025-09-10T15:31:54.097328337Z │ │ │ │ 'step': None, │ │
2025-09-10T15:31:54.097329462Z │ │ │ │ 'include_prior_dates': False, │ │
2025-09-10T15:31:54.097331628Z │ │ │ │ 'type': 'GetXComSequenceSlice' │ │
2025-09-10T15:31:54.097332753Z │ │ │ } │ │
2025-09-10T15:31:54.097333837Z │ │ ) │ │
2025-09-10T15:31:54.097334878Z │ │ self = <DagFileProcessorProcess id=UUID('01993441-38d2-7bcc-8bbd-53306533f95a') pid=177> │ │
2025-09-10T15:31:54.097336670Z │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
2025-09-10T15:31:54.097339170Z │ │
2025-09-10T15:31:54.097340253Z │ /home/airflow/.local/lib/python3.12/site-packages/pydantic/type_adapter.py:421 in │
2025-09-10T15:31:54.097341462Z │ validate_python │
2025-09-10T15:31:54.097342545Z │ │
2025-09-10T15:31:54.097343545Z │ 418 │ │ │ │ code='validate-by-alias-and-name-false', │
2025-09-10T15:31:54.097344837Z │ 419 │ │ │ ) │
2025-09-10T15:31:54.097346045Z │ 420 │ │ │
2025-09-10T15:31:54.097347128Z │ ❱ 421 │ │ return self.validator.validate_python( │
2025-09-10T15:31:54.097348378Z │ 422 │ │ │ object, │
2025-09-10T15:31:54.097349420Z │ 423 │ │ │ strict=strict, │
2025-09-10T15:31:54.097350378Z │ 424 │ │ │ from_attributes=from_attributes, │
2025-09-10T15:31:54.097351420Z │ │
2025-09-10T15:31:54.097352462Z │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
2025-09-10T15:31:54.097353920Z │ │ by_alias = None │ │
2025-09-10T15:31:54.097355087Z │ │ by_name = None │ │
2025-09-10T15:31:54.097357045Z │ │ context = None │ │
2025-09-10T15:31:54.097358212Z │ │ experimental_allow_partial = False │ │
2025-09-10T15:31:54.097359295Z │ │ from_attributes = None │ │
2025-09-10T15:31:54.097360420Z │ │ object = { │ │
2025-09-10T15:31:54.097361670Z │ │ │ 'key': 'report_df', │ │
2025-09-10T15:31:54.097363545Z │ │ │ 'dag_id': 'Oracle-DB-Check', │ │
2025-09-10T15:31:54.097364837Z │ │ │ 'run_id': 'manual__2025-09-10T15:31:49.020767+00:00', │ │
2025-09-10T15:31:54.097365962Z │ │ │ 'task_id': 'check_dw_prod', │ │
2025-09-10T15:31:54.097367212Z │ │ │ 'start': None, │ │
2025-09-10T15:31:54.097368295Z │ │ │ 'stop': None, │ │
2025-09-10T15:31:54.097369545Z │ │ │ 'step': None, │ │
2025-09-10T15:31:54.097370587Z │ │ │ 'include_prior_dates': False, │ │
2025-09-10T15:31:54.097371753Z │ │ │ 'type': 'GetXComSequenceSlice' │ │
2025-09-10T15:31:54.097373795Z │ │ } │ │
2025-09-10T15:31:54.097374837Z │ │ self = TypeAdapter(Annotated[Union[DagFileParsingResult, │ │
2025-09-10T15:31:54.097376003Z │ │ GetConnection, GetVariable, PutVariable, DeleteVariable, │ │
2025-09-10T15:31:54.097377253Z │ │ GetPrevSuccessfulDagRun, GetPreviousDagRun, MaskSecret], │ │
2025-09-10T15:31:54.097378295Z │ │ FieldInfo(annotation=NoneType, required=True, │ │
2025-09-10T15:31:54.097379712Z │ │ discriminator='type')]) │ │
2025-09-10T15:31:54.097380920Z │ │ strict = None │ │
2025-09-10T15:31:54.097382128Z │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
2025-09-10T15:31:54.097383295Z ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
2025-09-10T15:31:54.097384420Z ValidationError: 1 validation error for
2025-09-10T15:31:54.097385337Z tagged-union[DagFileParsingResult,GetConnection,GetVariable,PutVariable,DeleteVariable,GetPrevSucces
2025-09-10T15:31:54.097386462Z sfulDagRun,GetPreviousDagRun,MaskSecret]
2025-09-10T15:31:54.097387295Z Input tag 'GetXComSequenceSlice' found using 'type' does not match any of the expected tags:
2025-09-10T15:31:54.097388378Z 'DagFileParsingResult', 'GetConnection', 'GetVariable', 'PutVariable', 'DeleteVariable',
2025-09-10T15:31:54.097389545Z 'GetPrevSuccessfulDagRun', 'GetPreviousDagRun', 'MaskSecret' [type=union_tag_invalid,
2025-09-10T15:31:54.097391545Z input_value={'key': 'report_df', 'dag... 'GetXComSequenceSlice'}, input_type=dict]
2025-09-10T15:31:54.097392878Z For further information visit https://errors.pydantic.dev/2.11/v/union_tag_invalid
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct