diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 4ab42775891b8..289964bd0417b 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -2090,7 +2090,7 @@ Improvements - [AIRFLOW-5583] Extend the 'DAG Details' page to display the start_date / end_date (#6235) - [AIRFLOW-6250] Ensure on_failure_callback always has a populated context (#6812) - [AIRFLOW-6222] http hook logs response body for any failure (#6779) -- [AIRFLOW-6260] Drive _cmd config option by env var (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD`` for example) (#6801) +- [AIRFLOW-6260] Drive _cmd config option by env var (``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD`` for example) (#6801) - [AIRFLOW-6168] Allow proxy_fix middleware of webserver to be configurable (#6723) - [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. (#6627) - [AIRFLOW-4145] Allow RBAC roles permissions, ViewMenu to be over-rideable (#4960) diff --git a/UPDATING.md b/UPDATING.md index 04da03d37f892..e68350735d555 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -243,6 +243,23 @@ Smart sensors, an "early access" feature added in Airflow 2, are now deprecated See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.2.4/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate. +### Database configuration moved to new section + +The following configurations have been moved from `[core]` to the new `[database]` section. However when reading new option, the old option will be checked to see if it exists. If it does a DeprecationWarning will be issued and the old option will be used instead. + +- sql_alchemy_conn +- sql_engine_encoding +- sql_engine_collation_for_ids +- sql_alchemy_pool_enabled +- sql_alchemy_pool_size +- sql_alchemy_max_overflow +- sql_alchemy_pool_recycle +- sql_alchemy_pool_pre_ping +- sql_alchemy_schema +- sql_alchemy_connect_args +- load_default_connections +- max_db_retries + ## Airflow 2.2.3 No breaking changes. diff --git a/airflow/cli/commands/info_command.py b/airflow/cli/commands/info_command.py index 97088ca99704e..fc03615210af3 100644 --- a/airflow/cli/commands/info_command.py +++ b/airflow/cli/commands/info_command.py @@ -221,7 +221,7 @@ def get_fullname(o): def _airflow_info(self): executor = configuration.conf.get("core", "executor") sql_alchemy_conn = self.anonymizer.process_url( - configuration.conf.get("core", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE") + configuration.conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE") ) dags_folder = self.anonymizer.process_path( configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE") diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index 82a082e853f6b..3860942adb056 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -157,7 +157,7 @@ def calculate_env(self): executor_constants.LOCAL_EXECUTOR, executor_constants.SEQUENTIAL_EXECUTOR, ]: - if "sqlite" in conf.get("core", "sql_alchemy_conn"): + if "sqlite" in conf.get("database", "sql_alchemy_conn"): self.print_output("standalone", "Forcing executor to SequentialExecutor") env["AIRFLOW__CORE__EXECUTOR"] = executor_constants.SEQUENTIAL_EXECUTOR else: diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3acbdcc3d645f..17eca40bc016d 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -60,111 +60,6 @@ type: string example: ~ default: "SequentialExecutor" - - name: sql_alchemy_conn - description: | - The SqlAlchemy connection string to the metadata database. - SqlAlchemy supports many different database engines. - More information here: - http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri - version_added: ~ - type: string - sensitive: true - example: ~ - default: "sqlite:///{AIRFLOW_HOME}/airflow.db" - - name: sql_alchemy_engine_args - description: | - Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value - version_added: 2.3.0 - type: string - sensitive: true - example: '{"arg1": True}' - default: ~ - - name: sql_engine_encoding - description: | - The encoding for the databases - version_added: 1.10.1 - type: string - example: ~ - default: "utf-8" - - name: sql_engine_collation_for_ids - description: | - Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. - By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` - the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed - the maximum size of allowed index when collation is set to ``utf8mb4`` variant - (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). - version_added: 2.0.0 - type: string - example: ~ - default: ~ - - name: sql_alchemy_pool_enabled - description: | - If SqlAlchemy should pool database connections. - version_added: ~ - type: string - example: ~ - default: "True" - - name: sql_alchemy_pool_size - description: | - The SqlAlchemy pool size is the maximum number of database connections - in the pool. 0 indicates no limit. - version_added: ~ - type: string - example: ~ - default: "5" - - name: sql_alchemy_max_overflow - description: | - The maximum overflow size of the pool. - When the number of checked-out connections reaches the size set in pool_size, - additional connections will be returned up to this limit. - When those additional connections are returned to the pool, they are disconnected and discarded. - It follows then that the total number of simultaneous connections the pool will allow - is pool_size + max_overflow, - and the total number of "sleeping" connections the pool will allow is pool_size. - max_overflow can be set to ``-1`` to indicate no overflow limit; - no limit will be placed on the total number of concurrent connections. Defaults to ``10``. - version_added: 1.10.4 - type: string - example: ~ - default: "10" - - name: sql_alchemy_pool_recycle - description: | - The SqlAlchemy pool recycle is the number of seconds a connection - can be idle in the pool before it is invalidated. This config does - not apply to sqlite. If the number of DB connections is ever exceeded, - a lower config value will allow the system to recover faster. - version_added: ~ - type: string - example: ~ - default: "1800" - - name: sql_alchemy_pool_pre_ping - description: | - Check connection at the start of each connection pool checkout. - Typically, this is a simple statement like "SELECT 1". - More information here: - https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic - version_added: 1.10.6 - type: string - example: ~ - default: "True" - - name: sql_alchemy_schema - description: | - The schema to use for the metadata database. - SqlAlchemy supports databases with the concept of multiple schemas. - version_added: 1.10.3 - type: string - example: ~ - default: "" - - name: sql_alchemy_connect_args - description: | - Import path for connect args in SqlAlchemy. Defaults to an empty dict. - This is useful when you want to configure db engine args that SqlAlchemy won't parse - in connection string. - See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args - version_added: 1.10.11 - type: string - example: ~ - default: ~ - name: parallelism description: | This defines the maximum number of task instances that can run concurrently in Airflow @@ -212,15 +107,6 @@ type: string example: ~ default: "True" - - name: load_default_connections - description: | - Whether to load the default connections that ship with Airflow. It's good to - get started, but you probably want to set this to ``False`` in a production - environment - version_added: 1.10.10 - type: string - example: ~ - default: "True" - name: plugins_folder description: | Path to the folder containing Airflow plugins @@ -435,15 +321,6 @@ type: boolean example: ~ default: "True" - - name: max_db_retries - description: | - Number of times the code should be retried in case of DB Operational Errors. - Not all transactions will be retried as it can cause undesired state. - Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. - version_added: 2.0.0 - type: integer - example: ~ - default: "3" - name: hide_sensitive_var_conn_fields description: | Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True @@ -480,6 +357,133 @@ example: ~ default: "1024" +- name: database + description: ~ + options: + - name: sql_alchemy_conn + description: | + The SqlAlchemy connection string to the metadata database. + SqlAlchemy supports many different database engines. + More information here: + http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri + version_added: 2.3.0 + type: string + sensitive: true + example: ~ + default: "sqlite:///{AIRFLOW_HOME}/airflow.db" + - name: sql_alchemy_engine_args + description: | + Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value + version_added: 2.3.0 + type: string + sensitive: true + example: '{"arg1": True}' + default: ~ + - name: sql_engine_encoding + description: | + The encoding for the databases + version_added: 2.3.0 + type: string + example: ~ + default: "utf-8" + - name: sql_engine_collation_for_ids + description: | + Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. + By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` + the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed + the maximum size of allowed index when collation is set to ``utf8mb4`` variant + (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). + version_added: 2.3.0 + type: string + example: ~ + default: ~ + - name: sql_alchemy_pool_enabled + description: | + If SqlAlchemy should pool database connections. + version_added: 2.3.0 + type: string + example: ~ + default: "True" + - name: sql_alchemy_pool_size + description: | + The SqlAlchemy pool size is the maximum number of database connections + in the pool. 0 indicates no limit. + version_added: 2.3.0 + type: string + example: ~ + default: "5" + - name: sql_alchemy_max_overflow + description: | + The maximum overflow size of the pool. + When the number of checked-out connections reaches the size set in pool_size, + additional connections will be returned up to this limit. + When those additional connections are returned to the pool, they are disconnected and discarded. + It follows then that the total number of simultaneous connections the pool will allow + is pool_size + max_overflow, + and the total number of "sleeping" connections the pool will allow is pool_size. + max_overflow can be set to ``-1`` to indicate no overflow limit; + no limit will be placed on the total number of concurrent connections. Defaults to ``10``. + version_added: 2.3.0 + type: string + example: ~ + default: "10" + - name: sql_alchemy_pool_recycle + description: | + The SqlAlchemy pool recycle is the number of seconds a connection + can be idle in the pool before it is invalidated. This config does + not apply to sqlite. If the number of DB connections is ever exceeded, + a lower config value will allow the system to recover faster. + version_added: 2.3.0 + type: string + example: ~ + default: "1800" + - name: sql_alchemy_pool_pre_ping + description: | + Check connection at the start of each connection pool checkout. + Typically, this is a simple statement like "SELECT 1". + More information here: + https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic + version_added: 2.3.0 + type: string + example: ~ + default: "True" + - name: sql_alchemy_schema + description: | + The schema to use for the metadata database. + SqlAlchemy supports databases with the concept of multiple schemas. + version_added: 2.3.0 + type: string + example: ~ + default: "" + - name: sql_alchemy_connect_args + description: | + Import path for connect args in SqlAlchemy. Defaults to an empty dict. + This is useful when you want to configure db engine args that SqlAlchemy won't parse + in connection string. + See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args + version_added: 2.3.0 + type: string + example: ~ + default: ~ + - name: load_default_connections + description: | + Whether to load the default connections that ship with Airflow. It's good to + get started, but you probably want to set this to ``False`` in a production + environment + version_added: 2.3.0 + type: string + example: ~ + default: "True" + - name: max_db_retries + description: | + Number of times the code should be retried in case of DB Operational Errors. + Not all transactions will be retried as it can cause undesired state. + Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. + version_added: 2.3.0 + type: integer + example: ~ + default: "3" + - name: logging description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 706f83608ded5..91c3b6b6017d3 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -53,66 +53,6 @@ default_timezone = utc # full import path to the class when using a custom executor. executor = SequentialExecutor -# The SqlAlchemy connection string to the metadata database. -# SqlAlchemy supports many different database engines. -# More information here: -# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri -sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db - -# Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value -# Example: sql_alchemy_engine_args = {{"arg1": True}} -# sql_alchemy_engine_args = - -# The encoding for the databases -sql_engine_encoding = utf-8 - -# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. -# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` -# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed -# the maximum size of allowed index when collation is set to ``utf8mb4`` variant -# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). -# sql_engine_collation_for_ids = - -# If SqlAlchemy should pool database connections. -sql_alchemy_pool_enabled = True - -# The SqlAlchemy pool size is the maximum number of database connections -# in the pool. 0 indicates no limit. -sql_alchemy_pool_size = 5 - -# The maximum overflow size of the pool. -# When the number of checked-out connections reaches the size set in pool_size, -# additional connections will be returned up to this limit. -# When those additional connections are returned to the pool, they are disconnected and discarded. -# It follows then that the total number of simultaneous connections the pool will allow -# is pool_size + max_overflow, -# and the total number of "sleeping" connections the pool will allow is pool_size. -# max_overflow can be set to ``-1`` to indicate no overflow limit; -# no limit will be placed on the total number of concurrent connections. Defaults to ``10``. -sql_alchemy_max_overflow = 10 - -# The SqlAlchemy pool recycle is the number of seconds a connection -# can be idle in the pool before it is invalidated. This config does -# not apply to sqlite. If the number of DB connections is ever exceeded, -# a lower config value will allow the system to recover faster. -sql_alchemy_pool_recycle = 1800 - -# Check connection at the start of each connection pool checkout. -# Typically, this is a simple statement like "SELECT 1". -# More information here: -# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic -sql_alchemy_pool_pre_ping = True - -# The schema to use for the metadata database. -# SqlAlchemy supports databases with the concept of multiple schemas. -sql_alchemy_schema = - -# Import path for connect args in SqlAlchemy. Defaults to an empty dict. -# This is useful when you want to configure db engine args that SqlAlchemy won't parse -# in connection string. -# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args -# sql_alchemy_connect_args = - # This defines the maximum number of task instances that can run concurrently in Airflow # regardless of scheduler count and worker count. Generally, this value is reflective of # the number of task instances with the running state in the metadata database. @@ -140,11 +80,6 @@ max_active_runs_per_dag = 16 # environment load_examples = True -# Whether to load the default connections that ship with Airflow. It's good to -# get started, but you probably want to set this to ``False`` in a production -# environment -load_default_connections = True - # Path to the folder containing Airflow plugins plugins_folder = {AIRFLOW_HOME}/plugins @@ -249,11 +184,6 @@ lazy_load_plugins = True # loaded from module. lazy_discover_providers = True -# Number of times the code should be retried in case of DB Operational Errors. -# Not all transactions will be retried as it can cause undesired state. -# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. -max_db_retries = 3 - # Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True # # (Connection passwords are always hidden in logs) @@ -273,6 +203,77 @@ default_pool_task_slot_count = 128 # mapped tasks from clogging the scheduler. max_map_length = 1024 +[database] +# The SqlAlchemy connection string to the metadata database. +# SqlAlchemy supports many different database engines. +# More information here: +# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri +sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db + +# Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value +# Example: sql_alchemy_engine_args = {{"arg1": True}} +# sql_alchemy_engine_args = + +# The encoding for the databases +sql_engine_encoding = utf-8 + +# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. +# By default this collation is the same as the database collation, however for ``mysql`` and ``mariadb`` +# the default is ``utf8mb3_bin`` so that the index sizes of our index keys will not exceed +# the maximum size of allowed index when collation is set to ``utf8mb4`` variant +# (see https://github.com/apache/airflow/pull/17603#issuecomment-901121618). +# sql_engine_collation_for_ids = + +# If SqlAlchemy should pool database connections. +sql_alchemy_pool_enabled = True + +# The SqlAlchemy pool size is the maximum number of database connections +# in the pool. 0 indicates no limit. +sql_alchemy_pool_size = 5 + +# The maximum overflow size of the pool. +# When the number of checked-out connections reaches the size set in pool_size, +# additional connections will be returned up to this limit. +# When those additional connections are returned to the pool, they are disconnected and discarded. +# It follows then that the total number of simultaneous connections the pool will allow +# is pool_size + max_overflow, +# and the total number of "sleeping" connections the pool will allow is pool_size. +# max_overflow can be set to ``-1`` to indicate no overflow limit; +# no limit will be placed on the total number of concurrent connections. Defaults to ``10``. +sql_alchemy_max_overflow = 10 + +# The SqlAlchemy pool recycle is the number of seconds a connection +# can be idle in the pool before it is invalidated. This config does +# not apply to sqlite. If the number of DB connections is ever exceeded, +# a lower config value will allow the system to recover faster. +sql_alchemy_pool_recycle = 1800 + +# Check connection at the start of each connection pool checkout. +# Typically, this is a simple statement like "SELECT 1". +# More information here: +# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic +sql_alchemy_pool_pre_ping = True + +# The schema to use for the metadata database. +# SqlAlchemy supports databases with the concept of multiple schemas. +sql_alchemy_schema = + +# Import path for connect args in SqlAlchemy. Defaults to an empty dict. +# This is useful when you want to configure db engine args that SqlAlchemy won't parse +# in connection string. +# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args +# sql_alchemy_connect_args = + +# Whether to load the default connections that ship with Airflow. It's good to +# get started, but you probably want to set this to ``False`` in a production +# environment +load_default_connections = True + +# Number of times the code should be retried in case of DB Operational Errors. +# Not all transactions will be retried as it can cause undesired state. +# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. +max_db_retries = 3 + [logging] # The folder where airflow should store its log files. # This path must be absolute. diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 09430e55d74dd..2f9b6fa264b13 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -33,9 +33,7 @@ unit_test_mode = True dags_folder = {TEST_DAGS_FOLDER} plugins_folder = {TEST_PLUGINS_FOLDER} executor = SequentialExecutor -sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db load_examples = True -load_default_connections = True donot_pickle = True max_active_tasks_per_dag = 16 dags_are_paused_at_creation = False @@ -47,6 +45,10 @@ default_task_retries = 0 # This is a hack, too many tests assume DAGs are already in the DB. We need to fix those tests instead store_serialized_dags = False +[database] +sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db +load_default_connections = True + [logging] base_log_folder = {AIRFLOW_HOME}/logs logging_level = INFO diff --git a/airflow/configuration.py b/airflow/configuration.py index d8eb8216ce859..748e975d8330a 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -132,7 +132,7 @@ class AirflowConfigParser(ConfigParser): # These configs can also be fetched from Secrets backend # following the "{section}__{name}__secret" pattern sensitive_config_values = { - ('core', 'sql_alchemy_conn'), + ('database', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), ('celery', 'flower_basic_auth'), @@ -188,6 +188,18 @@ class AirflowConfigParser(ConfigParser): ('logging', 'worker_log_server_port'): ('celery', 'worker_log_server_port', '2.2.0'), ('api', 'access_control_allow_origins'): ('api', 'access_control_allow_origin', '2.2.0'), ('api', 'auth_backends'): ('api', 'auth_backend', '2.3'), + ('database', 'sql_alchemy_conn'): ('core', 'sql_alchemy_conn', '2.3'), + ('database', 'sql_engine_encoding'): ('core', 'sql_engine_encoding', '2.3'), + ('database', 'sql_engine_collation_for_ids'): ('core', 'sql_engine_collation_for_ids', '2.3'), + ('database', 'sql_alchemy_pool_enabled'): ('core', 'sql_alchemy_pool_enabled', '2.3'), + ('database', 'sql_alchemy_pool_size'): ('core', 'sql_alchemy_pool_size', '2.3'), + ('database', 'sql_alchemy_max_overflow'): ('core', 'sql_alchemy_max_overflow', '2.3'), + ('database', 'sql_alchemy_pool_recycle'): ('core', 'sql_alchemy_pool_recycle', '2.3'), + ('database', 'sql_alchemy_pool_pre_ping'): ('core', 'sql_alchemy_pool_pre_ping', '2.3'), + ('database', 'sql_alchemy_schema'): ('core', 'sql_alchemy_schema', '2.3'), + ('database', 'sql_alchemy_connect_args'): ('core', 'sql_alchemy_connect_args', '2.3'), + ('database', 'load_default_connections'): ('core', 'load_default_connections', '2.3'), + ('database', 'max_db_retries'): ('core', 'max_db_retries', '2.3'), } # A mapping of old default values that we want to change and warn the user @@ -323,7 +335,7 @@ def _upgrade_auth_backends(self): def _upgrade_postgres_metastore_conn(self): """As of sqlalchemy 1.4, scheme `postgres+psycopg2` must be replaced with `postgresql`""" - section, key = 'core', 'sql_alchemy_conn' + section, key = 'database', 'sql_alchemy_conn' old_value = self.get(section, key) bad_scheme = 'postgres+psycopg2' good_scheme = 'postgresql' @@ -360,7 +372,7 @@ def _validate_config_dependencies(self): 'DebugExecutor', 'SequentialExecutor', ) - is_sqlite = "sqlite" in self.get('core', 'sql_alchemy_conn') + is_sqlite = "sqlite" in self.get('database', 'sql_alchemy_conn') if is_sqlite and is_executor_without_sqlite_support: raise AirflowConfigException(f"error: cannot use sqlite with the {self.get('core', 'executor')}") if is_sqlite: diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index ab781b45d804a..4cb11cff82928 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -395,7 +395,7 @@ def __init__( os.set_blocking(self._direct_scheduler_conn.fileno(), False) self._parallelism = conf.getint('scheduler', 'parsing_processes') - if conf.get('core', 'sql_alchemy_conn').startswith('sqlite') and self._parallelism > 1: + if conf.get('database', 'sql_alchemy_conn').startswith('sqlite') and self._parallelism > 1: self.log.warning( "Because we cannot use more than 1 thread (parsing_processes = " "%d) when using sqlite. So we set parallelism to 1.", diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 628207df6b487..60cda34c13344 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -136,7 +136,7 @@ def __init__( self._log = log # Check what SQL backend we use - sql_conn: str = conf.get('core', 'sql_alchemy_conn').lower() + sql_conn: str = conf.get('database', 'sql_alchemy_conn').lower() self.using_sqlite = sql_conn.startswith('sqlite') self.using_mysql = sql_conn.startswith('mysql') # Dag Processor agent - not used in Dag Processor standalone mode. diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml index 103d5cf48cb8f..79a4e470564cc 100644 --- a/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml +++ b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml @@ -34,7 +34,7 @@ spec: secretKeyRef: name: RELEASE-NAME-fernet-key key: fernet-key - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: RELEASE-NAME-airflow-metadata diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml index cc4614996c760..237f14ced0820 100644 --- a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml +++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml @@ -34,7 +34,7 @@ spec: secretKeyRef: name: RELEASE-NAME-fernet-key key: fernet-key - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: RELEASE-NAME-airflow-metadata diff --git a/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml index 6d99761a1f4e1..021c113832adf 100644 --- a/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml +++ b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml @@ -55,7 +55,7 @@ spec: secretKeyRef: name: RELEASE-NAME-fernet-key key: fernet-key - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: RELEASE-NAME-airflow-metadata diff --git a/airflow/kubernetes_executor_templates/basic_template.yaml b/airflow/kubernetes_executor_templates/basic_template.yaml index a6eb83f8ad8eb..579857a2443c1 100644 --- a/airflow/kubernetes_executor_templates/basic_template.yaml +++ b/airflow/kubernetes_executor_templates/basic_template.yaml @@ -38,7 +38,7 @@ spec: secretKeyRef: name: airflow-fernet-key key: fernet-key - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-airflow-metadata diff --git a/airflow/models/base.py b/airflow/models/base.py index c586f2019ca28..478bd904eb83c 100644 --- a/airflow/models/base.py +++ b/airflow/models/base.py @@ -23,7 +23,7 @@ from airflow.configuration import conf -SQL_ALCHEMY_SCHEMA = conf.get("core", "SQL_ALCHEMY_SCHEMA") +SQL_ALCHEMY_SCHEMA = conf.get("database", "SQL_ALCHEMY_SCHEMA") metadata = ( None if not SQL_ALCHEMY_SCHEMA or SQL_ALCHEMY_SCHEMA.isspace() else MetaData(schema=SQL_ALCHEMY_SCHEMA) @@ -35,7 +35,7 @@ def get_id_collation_args(): """Get SQLAlchemy args to use for COLLATION""" - collation = conf.get('core', 'sql_engine_collation_for_ids', fallback=None) + collation = conf.get('database', 'sql_engine_collation_for_ids', fallback=None) if collation: return {'collation': collation} else: @@ -49,7 +49,7 @@ def get_id_collation_args(): # # We cannot use session/dialect as at this point we are trying to determine the right connection # parameters, so we use the connection - conn = conf.get('core', 'sql_alchemy_conn', fallback='') + conn = conf.get('database', 'sql_alchemy_conn', fallback='') if conn.startswith('mysql') or conn.startswith("mariadb"): return {'collation': 'utf8mb3_bin'} return {} diff --git a/airflow/settings.py b/airflow/settings.py index b161dd98386ff..86b22fab9311f 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -240,7 +240,7 @@ def configure_vars(): global DAGS_FOLDER global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS - SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN') + SQL_ALCHEMY_CONN = conf.get('database', 'SQL_ALCHEMY_CONN') DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) PLUGINS_FOLDER = conf.get('core', 'plugins_folder', fallback=os.path.join(AIRFLOW_HOME, 'plugins')) @@ -262,8 +262,8 @@ def configure_orm(disable_connection_pool=False): global Session engine_args = prepare_engine_args(disable_connection_pool) - if conf.has_option('core', 'sql_alchemy_connect_args'): - connect_args = conf.getimport('core', 'sql_alchemy_connect_args') + if conf.has_option('database', 'sql_alchemy_connect_args'): + connect_args = conf.getimport('database', 'sql_alchemy_connect_args') else: connect_args = {} @@ -321,16 +321,18 @@ def prepare_engine_args(disable_connection_pool=False): default_args = default.copy() break - engine_args: dict = conf.getjson('core', 'sql_alchemy_engine_args', fallback=default_args) # type: ignore + engine_args: dict = conf.getjson( + 'database', 'sql_alchemy_engine_args', fallback=default_args + ) # type: ignore - if disable_connection_pool or not conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED'): + if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): engine_args['poolclass'] = NullPool log.debug("settings.prepare_engine_args(): Using NullPool") elif not SQL_ALCHEMY_CONN.startswith('sqlite'): # Pool size engine args not supported by sqlite. # If no config value is defined for the pool size, select a reasonable value. # 0 means no limit, which could lead to exceeding the Database connection limit. - pool_size = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE', fallback=5) + pool_size = conf.getint('database', 'SQL_ALCHEMY_POOL_SIZE', fallback=5) # The maximum overflow size of the pool. # When the number of checked-out connections reaches the size set in pool_size, @@ -342,20 +344,20 @@ def prepare_engine_args(disable_connection_pool=False): # max_overflow can be set to -1 to indicate no overflow limit; # no limit will be placed on the total number # of concurrent connections. Defaults to 10. - max_overflow = conf.getint('core', 'SQL_ALCHEMY_MAX_OVERFLOW', fallback=10) + max_overflow = conf.getint('database', 'SQL_ALCHEMY_MAX_OVERFLOW', fallback=10) # The DB server already has a value for wait_timeout (number of seconds after # which an idle sleeping connection should be killed). Since other DBs may # co-exist on the same server, SQLAlchemy should set its # pool_recycle to an equal or smaller value. - pool_recycle = conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE', fallback=1800) + pool_recycle = conf.getint('database', 'SQL_ALCHEMY_POOL_RECYCLE', fallback=1800) # Check connection at the start of each connection pool checkout. # Typically, this is a simple statement like “SELECT 1”, but may also make use # of some DBAPI-specific method to test the connection for liveness. # More information here: # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic - pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True) + pool_pre_ping = conf.getboolean('database', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True) log.debug( "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " @@ -386,7 +388,7 @@ def prepare_engine_args(disable_connection_pool=False): # Allow the user to specify an encoding for their DB otherwise default # to utf-8 so jobs & users with non-latin1 characters can still use us. - engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8') + engine_args['encoding'] = conf.get('database', 'SQL_ENGINE_ENCODING', fallback='utf-8') return engine_args diff --git a/airflow/utils/db.py b/airflow/utils/db.py index adbe82c70cde7..4d4732e11de8e 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -641,7 +641,7 @@ def initdb(session: Session = NEW_SESSION): """Initialize Airflow database.""" upgradedb(session=session) - if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'): + if conf.getboolean('database', 'LOAD_DEFAULT_CONNECTIONS'): create_default_connections(session=session) with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): diff --git a/airflow/utils/retries.py b/airflow/utils/retries.py index c4124c8de8195..89703fc33a25a 100644 --- a/airflow/utils/retries.py +++ b/airflow/utils/retries.py @@ -25,7 +25,7 @@ from airflow.configuration import conf -MAX_DB_RETRIES = conf.getint('core', 'max_db_retries', fallback=3) +MAX_DB_RETRIES = conf.getint('database', 'max_db_retries', fallback=3) def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: Optional[logging.Logger] = None, **kwargs): diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index 61b32d5d550ba..c240a9445690b 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -34,7 +34,7 @@ utc = pendulum.tz.timezone('UTC') -using_mysql = conf.get('core', 'sql_alchemy_conn').lower().startswith('mysql') +using_mysql = conf.get('database', 'sql_alchemy_conn').lower().startswith('mysql') class UtcDateTime(TypeDecorator): @@ -100,7 +100,7 @@ class ExtendedJSON(TypeDecorator): def db_supports_json(self): """Checks if the database supports JSON (i.e. is NOT MSSQL)""" - return not conf.get("core", "sql_alchemy_conn").startswith("mssql") + return not conf.get("database", "sql_alchemy_conn").startswith("mssql") def load_dialect_impl(self, dialect) -> "TypeEngine": if self.db_supports_json(): diff --git a/airflow/www/app.py b/airflow/www/app.py index 67f1cc6c84c47..21b595e981513 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -77,12 +77,12 @@ def create_app(config=None, testing=False): flask_app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True) flask_app.config['APP_NAME'] = conf.get(section="webserver", key="instance_name", fallback="Airflow") flask_app.config['TESTING'] = testing - flask_app.config['SQLALCHEMY_DATABASE_URI'] = conf.get('core', 'SQL_ALCHEMY_CONN') + flask_app.config['SQLALCHEMY_DATABASE_URI'] = conf.get('database', 'SQL_ALCHEMY_CONN') url = make_url(flask_app.config['SQLALCHEMY_DATABASE_URI']) if url.drivername == 'sqlite' and url.database and not url.database.startswith('/'): raise AirflowConfigException( - f'Cannot use relative path: `{conf.get("core", "SQL_ALCHEMY_CONN")}` to connect to sqlite. ' + f'Cannot use relative path: `{conf.get("database", "SQL_ALCHEMY_CONN")}` to connect to sqlite. ' 'Please use absolute path such as `sqlite:////tmp/airflow.db`.' ) diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index 78e03c4f4fa40..2b49b848ad273 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -44,6 +44,7 @@ If release name contains chart name it will be used as a full name. name: {{ template "fernet_key_secret" . }} key: fernet-key {{- end }} + # For Airflow <2.3, backward compatibility; moved to [database] in 2.3 {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CORE__SQL_ALCHEMY_CONN }} - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN valueFrom: @@ -51,6 +52,13 @@ If release name contains chart name it will be used as a full name. name: {{ template "airflow_metadata_secret" . }} key: connection {{- end }} + {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__DATABASE__SQL_ALCHEMY_CONN }} + - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: {{ template "airflow_metadata_secret" . }} + key: connection + {{- end }} {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW_CONN_AIRFLOW_DB }} - name: AIRFLOW_CONN_AIRFLOW_DB valueFrom: diff --git a/chart/values.schema.json b/chart/values.schema.json index 5baa5a6f2d0e8..a35b9e109a565 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -741,6 +741,11 @@ "type": "boolean", "default": true }, + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": { + "description": "Enable ``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN`` variable to be read from the Metadata Secret", + "type": "boolean", + "default": true + }, "AIRFLOW_CONN_AIRFLOW_DB": { "description": "Enable ``AIRFLOW_CONN_AIRFLOW_DB`` variable to be read from the Metadata Secret", "type": "boolean", @@ -2710,7 +2715,7 @@ "x-docsSection": "Common", "default": null, "examples": [ - "from airflow import configuration as conf\n\n# The SQLAlchemy connection string.\nSQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')\n\n# Flask-WTF flag for CSRF\nCSRF_ENABLED = True" + "from airflow import configuration as conf\n\n# The SQLAlchemy connection string.\nSQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN')\n\n# Flask-WTF flag for CSRF\nCSRF_ENABLED = True" ] }, "service": { diff --git a/chart/values.yaml b/chart/values.yaml index 2f06f98f67151..f9cf985e1e15e 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -245,7 +245,9 @@ secret: [] # in this case disable setting of those variables by setting the relevant configuration to false. enableBuiltInSecretEnvVars: AIRFLOW__CORE__FERNET_KEY: true + # For Airflow <2.3, backward compatibility; moved to [database] in 2.3 AIRFLOW__CORE__SQL_ALCHEMY_CONN: true + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: true AIRFLOW_CONN_AIRFLOW_DB: true AIRFLOW__WEBSERVER__SECRET_KEY: true AIRFLOW__CELERY__CELERY_RESULT_BACKEND: true @@ -888,7 +890,7 @@ webserver: # from airflow import configuration as conf # # The SQLAlchemy connection string. - # SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN') + # SQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN') # # Flask-WTF flag for CSRF # CSRF_ENABLED = True diff --git a/docs/apache-airflow/cli-and-env-variables-ref.rst b/docs/apache-airflow/cli-and-env-variables-ref.rst index d62412c6393ed..5bcaec4afaaa9 100644 --- a/docs/apache-airflow/cli-and-env-variables-ref.rst +++ b/docs/apache-airflow/cli-and-env-variables-ref.rst @@ -61,7 +61,7 @@ Environment Variables This is only supported by the following config options: -* ``sql_alchemy_conn`` in ``[core]`` section +* ``sql_alchemy_conn`` in ``[database]`` section * ``fernet_key`` in ``[core]`` section * ``broker_url`` in ``[celery]`` section * ``flower_basic_auth`` in ``[celery]`` section diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst index a31885b2da1b5..906e436e4134b 100644 --- a/docs/apache-airflow/howto/set-config.rst +++ b/docs/apache-airflow/howto/set-config.rst @@ -29,21 +29,21 @@ For example, the metadata database connection string can either be set in ``airf .. code-block:: ini - [core] + [database] sql_alchemy_conn = my_conn_string or by creating a corresponding environment variable: .. code-block:: bash - export AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string + export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=my_conn_string You can also derive the connection string at run time by appending ``_cmd`` to the key like this: .. code-block:: ini - [core] + [database] sql_alchemy_conn_cmd = bash_command_to_run You can also derive the connection string at run time by appending ``_secret`` to @@ -51,18 +51,18 @@ the key like this: .. code-block:: ini - [core] + [database] sql_alchemy_conn_secret = sql_alchemy_conn # You can also add a nested path # example: - # sql_alchemy_conn_secret = core/sql_alchemy_conn + # sql_alchemy_conn_secret = database/sql_alchemy_conn This will retrieve config option from Secret Backends e.g Hashicorp Vault. See :ref:`Secrets Backends` for more details. The following config options support this ``_cmd`` and ``_secret`` version: -* ``sql_alchemy_conn`` in ``[core]`` section +* ``sql_alchemy_conn`` in ``[database]`` section * ``fernet_key`` in ``[core]`` section * ``broker_url`` in ``[celery]`` section * ``flower_basic_auth`` in ``[celery]`` section @@ -76,14 +76,14 @@ the same way the usual config options can. For example: .. code-block:: bash - export AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash_command_to_run + export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD=bash_command_to_run Similarly, ``_secret`` config options can also be set using a corresponding environment variable. For example: .. code-block:: bash - export AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET=sql_alchemy_conn + export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_SECRET=sql_alchemy_conn .. note:: The config options must follow the config prefix naming convention defined within the secrets backend. This means that ``sql_alchemy_conn`` is not defined with a connection prefix, but with config prefix. For example it should be named as ``airflow/config/sql_alchemy_conn`` @@ -92,9 +92,9 @@ The idea behind this is to not store passwords on boxes in plain text files. The universal order of precedence for all configuration options is as follows: -#. set as an environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN``) -#. set as a command environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD``) -#. set as a secret environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET``) +#. set as an environment variable (``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN``) +#. set as a command environment variable (``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD``) +#. set as a secret environment variable (``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_SECRET``) #. set in ``airflow.cfg`` #. command in ``airflow.cfg`` #. secret key in ``airflow.cfg`` diff --git a/docs/apache-airflow/howto/set-up-database.rst b/docs/apache-airflow/howto/set-up-database.rst index e848b741a0f9b..749ad774551fb 100644 --- a/docs/apache-airflow/howto/set-up-database.rst +++ b/docs/apache-airflow/howto/set-up-database.rst @@ -44,18 +44,18 @@ Database URI ------------ Airflow uses SQLAlchemy to connect to the database, which requires you to configure the Database URL. -You can do this in option ``sql_alchemy_conn`` in section ``[core]``. It is also common to configure -this option with ``AIRFLOW__CORE__SQL_ALCHEMY_CONN`` environment variable. +You can do this in option ``sql_alchemy_conn`` in section ``[database]``. It is also common to configure +this option with ``AIRFLOW__DATABE__SQL_ALCHEMY_CONN`` environment variable. .. note:: For more information on setting the configuration, see :doc:`/howto/set-config`. -If you want to check the current value, you can use ``airflow config get-value core sql_alchemy_conn`` command as in +If you want to check the current value, you can use ``airflow config get-value database sql_alchemy_conn`` command as in the example below. .. code-block:: bash - $ airflow config get-value core sql_alchemy_conn + $ airflow config get-value database sql_alchemy_conn sqlite:////tmp/airflow/airflow.db The exact format description is described in the SQLAlchemy documentation, see `Database Urls `__. We will also show you some examples below. @@ -228,7 +228,7 @@ For more information regarding setup of the PostgreSQL connection, see `PostgreS services will close idle connections after some time of inactivity (typically 300 seconds), which results with error ``The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detected``. The ``keepalive`` settings can be changed via ``sql_alchemy_connect_args`` configuration parameter - :doc:`../configurations-ref` in ``[core]`` section. You can configure the args for example in your + :doc:`../configurations-ref` in ``[database]`` section. You can configure the args for example in your local_settings.py and the ``sql_alchemy_connect_args`` should be a full import path to the dictionary that stores the configuration parameters. You can read about `Postgres Keepalives `_. diff --git a/docs/apache-airflow/production-deployment.rst b/docs/apache-airflow/production-deployment.rst index c2076040fb511..c67da9a50c4b6 100644 --- a/docs/apache-airflow/production-deployment.rst +++ b/docs/apache-airflow/production-deployment.rst @@ -93,7 +93,7 @@ e.g. metadata DB, password, etc. You can accomplish this using the format :envva .. code-block:: bash - AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_id + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=my_conn_id AIRFLOW__WEBSERVER__BASE_URL=http://host:port Some configurations such as the Airflow Backend connection URI can be derived from bash commands as well: diff --git a/docs/apache-airflow/start/docker-compose.yaml b/docs/apache-airflow/start/docker-compose.yaml index 02945a034c797..b052ea708f60c 100644 --- a/docs/apache-airflow/start/docker-compose.yaml +++ b/docs/apache-airflow/start/docker-compose.yaml @@ -49,6 +49,8 @@ x-airflow-common: environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 diff --git a/docs/docker-stack/README.md b/docs/docker-stack/README.md index 96010fed98c2a..430eefb9294fa 100644 --- a/docs/docker-stack/README.md +++ b/docs/docker-stack/README.md @@ -64,7 +64,7 @@ are in default in the ``/opt/airflow/dags`` folder and logs are in the ``/opt/ai The working directory is ``/opt/airflow`` by default. -If no `AIRFLOW__CORE__SQL_ALCHEMY_CONN` variable is set then SQLite database is created in +If no `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` variable is set then SQLite database is created in ``${AIRFLOW_HOME}/airflow.db``. For example commands that start Airflow see: [Executing commands](https://airflow.apache.org/docs/docker-stack/entrypoint.html#entrypoint-commands). diff --git a/docs/docker-stack/index.rst b/docs/docker-stack/index.rst index db8476f19887a..a6baecbe6171a 100644 --- a/docs/docker-stack/index.rst +++ b/docs/docker-stack/index.rst @@ -83,7 +83,7 @@ are in default in the ``/opt/airflow/dags`` folder and logs are in the ``/opt/ai The working directory is ``/opt/airflow`` by default. -If no :envvar:`AIRFLOW__CORE__SQL_ALCHEMY_CONN` variable is set then SQLite database is created in +If no :envvar:`AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` variable is set then SQLite database is created in ``${AIRFLOW_HOME}/airflow.db``. For example commands that start Airflow see: :ref:`entrypoint:commands`. diff --git a/docs/helm-chart/airflow-configuration.rst b/docs/helm-chart/airflow-configuration.rst index a703f51591d20..90ccd7d3707c5 100644 --- a/docs/helm-chart/airflow-configuration.rst +++ b/docs/helm-chart/airflow-configuration.rst @@ -42,7 +42,7 @@ configuration prior to installing and deploying the service. .. note:: - The ``AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS`` variable is not used by the Chart. Airflow Helm Chart is + The ``AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS`` variable is not used by the Chart. Airflow Helm Chart is intended to be used as production deployment and loading default connections is not supposed to be handled during Chart installation. The Chart is intended to install and configure the Apache Airflow software and create database structure, but not to fill-in the data which should be managed by the users. diff --git a/docs/helm-chart/production-guide.rst b/docs/helm-chart/production-guide.rst index 573f9ef278996..3ddd68f701582 100644 --- a/docs/helm-chart/production-guide.rst +++ b/docs/helm-chart/production-guide.rst @@ -453,7 +453,7 @@ Here is the full list of secrets that can be disabled and replaced by ``_CMD`` a | Default secret name if secret name not specified | Use a different Kubernetes Secret | Airflow Environment Variable | +=======================================================+==========================================+==================================================+ | ``-airflow-metadata`` | ``.Values.data.metadataSecretName`` | | ``AIRFLOW_CONN_AIRFLOW_DB`` | -| | | | ``AIRFLOW__CORE__SQL_ALCHEMY_CONN`` | +| | | | ``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN`` | +-------------------------------------------------------+------------------------------------------+--------------------------------------------------+ | ``-fernet-key`` | ``.Values.fernetKeySecretName`` | ``AIRFLOW__CORE__FERNET_KEY`` | +-------------------------------------------------------+------------------------------------------+--------------------------------------------------+ diff --git a/scripts/ci/docker-compose/backend-mssql-bullseye.yml b/scripts/ci/docker-compose/backend-mssql-bullseye.yml index 4cf4d94570192..f5953a0bfef7a 100644 --- a/scripts/ci/docker-compose/backend-mssql-bullseye.yml +++ b/scripts/ci/docker-compose/backend-mssql-bullseye.yml @@ -20,7 +20,7 @@ services: airflow: environment: - BACKEND=mssql - - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=No + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=No - AIRFLOW__CELERY__RESULT_BACKEND=db+mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=No - AIRFLOW__CORE__EXECUTOR=LocalExecutor depends_on: diff --git a/scripts/ci/docker-compose/backend-mssql-buster.yml b/scripts/ci/docker-compose/backend-mssql-buster.yml index 7fc5540e467a0..8e06ffde68753 100644 --- a/scripts/ci/docker-compose/backend-mssql-buster.yml +++ b/scripts/ci/docker-compose/backend-mssql-buster.yml @@ -20,7 +20,7 @@ services: airflow: environment: - BACKEND=mssql - - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+17+for+SQL+Server + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+17+for+SQL+Server - AIRFLOW__CELERY__RESULT_BACKEND=db+mssql+pyodbc://sa:Airflow123@mssql:1433/airflow?driver=ODBC+Driver+17+for+SQL+Server - AIRFLOW__CORE__EXECUTOR=LocalExecutor depends_on: diff --git a/scripts/ci/docker-compose/backend-mysql.yml b/scripts/ci/docker-compose/backend-mysql.yml index 7bca2235d4d5e..aaef6b8c578e2 100644 --- a/scripts/ci/docker-compose/backend-mysql.yml +++ b/scripts/ci/docker-compose/backend-mysql.yml @@ -20,7 +20,7 @@ services: airflow: environment: - BACKEND=mysql - - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow?charset=utf8mb4 + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow?charset=utf8mb4 - AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow?charset=utf8mb4 - AIRFLOW__CORE__EXECUTOR=LocalExecutor depends_on: diff --git a/scripts/ci/docker-compose/backend-postgres.yml b/scripts/ci/docker-compose/backend-postgres.yml index b0a839ed3fd85..4f7374768fb4a 100644 --- a/scripts/ci/docker-compose/backend-postgres.yml +++ b/scripts/ci/docker-compose/backend-postgres.yml @@ -20,7 +20,7 @@ services: airflow: environment: - BACKEND=postgres - - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:airflow@postgres/airflow + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:airflow@postgres/airflow - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://postgres:airflow@postgres/airflow - AIRFLOW__CORE__EXECUTOR=LocalExecutor depends_on: diff --git a/scripts/ci/docker-compose/backend-sqlite.yml b/scripts/ci/docker-compose/backend-sqlite.yml index 934e7fc7e0790..947023be46a92 100644 --- a/scripts/ci/docker-compose/backend-sqlite.yml +++ b/scripts/ci/docker-compose/backend-sqlite.yml @@ -20,7 +20,7 @@ services: airflow: environment: - BACKEND=sqlite - - AIRFLOW__CORE__SQL_ALCHEMY_CONN=${SQLITE_URL} + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=${SQLITE_URL} - AIRFLOW__CORE__EXECUTOR=SequentialExecutor volumes: - /dev/urandom:/dev/random # Required to get non-blocking entropy source diff --git a/tests/charts/test_airflow_common.py b/tests/charts/test_airflow_common.py index 460916877b989..8fd1576a9f9d2 100644 --- a/tests/charts/test_airflow_common.py +++ b/tests/charts/test_airflow_common.py @@ -224,6 +224,7 @@ def test_should_disable_some_variables(self): values={ "enableBuiltInSecretEnvVars": { "AIRFLOW__CORE__SQL_ALCHEMY_CONN": False, + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": False, "AIRFLOW__WEBSERVER__SECRET_KEY": False, "AIRFLOW__CELERY__RESULT_BACKEND": False, "AIRFLOW__ELASTICSEARCH__HOST": False, @@ -263,6 +264,7 @@ def test_have_all_variables(self): expected_vars = [ 'AIRFLOW__CORE__FERNET_KEY', 'AIRFLOW__CORE__SQL_ALCHEMY_CONN', + 'AIRFLOW__DATABASE__SQL_ALCHEMY_CONN', 'AIRFLOW_CONN_AIRFLOW_DB', 'AIRFLOW__WEBSERVER__SECRET_KEY', 'AIRFLOW__CELERY__CELERY_RESULT_BACKEND', diff --git a/tests/cli/commands/test_info_command.py b/tests/cli/commands/test_info_command.py index 0f04722265f0f..cd9b5adda574d 100644 --- a/tests/cli/commands/test_info_command.py +++ b/tests/cli/commands/test_info_command.py @@ -96,7 +96,7 @@ def unique_items(items): ("core", "dags_folder"): "TEST_DAGS_FOLDER", ("core", "plugins_folder"): "TEST_PLUGINS_FOLDER", ("logging", "base_log_folder"): "TEST_LOG_FOLDER", - ('core', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', + ('database', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', ('logging', 'remote_logging'): 'True', ('logging', 'remote_base_log_folder'): 's3://logs-name', } @@ -144,7 +144,7 @@ def test_tools_info(self): @conf_vars( { - ('core', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', + ('database', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', } ) def test_show_info(self): @@ -157,7 +157,7 @@ def test_show_info(self): @conf_vars( { - ('core', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', + ('database', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', } ) def test_show_info_anonymize(self): @@ -177,7 +177,7 @@ def setup_parser(): class TestInfoCommandMockHttpx: @conf_vars( { - ('core', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', + ('database', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', } ) def test_show_info_anonymize_fileio(self, httpx_mock, setup_parser): diff --git a/tests/core/test_config_templates.py b/tests/core/test_config_templates.py index e69f38f75b215..09b24bd4328e6 100644 --- a/tests/core/test_config_templates.py +++ b/tests/core/test_config_templates.py @@ -26,6 +26,7 @@ DEFAULT_AIRFLOW_SECTIONS = [ 'core', + 'database', "logging", "metrics", 'secrets', @@ -58,6 +59,7 @@ DEFAULT_TEST_SECTIONS = [ 'core', + 'database', "logging", 'cli', 'api', diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 68efdbbb911f8..839877b8fb658 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -126,7 +126,7 @@ def test_conf_as_dict_source(self): # test display_source cfg_dict = conf.as_dict(display_source=True) assert cfg_dict['core']['load_examples'][1] == 'airflow.cfg' - assert cfg_dict['core']['load_default_connections'][1] == 'airflow.cfg' + assert cfg_dict['database']['load_default_connections'][1] == 'airflow.cfg' assert cfg_dict['testsection']['testkey'] == ('< hidden >', 'env var') def test_conf_as_dict_sensitive(self): @@ -810,12 +810,12 @@ def test_as_dict_works_without_sensitive_cmds(self): ) def test_as_dict_respects_sensitive_cmds(self): - conf_conn = conf['core']['sql_alchemy_conn'] + conf_conn = conf['database']['sql_alchemy_conn'] test_conf = copy.deepcopy(conf) test_conf.read_string( textwrap.dedent( """ - [core] + [database] sql_alchemy_conn_cmd = echo -n my-super-secret-conn """ ) @@ -824,20 +824,20 @@ def test_as_dict_respects_sensitive_cmds(self): conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True) conf_maintain_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=False) - assert 'sql_alchemy_conn' in conf_materialize_cmds['core'] - assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['core'] + assert 'sql_alchemy_conn' in conf_materialize_cmds['database'] + assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['database'] - if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']: - assert conf_materialize_cmds['core']['sql_alchemy_conn'] == 'my-super-secret-conn' + if conf_conn == test_conf.airflow_defaults['database']['sql_alchemy_conn']: + assert conf_materialize_cmds['database']['sql_alchemy_conn'] == 'my-super-secret-conn' - assert 'sql_alchemy_conn_cmd' in conf_maintain_cmds['core'] - assert conf_maintain_cmds['core']['sql_alchemy_conn_cmd'] == 'echo -n my-super-secret-conn' + assert 'sql_alchemy_conn_cmd' in conf_maintain_cmds['database'] + assert conf_maintain_cmds['database']['sql_alchemy_conn_cmd'] == 'echo -n my-super-secret-conn' - if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']: - assert 'sql_alchemy_conn' not in conf_maintain_cmds['core'] + if conf_conn == test_conf.airflow_defaults['database']['sql_alchemy_conn']: + assert 'sql_alchemy_conn' not in conf_maintain_cmds['database'] else: - assert 'sql_alchemy_conn' in conf_maintain_cmds['core'] - assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn + assert 'sql_alchemy_conn' in conf_maintain_cmds['database'] + assert conf_maintain_cmds['database']['sql_alchemy_conn'] == conf_conn def test_gettimedelta(self): test_config = ''' diff --git a/tests/core/test_sqlalchemy_config.py b/tests/core/test_sqlalchemy_config.py index 37d4cfe895c6e..4dd154f0c2c70 100644 --- a/tests/core/test_sqlalchemy_config.py +++ b/tests/core/test_sqlalchemy_config.py @@ -69,11 +69,11 @@ def test_sql_alchemy_connect_args( ): config = { ( - 'core', + 'database', 'sql_alchemy_connect_args', ): 'tests.core.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS', - ('core', 'sql_alchemy_engine_args'): '{"arg": 1}', - ('core', 'sql_alchemy_pool_enabled'): 'False', + ('database', 'sql_alchemy_engine_args'): '{"arg": 1}', + ('database', 'sql_alchemy_pool_enabled'): 'False', } with conf_vars(config): settings.configure_orm() @@ -96,8 +96,8 @@ def test_sql_alchemy_invalid_connect_args( self, mock_create_engine, mock_sessionmaker, mock_scoped_session, mock_setup_event_handlers ): config = { - ('core', 'sql_alchemy_connect_args'): 'does.not.exist', - ('core', 'sql_alchemy_pool_enabled'): 'False', + ('database', 'sql_alchemy_connect_args'): 'does.not.exist', + ('database', 'sql_alchemy_pool_enabled'): 'False', } with pytest.raises(AirflowConfigException): with conf_vars(config): diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index a85843a7bebf6..0f95dea446039 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -148,7 +148,7 @@ def test_remove_file_clears_import_error(self, tmpdir): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') manager = DagFileProcessorManager( dag_directory=tmpdir, max_runs=1, @@ -183,7 +183,7 @@ def test_max_runs_when_no_files(self): child_pipe, parent_pipe = multiprocessing.Pipe() with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder: - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') manager = DagFileProcessorManager( dag_directory=dags_folder, max_runs=1, @@ -678,7 +678,7 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') manager = DagFileProcessorManager( dag_directory=tmpdir, max_runs=1, @@ -876,7 +876,7 @@ class path, thus when reloading logging module the airflow.processor_manager # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') try: @@ -904,7 +904,7 @@ def test_parse_once(self): clear_db_dags() test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], False, async_mode) processor_agent.start() if not async_mode: @@ -926,7 +926,7 @@ def test_parse_once(self): def test_launch_process(self): test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' - async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') try: diff --git a/tests/models/test_base.py b/tests/models/test_base.py index 283fa69f792d1..2ddb33db3db68 100644 --- a/tests/models/test_base.py +++ b/tests/models/test_base.py @@ -31,17 +31,17 @@ param( "mysql://host/the_database", {"collation": "ascii"}, - {('core', 'sql_engine_collation_for_ids'): 'ascii'}, + {('database', 'sql_engine_collation_for_ids'): 'ascii'}, id="mysql with explicit config", ), param( "postgresql://host/the_database", {"collation": "ascii"}, - {('core', 'sql_engine_collation_for_ids'): 'ascii'}, + {('database', 'sql_engine_collation_for_ids'): 'ascii'}, id="postgres with explicit config", ), ], ) def test_collation(dsn, expected, extra): - with conf_vars({('core', 'sql_alchemy_conn'): dsn, **extra}): + with conf_vars({('database', 'sql_alchemy_conn'): dsn, **extra}): assert expected == get_id_collation_args() diff --git a/tests/test_utils/perf/sql_queries.py b/tests/test_utils/perf/sql_queries.py index a142b09e02e25..13f404d948060 100644 --- a/tests/test_utils/perf/sql_queries.py +++ b/tests/test_utils/perf/sql_queries.py @@ -27,7 +27,7 @@ os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = DAG_FOLDER os.environ["AIRFLOW__DEBUG__SQLALCHEMY_STATS"] = "True" os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False" -os.environ["AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS"] = "True" +os.environ["AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS"] = "True" # Here we setup simpler logger to avoid any code changes in # Airflow core code base