Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow/api_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/api_internal/endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
80 changes: 80 additions & 0 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import json
import logging

from flask import Response

from airflow.api_connexion.types import APIResponse
from airflow.dag_processing.processor import DagFileProcessor
from airflow.serialization.serialized_objects import BaseSerialization

log = logging.getLogger(__name__)


def _build_methods_map(list) -> dict:
return {f"{func.__module__}.{func.__name__}": func for func in list}


METHODS_MAP = _build_methods_map(
[
DagFileProcessor.update_import_errors,
]
)


def internal_airflow_api(
body: dict,
) -> APIResponse:
"""Handler for Internal API /internal_api/v1/rpcapi endpoint."""
log.debug("Got request")
json_rpc = body.get("jsonrpc")
if json_rpc != "2.0":
log.error("Not jsonrpc-2.0 request.")
return Response(response="Expected jsonrpc 2.0 request.", status=400)

method_name = str(body.get("method"))
if method_name not in METHODS_MAP:
log.error("Unrecognized method: %s.", method_name)
return Response(response=f"Unrecognized method: {method_name}.", status=400)

handler = METHODS_MAP[method_name]
params = {}
try:
if body.get("params"):
params_json = json.loads(str(body.get("params")))
params = BaseSerialization.deserialize(params_json)
except Exception as err:
log.error("Error deserializing parameters.")
log.error(err)
return Response(response="Error deserializing parameters.", status=400)

log.debug("Calling method %.", {method_name})
try:
output = handler(**params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized something here. Dont we have an issue here? All functions listed in METHODS_MAP are annotated with @internal_api_call, so when we execute these functions here, we'll trigger again the decorator and as such, send a rpc request. Am I wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I have an issue for that #28267
The idea is: some components (like Scheduler/Webserver/Internal API) will always run these methods locally

output_json = BaseSerialization.serialize(output)
log.debug("Returning response")
return Response(
response=json.dumps(output_json or "{}"), headers={"Content-Type": "application/json"}
)
except Exception as e:
log.error("Error when calling method %s.", method_name)
log.error(e)
return Response(response=f"Error executing method: {method_name}.", status=500)
114 changes: 114 additions & 0 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import inspect
import json
from functools import wraps
from typing import Callable, TypeVar

import requests

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.typing_compat import ParamSpec

PS = ParamSpec("PS")
RT = TypeVar("RT")


class InternalApiConfig:
"""Stores and caches configuration for Internal API."""

_initialized = False
_use_internal_api = False
_internal_api_endpoint = ""

@staticmethod
def get_use_internal_api():
if not InternalApiConfig._initialized:
InternalApiConfig._init_values()
return InternalApiConfig._use_internal_api

@staticmethod
def get_internal_api_endpoint():
if not InternalApiConfig._initialized:
InternalApiConfig._init_values()
return InternalApiConfig._internal_api_endpoint

@staticmethod
def _init_values():
use_internal_api = conf.getboolean("core", "database_access_isolation")
internal_api_endpoint = ""
if use_internal_api:
internal_api_url = conf.get("core", "internal_api_url")
internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi"
if not internal_api_endpoint.startswith("http://"):
raise AirflowConfigException("[core]internal_api_url must start with http://")

InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = use_internal_api
InternalApiConfig._internal_api_endpoint = internal_api_endpoint


def internal_api_call(func: Callable[PS, RT | None]) -> Callable[PS, RT | None]:
"""Decorator for methods which may be executed in database isolation mode.

If [core]database_access_isolation is true then such method are not executed locally,
but instead RPC call is made to Database API (aka Internal API). This makes some components
decouple from direct Airflow database access.
Each decorated method must be present in METHODS list in airflow.api_internal.endpoints.rpc_api_endpoint.
Only static methods can be decorated. This decorator must be before "provide_session".

See [AIP-44](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API)
for more information .
"""
headers = {
"Content-Type": "application/json",
}

def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers)
if response.status_code != 200:
raise AirflowException(
f"Got {response.status_code}:{response.reason} when sending the internal api request."
)
return response.content

@wraps(func)
def wrapper(*args, **kwargs) -> RT | None:
use_internal_api = InternalApiConfig.get_use_internal_api()
if not use_internal_api:
return func(*args, **kwargs)

