Skip to content

Conversation

@mhenc
Copy link
Contributor

@mhenc mhenc commented Aug 24, 2022

Support running multiple standalone DagProcessor each configured to parse dags from different directory.

Changes:

  • add new column 'dag_directory' to DagModel and SerializedDagModel
  • Expose information about the dag_directory used by current DagProcessorManager (to set the value for DagModel/SerializedDagModel)
  • Make sure DagProcessor marks only its own dags are inactive (based on 'dag_directory')
  • Make sure DagProcessor receives Callbacks only for its own dags (based on 'dag_directory')
  • Add additional method for SchedulerJob to mark dags as stale if they were not updated within last 10 minutes (e.g. in case where one of DagProcessor was killed)

Usage:

  • make sure [scheduler]standalone_dag_processor is true
  • run each dag processor with
airflow dag-processor --subdir /files/dags/dags1
airflow dag-processor --subdir /files/dags/dags2

Part of https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation

@boring-cyborg boring-cyborg bot added area:providers area:Scheduler including HA (high availability) scheduler area:serialization kind:documentation provider:microsoft-azure Azure-related issues provider:google Google (including GCP) related issues labels Aug 24, 2022
@mhenc mhenc changed the title Multiple dp Support multiple DagProcessor parsing files from different locations. Aug 24, 2022
@mhenc mhenc force-pushed the multiple_dp branch 6 times, most recently from 9dd1396 to fb3298a Compare August 26, 2022 09:34
@mhenc mhenc closed this Aug 26, 2022
@mhenc mhenc reopened this Aug 26, 2022
@mhenc mhenc marked this pull request as ready for review August 26, 2022 09:38
@mhenc
Copy link
Contributor Author

mhenc commented Aug 26, 2022

Not sure what is the state of releasing 2.4.0 - if we can't fit this PR then it may wait until 2.5.0 I believe.

@mhenc mhenc changed the title Support multiple DagProcessor parsing files from different locations. Support multiple DagProcessors parsing files from different locations. Aug 26, 2022
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a global variable (which is what DagProcessorDirectory is, how about:

Add an attribute to the DagFileProcessorProcess constructor (which is passed down from the Manager), and add a dag_directory argument to DagBag.sync_to_db which can get passed down to DAG.bulk_write_to_db and SerializedDagModel.write_dag

@potiuk potiuk removed provider:google Google (including GCP) related issues provider:microsoft-azure Azure-related issues area:providers labels Aug 26, 2022
@mhenc mhenc force-pushed the multiple_dp branch 2 times, most recently from 7c76fec to edab1fc Compare August 29, 2022 12:23
@mhenc mhenc requested a review from ashb September 6, 2022 14:57
@ashb ashb merged commit f878854 into apache:main Sep 6, 2022
@mhenc mhenc deleted the multiple_dp branch September 6, 2022 19:26

# Only applicable if `[scheduler]standalone_dag_processor` is true.
# Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
dag_stale_not_seen_duration = 600
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag_stale_not_seen_duration --> any suggestion for a better name? this config name isn't easy understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh.. naming things...

Let me be wild on that one:

deactivation_time_for_missing_dags_in_standalone_dag_processor_mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another suggestion from @ashb was mark_dag_stale_not_seen_in
Is it better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of them are awful :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually yours @potiuk helped me to understand the meaning of that parameter ;p

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Horrible names can also be best :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-43 DAG processor separation AIP-43 area:Scheduler including HA (high availability) scheduler area:serialization kind:documentation type:new-feature Changelog: New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants