Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}__CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our docs were lying aboug __ prefix for CMD :).

}
},
"required": [
Expand Down
180 changes: 164 additions & 16 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
ConfigSourcesType = Dict[str, ConfigSectionSourcesType]

ENV_VAR_PREFIX = 'AIRFLOW__'


def _parse_sqlite_version(s: str) -> Tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
Expand Down Expand Up @@ -144,7 +146,7 @@ class AirflowConfigParser(ConfigParser):
"""Custom Airflow Configparser supporting defaults and deprecated options"""

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}__cmd" pattern, the idea behind this
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
# These configs can also be fetched from Secrets backend
# following the "{section}__{name}__secret" pattern
Expand Down Expand Up @@ -435,10 +437,8 @@ def _create_future_warning(name: str, section: str, current_value: Any, new_valu
FutureWarning,
)

ENV_VAR_PREFIX = 'AIRFLOW__'

def _env_var_name(self, section: str, key: str) -> str:
return f'{self.ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'
return f'{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'

def _get_env_var_option(self, section: str, key: str):
# must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
Expand All @@ -461,23 +461,53 @@ def _get_env_var_option(self, section: str, key: str):

def _get_cmd_option(self, section: str, key: str):
fallback_key = key + '_cmd'
# if this is a valid command key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
command = super().get(section, fallback_key)
return run_command(command)
return None

def _get_cmd_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_cmd'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
command_value = section_dict.get(fallback_key)
if command_value is not None:
if isinstance(command_value, str):
command = command_value
else:
command = command_value[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what cases does this end up as a list rather than a single string?

Copy link
Member Author

@potiuk potiuk May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a Tuple of [value, source] potentially - when ConfigSourceType is produced wuith "display_source=True", the section.get(key) will return a Tuple.

This is the main reason why as prerequisite of this change I had to add Typing to conf: #23716 becuase it was very difficult to reason what types of values are returned where.

So the change here is really not anything I "knew" about - it's more MyPy telling me that this might be either string or Tuple[str, str].

Copy link
Member Author

@potiuk potiuk May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW. Yeah. I would have done it differently actually and this whole conf requires rewriting at some point of time because the complexity of it grew over time with adding defaults, secrets, commands, displaying sources and finally deprecations which made it terribly complex overall - it took me good few hours to understand all the paths and understand how we should handle the deprecations to not fall into the trap of getting the "default" when there is a deprecation.

So this whole part I think needs to be rewritten - but for 2.3.1 I think this is the minimum set of changes and very comprehensive test coverage covering all the tests cases I could possibly imagine so that we can actually safely cherry-pick that one to 2.3.1 (possibly).

return run_command(command)
return None

def _get_secret_option(self, section: str, key: str) -> Optional[str]:
"""Get Config option values from Secret Backend"""
fallback_key = key + '_secret'
# if this is a valid secret key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
secrets_path = super().get(section, fallback_key)
return _get_config_value_from_secret_backend(secrets_path)
return None

def _get_secret_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_secret'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
secrets_path_value = section_dict.get(fallback_key)
if secrets_path_value is not None:
if isinstance(secrets_path_value, str):
secrets_path = secrets_path_value
else:
secrets_path = secrets_path_value[0]
return _get_config_value_from_secret_backend(secrets_path)
return None

def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
value = self.get(section, key, **kwargs)
if value is None:
Expand Down Expand Up @@ -859,7 +889,16 @@ def as_dict(
('airflow.cfg', self),
]

self._replace_config_with_display_sources(config_sources, configs, display_source, raw)
self._replace_config_with_display_sources(
config_sources,
configs,
display_source,
raw,
self.deprecated_options,
include_cmds=include_cmds,
include_env=include_env,
include_secret=include_secret,
)

# add env vars and overwrite because they have priority
if include_env:
Expand Down Expand Up @@ -889,7 +928,7 @@ def _include_secrets(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
value: Optional[str] = self._get_secret_option(section, key)
value: Optional[str] = self._get_secret_option_from_config_sources(config_sources, section, key)
if value:
if not display_sensitive:
value = '< hidden >'
Expand All @@ -910,17 +949,20 @@ def _include_commands(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
opt = self._get_cmd_option(section, key)
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
if not opt:
continue
opt_to_set: Union[str, Tuple[str, str], None] = opt
if not display_sensitive:
opt = '< hidden >'
opt_to_set = '< hidden >'
if display_source:
opt = (opt, 'cmd')
opt_to_set = (str(opt_to_set), 'cmd')
elif raw:
opt = opt.replace('%', '%%')
config_sources.setdefault(section, OrderedDict()).update({key: opt})
del config_sources[section][key + '_cmd']
opt_to_set = str(opt_to_set).replace('%', '%%')
if opt_to_set is not None:
dict_to_update: Dict[str, Union[str, Tuple[str, str]]] = {key: opt_to_set}
config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
del config_sources[section][key + '_cmd']

def _include_envs(
self,
Expand All @@ -930,7 +972,7 @@ def _include_envs(
raw: bool,
):
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith(self.ENV_VAR_PREFIX)
os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
]:
try:
_, section, key = env_var.split('__', 2)
Expand Down Expand Up @@ -1010,13 +1052,82 @@ def _replace_config_with_display_sources(
configs: Iterable[Tuple[str, ConfigParser]],
display_source: bool,
raw: bool,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
for (source_name, config) in configs:
for section in config.sections():
AirflowConfigParser._replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
config,
config_sources,
display_source,
raw,
section,
source_name,
deprecated_options,
configs,
include_env=include_env,
include_cmds=include_cmds,
include_secret=include_secret,
)

@staticmethod
def _deprecated_value_is_set_in_config(
deprecated_section: str,
deprecated_key: str,
configs: Iterable[Tuple[str, ConfigParser]],
) -> bool:
for config_type, config in configs:
if config_type == 'default':
continue
try:
deprecated_section_array = config.items(section=deprecated_section, raw=True)
for (key_candidate, _) in deprecated_section_array:
if key_candidate == deprecated_key:
return True
except NoSectionError:
pass
return False

@staticmethod
def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}')
is not None
)

@staticmethod
def _deprecated_command_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
)

