diff --git a/airflow/config_templates/config.yml.schema.json b/airflow/config_templates/config.yml.schema.json index 9fef382fa448d..1b433060900c0 100644 --- a/airflow/config_templates/config.yml.schema.json +++ b/airflow/config_templates/config.yml.schema.json @@ -36,7 +36,7 @@ }, "sensitive": { "type": "boolean", - "description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}__CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values" + "description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values" } }, "required": [ diff --git a/airflow/configuration.py b/airflow/configuration.py index 91b8a4e94e512..a33b3f3998e78 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -59,6 +59,8 @@ ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]] ConfigSourcesType = Dict[str, ConfigSectionSourcesType] +ENV_VAR_PREFIX = 'AIRFLOW__' + def _parse_sqlite_version(s: str) -> Tuple[int, ...]: match = _SQLITE3_VERSION_PATTERN.match(s) @@ -144,7 +146,7 @@ class AirflowConfigParser(ConfigParser): """Custom Airflow Configparser supporting defaults and deprecated options""" # These configuration elements can be fetched as the stdout of commands - # following the "{section}__{name}__cmd" pattern, the idea behind this + # following the "{section}__{name}_cmd" pattern, the idea behind this # is to not store password on boxes in text files. # These configs can also be fetched from Secrets backend # following the "{section}__{name}__secret" pattern @@ -435,10 +437,8 @@ def _create_future_warning(name: str, section: str, current_value: Any, new_valu FutureWarning, ) - ENV_VAR_PREFIX = 'AIRFLOW__' - def _env_var_name(self, section: str, key: str) -> str: - return f'{self.ENV_VAR_PREFIX}{section.upper()}__{key.upper()}' + return f'{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}' def _get_env_var_option(self, section: str, key: str): # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore) @@ -461,23 +461,53 @@ def _get_env_var_option(self, section: str, key: str): def _get_cmd_option(self, section: str, key: str): fallback_key = key + '_cmd' - # if this is a valid command key... if (section, key) in self.sensitive_config_values: if super().has_option(section, fallback_key): command = super().get(section, fallback_key) return run_command(command) return None + def _get_cmd_option_from_config_sources( + self, config_sources: ConfigSourcesType, section: str, key: str + ) -> Optional[str]: + fallback_key = key + '_cmd' + if (section, key) in self.sensitive_config_values: + section_dict = config_sources.get(section) + if section_dict is not None: + command_value = section_dict.get(fallback_key) + if command_value is not None: + if isinstance(command_value, str): + command = command_value + else: + command = command_value[0] + return run_command(command) + return None + def _get_secret_option(self, section: str, key: str) -> Optional[str]: """Get Config option values from Secret Backend""" fallback_key = key + '_secret' - # if this is a valid secret key... if (section, key) in self.sensitive_config_values: if super().has_option(section, fallback_key): secrets_path = super().get(section, fallback_key) return _get_config_value_from_secret_backend(secrets_path) return None + def _get_secret_option_from_config_sources( + self, config_sources: ConfigSourcesType, section: str, key: str + ) -> Optional[str]: + fallback_key = key + '_secret' + if (section, key) in self.sensitive_config_values: + section_dict = config_sources.get(section) + if section_dict is not None: + secrets_path_value = section_dict.get(fallback_key) + if secrets_path_value is not None: + if isinstance(secrets_path_value, str): + secrets_path = secrets_path_value + else: + secrets_path = secrets_path_value[0] + return _get_config_value_from_secret_backend(secrets_path) + return None + def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: value = self.get(section, key, **kwargs) if value is None: @@ -859,7 +889,16 @@ def as_dict( ('airflow.cfg', self), ] - self._replace_config_with_display_sources(config_sources, configs, display_source, raw) + self._replace_config_with_display_sources( + config_sources, + configs, + display_source, + raw, + self.deprecated_options, + include_cmds=include_cmds, + include_env=include_env, + include_secret=include_secret, + ) # add env vars and overwrite because they have priority if include_env: @@ -889,7 +928,7 @@ def _include_secrets( raw: bool, ): for (section, key) in self.sensitive_config_values: - value: Optional[str] = self._get_secret_option(section, key) + value: Optional[str] = self._get_secret_option_from_config_sources(config_sources, section, key) if value: if not display_sensitive: value = '< hidden >' @@ -910,17 +949,20 @@ def _include_commands( raw: bool, ): for (section, key) in self.sensitive_config_values: - opt = self._get_cmd_option(section, key) + opt = self._get_cmd_option_from_config_sources(config_sources, section, key) if not opt: continue + opt_to_set: Union[str, Tuple[str, str], None] = opt if not display_sensitive: - opt = '< hidden >' + opt_to_set = '< hidden >' if display_source: - opt = (opt, 'cmd') + opt_to_set = (str(opt_to_set), 'cmd') elif raw: - opt = opt.replace('%', '%%') - config_sources.setdefault(section, OrderedDict()).update({key: opt}) - del config_sources[section][key + '_cmd'] + opt_to_set = str(opt_to_set).replace('%', '%%') + if opt_to_set is not None: + dict_to_update: Dict[str, Union[str, Tuple[str, str]]] = {key: opt_to_set} + config_sources.setdefault(section, OrderedDict()).update(dict_to_update) + del config_sources[section][key + '_cmd'] def _include_envs( self, @@ -930,7 +972,7 @@ def _include_envs( raw: bool, ): for env_var in [ - os_environment for os_environment in os.environ if os_environment.startswith(self.ENV_VAR_PREFIX) + os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) ]: try: _, section, key = env_var.split('__', 2) @@ -1010,13 +1052,82 @@ def _replace_config_with_display_sources( configs: Iterable[Tuple[str, ConfigParser]], display_source: bool, raw: bool, + deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]], + include_env: bool, + include_cmds: bool, + include_secret: bool, ): for (source_name, config) in configs: for section in config.sections(): AirflowConfigParser._replace_section_config_with_display_sources( - config, config_sources, display_source, raw, section, source_name + config, + config_sources, + display_source, + raw, + section, + source_name, + deprecated_options, + configs, + include_env=include_env, + include_cmds=include_cmds, + include_secret=include_secret, ) + @staticmethod + def _deprecated_value_is_set_in_config( + deprecated_section: str, + deprecated_key: str, + configs: Iterable[Tuple[str, ConfigParser]], + ) -> bool: + for config_type, config in configs: + if config_type == 'default': + continue + try: + deprecated_section_array = config.items(section=deprecated_section, raw=True) + for (key_candidate, _) in deprecated_section_array: + if key_candidate == deprecated_key: + return True + except NoSectionError: + pass + return False + + @staticmethod + def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: + return ( + os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}') + is not None + ) + + @staticmethod + def _deprecated_command_is_set_in_config( + deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]] + ) -> bool: + return AirflowConfigParser._deprecated_value_is_set_in_config( + deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs + ) + + @staticmethod + def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: + return ( + os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD') + is not None + ) + + @staticmethod + def _deprecated_secret_is_set_in_config( + deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]] + ) -> bool: + return AirflowConfigParser._deprecated_value_is_set_in_config( + deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs + ) + + @staticmethod + def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: + return ( + os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET') + is not None + ) + @staticmethod def _replace_section_config_with_display_sources( config: ConfigParser, @@ -1025,9 +1136,46 @@ def _replace_section_config_with_display_sources( raw: bool, section: str, source_name: str, + deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]], + configs: Iterable[Tuple[str, ConfigParser]], + include_env: bool, + include_cmds: bool, + include_secret: bool, ): sect = config_sources.setdefault(section, OrderedDict()) for (k, val) in config.items(section=section, raw=raw): + deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) + if deprecated_section and deprecated_key: + if source_name == 'default': + # If deprecated entry has some non-default value set for any of the sources requested, + # We should NOT set default for the new entry (because it will override anything + # coming from the deprecated ones) + if AirflowConfigParser._deprecated_value_is_set_in_config( + deprecated_section, deprecated_key, configs + ): + continue + if include_env and AirflowConfigParser._deprecated_variable_is_set( + deprecated_section, deprecated_key + ): + continue + if include_cmds and ( + AirflowConfigParser._deprecated_variable_command_is_set( + deprecated_section, deprecated_key + ) + or AirflowConfigParser._deprecated_command_is_set_in_config( + deprecated_section, deprecated_key, configs + ) + ): + continue + if include_secret and ( + AirflowConfigParser._deprecated_variable_secret_is_set( + deprecated_section, deprecated_key + ) + or AirflowConfigParser._deprecated_secret_is_set_in_config( + deprecated_section, deprecated_key, configs + ) + ): + continue if display_source: sect[k] = (val, source_name) else: diff --git a/tests/config_templates/deprecated.cfg b/tests/config_templates/deprecated.cfg new file mode 100644 index 0000000000000..cab0b6e7fa2d5 --- /dev/null +++ b/tests/config_templates/deprecated.cfg @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# This is the template for Airflow's unit test configuration. When Airflow runs +# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg. +# If it doesn't exist, Airflow uses this template to generate it by replacing +# variables in curly braces with their global values from configuration.py. + +# Users should not modify this file; they should customize the generated +# unittests.cfg instead. +[core] +sql_alchemy_conn = mysql:// diff --git a/tests/config_templates/deprecated_cmd.cfg b/tests/config_templates/deprecated_cmd.cfg new file mode 100644 index 0000000000000..ff2a9268dd691 --- /dev/null +++ b/tests/config_templates/deprecated_cmd.cfg @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# This is the template for Airflow's unit test configuration. When Airflow runs +# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg. +# If it doesn't exist, Airflow uses this template to generate it by replacing +# variables in curly braces with their global values from configuration.py. + +# Users should not modify this file; they should customize the generated +# unittests.cfg instead. + +[core] +sql_alchemy_conn_cmd = echo -n "postgresql://" diff --git a/tests/config_templates/deprecated_secret.cfg b/tests/config_templates/deprecated_secret.cfg new file mode 100644 index 0000000000000..6c4cda091af6e --- /dev/null +++ b/tests/config_templates/deprecated_secret.cfg @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# This is the template for Airflow's unit test configuration. When Airflow runs +# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg. +# If it doesn't exist, Airflow uses this template to generate it by replacing +# variables in curly braces with their global values from configuration.py. + +# Users should not modify this file; they should customize the generated +# unittests.cfg instead. + +[core] +sql_alchemy_conn_secret = secret_path diff --git a/tests/config_templates/empty.cfg b/tests/config_templates/empty.cfg new file mode 100644 index 0000000000000..36906c07cfde6 --- /dev/null +++ b/tests/config_templates/empty.cfg @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# This is the template for Airflow's unit test configuration. When Airflow runs +# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg. +# If it doesn't exist, Airflow uses this template to generate it by replacing +# variables in curly braces with their global values from configuration.py. + +# Users should not modify this file; they should customize the generated +# unittests.cfg instead. diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 70a51806c4cdb..991695a969d08 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -42,6 +42,14 @@ ) from tests.test_utils.config import conf_vars from tests.test_utils.reset_warning_registry import reset_warning_registry +from tests.utils.test_config import ( + remove_all_configurations, + set_deprecated_options, + set_sensitive_config_values, + use_config, +) + +HOME_DIR = os.path.expanduser('~') @unittest.mock.patch.dict( @@ -511,89 +519,6 @@ def test_broker_transport_options(self): assert isinstance(section_dict['_test_only_float'], float) assert isinstance(section_dict['_test_only_string'], str) - @conf_vars( - { - ("celery", "worker_concurrency"): None, - ("celery", "celeryd_concurrency"): None, - } - ) - def test_deprecated_options(self): - # Guarantee we have a deprecated setting, so we test the deprecation - # lookup even if we remove this explicit fallback - conf.deprecated_options = { - ('celery', 'worker_concurrency'): ('celery', 'celeryd_concurrency', '2.0.0'), - } - - # Remove it so we are sure we use the right setting - conf.remove_option('celery', 'worker_concurrency') - - with pytest.warns(DeprecationWarning): - with mock.patch.dict('os.environ', AIRFLOW__CELERY__CELERYD_CONCURRENCY="99"): - assert conf.getint('celery', 'worker_concurrency') == 99 - - with pytest.warns(DeprecationWarning), conf_vars({('celery', 'celeryd_concurrency'): '99'}): - assert conf.getint('celery', 'worker_concurrency') == 99 - - @conf_vars( - { - ('logging', 'logging_level'): None, - ('core', 'logging_level'): None, - } - ) - def test_deprecated_options_with_new_section(self): - # Guarantee we have a deprecated setting, so we test the deprecation - # lookup even if we remove this explicit fallback - conf.deprecated_options = { - ('logging', 'logging_level'): ('core', 'logging_level', '2.0.0'), - } - - # Remove it so we are sure we use the right setting - conf.remove_option('core', 'logging_level') - conf.remove_option('logging', 'logging_level') - - with pytest.warns(DeprecationWarning): - with mock.patch.dict('os.environ', AIRFLOW__CORE__LOGGING_LEVEL="VALUE"): - assert conf.get('logging', 'logging_level') == "VALUE" - - with pytest.warns(DeprecationWarning), conf_vars({('core', 'logging_level'): 'VALUE'}): - assert conf.get('logging', 'logging_level') == "VALUE" - - @conf_vars( - { - ("celery", "result_backend"): None, - ("celery", "celery_result_backend"): None, - ("celery", "celery_result_backend_cmd"): None, - } - ) - def test_deprecated_options_cmd(self): - # Guarantee we have a deprecated setting, so we test the deprecation - # lookup even if we remove this explicit fallback - conf.deprecated_options[('celery', "result_backend")] = 'celery', 'celery_result_backend', '2.0.0' - conf.sensitive_config_values.add(('celery', 'celery_result_backend')) - - conf.remove_option('celery', 'result_backend') - with conf_vars({('celery', 'celery_result_backend_cmd'): '/bin/echo 99'}): - with pytest.warns(DeprecationWarning): - tmp = None - if 'AIRFLOW__CELERY__RESULT_BACKEND' in os.environ: - tmp = os.environ.pop('AIRFLOW__CELERY__RESULT_BACKEND') - assert conf.getint('celery', 'result_backend') == 99 - if tmp: - os.environ['AIRFLOW__CELERY__RESULT_BACKEND'] = tmp - - def test_deprecated_values_from_conf(self): - test_conf = AirflowConfigParser(default_config='') - # Guarantee we have deprecated settings, so we test the deprecation - # lookup even if we remove this explicit fallback - test_conf.deprecated_values = { - 'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')}, - } - test_conf.read_dict({'core': {'hostname_callable': 'socket:getfqdn'}}) - - with pytest.warns(FutureWarning): - test_conf.validate() - assert test_conf.get('core', 'hostname_callable') == 'socket.getfqdn' - def test_auth_backends_adds_session(self): test_conf = AirflowConfigParser(default_config='') # Guarantee we have deprecated settings, so we test the deprecation @@ -616,95 +541,6 @@ def test_auth_backends_adds_session(self): == 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' ) - @pytest.mark.parametrize( - "old, new", - [ - ( - ("api", "auth_backend", "airflow.api.auth.backend.basic_auth"), - ( - "api", - "auth_backends", - "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session", - ), - ), - ( - ("core", "sql_alchemy_conn", "postgres+psycopg2://localhost/postgres"), - ("database", "sql_alchemy_conn", "postgresql://localhost/postgres"), - ), - ], - ) - def test_deprecated_env_vars_upgraded_and_removed(self, old, new): - test_conf = AirflowConfigParser(default_config='') - old_section, old_key, old_value = old - new_section, new_key, new_value = new - old_env_var = test_conf._env_var_name(old_section, old_key) - new_env_var = test_conf._env_var_name(new_section, new_key) - - with pytest.warns(FutureWarning): - with unittest.mock.patch.dict('os.environ', **{old_env_var: old_value}): - # Can't start with the new env var existing... - os.environ.pop(new_env_var, None) - - test_conf.validate() - assert test_conf.get(new_section, new_key) == new_value - # We also need to make sure the deprecated env var is removed - # so that any subprocesses don't use it in place of our updated - # value. - assert old_env_var not in os.environ - # and make sure we track the old value as well, under the new section/key - assert test_conf.upgraded_values[(new_section, new_key)] == old_value - - @pytest.mark.parametrize( - "conf_dict", - [ - {}, # Even if the section is absent from config file, environ still needs replacing. - {'core': {'hostname_callable': 'socket:getfqdn'}}, - ], - ) - def test_deprecated_values_from_environ(self, conf_dict): - def make_config(): - test_conf = AirflowConfigParser(default_config='') - # Guarantee we have a deprecated setting, so we test the deprecation - # lookup even if we remove this explicit fallback - test_conf.deprecated_values = { - 'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')}, - } - test_conf.read_dict(conf_dict) - test_conf.validate() - return test_conf - - with pytest.warns(FutureWarning): - with unittest.mock.patch.dict('os.environ', AIRFLOW__CORE__HOSTNAME_CALLABLE='socket:getfqdn'): - test_conf = make_config() - assert test_conf.get('core', 'hostname_callable') == 'socket.getfqdn' - - with reset_warning_registry(): - with warnings.catch_warnings(record=True) as warning: - with unittest.mock.patch.dict( - 'os.environ', - AIRFLOW__CORE__HOSTNAME_CALLABLE='CarrierPigeon', - ): - test_conf = make_config() - assert test_conf.get('core', 'hostname_callable') == 'CarrierPigeon' - assert [] == warning - - def test_deprecated_funcs(self): - for func in [ - 'load_test_config', - 'get', - 'getboolean', - 'getfloat', - 'getint', - 'has_option', - 'remove_option', - 'as_dict', - 'set', - ]: - with mock.patch(f'airflow.configuration.conf.{func}') as mock_method: - with pytest.warns(DeprecationWarning): - getattr(configuration, func)() - mock_method.assert_called_once() - def test_command_from_env(self): test_cmdenv_config = '''[testcmdenv] itsacommand = NOT OK @@ -877,6 +713,20 @@ def test_as_dict_respects_sensitive_cmds(self): assert 'sql_alchemy_conn' in conf_maintain_cmds['database'] assert conf_maintain_cmds['database']['sql_alchemy_conn'] == conf_conn + @mock.patch.dict( + 'os.environ', {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True + ) + def test_as_dict_respects_sensitive_cmds_from_env(self): + test_conf = copy.deepcopy(conf) + test_conf.read_string("") + + conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True) + + assert 'sql_alchemy_conn' in conf_materialize_cmds['database'] + assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['database'] + + assert conf_materialize_cmds['database']['sql_alchemy_conn'] == 'postgresql://' + def test_gettimedelta(self): test_config = ''' [invalid] @@ -940,3 +790,474 @@ def test_gettimedelta(self): assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300) assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None)) assert test_conf.gettimedelta('default', 'key7') is None + + +class TestDeprecatedConf: + @conf_vars( + { + ("celery", "worker_concurrency"): None, + ("celery", "celeryd_concurrency"): None, + } + ) + def test_deprecated_options(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + with set_deprecated_options( + deprecated_options={('celery', 'worker_concurrency'): ('celery', 'celeryd_concurrency', '2.0.0')} + ): + # Remove it so we are sure we use the right setting + conf.remove_option('celery', 'worker_concurrency') + + with pytest.warns(DeprecationWarning): + with mock.patch.dict('os.environ', AIRFLOW__CELERY__CELERYD_CONCURRENCY="99"): + assert conf.getint('celery', 'worker_concurrency') == 99 + + with pytest.warns(DeprecationWarning), conf_vars({('celery', 'celeryd_concurrency'): '99'}): + assert conf.getint('celery', 'worker_concurrency') == 99 + + @conf_vars( + { + ('logging', 'logging_level'): None, + ('core', 'logging_level'): None, + } + ) + def test_deprecated_options_with_new_section(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + with set_deprecated_options( + deprecated_options={('logging', 'logging_level'): ('core', 'logging_level', '2.0.0')} + ): + # Remove it so we are sure we use the right setting + conf.remove_option('core', 'logging_level') + conf.remove_option('logging', 'logging_level') + + with pytest.warns(DeprecationWarning): + with mock.patch.dict('os.environ', AIRFLOW__CORE__LOGGING_LEVEL="VALUE"): + assert conf.get('logging', 'logging_level') == "VALUE" + + with pytest.warns(DeprecationWarning), conf_vars({('core', 'logging_level'): 'VALUE'}): + assert conf.get('logging', 'logging_level') == "VALUE" + + @conf_vars( + { + ("celery", "result_backend"): None, + ("celery", "celery_result_backend"): None, + ("celery", "celery_result_backend_cmd"): None, + } + ) + def test_deprecated_options_cmd(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + with set_deprecated_options( + deprecated_options={('celery', "result_backend"): ('celery', 'celery_result_backend', '2.0.0')} + ), set_sensitive_config_values(sensitive_config_values={('celery', 'celery_result_backend')}): + conf.remove_option('celery', 'result_backend') + with conf_vars({('celery', 'celery_result_backend_cmd'): '/bin/echo 99'}): + with pytest.warns(DeprecationWarning): + tmp = None + if 'AIRFLOW__CELERY__RESULT_BACKEND' in os.environ: + tmp = os.environ.pop('AIRFLOW__CELERY__RESULT_BACKEND') + assert conf.getint('celery', 'result_backend') == 99 + if tmp: + os.environ['AIRFLOW__CELERY__RESULT_BACKEND'] = tmp + + def test_deprecated_values_from_conf(self): + test_conf = AirflowConfigParser( + default_config=""" +[core] +executor=SequentialExecutor +[database] +sql_alchemy_conn=sqlite://test +""" + ) + # Guarantee we have deprecated settings, so we test the deprecation + # lookup even if we remove this explicit fallback + test_conf.deprecated_values = { + 'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')}, + } + test_conf.read_dict({'core': {'hostname_callable': 'socket:getfqdn'}}) + + with pytest.warns(FutureWarning): + test_conf.validate() + assert test_conf.get('core', 'hostname_callable') == 'socket.getfqdn' + + @pytest.mark.parametrize( + "old, new", + [ + ( + ("api", "auth_backend", "airflow.api.auth.backend.basic_auth"), + ( + "api", + "auth_backends", + "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session", + ), + ), + ( + ("core", "sql_alchemy_conn", "postgres+psycopg2://localhost/postgres"), + ("database", "sql_alchemy_conn", "postgresql://localhost/postgres"), + ), + ], + ) + def test_deprecated_env_vars_upgraded_and_removed(self, old, new): + test_conf = AirflowConfigParser( + default_config=""" +[core] +executor=SequentialExecutor +[database] +sql_alchemy_conn=sqlite://test +""" + ) + old_section, old_key, old_value = old + new_section, new_key, new_value = new + old_env_var = test_conf._env_var_name(old_section, old_key) + new_env_var = test_conf._env_var_name(new_section, new_key) + + with pytest.warns(FutureWarning): + with unittest.mock.patch.dict('os.environ', **{old_env_var: old_value}): + # Can't start with the new env var existing... + os.environ.pop(new_env_var, None) + + test_conf.validate() + assert test_conf.get(new_section, new_key) == new_value + # We also need to make sure the deprecated env var is removed + # so that any subprocesses don't use it in place of our updated + # value. + assert old_env_var not in os.environ + # and make sure we track the old value as well, under the new section/key + assert test_conf.upgraded_values[(new_section, new_key)] == old_value + + @pytest.mark.parametrize( + "conf_dict", + [ + {}, # Even if the section is absent from config file, environ still needs replacing. + {'core': {'hostname_callable': 'socket:getfqdn'}}, + ], + ) + def test_deprecated_values_from_environ(self, conf_dict): + def make_config(): + test_conf = AirflowConfigParser( + default_config=""" +[core] +executor=SequentialExecutor +[database] +sql_alchemy_conn=sqlite://test +""" + ) + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + test_conf.deprecated_values = { + 'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')}, + } + test_conf.read_dict(conf_dict) + test_conf.validate() + return test_conf + + with pytest.warns(FutureWarning): + with unittest.mock.patch.dict('os.environ', AIRFLOW__CORE__HOSTNAME_CALLABLE='socket:getfqdn'): + test_conf = make_config() + assert test_conf.get('core', 'hostname_callable') == 'socket.getfqdn' + + with reset_warning_registry(): + with warnings.catch_warnings(record=True) as warning: + with unittest.mock.patch.dict( + 'os.environ', + AIRFLOW__CORE__HOSTNAME_CALLABLE='CarrierPigeon', + ): + test_conf = make_config() + assert test_conf.get('core', 'hostname_callable') == 'CarrierPigeon' + assert [] == warning + + def test_deprecated_funcs(self): + for func in [ + 'load_test_config', + 'get', + 'getboolean', + 'getfloat', + 'getint', + 'has_option', + 'remove_option', + 'as_dict', + 'set', + ]: + with mock.patch(f'airflow.configuration.conf.{func}') as mock_method: + with pytest.warns(DeprecationWarning): + getattr(configuration, func)() + mock_method.assert_called_once() + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_config(self, display_source: bool): + with use_config(config="deprecated.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=False, + include_cmds=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('mysql://', "airflow.cfg") if display_source else 'mysql://' + ) + # database should be None because the deprecated value is set in config + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'mysql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_both_env_and_config(self, display_source: bool): + with use_config(config="deprecated.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_cmds=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('postgresql://', "env var") if display_source else 'postgresql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'postgresql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_both_env_and_config_exclude_env( + self, display_source: bool + ): + with use_config(config="deprecated.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=False, + include_cmds=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('mysql://', "airflow.cfg") if display_source else 'mysql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'mysql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_env(self, display_source: bool): + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, raw=True, display_sensitive=True, include_env=True + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('postgresql://', "env var") if display_source else 'postgresql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'postgresql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {}, clear=True) + def test_conf_as_dict_when_both_conf_and_env_are_empty(self, display_source: bool): + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict(display_source=display_source, raw=True, display_sensitive=True) + assert cfg_dict['core'].get('sql_alchemy_conn') is None + # database should be taken from default because the deprecated value is missing in config + assert cfg_dict['database'].get('sql_alchemy_conn') == ( + (f'sqlite:///{HOME_DIR}/airflow/airflow.db', "default") + if display_source + else f'sqlite:///{HOME_DIR}/airflow/airflow.db' + ) + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_cmd_config(self, display_source: bool): + with use_config(config="deprecated_cmd.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_cmds=True, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('postgresql://', "cmd") if display_source else 'postgresql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'postgresql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict( + 'os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True + ) + def test_conf_as_dict_when_deprecated_value_in_cmd_env(self, display_source: bool): + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_cmds=True, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('postgresql://', "cmd") if display_source else 'postgresql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'postgresql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict( + 'os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True + ) + def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_env(self, display_source: bool): + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_cmds=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') is None + assert cfg_dict['database'].get('sql_alchemy_conn') == ( + (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default') + if display_source + else f'sqlite:///{HOME_DIR}/airflow/airflow.db' + ) + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_config(self, display_source: bool): + with use_config(config="deprecated_cmd.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_cmds=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') is None + assert cfg_dict['database'].get('sql_alchemy_conn') == ( + (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default') + if display_source + else f'sqlite:///{HOME_DIR}/airflow/airflow.db' + ) + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": "secret_path'"}, clear=True) + @mock.patch("airflow.configuration.get_custom_secret_backend") + def test_conf_as_dict_when_deprecated_value_in_secrets( + self, get_custom_secret_backend, display_source: bool + ): + get_custom_secret_backend.return_value.get_config.return_value = "postgresql://" + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_secret=True, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') == ( + ('postgresql://', "secret") if display_source else 'postgresql://' + ) + # database should be None because the deprecated value is set in env value + assert cfg_dict['database'].get('sql_alchemy_conn') is None + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == 'postgresql://' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": "secret_path'"}, clear=True) + @mock.patch("airflow.configuration.get_custom_secret_backend") + def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_env( + self, get_custom_secret_backend, display_source: bool + ): + get_custom_secret_backend.return_value.get_config.return_value = "postgresql://" + with use_config(config="empty.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_secret=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') is None + assert cfg_dict['database'].get('sql_alchemy_conn') == ( + (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default') + if display_source + else f'sqlite:///{HOME_DIR}/airflow/airflow.db' + ) + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db' + + @pytest.mark.parametrize("display_source", [True, False]) + @mock.patch("airflow.configuration.get_custom_secret_backend") + @mock.patch.dict('os.environ', {}, clear=True) + def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_config( + self, get_custom_secret_backend, display_source: bool + ): + get_custom_secret_backend.return_value.get_config.return_value = "postgresql://" + with use_config(config="deprecated_secret.cfg"): + cfg_dict = conf.as_dict( + display_source=display_source, + raw=True, + display_sensitive=True, + include_env=True, + include_secret=False, + ) + assert cfg_dict['core'].get('sql_alchemy_conn') is None + assert cfg_dict['database'].get('sql_alchemy_conn') == ( + (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default') + if display_source + else f'sqlite:///{HOME_DIR}/airflow/airflow.db' + ) + if not display_source: + remove_all_configurations() + conf.read_dict(dictionary=cfg_dict) + os.environ.clear() + assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db' diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py new file mode 100644 index 0000000000000..4728d33b4bd7d --- /dev/null +++ b/tests/utils/test_config.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from contextlib import contextmanager +from pathlib import Path +from typing import Dict, Set, Tuple + +from airflow.configuration import conf + +log = logging.getLogger(__name__) + +# TODO(potiuk) change the tests use better approach sing Pytest fixtures rather than +# `unit_test_mode` parameter. It's potentially disruptive so we should not do it **JUST** yet + + +def remove_all_configurations(): + old_sections, old_proxies = (conf._sections, conf._proxies) + conf._sections = {} + conf._proxies = {} + return old_sections, old_proxies + + +def restore_all_configurations(sections: Dict, proxies: Dict): + conf._sections = sections # type: ignore + conf._proxies = proxies # type: ignore + + +@contextmanager +def use_config(config: str): + """ + Temporary load the deprecated test configuration. + """ + sections, proxies = remove_all_configurations() + conf.read(str(Path(__file__).parents[1] / "config_templates" / config)) + try: + yield + finally: + restore_all_configurations(sections, proxies) + + +@contextmanager +def set_deprecated_options(deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]]): + """ + Temporary replaces deprecated options with the ones provided. + """ + old_deprecated_options = conf.deprecated_options + conf.deprecated_options = deprecated_options + try: + yield + finally: + conf.deprecated_options = old_deprecated_options + + +@contextmanager +def set_sensitive_config_values(sensitive_config_values: Set[Tuple[str, str]]): + """ + Temporary replaces sensitive values with the ones provided. + """ + old_sensitive_config_values = conf.sensitive_config_values + conf.sensitive_config_values = sensitive_config_values + try: + yield + finally: + conf.sensitive_config_values = old_sensitive_config_values