diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
index 77514e2e4812e..7e644736784ee 100644
--- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
@@ -34,6 +34,7 @@ body:
- apache-hdfs
- apache-hive
- apache-impala
+ - apache-kafka
- apache-kylin
- apache-livy
- apache-pig
@@ -66,6 +67,7 @@ body:
- influxdb
- jdbc
- jenkins
+ - apache-kafka
- microsoft-azure
- microsoft-mssql
- microsoft-psrp
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 94a86bbea0d5c..aeee676f22dea 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -1271,6 +1271,11 @@ jobs:
breeze testing integration-tests --integration trino --integration kerberos
breeze stop
if: needs.build-info.outputs.runs-on != 'self-hosted'
+ - name: "Integration Tests Postgres: Kafka"
+ run: |
+ breeze testing integration-tests --integration kafka
+ breeze stop
+ if: needs.build-info.outputs.runs-on != 'self-hosted'
- name: "Integration Tests Postgres: all-testable"
run: breeze testing integration-tests --integration all-testable
if: needs.build-info.outputs.runs-on == 'self-hosted'
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 4a152a6c1dd7a..1c2da5a25a701 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -488,6 +488,7 @@ repos:
^docs/README.rst$|
^docs/apache-airflow-providers-amazon/secrets-backends/aws-ssm-parameter-store.rst$|
^docs/apache-airflow-providers-apache-hdfs/connections.rst$|
+ ^docs/apache-airflow-providers-apache-kafka/connections/kafka.rst$|
^docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst$|
^docs/apache-airflow-providers-microsoft-azure/connections/azure_cosmos.rst$|
^docs/conf.py$|
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index e3e566c1ebe4b..2ec24a426d330 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -610,18 +610,18 @@ This is the full list of those extras:
.. START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
-apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
-apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
-async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
-devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
-facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
-hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
-microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
-postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
-sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
-trino, vertica, virtualenv, webhdfs, winrm, zendesk
+apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
+apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
+arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
+exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
+hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
+microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
+openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
+plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
+sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
+telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
.. END EXTRAS HERE
Provider packages
diff --git a/INSTALL b/INSTALL
index 1ca1b78eb570a..7fc9d1097cce4 100644
--- a/INSTALL
+++ b/INSTALL
@@ -95,18 +95,18 @@ The list of available extras:
# START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
-apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
-apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
-async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
-devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
-facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
-hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
-microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
-postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
-sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
-trino, vertica, virtualenv, webhdfs, winrm, zendesk
+apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
+apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
+arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
+exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
+hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
+microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
+openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
+plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
+sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
+telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
# END EXTRAS HERE
# For installing Airflow in development environments - see CONTRIBUTING.rst
diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json
index fcb8851830d01..d961d672b0340 100644
--- a/airflow/provider.yaml.schema.json
+++ b/airflow/provider.yaml.schema.json
@@ -78,6 +78,7 @@
"gcp",
"gmp",
"google",
+ "kafka",
"protocol",
"service",
"software",
diff --git a/airflow/providers/apache/kafka/CHANGELOG.rst b/airflow/providers/apache/kafka/CHANGELOG.rst
new file mode 100644
index 0000000000000..cef7dda80708a
--- /dev/null
+++ b/airflow/providers/apache/kafka/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/apache/kafka/__init__.py b/airflow/providers/apache/kafka/__init__.py
new file mode 100644
index 0000000000000..217e5db960782
--- /dev/null
+++ b/airflow/providers/apache/kafka/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/hooks/__init__.py b/airflow/providers/apache/kafka/hooks/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/hooks/base.py b/airflow/providers/apache/kafka/hooks/base.py
new file mode 100644
index 0000000000000..bd3a2d3cc74b8
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/base.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from confluent_kafka.admin import AdminClient
+
+from airflow.compat.functools import cached_property
+from airflow.hooks.base import BaseHook
+
+
+class KafkaBaseHook(BaseHook):
+ """
+ A base hook for interacting with Apache Kafka
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ """
+
+ conn_name_attr = "kafka_config_id"
+ default_conn_name = "kafka_default"
+ conn_type = "kafka"
+ hook_name = "Apache Kafka"
+
+ def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
+ """Initialize our Base"""
+ super().__init__()
+ self.kafka_config_id = kafka_config_id
+ self.get_conn
+
+ @staticmethod
+ def get_ui_field_behaviour() -> dict[str, Any]:
+ """Returns custom field behaviour"""
+ return {
+ "hidden_fields": ["schema", "login", "password", "port", "host"],
+ "relabeling": {"extra": "Config Dict"},
+ "placeholders": {
+ "extra": '{"bootstrap.servers": "localhost:9092"}',
+ },
+ }
+
+ def _get_client(self, config):
+ raise NotImplementedError
+
+ @cached_property
+ def get_conn(self) -> Any:
+ """get the configuration object"""
+ config = self.get_connection(self.kafka_config_id).extra_dejson
+
+ if not (config.get("bootstrap.servers", None)):
+ raise ValueError("config['bootstrap.servers'] must be provided.")
+
+ return self._get_client(config)
+
+ def test_connection(self) -> tuple[bool, str]:
+ """Test Connectivity from the UI"""
+ try:
+ config = self.get_connection(self.kafka_config_id).extra_dejson
+ t = AdminClient(config, timeout=10).list_topics()
+ if t:
+ return True, "Connection successful."
+ except Exception as e:
+ False, str(e)
+
+ return False, "Failed to establish connection."
diff --git a/airflow/providers/apache/kafka/hooks/client.py b/airflow/providers/apache/kafka/hooks/client.py
new file mode 100644
index 0000000000000..7613bfab220c0
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/client.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka import KafkaException
+from confluent_kafka.admin import AdminClient, NewTopic
+
+from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
+
+
+class KafkaAdminClientHook(KafkaBaseHook):
+ """
+ A hook for interacting with the Kafka Cluster
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ """
+
+ def __init__(self, kafka_config_id=KafkaBaseHook.default_conn_name) -> None:
+ super().__init__(kafka_config_id=kafka_config_id)
+
+ def _get_client(self, config) -> AdminClient:
+ return AdminClient(config)
+
+ def create_topic(
+ self,
+ topics: Sequence[Sequence[Any]],
+ ) -> None:
+ """creates a topic
+
+ :param topics: a list of topics to create including the number of partitions for the topic
+ and the replication factor. Format: [ ("topic_name", number of partitions, replication factor)]
+ """
+ admin_client = self.get_conn
+
+ new_topics = [NewTopic(t[0], num_partitions=t[1], replication_factor=t[2]) for t in topics]
+
+ futures = admin_client.create_topics(new_topics)
+
+ for t, f in futures.items():
+ try:
+ f.result()
+ self.log.info("The topic %s has been created.", t)
+ except KafkaException as e:
+ if e.args[0].name == "TOPIC_ALREADY_EXISTS":
+ self.log.warning("The topic %s already exists.", t)
+ else:
+ raise
diff --git a/airflow/providers/apache/kafka/hooks/consume.py b/airflow/providers/apache/kafka/hooks/consume.py
new file mode 100644
index 0000000000000..9ab0361067125
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/consume.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Sequence
+
+from confluent_kafka import Consumer
+
+from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
+
+
+class KafkaConsumerHook(KafkaBaseHook):
+ """
+ A hook for creating a Kafka Consumer
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topics: A list of topics to subscribe to.
+ """
+
+ def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_conn_name) -> None:
+
+ super().__init__(kafka_config_id=kafka_config_id)
+ self.topics = topics
+
+ def _get_client(self, config) -> Consumer:
+ return Consumer(config)
+
+ def get_consumer(self) -> Consumer:
+ """Returns a Consumer that has been subscribed to topics."""
+ consumer = self.get_conn
+ consumer.subscribe(self.topics)
+
+ return consumer
diff --git a/airflow/providers/apache/kafka/hooks/produce.py b/airflow/providers/apache/kafka/hooks/produce.py
new file mode 100644
index 0000000000000..7e3a5bcf6e2ca
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/produce.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from confluent_kafka import Producer
+
+from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
+
+
+class KafkaProducerHook(KafkaBaseHook):
+ """
+ A hook for creating a Kafka Producer
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ """
+
+ def __init__(self, kafka_config_id=KafkaBaseHook.default_conn_name) -> None:
+ super().__init__(kafka_config_id=kafka_config_id)
+
+ def _get_client(self, config) -> Producer:
+ return Producer(config)
+
+ def get_producer(self) -> Producer:
+ """Returns a producer object for sending messages to Kafka"""
+ producer = self.get_conn
+
+ self.log.info("Producer %s", producer)
+ return producer
diff --git a/airflow/providers/apache/kafka/operators/__init__.py b/airflow/providers/apache/kafka/operators/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/operators/consume.py b/airflow/providers/apache/kafka/operators/consume.py
new file mode 100644
index 0000000000000..02c9db6556df6
--- /dev/null
+++ b/airflow/providers/apache/kafka/operators/consume.py
@@ -0,0 +1,187 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
+from airflow.utils.module_loading import import_string
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+ """An operator that consumes from Kafka a topic(s) and processing the messages.
+
+ The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them
+ using the user supplied callable function. The consumer will continue to read in batches until it reaches
+ the end of the log or reads a maximum number of messages is reached.
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topics: A list of topics or regex patterns the consumer should subscribe to.
+ :param apply_function: The function that should be applied to fetched one at a time.
+ name of dag file executing the function and the function name delimited by a `.`
+ :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not
+ be used with `apply_function`. Intended for transactional workloads where an expensive task might
+ be called before or after operations on the messages are taken.
+ :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None
+ :param apply_function_kwargs: Additional key word arguments that should be applied to the callable
+ defaults to None
+ :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"),
+ defaults to "end_of_operator";
+ if end_of_operator, the commit() is called based on the max_messages arg. Commits are made after the
+ operator has processed the apply_function method for the maximum messages in the operator.
+ if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each
+ batch has processed by the apply_function method for all messages in the batch.
+ if never, close() is called without calling the commit() method.
+ :param max_messages: The maximum total number of messages an operator should read from Kafka,
+ defaults to None implying read to the end of the topic.
+ :param max_batch_size: The maximum number of messages a consumer should read when polling,
+ defaults to 1000
+ :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are
+ available, defaults to 60
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:ConsumeFromTopicOperator`
+ """
+
+ BLUE = "#ffefeb"
+ ui_color = BLUE
+ template_fields = (
+ "topics",
+ "apply_function",
+ "apply_function_args",
+ "apply_function_kwargs",
+ "kafka_config_id",
+ )
+
+ def __init__(
+ self,
+ topics: str | Sequence[str],
+ kafka_config_id: str = "kafka_default",
+ apply_function: Callable[..., Any] | str | None = None,
+ apply_function_batch: Callable[..., Any] | str | None = None,
+ apply_function_args: Sequence[Any] | None = None,
+ apply_function_kwargs: dict[Any, Any] | None = None,
+ commit_cadence: str | None = "end_of_operator",
+ max_messages: int | None = None,
+ max_batch_size: int = 1000,
+ poll_timeout: float = 60,
+ **kwargs: Any,
+ ) -> None:
+
+ super().__init__(**kwargs)
+
+ self.topics = topics
+ self.apply_function = apply_function
+ self.apply_function_batch = apply_function_batch
+ self.apply_function_args = apply_function_args or ()
+ self.apply_function_kwargs = apply_function_kwargs or {}
+ self.kafka_config_id = kafka_config_id
+ self.commit_cadence = commit_cadence
+ self.max_messages = max_messages or True
+ self.max_batch_size = max_batch_size
+ self.poll_timeout = poll_timeout
+
+ if self.max_messages is True:
+ self.read_to_end = True
+ else:
+ self.read_to_end = False
+
+ if self.commit_cadence not in VALID_COMMIT_CADENCE:
+ raise AirflowException(
+ f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got {self.commit_cadence}"
+ )
+
+ if self.max_messages and self.max_batch_size > self.max_messages:
+ self.log.warning(
+ "max_batch_size (%s) > max_messages (%s). Setting max_messages to %s ",
+ self.max_batch_size,
+ self.max_messages,
+ self.max_batch_size,
+ )
+
+ if self.commit_cadence == "never":
+ self.commit_cadence = None
+
+ if apply_function and apply_function_batch:
+ raise AirflowException(
+ "One of apply_function or apply_function_batch must be supplied, not both."
+ )
+
+ def execute(self, context) -> Any:
+
+ consumer = KafkaConsumerHook(topics=self.topics, kafka_config_id=self.kafka_config_id).get_consumer()
+
+ if isinstance(self.apply_function, str):
+ self.apply_function = import_string(self.apply_function)
+
+ if isinstance(self.apply_function_batch, str):
+ self.apply_function_batch = import_string(self.apply_function_batch)
+
+ if self.apply_function:
+ apply_callable = partial(
+ self.apply_function, *self.apply_function_args, **self.apply_function_kwargs # type: ignore
+ )
+
+ if self.apply_function_batch:
+ apply_callable = partial(
+ self.apply_function_batch, # type: ignore
+ *self.apply_function_args,
+ **self.apply_function_kwargs,
+ )
+
+ messages_left = self.max_messages
+
+ while self.read_to_end or (
+ messages_left > 0
+ ): # bool(True > 0) == True in the case where self.max_messages isn't set by the user
+
+ if not isinstance(messages_left, bool):
+ batch_size = self.max_batch_size if messages_left > self.max_batch_size else messages_left
+ else:
+ batch_size = self.max_batch_size
+
+ msgs = consumer.consume(num_messages=batch_size, timeout=self.poll_timeout)
+ messages_left -= len(msgs)
+
+ if not msgs: # No messages + messages_left is being used.
+ self.log.info("Reached end of log. Exiting.")
+ break
+
+ if self.apply_function:
+ for m in msgs:
+ apply_callable(m)
+
+ if self.apply_function_batch:
+ apply_callable(msgs)
+
+ if self.commit_cadence == "end_of_batch":
+ self.log.info("committing offset at %s", self.commit_cadence)
+ consumer.commit()
+
+ if self.commit_cadence:
+ self.log.info("committing offset at %s", self.commit_cadence)
+ consumer.commit()
+
+ consumer.close()
+
+ return
diff --git a/airflow/providers/apache/kafka/operators/produce.py b/airflow/providers/apache/kafka/operators/produce.py
new file mode 100644
index 0000000000000..c6a05436aa5ac
--- /dev/null
+++ b/airflow/providers/apache/kafka/operators/produce.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
+from airflow.utils.module_loading import import_string
+
+local_logger = logging.getLogger("airflow")
+
+
+def acked(err, msg):
+ if err is not None:
+ local_logger.error(f"Failed to deliver message: {err}")
+ else:
+ local_logger.info(
+ f"Produced record to topic {msg.topic()} partition [{msg.partition()}] @ offset {msg.offset()}"
+ )
+
+
+class ProduceToTopicOperator(BaseOperator):
+ """An operator that produces messages to a Kafka topic
+
+ Registers a producer to a kafka topic and publishes messages to the log.
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topic: The topic the producer should produce to, defaults to None
+ :param producer_function: The function that generates key/value pairs as messages for production,
+ defaults to None
+ :param producer_function_args: Additional arguments to be applied to the producer callable,
+ defaults to None
+ :param producer_function_kwargs: Additional keyword arguments to be applied to the producer callable,
+ defaults to None
+ :param delivery_callback: The callback to apply after delivery(or failure) of a message, defaults to None
+ :param synchronous: If writes to kafka should be fully synchronous, defaults to True
+ :param poll_timeout: How long of a delay should be applied when calling poll after production to kafka,
+ defaults to 0
+ :raises AirflowException: _description_
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:ProduceToTopicOperator`
+ """
+
+ template_fields = (
+ "topic",
+ "producer_function",
+ "producer_function_args",
+ "producer_function_kwargs",
+ "kafka_config_id",
+ )
+
+ def __init__(
+ self,
+ topic: str,
+ producer_function: str | Callable[..., Any],
+ kafka_config_id: str = "kafka_default",
+ producer_function_args: Sequence[Any] | None = None,
+ producer_function_kwargs: dict[Any, Any] | None = None,
+ delivery_callback: str | None = None,
+ synchronous: bool = True,
+ poll_timeout: float = 0,
+ **kwargs: Any,
+ ) -> None:
+
+ super().__init__(**kwargs)
+
+ if delivery_callback:
+ dc = import_string(delivery_callback)
+ else:
+ dc = acked
+
+ self.kafka_config_id = kafka_config_id
+ self.topic = topic
+ self.producer_function = producer_function
+ self.producer_function_args = producer_function_args or ()
+ self.producer_function_kwargs = producer_function_kwargs or {}
+ self.delivery_callback = dc
+ self.synchronous = synchronous
+ self.poll_timeout = poll_timeout
+
+ if not (self.topic and self.producer_function):
+ raise AirflowException(
+ "topic and producer_function must be provided. Got topic="
+ f"{self.topic} and producer_function={self.producer_function}"
+ )
+
+ return
+
+ def execute(self, context) -> None:
+
+ # Get producer and callable
+ producer = KafkaProducerHook(kafka_config_id=self.kafka_config_id).get_producer()
+
+ if isinstance(self.producer_function, str):
+ self.producer_function = import_string(self.producer_function)
+
+ producer_callable = partial(
+ self.producer_function, # type: ignore
+ *self.producer_function_args,
+ **self.producer_function_kwargs,
+ )
+
+ # For each returned k/v in the callable : publish and flush if needed.
+ for k, v in producer_callable():
+ producer.produce(self.topic, key=k, value=v, on_delivery=self.delivery_callback)
+ producer.poll(self.poll_timeout)
+ if self.synchronous:
+ producer.flush()
+
+ producer.flush()
diff --git a/airflow/providers/apache/kafka/provider.yaml b/airflow/providers/apache/kafka/provider.yaml
new file mode 100644
index 0000000000000..6b3fdad1ab38a
--- /dev/null
+++ b/airflow/providers/apache/kafka/provider.yaml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-kafka
+name: Apache Kafka
+
+suspended: false
+description: |
+ `Apache Kafka `__
+versions:
+ - 1.0.0
+
+dependencies:
+ - apache-airflow>=2.3.0
+ - asgiref
+ - confluent-kafka>=1.8.2
+
+integrations:
+ - integration-name: Apache Kafka
+ external-doc-url: https://kafka.apache.org/
+ logo: /integration-logos/apache/kafka.svg
+ tags: [apache]
+
+operators:
+ - integration-name: Apache Kafka
+ python-modules:
+ - airflow.providers.apache.kafka.operators.consume
+ - airflow.providers.apache.kafka.operators.produce
+
+hooks:
+ - integration-name: Apache Kafka
+ python-modules:
+ - airflow.providers.apache.kafka.hooks.base
+ - airflow.providers.apache.kafka.hooks.client
+ - airflow.providers.apache.kafka.hooks.consume
+ - airflow.providers.apache.kafka.hooks.produce
+
+sensors:
+ - integration-name: Apache Kafka
+ python-modules:
+ - airflow.providers.apache.kafka.sensors.kafka
+
+triggers:
+ - integration-name: Apache Kafka
+ python-modules:
+ - airflow.providers.apache.kafka.triggers.await_message
+
+connection-types:
+ - hook-class-name: airflow.providers.apache.kafka.hooks.base.KafkaBaseHook
+ connection-type: kafka
diff --git a/airflow/providers/apache/kafka/sensors/__init__.py b/airflow/providers/apache/kafka/sensors/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/sensors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/sensors/kafka.py b/airflow/providers/apache/kafka/sensors/kafka.py
new file mode 100644
index 0000000000000..747c01ddc5aa7
--- /dev/null
+++ b/airflow/providers/apache/kafka/sensors/kafka.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Callable, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class AwaitMessageSensor(BaseOperator):
+ """An Airflow sensor that defers until a specific message is published to Kafka.
+
+ The sensor creates a consumer that reads the Kafka log until it encounters a positive event.
+
+ The behavior of the consumer for this trigger is as follows:
+ - poll the Kafka topics for a message
+ - if no message returned, sleep
+ - process the message with provided callable and commit the message offset
+ - if callable returns any data, raise a TriggerEvent with the return data
+ - else continue to next message
+ - return event (as default xcom or specific xcom key)
+
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topics: Topics (or topic regex) to use for reading from
+ :param apply_function: The function to apply to messages to determine if an event occurred. As a dot
+ notation string.
+ :param apply_function_args: Arguments to be applied to the processing function,
+ defaults to None
+ :param apply_function_kwargs: Key word arguments to be applied to the processing function,
+ defaults to None
+ :param poll_timeout: How long the kafka consumer should wait for a message to arrive from the kafka
+ cluster,defaults to 1
+ :param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log,
+ defaults to 5
+ :param xcom_push_key: the name of a key to push the returned message to, defaults to None
+
+
+ """
+
+ BLUE = "#ffefeb"
+ ui_color = BLUE
+
+ template_fields = (
+ "topics",
+ "apply_function",
+ "apply_function_args",
+ "apply_function_kwargs",
+ "kafka_config_id",
+ )
+
+ def __init__(
+ self,
+ topics: Sequence[str],
+ apply_function: str,
+ kafka_config_id: str = "kafka_default",
+ apply_function_args: Sequence[Any] | None = None,
+ apply_function_kwargs: dict[Any, Any] | None = None,
+ poll_timeout: float = 1,
+ poll_interval: float = 5,
+ xcom_push_key=None,
+ **kwargs: Any,
+ ) -> None:
+
+ super().__init__(**kwargs)
+
+ self.topics = topics
+ self.apply_function = apply_function
+ self.apply_function_args = apply_function_args
+ self.apply_function_kwargs = apply_function_kwargs
+ self.kafka_config_id = kafka_config_id
+ self.poll_timeout = poll_timeout
+ self.poll_interval = poll_interval
+ self.xcom_push_key = xcom_push_key
+
+ def execute(self, context) -> Any:
+
+ self.defer(
+ trigger=AwaitMessageTrigger(
+ topics=self.topics,
+ apply_function=self.apply_function,
+ apply_function_args=self.apply_function_args,
+ apply_function_kwargs=self.apply_function_kwargs,
+ kafka_config_id=self.kafka_config_id,
+ poll_timeout=self.poll_timeout,
+ poll_interval=self.poll_interval,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context, event=None):
+ if self.xcom_push_key:
+ self.xcom_push(context, key=self.xcom_push_key, value=event)
+ return event
+
+
+class AwaitMessageTriggerFunctionSensor(BaseOperator):
+ """An Airflow sensor that defers until a specific message is published to
+ Kafka, then triggers a registered function, and goes back to waiting for
+ a message.
+
+
+ The behavior of the consumer for this trigger is as follows:
+ - poll the Kafka topics for a message
+ - if no message returned, sleep
+ - process the message with provided callable and commit the message offset
+ - if callable returns any data, raise a TriggerEvent with the return data
+ - else continue to next message
+ - return event (as default xcom or specific xcom key)
+
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topics: Topics (or topic regex) to use for reading from
+ :param apply_function: The function to apply to messages to determine if an event occurred. As a dot
+ notation string.
+ :param event_triggered_function: The callable to trigger once the apply_function encounters a
+ positive event.
+ :param apply_function_args: Arguments to be applied to the processing function, defaults to None
+ :param apply_function_kwargs: Key word arguments to be applied to the processing function,
+ defaults to None
+ :param poll_timeout: How long the kafka consumer should wait for a message to arrive from the kafka
+ cluster, defaults to 1
+ :param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log,
+ defaults to 5
+
+
+ """
+
+ BLUE = "#ffefeb"
+ ui_color = BLUE
+
+ template_fields = (
+ "topics",
+ "apply_function",
+ "apply_function_args",
+ "apply_function_kwargs",
+ "kafka_config_id",
+ )
+
+ def __init__(
+ self,
+ topics: Sequence[str],
+ apply_function: str,
+ event_triggered_function: Callable,
+ kafka_config_id: str = "kafka_default",
+ apply_function_args: Sequence[Any] | None = None,
+ apply_function_kwargs: dict[Any, Any] | None = None,
+ poll_timeout: float = 1,
+ poll_interval: float = 5,
+ **kwargs: Any,
+ ) -> None:
+
+ super().__init__(**kwargs)
+
+ self.topics = topics
+ self.apply_function = apply_function
+ self.apply_function_args = apply_function_args
+ self.apply_function_kwargs = apply_function_kwargs
+ self.kafka_config_id = kafka_config_id
+ self.poll_timeout = poll_timeout
+ self.poll_interval = poll_interval
+ self.event_triggered_function = event_triggered_function
+
+ if not callable(self.event_triggered_function):
+ raise TypeError(
+ "parameter event_triggered_function is expected to be of type callable,"
+ f"got {type(event_triggered_function)}"
+ )
+
+ def execute(self, context, event=None) -> Any:
+
+ self.defer(
+ trigger=AwaitMessageTrigger(
+ topics=self.topics,
+ apply_function=self.apply_function,
+ apply_function_args=self.apply_function_args,
+ apply_function_kwargs=self.apply_function_kwargs,
+ kafka_config_id=self.kafka_config_id,
+ poll_timeout=self.poll_timeout,
+ poll_interval=self.poll_interval,
+ ),
+ method_name="execute_complete",
+ )
+
+ return event
+
+ def execute_complete(self, context, event=None):
+
+ self.event_triggered_function(event, **context)
+
+ self.defer(
+ trigger=AwaitMessageTrigger(
+ topics=self.topics,
+ apply_function=self.apply_function,
+ apply_function_args=self.apply_function_args,
+ apply_function_kwargs=self.apply_function_kwargs,
+ kafka_config_id=self.kafka_config_id,
+ poll_timeout=self.poll_timeout,
+ poll_interval=self.poll_interval,
+ ),
+ method_name="execute_complete",
+ )
diff --git a/airflow/providers/apache/kafka/triggers/__init__.py b/airflow/providers/apache/kafka/triggers/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/triggers/await_message.py b/airflow/providers/apache/kafka/triggers/await_message.py
new file mode 100644
index 0000000000000..7e7021f54be70
--- /dev/null
+++ b/airflow/providers/apache/kafka/triggers/await_message.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from functools import partial
+from typing import Any, Sequence
+
+from asgiref.sync import sync_to_async
+
+from airflow import AirflowException
+from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.utils.module_loading import import_string
+
+
+class AwaitMessageTrigger(BaseTrigger):
+ """A trigger that waits for a message matching specific criteria to arrive in Kafka
+
+ The behavior of the consumer of this trigger is as follows:
+ - poll the Kafka topics for a message, if no message returned, sleep
+ - process the message with provided callable and commit the message offset:
+
+ - if callable returns any data, raise a TriggerEvent with the return data
+
+ - else continue to next message
+
+
+ :param kafka_config_id: The connection object to use, defaults to "kafka_default"
+ :param topics: The topic (or topic regex) that should be searched for messages
+ :param apply_function: the location of the function to apply to messages for determination of matching
+ criteria. (In python dot notation as a string)
+ :param apply_function_args: A set of arguments to apply to the callable, defaults to None
+ :param apply_function_kwargs: A set of key word arguments to apply to the callable, defaults to None,
+ defaults to None
+ :param poll_timeout: How long the Kafka client should wait before returning from a poll request to
+ Kafka (seconds), defaults to 1
+ :param poll_interval: How long the the trigger should sleep after reaching the end of the Kafka log
+ (seconds), defaults to 5
+
+ """
+
+ def __init__(
+ self,
+ topics: Sequence[str],
+ apply_function: str,
+ kafka_config_id: str = "kafka_default",
+ apply_function_args: Sequence[Any] | None = None,
+ apply_function_kwargs: dict[Any, Any] | None = None,
+ poll_timeout: float = 1,
+ poll_interval: float = 5,
+ ) -> None:
+
+ self.topics = topics
+ self.apply_function = apply_function
+ self.apply_function_args = apply_function_args or ()
+ self.apply_function_kwargs = apply_function_kwargs or {}
+ self.kafka_config_id = kafka_config_id
+ self.poll_timeout = poll_timeout
+ self.poll_interval = poll_interval
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ return (
+ "airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger",
+ {
+ "topics": self.topics,
+ "apply_function": self.apply_function,
+ "apply_function_args": self.apply_function_args,
+ "apply_function_kwargs": self.apply_function_kwargs,
+ "kafka_config_id": self.kafka_config_id,
+ "poll_timeout": self.poll_timeout,
+ "poll_interval": self.poll_interval,
+ },
+ )
+
+ async def run(self):
+ consumer_hook = KafkaConsumerHook(topics=self.topics, kafka_config_id=self.kafka_config_id)
+
+ async_get_consumer = sync_to_async(consumer_hook.get_consumer)
+ consumer = await async_get_consumer()
+
+ async_poll = sync_to_async(consumer.poll)
+ async_commit = sync_to_async(consumer.commit)
+
+ processing_call = import_string(self.apply_function)
+ processing_call = partial(processing_call, *self.apply_function_args, **self.apply_function_kwargs)
+ async_message_process = sync_to_async(processing_call)
+ while True:
+
+ message = await async_poll(self.poll_timeout)
+
+ if message is None:
+ continue
+ elif message.error():
+ raise AirflowException(f"Error: {message.error()}")
+ else:
+
+ rv = await async_message_process(message)
+ if rv:
+ await async_commit(asynchronous=False)
+ yield TriggerEvent(rv)
+ else:
+ await async_commit(asynchronous=False)
+ await asyncio.sleep(self.poll_interval)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 99f63598fbb26..358496e44453c 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -19,6 +19,7 @@
import contextlib
import enum
+import json
import logging
import os
import sys
@@ -39,7 +40,7 @@
from airflow.utils import helpers
# TODO: remove create_session once we decide to break backward compatibility
-from airflow.utils.session import NEW_SESSION, create_session, provide_session # noqa: F401
+from airflow.utils.session import NEW_SESSION, provide_session
if TYPE_CHECKING:
from alembic.runtime.environment import EnvironmentContext
@@ -364,6 +365,14 @@ def create_default_connections(session: Session = NEW_SESSION):
session,
)
merge_conn(Connection(conn_id="impala_default", conn_type="impala", host="localhost", port=21050))
+ merge_conn(
+ Connection(
+ conn_id="kafka_default",
+ conn_type="kafka",
+ extra=json.dumps({"bootstrap.servers": "broker:29092"}),
+ ),
+ session,
+ )
merge_conn(
Connection(
conn_id="kubernetes_default",
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py
index 22088120c843e..49cbf6be606ce 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -42,14 +42,7 @@
ALLOWED_BACKENDS = ["sqlite", "mysql", "postgres", "mssql"]
ALLOWED_PROD_BACKENDS = ["mysql", "postgres", "mssql"]
DEFAULT_BACKEND = ALLOWED_BACKENDS[0]
-TESTABLE_INTEGRATIONS = [
- "cassandra",
- "celery",
- "kerberos",
- "mongo",
- "pinot",
- "trino",
-]
+TESTABLE_INTEGRATIONS = ["cassandra", "celery", "kerberos", "mongo", "pinot", "trino", "kafka"]
OTHER_INTEGRATIONS = ["statsd"]
ALL_INTEGRATIONS = sorted(
[
diff --git a/docs/apache-airflow-providers-apache-kafka/commits.rst b/docs/apache-airflow-providers-apache-kafka/commits.rst
new file mode 100644
index 0000000000000..252b3943f5c61
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/commits.rst
@@ -0,0 +1,32 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Package apache-airflow-providers-apache-kafka
+
+------------------------------------------------------
+
+`Kafka `__
+
+
+This is detailed commit list of changes for versions provider package: ``kafka``.
+For high-level changelog, see :doc:`package information including changelog `.
+
+
+1.0.0
+.....
+
+Initial release of this provider.
diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
new file mode 100644
index 0000000000000..831653c3dc887
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/connection: kafka
+
+Apache Kafka Connection
+========================
+
+The Apache Kafka connection type configures a connection to Apache Kafka via the ``confluent-kafka`` Python package.
+
+.. |Kafka Connection| image:: kafka_connection.png
+ :width: 400
+ :alt: Kafka Connection Screenshot
+
+
+Default Connection IDs
+----------------------
+
+Kafka hooks and operators use ``kafka_default`` by default, this connection is very minimal and should not be assumed useful for more than the most trivial of testing.
+
+Configuring the Connection
+--------------------------
+
+Connections are configured as a json serializable string within provided to the ``extra`` field. A full list of parameters
+are described in the `Confluent Kafka python library `_.
+
+If you are defining the Airflow connection from the Airflow UI, the ``extra`` field will be renamed to ``Config Dict``.
+
+Most operators and hooks will check that at the minimum ``bootstrap.servers`` key exists and has a value set to be valid.
diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka_connection.png b/docs/apache-airflow-providers-apache-kafka/connections/kafka_connection.png
new file mode 100644
index 0000000000000..bd90be0a9b918
Binary files /dev/null and b/docs/apache-airflow-providers-apache-kafka/connections/kafka_connection.png differ
diff --git a/docs/apache-airflow-providers-apache-kafka/hooks.rst b/docs/apache-airflow-providers-apache-kafka/hooks.rst
new file mode 100644
index 0000000000000..a50f6cbb9f00b
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/hooks.rst
@@ -0,0 +1,70 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Apache Kafka Hooks
+==================
+
+.. _howto/hook:KafkaHook:
+
+KafkaHook
+------------------------
+
+A base hook for interacting with Apache Kafka. Use this hook as a base class when creating your own Kafka hooks.
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.hooks.base.KafkaHook`.
+
+
+.. _howto/hook:KafkaAdminClientHook:
+
+KafkaAdminClientHook
+------------------------
+
+A hook for interacting with an Apache Kafka cluster.
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.hooks.client.KafkaAdminClientHook`.
+
+Reference
+"""""""""
+
+For further information, look at `Apache Kafka Admin config documentation `_.
+
+
+.. _howto/hook:KafkaConsumerHook:
+
+KafkaConsumerHook
+------------------------
+
+A hook for creating a Kafka Consumer. This hook is used by the ``ConsumeFromTopicOperator`` and the ``AwaitMessageTrigger``.
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.hooks.consume.KafkaConsumerHook`.
+
+Reference
+"""""""""
+
+For further information, look at `Apache Kafka Consumer documentation `_.
+
+
+.. _howto/hook:KafkaProducerHook:
+
+KafkaProducerHook
+------------------------
+
+A hook for creating a Kafka Consumer. This hook is used by the ``ProduceToTopicOperator``.
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.hooks.produce.KafkaProducerHook`.
+
+Reference
+"""""""""
+
+For further information, look at `Apache Kafka Producer documentation `_.
diff --git a/docs/apache-airflow-providers-apache-kafka/index.rst b/docs/apache-airflow-providers-apache-kafka/index.rst
new file mode 100644
index 0000000000000..4a89f1c71cdf6
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/index.rst
@@ -0,0 +1,94 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+``apache-airflow-providers-apache-kafka``
+==========================================
+
+Content
+-------
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+
+ Connection
+ Hooks
+ Operators
+ Sensors
+ Triggers
+
+
+.. toctree::
+ :maxdepth: 1
+ :caption: References
+
+ Python API <_api/airflow/providers/apache/kafka/index>
+
+.. toctree::
+ :hidden:
+ :caption: System tests
+
+ System Tests <_api/tests/system/providers/apache/kafka/index>
+
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
+ Example DAGs
+ PyPI Repository
+ Installing from sources
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Commits
+
+ Detailed list of commits
+
+
+Package apache-airflow-providers-apache-kafka
+------------------------------------------------------
+
+`Apache Kafka `__
+
+
+Release: 1.0.0
+
+Provider package
+----------------
+
+This is a provider package for ``apache.kafka`` provider. All classes for this provider package
+are in ``airflow.providers.apache.kafka`` python package.
+
+Installation
+------------
+
+You can install this package on top of an existing Airflow 2 installation (see ``Requirements`` below)
+for the minimum Airflow version supported via ``pip install apache-airflow-providers-apache-kafka``.
+
+Requirements
+------------
+
+=================== ==================
+PIP package Version required
+=================== ==================
+``apache-airflow`` ``>=2.3.0``
+``confluent-kafka``
+=================== ==================
+
+.. include:: ../../airflow/providers/apache/kafka/CHANGELOG.rst
diff --git a/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst b/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst
new file mode 100644
index 0000000000000..1c90205d15b3a
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. include:: ../installing-providers-from-sources.rst
diff --git a/docs/apache-airflow-providers-apache-kafka/operators/index.rst b/docs/apache-airflow-providers-apache-kafka/operators/index.rst
new file mode 100644
index 0000000000000..2ec6133a0e1b0
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/operators/index.rst
@@ -0,0 +1,72 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Apache Spark Operators
+======================
+
+.. _howto/operator:ConsumeFromTopicOperator:
+
+ConsumeFromTopicOperator
+------------------------
+
+An operator that consumes from Kafka one or more Kafka topic(s) and processes the messages.
+The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them using the user supplied callable function. The consumer will continue to read in batches until it reaches the end of the log or reads a maximum number of messages is reached.
+
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator`.
+
+
+Using the operator
+""""""""""""""""""
+
+.. exampleinclude:: /../../tests/system/providers/apache/kafka/example_dag_hello_kafka.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_consume_from_topic]
+ :end-before: [END howto_operator_consume_from_topic]
+
+
+Reference
+"""""""""
+
+For further information, see the `Apache Kafka Consumer documentation `_.
+
+
+.. _howto/operator:ProduceToTopicOperator:
+
+ProduceToTopicOperator
+------------------------
+
+An operator that produces messages to a Kafka topic.
+The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them using the user supplied callable function. The consumer will continue to read in batches until it reaches the end of the log or reads a maximum number of messages is reached.
+
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.operators.produce.ProduceToTopicOperator`.
+
+Using the operator
+""""""""""""""""""
+
+.. exampleinclude:: /../../tests/system/providers/apache/kafka/example_dag_hello_kafka.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_produce_to_topic]
+ :end-before: [END howto_operator_produce_to_topic]
+
+
+Reference
+"""""""""
+
+For further information, see the `Apache Kafka Producer documentation `_.
diff --git a/docs/apache-airflow-providers-apache-kafka/sensors.rst b/docs/apache-airflow-providers-apache-kafka/sensors.rst
new file mode 100644
index 0000000000000..82b560f84d44e
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/sensors.rst
@@ -0,0 +1,75 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Apache Kafka Sensors
+====================
+
+
+.. _howto/sensor:AwaitMessageSensor:
+
+AwaitMessageSensor
+------------------------
+
+A sensor that defers until a specific message is published to a Kafka topic.
+The sensor will create a consumer reading messages from a Kafka topic until a message fulfilling criteria defined in the
+``apply_function`` parameter is found.
+
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.sensors.kafka.AwaitMessageSensor`.
+
+Using the sensor
+""""""""""""""""""
+
+
+.. exampleinclude:: /../../tests/system/providers/apache/kafka/example_dag_hello_kafka.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_await_message]
+ :end-before: [END howto_sensor_await_message]
+
+
+Reference
+"""""""""
+
+For further information, see the `Apache Kafka Consumer documentation `_.
+
+
+.. _howto/sensor:AwaitMessageTriggerFunctionSensor:
+
+AwaitMessageTriggerFunctionSensor
+---------------------------------
+
+Similar to the ``AwaitMessageSensor`` above, this sensor will defer until it consumes a message from a Kafka topic fulfilling the criteria
+of its ``apply_function``. Once a positive event is encountered, the ``AwaitMessageTriggerFunctionSensor`` will trigger a callable provided
+to ``event_triggered_function``.
+
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.sensors.kafka.AwaitMessageTriggerFunctionSensor`.
+
+Using the sensor
+""""""""""""""""""
+
+.. exampleinclude:: /../../tests/system/providers/apache/kafka/example_dag_event_listener.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_await_message_trigger_function]
+ :end-before: [END howto_sensor_await_message_trigger_function]
+
+
+Reference
+"""""""""
+
+For further information, see the `Apache Kafka Consumer documentation `_.
diff --git a/docs/apache-airflow-providers-apache-kafka/triggers.rst b/docs/apache-airflow-providers-apache-kafka/triggers.rst
new file mode 100644
index 0000000000000..14c74bcdf768b
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/triggers.rst
@@ -0,0 +1,30 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Apache Kafka Triggers
+=====================
+
+.. _howto/triggers:AwaitMessageTrigger:
+
+AwaitMessageTrigger
+------------------------
+
+The ``AwaitMessageTrigger`` is a trigger that will consume messages polled from a Kafka topic and process them with a provided callable.
+If the callable returns any data, a TriggerEvent is raised.
+
+For parameter definitions take a look at :class:`~airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger`.
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 017ad927caa2b..4477afdc1ae59 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -134,6 +134,8 @@ custom bash/python providers).
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.impala | ``pip install 'apache-airflow[apache.impala]'`` | All Impala related operators & hooks |
+---------------------+-----------------------------------------------------+------------------------------------------------+
+| apache.kafka | ``pip install 'apache-airflow[apache.kafka]'`` | All Kafka related operators & hooks |
++---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.kylin | ``pip install 'apache-airflow[apache.kylin]'`` | All Kylin related operators & hooks |
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.livy | ``pip install 'apache-airflow[apache.livy]'`` | All Livy related operators, hooks & sensors |
diff --git a/docs/integration-logos/apache/kafka.svg b/docs/integration-logos/apache/kafka.svg
new file mode 100644
index 0000000000000..bdf6af9b89fed
--- /dev/null
+++ b/docs/integration-logos/apache/kafka.svg
@@ -0,0 +1,22 @@
+
+
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 3ffe00f60994d..38bac06ca8c1d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -812,6 +812,8 @@ jthomas
Jupyter
jupyter
jupytercmd
+Kafka
+kafka
Kalibrr
Kamil
KEDA
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index 8e0b5846cbd71..79e919591815e 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -126,6 +126,14 @@
"common.sql"
]
},
+ "apache.kafka": {
+ "deps": [
+ "apache-airflow>=2.3.0",
+ "asgiref",
+ "confluent-kafka>=1.8.2"
+ ],
+ "cross-providers-deps": []
+ },
"apache.kylin": {
"deps": [
"apache-airflow>=2.3.0",
diff --git a/images/breeze/output-commands-hash.txt b/images/breeze/output-commands-hash.txt
index c9922142bd2f7..7eab1f50984e6 100644
--- a/images/breeze/output-commands-hash.txt
+++ b/images/breeze/output-commands-hash.txt
@@ -1,8 +1,8 @@
# This file is automatically generated by pre-commit. If you have a conflict with this file
# Please do not solve it but run `breeze setup regenerate-command-images`.
# This command should fix the conflict and regenerate help images that you have conflict with.
-main:83de6a9bf2b1afecd1f9ce4cd0493733
-build-docs:d449f8ee7b20545a2c7c46ad65226f94
+main:3b0efd589fb61236e9fb1e4422de78c9
+build-docs:3b89efaf5551b1782227cd382c019990
ci:fix-ownership:fee2c9ec9ef19686792002ae054fecdd
ci:free-space:47234aa0a60b0efd84972e6e797379f8
ci:get-workflow-info:01ee34c33ad62fa5dc33e0ac8773223f
@@ -36,16 +36,16 @@ prod-image:verify:31bc5efada1d70a0a31990025db1a093
prod-image:79bd4cc9de03ab7e1d75f025d75eee46
release-management:create-minor-branch:6a01066dce15e09fb269a8385626657c
release-management:generate-constraints:ae30d6ad49a1b2c15b61cb29080fd957
-release-management:generate-issue-content-providers:767a85195f6e686df63b8f8ea0fb7142
+release-management:generate-issue-content-providers:421c1b186818a6251c16f7f3b7807292
release-management:install-provider-packages:5838b06b78e3c5c6e8380024867a1a8d
release-management:prepare-airflow-package:3ac14ea6d2b09614959c0ec4fd564789
-release-management:prepare-provider-documentation:8dab0cba3d0bf3d36ec60d71c8c23d50
-release-management:prepare-provider-packages:9d803d0eb5f55d1a178fff2f7951eec8
+release-management:prepare-provider-documentation:b48d9c8af27d5e110364ed2454d23959
+release-management:prepare-provider-packages:cf41c33c6d6121efef1f1d97333e8710
release-management:release-prod-images:c9bc40938e0efad49e51ef66e83f9527
release-management:start-rc-process:6aafbaceabd7b67b9a1af4c2f59abc4c
release-management:start-release:acb384d86e02ff5fde1bf971897be17c
release-management:verify-provider-packages:566c60fb1bfdc5ed7c4be590736891b2
-release-management:878586136f3d17ecc38e63d969eb3d79
+release-management:a6d6b27d8705294bd55cd1281b05f4c8
setup:autocomplete:03343478bf1d0cf9c101d454cdb63b68
setup:check-all-params-in-groups:c3aca085350fc09451a6d502be9ee821
setup:config:3ffcd35dd24b486ddf1d08b797e3d017
@@ -53,12 +53,12 @@ setup:regenerate-command-images:aaf263095a037d2271640513d8c156fe
setup:self-upgrade:d02f70c7a230eae3463ceec2056b63fa
setup:version:123b462a421884dc2320ffc5e54b2478
setup:26f37743534e14f5aad5300aad920301
-shell:d77f43d3faadfce6c332beae1cf46d1c
-start-airflow:5e8460ac38f8e9ea2a0ac7e248fd7bc9
+shell:bd3e004a92ebcec8feb40fc5cd95872d
+start-airflow:ee5066f1420a489864b48bc4e5e472da
static-checks:543f0c776d0f198e80a0f75058445bb2
stop:e5aa686b4e53707ced4039d8414d5cd6
testing:docker-compose-tests:b86c044b24138af0659a05ed6331576c
testing:helm-tests:936cf28fd84ce4ff5113795fdae9624b
-testing:integration-tests:225ddb6243cce5fc64f4824b87adfd98
-testing:tests:b96f54a7e08986e2309af33141099e8d
-testing:8d1f02ebc1119bdf93e027a4f291237f
+testing:integration-tests:7865b62e9418ddb749511f8a801a49c2
+testing:tests:d301440c82391f9c21c29e7a45efd3b9
+testing:db7a6fc196906d4ead598d63b094c72f
diff --git a/images/breeze/output-commands.svg b/images/breeze/output-commands.svg
index 784715a8cce83..d1ce7944df5bd 100644
--- a/images/breeze/output-commands.svg
+++ b/images/breeze/output-commands.svg
@@ -35,8 +35,8 @@
.breeze-help-r1 { fill: #c5c8c6;font-weight: bold }
.breeze-help-r2 { fill: #c5c8c6 }
.breeze-help-r3 { fill: #d0b344;font-weight: bold }
-.breeze-help-r4 { fill: #868887 }
-.breeze-help-r5 { fill: #68a0b3;font-weight: bold }
+.breeze-help-r4 { fill: #68a0b3;font-weight: bold }
+.breeze-help-r5 { fill: #868887 }
.breeze-help-r6 { fill: #98a84b;font-weight: bold }
.breeze-help-r7 { fill: #8d7b39 }
@@ -190,50 +190,50 @@
-Usage: breeze [OPTIONS] COMMAND [ARGS]...
+Usage: breeze [OPTIONS] COMMAND [ARGS]...
-â•─ Basic flags ────────────────────────────────────────────────────────────────────────────────────────────────────────╮
-│--python-pPython major/minor version used in Airflow image for images.(>3.7< | 3.8 | 3.9 | 3.10)│
-│[default: 3.7] │
-│--backend-bDatabase backend to use.(>sqlite< | mysql | postgres | mssql)[default: sqlite]│
-│--postgres-version-PVersion of Postgres used.(>11< | 12 | 13 | 14 | 15)[default: 11]│
-│--mysql-version-MVersion of MySQL used.(>5.7< | 8)[default: 5.7]│
-│--mssql-version-SVersion of MsSQL used.(>2017-latest< | 2019-latest)[default: 2017-latest]│
-│--integrationIntegration(s) to enable when running (can be more than one). │
-│(all | all-testable | cassandra | celery | kerberos | mongo | otel | pinot | statsd | │
-│statsd | trino) │
-│--forward-credentials-fForward local credentials to container when running.│
-│--db-reset-dReset DB when entering the container.│
-│--max-timeMaximum time that the command should take - if it takes longer, the command will fail.│
-│(INTEGER RANGE) │
-│--github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow]│
-╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
-â•─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮
-│--verbose-vPrint verbose information about performed steps.│
-│--dry-run-DIf dry-run is set, commands are only printed, not executed.│
-│--answer-aForce answer to questions.(y | n | q | yes | no | quit)│
-│--help-hShow this message and exit.│
-╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
-â•─ Basic developer commands ───────────────────────────────────────────────────────────────────────────────────────────╮
-│start-airflow Enter breeze environment and starts all Airflow components in the tmux session. Compile assets │
-│if contents of www directory changed. │
-│static-checks Run static checks. │
-│build-docs Build documentation in the container. │
-│stop Stop running breeze environment. │
-│shell Enter breeze environment. this is the default command use when no other is selected. │
-│exec Joins the interactive shell of running airflow container. │
-│compile-www-assetsCompiles www assets. │
-│cleanup Cleans the cache of parameters, docker cache and optionally built CI/PROD images. │
-╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
-â•─ Advanced command groups ────────────────────────────────────────────────────────────────────────────────────────────╮
-│testing Tools that developers can use to run tests │
-│ci-image Tools that developers can use to manually manage CI images │
-│k8s Tools that developers use to run Kubernetes tests │
-│prod-image Tools that developers can use to manually manage PROD images │
-│setup Tools that developers can use to configure Breeze │
-│release-management Tools that release managers can use to prepare and manage Airflow releases │
-│ci Tools that CI workflows use to cleanup/manage CI environment │
-╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+â•─ Basic flags ────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+│--python-pPython major/minor version used in Airflow image for images.(>3.7< | 3.8 | 3.9 | 3.10)│
+│[default: 3.7] │
+│--backend-bDatabase backend to use.(>sqlite< | mysql | postgres | mssql)[default: sqlite]│
+│--postgres-version-PVersion of Postgres used.(>11< | 12 | 13 | 14 | 15)[default: 11]│
+│--mysql-version-MVersion of MySQL used.(>5.7< | 8)[default: 5.7]│
+│--mssql-version-SVersion of MsSQL used.(>2017-latest< | 2019-latest)[default: 2017-latest]│
+│--integrationIntegration(s) to enable when running (can be more than one). │
+│(all | all-testable | cassandra | celery | kafka | kerberos | mongo | otel | pinot | │
+│statsd | statsd | trino) │
+│--forward-credentials-fForward local credentials to container when running.│
+│--db-reset-dReset DB when entering the container.│
+│--max-timeMaximum time that the command should take - if it takes longer, the command will fail.│
+│(INTEGER RANGE) │
+│--github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow]│
+╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+â•─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮
+│--verbose-vPrint verbose information about performed steps.│
+│--dry-run-DIf dry-run is set, commands are only printed, not executed.│
+│--answer-aForce answer to questions.(y | n | q | yes | no | quit)│
+│--help-hShow this message and exit.│
+╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+â•─ Basic developer commands ───────────────────────────────────────────────────────────────────────────────────────────╮
+│start-airflow Enter breeze environment and starts all Airflow components in the tmux session. Compile assets │
+│if contents of www directory changed. │
+│static-checks Run static checks. │
+│build-docs Build documentation in the container. │
+│stop Stop running breeze environment. │
+│shell Enter breeze environment. this is the default command use when no other is selected. │
+│exec Joins the interactive shell of running airflow container. │
+│compile-www-assetsCompiles www assets. │
+│cleanup Cleans the cache of parameters, docker cache and optionally built CI/PROD images. │
+╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+â•─ Advanced command groups ────────────────────────────────────────────────────────────────────────────────────────────╮
+│testing Tools that developers can use to run tests │
+│ci-image Tools that developers can use to manually manage CI images │
+│k8s Tools that developers use to run Kubernetes tests │
+│prod-image Tools that developers can use to manually manage PROD images │
+│setup Tools that developers can use to configure Breeze │
+│release-management Tools that release managers can use to prepare and manage Airflow releases │
+│ci Tools that CI workflows use to cleanup/manage CI environment │
+╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
diff --git a/images/breeze/output_build-docs.svg b/images/breeze/output_build-docs.svg
index d8c26d13745ed..9ff15eefc79d0 100644
--- a/images/breeze/output_build-docs.svg
+++ b/images/breeze/output_build-docs.svg
@@ -1,4 +1,4 @@
-