@staticmethod
def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD')
is not None
)

@staticmethod
def _deprecated_secret_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
)

@staticmethod
def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET')
Copy link
Member

@ashb ashb May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use _var_var_name() for consistency.

is not None
)

@staticmethod
def _replace_section_config_with_display_sources(
config: ConfigParser,
Expand All @@ -1025,9 +1136,46 @@ def _replace_section_config_with_display_sources(
raw: bool,
section: str,
source_name: str,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
configs: Iterable[Tuple[str, ConfigParser]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
sect = config_sources.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
if deprecated_section and deprecated_key:
if source_name == 'default':
# If deprecated entry has some non-default value set for any of the sources requested,
# We should NOT set default for the new entry (because it will override anything
# coming from the deprecated ones)
if AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section, deprecated_key, configs
):
continue
if include_env and AirflowConfigParser._deprecated_variable_is_set(
deprecated_section, deprecated_key
):
continue
if include_cmds and (
AirflowConfigParser._deprecated_variable_command_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_command_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if include_secret and (
AirflowConfigParser._deprecated_variable_secret_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_secret_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if display_source:
sect[k] = (val, source_name)
else:
Expand Down
28 changes: 28 additions & 0 deletions tests/config_templates/deprecated.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.
[core]
sql_alchemy_conn = mysql://
29 changes: 29 additions & 0 deletions tests/config_templates/deprecated_cmd.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_cmd = echo -n "postgresql://"
29 changes: 29 additions & 0 deletions tests/config_templates/deprecated_secret.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_secret = secret_path
Loading