Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1670,12 +1670,13 @@
or insert it into a database (depending of the backend)
This status is used by the scheduler to update the state of the task
The use of a database is highly recommended
When not specified, sql_alchemy_conn with a db+ scheme prefix will be used
http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
version_added: ~
type: string
sensitive: true
example: ~
default: "db+postgresql://postgres:airflow@postgres/airflow"
example: "db+postgresql://postgres:airflow@postgres/airflow"
default: ~
- name: flower_host
description: |
Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
Expand Down
4 changes: 3 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,10 @@ broker_url = redis://redis:6379/0
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# When not specified, sql_alchemy_conn with a db+ scheme prefix will be used
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+postgresql://postgres:airflow@postgres/airflow
# Example: result_backend = db+postgresql://postgres:airflow@postgres/airflow
# result_backend =

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it ``airflow celery flower``. This defines the IP that Celery Flower runs on
Expand Down
9 changes: 7 additions & 2 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def _broker_supports_visibility_timeout(url):
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options['visibility_timeout'] = 21600

if conf.has_option("celery", 'RESULT_BACKEND'):
result_backend = conf.get_mandatory_value('celery', 'RESULT_BACKEND')
else:
log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")
result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}'

DEFAULT_CELERY_CONFIG = {
'accept_content': ['json'],
'event_serializer': 'json',
Expand All @@ -46,7 +52,7 @@ def _broker_supports_visibility_timeout(url):
'task_track_started': conf.getboolean('celery', 'task_track_started'),
'broker_url': broker_url,
'broker_transport_options': broker_transport_options,
'result_backend': conf.get('celery', 'RESULT_BACKEND'),
'result_backend': result_backend,
'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
'worker_enable_remote_control': conf.getboolean('celery', 'worker_enable_remote_control'),
}
Expand Down Expand Up @@ -92,7 +98,6 @@ def _broker_supports_visibility_timeout(url):
f'all necessary certs and key ({e}).'
)

