Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airflow.utils import cli as cli_utils
from airflow.utils.cli import process_subdir
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.scarf import scarf_analytics
from airflow.utils.scheduler_health import serve_health_check

log = logging.getLogger(__name__)
Expand All @@ -55,6 +56,8 @@ def scheduler(args: Namespace):
"""Start Airflow Scheduler."""
print(settings.HEADER)

scarf_analytics()

run_command_with_daemon_option(
args=args,
process_name="scheduler",
Expand Down
22 changes: 22 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2591,3 +2591,25 @@ sensors:
type: float
example: ~
default: "604800"
telemetry_collection:
description: |
Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic telemetry data during operation.
This data assists Airflow maintainers in better understanding how Airflow is used.
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
security fixes. Additionally, this information supports key decisions related to the development road map.
Check the FAQ doc for more information on what data is collected.

Deployments can opt-out of analytics by setting the ``enabled`` option
to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
Comment thread
jscheffl marked this conversation as resolved.
Outdated
Individual users can easily opt-out of analytics in various ways documented in the
`Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.

options:
enabled:
description: |
Enable or disable telemetry data collection and sending via Scarf.
version_added: 2.10.0
type: boolean
example: ~
default: "True"
see_also: ":ref:`Airflow telemetry FAQ <airflow-telemetry-faq>`"
7 changes: 7 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ def initialize():
atexit.register(dispose_orm)


def is_telemetry_collection_enabled() -> bool:
"""Check if scarf analytics is enabled."""
return conf.getboolean("telemetry_collection", "enabled", fallback=True) and (
os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
)


# Const stuff

KILOBYTE = 1024
Expand Down
89 changes: 89 additions & 0 deletions airflow/utils/scarf.py
Comment thread
kaxil marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import platform
from urllib.parse import urlencode

import httpx
from packaging.version import parse

from airflow import __version__ as airflow_version, settings
from airflow.configuration import conf


def scarf_analytics():
if not settings.is_telemetry_collection_enabled():
return

# Exclude pre-releases and dev versions
if _version_is_prerelease(airflow_version):
return

scarf_domain = "https://apacheairflow.gateway.scarf.sh/scheduler"
Comment thread
kaxil marked this conversation as resolved.
Outdated

try:
platform_sys, arch = get_platform_info()

params = {
"version": airflow_version,
"python_version": get_python_version(),
"platform": platform_sys,
"arch": arch,
"database": get_database_name(),
"db_version": get_database_version(),
"executor": get_executor(),
}

query_string = urlencode(params)
scarf_url = f"{scarf_domain}?{query_string}"

httpx.get(scarf_url, timeout=5.0)
except Exception:
pass


def _version_is_prerelease(version: str) -> bool:
return parse(version).is_prerelease


def get_platform_info() -> tuple[str, str]:
return platform.system(), platform.machine()


def get_database_version() -> str:
if settings.engine is None:
return "None"

version_info = settings.engine.dialect.server_version_info
# Example: (1, 2, 3) -> "1.2.3"
return ".".join(map(str, version_info)) if version_info else "None"


def get_database_name() -> str:
Comment thread
kaxil marked this conversation as resolved.
Outdated
if settings.engine is None:
return "None"
return settings.engine.dialect.name


def get_executor() -> str:
return conf.get("core", "EXECUTOR")


def get_python_version() -> str:
return platform.python_version()
3 changes: 3 additions & 0 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,7 @@ <h2>{{ page_title }}</h2>
return false;
}
</script>
{% if scarf_url %}
<img referrerpolicy="no-referrer-when-downgrade" src="{{ scarf_url }}" width="0" height="0" alt="" style="display:none;" />
{% endif %}
{% endblock %}
34 changes: 33 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.timetables.simple import ContinuousTimetable
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils import json as utils_json, scarf, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
from airflow.utils.db import get_query_count
Expand Down Expand Up @@ -217,6 +217,32 @@ def get_safe_url(url):
return redirect_url.geturl()


def build_scarf_url(dags_count: int) -> str:
"""Build the URL for the Scarf telemetry collection."""
if not settings.is_telemetry_collection_enabled():
return ""

scarf_domain = "https://apacheairflow.gateway.scarf.sh"

platform_sys, platform_arch = scarf.get_platform_info()
db_version = scarf.get_database_version()
db_name = scarf.get_database_name()
Comment thread
kaxil marked this conversation as resolved.
Outdated
executor = scarf.get_executor()
python_version = scarf.get_python_version()

# Path Format:
# /{version}/{python_version}/{platform}/{arch}/{database}/{db_version}/{executor}/{num_dags}
#
# This path redirects to a Pixel tracking URL
scarf_url = (
f"{scarf_domain}/webserver"
f"/{version}/{python_version}"
f"/{platform_sys}/{platform_arch}/{db_name}/{db_version}/{executor}/{dags_count}"
)

return scarf_url


def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
"""Get Execution Data, Base Date & Number of runs from a Request."""
date_time = www_request.args.get("execution_date")
Expand Down Expand Up @@ -1034,6 +1060,11 @@ def _iter_parsed_moved_data_table_names():
"warning",
)

try:
scarf_url = build_scarf_url(dags_count=all_dags_count)
except Exception:
scarf_url = ""

return self.render_template(
"airflow/dags.html",
dags=dags,
Expand Down Expand Up @@ -1072,6 +1103,7 @@ def _iter_parsed_moved_data_table_names():
sorting_direction=arg_sorting_direction,
auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
dataset_triggered_next_run_info=dataset_triggered_next_run_info,
scarf_url=scarf_url,
)

@expose("/datasets")
Expand Down
24 changes: 24 additions & 0 deletions docs/apache-airflow/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,27 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server

#. Set ``explicit_defaults_for_timestamp = 1`` under the ``mysqld`` section in your ``my.cnf`` file.
#. Restart the Mysql server.

Does Airflow collect any telemetry data?
Comment thread
kaxil marked this conversation as resolved.
----------------------------------------

.. _airflow-telemetry-faq:

Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic telemetry data during operation.
This data assists Airflow maintainers in better understanding how Airflow is used.
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
security fixes. Additionally, this information supports key decisions related to the development road map.

Comment thread
kaxil marked this conversation as resolved.
Deployments can opt-out of analytics by setting the :ref:`[telemetry_collection] enabled <config:telemetry_collection__enabled>`
option to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
Comment thread
jscheffl marked this conversation as resolved.
Outdated
Individual users can easily opt-out of analytics in various ways documented in the
`Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.

The telemetry data collected is limited to the following:

- Airflow version
- Python version
- Operating system & machine architecture
- Executor
- Metadata DB type & its version
- Number of DAGs
6 changes: 6 additions & 0 deletions docs/apache-airflow/installation/installing-from-pypi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ dependencies compatible with just airflow core at the moment Airflow was release
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"


.. note::

Airflow uses `Scarf <https://about.scarf.sh/>`__ to collect basic telemetry data during operation.
Check the :ref:`Airflow telemetry FAQ <airflow-telemetry-faq>` for more information about the data collected
and how to opt-out.

Troubleshooting
'''''''''''''''

Expand Down
25 changes: 24 additions & 1 deletion tests/core/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowConfigException
from airflow.settings import _ENABLE_AIP_44, TracebackSession
from airflow.settings import _ENABLE_AIP_44, TracebackSession, is_telemetry_collection_enabled
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars

Expand Down Expand Up @@ -324,3 +324,26 @@ def test_create_session_ctx_mgr_no_call_methods(mock_new, clear_internal_api):
assert session == m
method_calls = [x[0] for x in m.method_calls]
assert method_calls == [] # commit and close not called when using internal API


@pytest.mark.parametrize(
"env_var, conf_setting, is_enabled",
[
("false", "True", False), # env forces disable
("false", "False", False), # Both force disable
("False ", "False", False), # Both force disable
("true", "True", True), # Both enable
("true", "False", False), # Conf forces disable
(None, "True", True), # Default env, conf enables
(None, "False", False), # Default env, conf disables
],
)
def test_telemetry_collection_disabled(env_var, conf_setting, is_enabled):
conf_patch = conf_vars({("telemetry_collection", "enabled"): conf_setting})

if env_var is not None:
with conf_patch, patch.dict(os.environ, {"SCARF_ANALYTICS": env_var}):
assert is_telemetry_collection_enabled() == is_enabled
else:
with conf_patch:
assert is_telemetry_collection_enabled() == is_enabled
84 changes: 84 additions & 0 deletions tests/utils/test_scarf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import platform
from unittest import mock

import pytest

from airflow import __version__ as airflow_version
from airflow.configuration import conf
from airflow.utils.scarf import get_database_version, scarf_analytics


@pytest.mark.parametrize("is_enabled, is_prerelease", [(False, True), (True, True)])
@mock.patch("httpx.get")
def test_scarf_analytics_disabled(mock_get, is_enabled, is_prerelease):
with mock.patch("airflow.settings.is_telemetry_collection_enabled", return_value=is_enabled), mock.patch(
"airflow.utils.scarf._version_is_prerelease", return_value=is_prerelease
):
scarf_analytics()
mock_get.assert_not_called()


@mock.patch("airflow.settings.is_telemetry_collection_enabled", return_value=True)
@mock.patch("airflow.utils.scarf._version_is_prerelease", return_value=False)
@mock.patch("airflow.utils.scarf.get_database_version", return_value="12.3")
@mock.patch("airflow.utils.scarf.get_database_name", return_value="postgres")
@mock.patch("httpx.get")
def test_scarf_analytics(
mock_get,
mock_is_telemetry_collection_enabled,
mock_version_is_prerelease,
get_database_version,
get_database_name,
):
platform_sys = platform.system()
platform_machine = platform.machine()
python_version = platform.python_version()
executor = conf.get("core", "EXECUTOR")
scarf_endpoint = "https://apacheairflow.gateway.scarf.sh/scheduler"
scarf_analytics()

expected_scarf_url = (
f"{scarf_endpoint}?version={airflow_version}"
f"&python_version={python_version}"
f"&platform={platform_sys}"
f"&arch={platform_machine}"
f"&database=postgres"
f"&db_version=12.3"
f"&executor={executor}"
)

mock_get.assert_called_once_with(expected_scarf_url, timeout=5.0)


@pytest.mark.db_test
@pytest.mark.parametrize(
"version_info, expected_version",
[
((1, 2, 3), "1.2.3"), # Normal version tuple
(None, "None"), # No version info available
((1,), "1"), # Single element version tuple
((1, 2, 3, "beta", 4), "1.2.3.beta.4"), # Complex version tuple with strings
],
)
def test_get_database_version(version_info, expected_version):
with mock.patch("airflow.settings.engine.dialect.server_version_info", new=version_info):
assert get_database_version() == expected_version
Loading