bound = inspect.signature(func).bind(*args, **kwargs)
arguments_dict = dict(bound.arguments)
if "session" in arguments_dict:
del arguments_dict["session"]
args_json = json.dumps(BaseSerialization.serialize(arguments_dict))
method_name = f"{func.__module__}.{func.__name__}"
result = make_jsonrpc_request(method_name, args_json)
if result:
return BaseSerialization.deserialize(json.loads(result))
else:
return None

return wrapper
91 changes: 91 additions & 0 deletions airflow/api_internal/openapi/internal_api_v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
openapi: 3.0.2
info:
title: Airflow Internal API
version: 1.0.0
description: |
This is Airflow Internal API - which is a proxy for components running
customer code for connecting to Airflow Database.

It is not intended to be used by any external code.

You can find more information in AIP-44
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API


servers:
- url: /internal_api/v1
description: Airflow Internal API
paths:
"/rpcapi":
post:
operationId: rpcapi
deprecated: false
x-openapi-router-controller: airflow.api_internal.endpoints.rpc_api_endpoint
operationId: internal_airflow_api
tags:
- JSONRPC
parameters: []
responses:
'200':
description: Successful response
requestBody:
x-body-name: body
required: true
content:
application/json:
schema:
type: object
required:
- method
- jsonrpc
- params
properties:
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
method:
type: string
description: Method name
params:
title: Parameters
type: string
x-headers: []
x-explorer-enabled: true
x-proxy-enabled: true
components:
schemas:
JsonRpcRequired:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which part exactly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema JsonRpcRequired

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with OpenAPI, but isn't is used for validation? I found that requests that missed "method" were automatically rejected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the way how Open API specification describes it conformnce with JsonRPC. We should have it in. Various tools that might be later employed might use the information (swagger UI for example to generate the documentation and describe the API).

This is not crucial in our case, the Swagger UI and other OpenAPI artifacts are really "side-efffect" here - we have no-one to consume those artifacts as our API is purely internal and most of it is not really "fixed" and described (i.e. the actual methods and parameters are not really validated/processed by the API specification - because they are dynamically generated on both client and server side - so this is not actually very needed, but might be useful in the future in case we will do any kind of additional tooling that might rely on inspecting the specification and using the "metadata" about JsonRPC under the hood.

BTW. This is perfectly OK we do not have those methods and parameters described. This is not the "usual" API that you expose and document. Both client and server in this communication are guaranteed to have single source of truth (inspection of the method names and their parameters and serializing them using our own serializer).

So we are good here. The Open API specification we have looks cool:

  • single endpoint
  • JSonRpc conformance
  • no boiler-plate growing with every method added
  • single source of truth for the actual "schema" of passed data
  • the data is easy to inspect by human without extra tools (method names, and parameters send as JSON-serialized data).
  • guaranteed client-server compatibility

I really like it :)

type: object
required:
- method
- jsonrpc
properties:
method:
type: string
description: Method name
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
discriminator:
propertyName: method_name
tags: []
20 changes: 20 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,19 @@
type: string
default: ~
example: '{"some_param": "some_value"}'
- name: database_access_isolation
description: (experimental) Whether components should use Airflow Internal API for DB connectivity.
version_added: 2.6.0
type: boolean
example: ~
default: "False"
- name: internal_api_url
description: |
(experimental)Airflow Internal API url. Only used if [core] database_access_isolation is True.
version_added: 2.6.0
type: string
default: ~
example: 'http://localhost:8080'

- name: database
description: ~
Expand Down Expand Up @@ -1482,6 +1495,13 @@
type: string
example: "dagrun_cleared,failed"
default: ~
- name: run_internal_api
description: |
Boolean for running Internal API in the webserver.
version_added: 2.6.0
type: boolean
example: ~
default: "False"

- name: email
description: |
Expand Down
10 changes: 10 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ daemon_umask = 0o077
# Example: dataset_manager_kwargs = {{"some_param": "some_value"}}
# dataset_manager_kwargs =

# (experimental) Whether components should use Airflow Internal API for DB connectivity.
database_access_isolation = False

# (experimental)Airflow Internal API url. Only used if [core] database_access_isolation is True.
# Example: internal_api_url = http://localhost:8080
# internal_api_url =

[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
Expand Down Expand Up @@ -752,6 +759,9 @@ audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,g
# Example: audit_view_included_events = dagrun_cleared,failed
# audit_view_included_events =

# Boolean for running Internal API in the webserver.
run_internal_api = False

[email]

# Configuration email backend and whether to
Expand Down
Loading