result_backend = str(DEFAULT_CELERY_CONFIG['result_backend'])
if 'amqp://' in result_backend or 'redis://' in result_backend or 'rpc://' in result_backend:
log.warning(
"You have configured a result_backend of %s, it is highly recommended "
Expand Down
2 changes: 2 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ If release name contains chart name it will be used as a full name.
key: webserver-secret-key
{{- end }}
{{- if or (eq .Values.executor "CeleryExecutor") (eq .Values.executor "CeleryKubernetesExecutor") }}
{{- if or (semverCompare "<2.4.0" .Values.airflowVersion) (.Values.data.resultBackendSecretName) (.Values.data.resultBackendConnection) }}
{{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CELERY__CELERY_RESULT_BACKEND }}
# (Airflow 1.10.* variant)
- name: AIRFLOW__CELERY__CELERY_RESULT_BACKEND
Expand All @@ -89,6 +90,7 @@ If release name contains chart name it will be used as a full name.
name: {{ template "airflow_result_backend_secret" . }}
key: connection
{{- end }}
{{- end }}
{{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CELERY__BROKER_URL }}
- name: AIRFLOW__CELERY__BROKER_URL
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#################################
{{- if not .Values.data.resultBackendSecretName }}
{{- if or (eq .Values.executor "CeleryExecutor") (eq .Values.executor "CeleryKubernetesExecutor") }}
{{- if or (semverCompare "<2.4.0" .Values.airflowVersion) (and (semverCompare ">=2.4.0" .Values.airflowVersion) .Values.data.resultBackendConnection) }}
{{- $connection := .Values.data.resultBackendConnection | default .Values.data.metadataConnection }}

{{- $resultBackendHost := $connection.host | default (printf "%s-%s" .Release.Name "postgresql") }}
Expand All @@ -45,3 +46,4 @@ data:
connection: {{ urlJoin (dict "scheme" (printf "db+%s" $connection.protocol) "userinfo" (printf "%s:%s" ($connection.user|urlquery) ($connection.pass | urlquery)) "host" (printf "%s:%s" $host $port) "path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
{{- end }}
{{- end }}
{{- end }}
15 changes: 15 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,22 @@ extraEnvFrom: ~
# Airflow database & redis config
data:
# If secret names are provided, use those secrets
# These secrets must be created manually, eg:
#
# kind: Secret
# apiVersion: v1
# metadata:
# name: custom-airflow-metadata-secret
# type: Opaque
# data:
# connection: base64_encoded_connection_string

metadataSecretName: ~
# When providing secret names and using the same database for metadata and
# result backend, for Airflow < 2.4.0 it is necessary to create a separate
# secret for result backend but with a db+ scheme prefix.
# For Airflow >= 2.4.0 it is possible to not specify the secret again,
# as Airflow will use sql_alchemy_conn with a db+ scheme prefix by default.
resultBackendSecretName: ~
brokerUrlSecretName: ~

Expand Down
60 changes: 47 additions & 13 deletions tests/charts/test_basic_helm_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,44 @@

from tests.charts.helm_template_generator import render_chart

OBJECT_COUNT_IN_BASIC_DEPLOYMENT = 35
OBJECT_COUNT_IN_BASIC_DEPLOYMENT = 34


class TestBaseChartTest(unittest.TestCase):
def test_basic_deployments(self):
def _get_values_with_version(self, values, version):
if version != "default":
values["airflowVersion"] = version
return values

def _get_object_count(self, version):
# TODO remove default from condition after airflow update
if version == "2.3.2" or version == "default":
return OBJECT_COUNT_IN_BASIC_DEPLOYMENT + 1
return OBJECT_COUNT_IN_BASIC_DEPLOYMENT

@parameterized.expand(["2.3.2", "2.4.0", "default"])
def test_basic_deployments(self, version):
expected_object_count_in_basic_deployment = self._get_object_count(version)
k8s_objects = render_chart(
"TEST-BASIC",
values={
"chart": {
'metadata': 'AA',
self._get_values_with_version(
values={
"chart": {
'metadata': 'AA',
},
'labels': {"TEST-LABEL": "TEST-VALUE"},
"fullnameOverride": "TEST-BASIC",
},
'labels': {"TEST-LABEL": "TEST-VALUE"},
"fullnameOverride": "TEST-BASIC",
},
version=version,
),
)
list_of_kind_names_tuples = {
(k8s_object['kind'], k8s_object['metadata']['name']) for k8s_object in k8s_objects
}
# TODO remove default from condition after airflow update
if version == "2.3.2" or version == "default":
assert ('Secret', 'TEST-BASIC-airflow-result-backend') in list_of_kind_names_tuples
list_of_kind_names_tuples.remove(('Secret', 'TEST-BASIC-airflow-result-backend'))
assert list_of_kind_names_tuples == {
('ServiceAccount', 'TEST-BASIC-create-user-job'),
('ServiceAccount', 'TEST-BASIC-migrate-database-job'),
Expand All @@ -53,7 +73,6 @@ def test_basic_deployments(self):
('ServiceAccount', 'TEST-BASIC-webserver'),
('ServiceAccount', 'TEST-BASIC-worker'),
('Secret', 'TEST-BASIC-airflow-metadata'),
('Secret', 'TEST-BASIC-airflow-result-backend'),
('Secret', 'TEST-BASIC-broker-url'),
('Secret', 'TEST-BASIC-fernet-key'),
('Secret', 'TEST-BASIC-webserver-secret-key'),
Expand All @@ -80,7 +99,7 @@ def test_basic_deployments(self):
('Job', 'TEST-BASIC-create-user'),
('Job', 'TEST-BASIC-run-airflow-migrations'),
}
assert OBJECT_COUNT_IN_BASIC_DEPLOYMENT == len(k8s_objects)
assert expected_object_count_in_basic_deployment == len(k8s_objects)
for k8s_object in k8s_objects:
labels = jmespath.search('metadata.labels', k8s_object) or {}
if 'helm.sh/chart' in labels:
Expand All @@ -94,16 +113,20 @@ def test_basic_deployments(self):
"TEST-LABEL"
), f"Missing label TEST-LABEL on {k8s_name}. Current labels: {labels}"

def test_basic_deployment_without_default_users(self):
@parameterized.expand(["2.3.2", "2.4.0", "default"])
def test_basic_deployment_without_default_users(self, version):
expected_object_count_in_basic_deployment = self._get_object_count(version)
k8s_objects = render_chart(
"TEST-BASIC",
values={"webserver": {"defaultUser": {'enabled': False}}},
values=self._get_values_with_version(
values={"webserver": {"defaultUser": {'enabled': False}}}, version=version
),
)
list_of_kind_names_tuples = [
(k8s_object['kind'], k8s_object['metadata']['name']) for k8s_object in k8s_objects
]
assert ('Job', 'TEST-BASIC-create-user') not in list_of_kind_names_tuples
assert OBJECT_COUNT_IN_BASIC_DEPLOYMENT - 2 == len(k8s_objects)
assert expected_object_count_in_basic_deployment - 2 == len(k8s_objects)

def test_network_policies_are_valid(self):
k8s_objects = render_chart(
Expand Down Expand Up @@ -139,6 +162,17 @@ def test_labels_are_valid(self):
values={
"labels": {"label1": "value1", "label2": "value2"},
"executor": "CeleryExecutor",
"data": {
"resultBackendConnection": {
"user": "someuser",
"pass": "somepass",
"host": "somehost",
"protocol": "postgresql",
"port": 7777,
"db": "somedb",
"sslmode": "allow",
}
},
"pgbouncer": {"enabled": True},
"redis": {"enabled": True},
"ingress": {"enabled": True},
Expand Down
Loading