Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ clean: ## Removes build and test artifacts
build-whl: ## Build installable whl file
rm -rf dev/include/*
rm -rf dist/*
mkdir -p dev/include
hatch build
cp dist/* dev/include/

Expand Down
25 changes: 25 additions & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@
except ImportError:
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator

# http operator was renamed in providers-http 4.11.0
try:
from airflow.providers.http.operators.http import HttpOperator

HTTP_OPERATOR_CLASS = HttpOperator
except ImportError:
try:
from airflow.providers.http.operators.http import SimpleHttpOperator

HTTP_OPERATOR_CLASS = SimpleHttpOperator
except ImportError:
# Fall back to dynamically importing the operator
HTTP_OPERATOR_CLASS = None

# http sensor was moved in 2.4
try:
Expand Down Expand Up @@ -428,6 +441,18 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator:
else None
)

# HttpOperator
if HTTP_OPERATOR_CLASS and issubclass(operator_obj, HTTP_OPERATOR_CLASS):
headers = task_params.get("headers", {})
content_type = headers.get("Content-Type", "").lower()

if "data" in task_params and "application/json" in content_type:
task_params["data"]: Callable = utils.get_json_serialized_callable(task_params["data"])

if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
task_params["headers"] = headers

DagBuilder.adjust_general_task_params(task_params)

expand_kwargs: Dict[str, Union[Dict[str, Any], Any]] = {}
Expand Down
25 changes: 25 additions & 0 deletions dagfactory/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ast
import importlib.util
import json
import logging
import os
import re
Expand Down Expand Up @@ -320,3 +321,27 @@ def parse_list_datasets(datasets: Union[List[str], str]) -> str:
if isinstance(datasets, list):
datasets = " & ".join(datasets)
return datasets


def get_json_serialized_callable(data_obj):
"""
Takes a dictionary object and returns a callable that
returns JSON serialized string. If it's already a string,
validates that it's valid JSON.

:param data_obj: dict or JSON-formatted str
:returns: callable returning JSON serialized str
"""
if isinstance(data_obj, dict):
serialized_json = json.dumps(data_obj)
elif isinstance(data_obj, str):
try:
# Validate JSON string
json.loads(data_obj)
serialized_json = data_obj
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON provided: {e}")
else:
raise TypeError(f"data_obj must be a dict or str, not {type(data_obj)}")

return lambda **kwargs: serialized_json
26 changes: 26 additions & 0 deletions dev/dags/example_http_operator_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from pathlib import Path

try:
from airflow.providers.http.operators.http import HttpOperator
HTTP_OPERATOR_AVAILABLE = True
except ImportError:
HTTP_OPERATOR_AVAILABLE = False

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
if HTTP_OPERATOR_AVAILABLE:
config_file = str(CONFIG_ROOT_DIR / "example_http_operator_task.yml")
else:
config_file = str(CONFIG_ROOT_DIR / "example_simple_http_operator_task.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
35 changes: 35 additions & 0 deletions dev/dags/example_http_operator_task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
default:
default_args:
catchup: false,
start_date: 2025-03-20

http_operator_example_dag:
default_args:
owner: "@owner"
description: "this is an HttpOperator dag"
schedule_interval: "0 3 * * *"
tags: ['http']
render_template_as_native_obj: True
tasks:
send_request_json:
operator: airflow.providers.http.operators.http.HttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
format: "json"
headers:
Content-Type: application/json
log_response: True
send_request_plain_text:
operator: airflow.providers.http.operators.http.HttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
test: "plain_text"
headers:
Content-Type: text/plain
log_response: True
35 changes: 35 additions & 0 deletions dev/dags/example_simple_http_operator_task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
default:
default_args:
catchup: false,
start_date: 2025-03-20

http_operator_example_dag:
default_args:
owner: "@owner"
description: "this is an HttpOperator dag"
schedule_interval: "0 3 * * *"
tags: ['http']
render_template_as_native_obj: True
tasks:
send_request_json:
operator: airflow.providers.http.operators.http.SimpleHttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
format: "json"
Comment thread
pankajastro marked this conversation as resolved.
headers:
Content-Type: application/json
log_response: True
send_request_plain_text:
operator: airflow.providers.http.operators.http.SimpleHttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
test: "plain_text"
headers:
Content-Type: text/plain
log_response: True
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow}
python = ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow = ["2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]

[tool.hatch.envs.tests.overrides]
matrix.airflow.dependencies = [
{ value = "typing_extensions<4.6", if = ["2.6"] },
]
Comment thread
pankajastro marked this conversation as resolved.

[tool.hatch.envs.tests.scripts]
freeze = "pip freeze"
Expand Down
35 changes: 35 additions & 0 deletions tests/fixtures/dag_factory_http_operator_task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
default:
default_args:
catchup: false,
start_date: 2025-03-20

http_operator_example_dag:
default_args:
owner: "@owner"
description: "this is an HttpOperator dag"
schedule_interval: "0 3 * * *"
tags: ['http']
render_template_as_native_obj: True
tasks:
send_request_json:
operator: airflow.providers.http.operators.http.HttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
format: "json"
headers:
Content-Type: application/json
log_response: True
send_request_plain_text:
operator: airflow.providers.http.operators.http.HttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
test: "plain_text"
headers:
Content-Type: text/plain
log_response: True
35 changes: 35 additions & 0 deletions tests/fixtures/dag_factory_simple_http_operator_task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
default:
default_args:
catchup: false,
start_date: 2025-03-20

simple_http_operator_example_dag:
default_args:
owner: "@owner"
description: "this is a SimpleHttpOperator dag"
schedule_interval: "0 3 * * *"
tags: ['http']
render_template_as_native_obj: True
tasks:
send_request_json:
operator: airflow.operators.http_operator.SimpleHttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
format: "json"
headers:
Content-Type: application/json
log_response: True
send_request_plain_text:
operator: airflow.operators.http_operator.SimpleHttpOperator
http_conn_id: "example_host"
method: "POST"
endpoint: "/run_test"
data:
data: "fake_data"
test: "plain_text"
headers:
Content-Type: text/plain
log_response: True
Loading