Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions airflow/api_connexion/openapi/internal_api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
x-send-defaults: true
openapi: 3.0.0
x-api-id: json-rpc-example
info:
title: JSON-RPC OpenAPI
version: 1.0.0
description: Example of how to describe a JSON-RPC 2 API in OpenAPI
servers:
- url: /internal/v1
description: Apache Airflow Internal API.
paths:
"/rpcapi":
post:
operationId: rpcapi
deprecated: false
summary: Example of JSON-RPC2 Post
x-openapi-router-controller: airflow.api_internal.rpc_api
operationId: json_rpc
description: Example post using JSON-RPC params.
tags:
- JSONRPC
parameters: []
responses:
'200':
description: Successful response
requestBody:
x-body-name: body
required: true
content:
application/json:
schema:
type: object
required:
- method
- jsonrpc
- params
properties:
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
method:
type: string
default: example
description: Method name
params:
title: Parameters
type: object
x-headers: []
x-explorer-enabled: true
x-proxy-enabled: true
x-samples-enabled: true
components:
schemas:
JsonRpcRequired:
type: object
required:
- method
- jsonrpc
properties:
method:
type: string
default: examplePost
description: Method name
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
discriminator:
propertyName: method_name
examplePost:
allOf:
- "$ref": "#/components/schemas/JsonRpcRequired"
- type: object
properties:
params:
title: Parameters
type: object
tags: []
16 changes: 16 additions & 0 deletions airflow/api_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
82 changes: 82 additions & 0 deletions airflow/api_internal/internal_api_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import inspect
import json
import requests
from airflow.exceptions import AirflowException
from airflow.api_internal.rpc_api import METHODS


# TODO read these from configuration
use_internal_api = False
url = "http://127.0.0.1:50051/internal/v1/rpcapi"

def remove_none(d: dict) -> dict:
if isinstance(d, dict):
for k, v in list(d.items()):
if v is None:
del d[k]
else:
remove_none(v)
if isinstance(d, list):
for v in d:
remove_none(v)
return d


def internal_api_call(
method_name: str,
):
headers = {
"Content-Type": "application/json",
}

def jsonrpc_request(params_json):
data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}

response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code != 200:
print(f"Internal API error {response.content}")
raise AirflowException(
f"Got {response.status_code}:{response.reason} when submitting the internal api call."
)
return response.content

def inner(func):
def make_call(*args, **kwargs):

if use_internal_api:
print("internal_api_call")
bound = inspect.signature(func).bind(*args, **kwargs)
arguments_dict = dict(bound.arguments)
if "session" in arguments_dict:
del arguments_dict["session"]
handler = METHODS[method_name]
result = jsonrpc_request(handler.args_to_json(arguments_dict))
if result is not None:
return handler.result_from_json(result)
else:
return

print("standard call")
return func(*args, **kwargs)

return make_call

return inner

108 changes: 108 additions & 0 deletions airflow/api_internal/rpc_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable
from flask import Response
import json
import logging

from airflow.callbacks.callback_requests import CallbackRequest, callback_from_dict

from typing import Any, Tuple, List
from airflow.api_connexion.types import APIResponse
from airflow.dag_processing.processor import DagFileProcessor

# internal_api_method : (module, class, method_name)
# PLEASE KEEP BACKWARD COMPATIBLE:
# - do not remove/change 'internal_api_method' string.
# - when adding more parameters (default values)


class InternalApiHandler:
def __init__(
self,
func: Callable,
args_to_json: Callable[[dict], str] = json.dumps,
args_from_json: Callable[[str], dict] = lambda x: x,
result_to_json: Callable[[Any], str] = json.dumps,
result_from_json: Callable[[str], Any] = lambda x: x,
):
self.func = func
self.args_to_json = args_to_json
self.args_from_json = args_from_json
self.result_to_json = result_to_json
self.result_from_json = result_from_json


def json_to_tuple(json_object: str) -> Tuple[Any, Any]:
return tuple(json.loads(json_object))


def process_file_args_to_json(args: dict) -> dict:
result_dict = args.copy()
callback_requests: List[CallbackRequest] = args["callback_requests"]
callbacks: List[dict] = []

for callback in callback_requests:
d = callback.to_dict().copy()
d['type'] = str(callback.__class__.__name__)
callbacks.append(d)
result_dict["callback_requests"] = callbacks
return result_dict


def process_file_args_from_json(args: dict) -> dict:
result_dict = args.copy()
callback_requests: List[dict] = result_dict["callback_requests"]
callbacks: List[CallbackRequest] = []

for callback in callback_requests:
type_name = callback['type']
del callback['type']
callbacks.append(callback_from_dict(callback, type_name))
result_dict["callback_requests"] = callbacks
return result_dict


processor = DagFileProcessor(
dag_ids=[], log=logging.getLogger('airflow'), dag_directory="/opt/airflow/airflow/example_dags"
)
METHODS = {
'update_import_errors': InternalApiHandler(func=DagFileProcessor.update_import_errors),
'process_file': InternalApiHandler(
func=processor.process_file,
args_to_json=process_file_args_to_json,
args_from_json=process_file_args_from_json,
result_from_json=json_to_tuple,
),
}

# handler for /internal/v1/rpcapi
def json_rpc(
body: dict,
) -> APIResponse:
"""Process internal API request."""
method_name = body.get("method")
params = body.get("params")
handler = METHODS[method_name]
args = handler.args_from_json(params)
output = handler.func(**args)
if output is not None:
output_json = handler.result_to_json(output)
else:
output_json = ''
return Response(response=output_json, headers={"Content-Type": "application/json"})
36 changes: 31 additions & 5 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,20 @@ def __eq__(self, other):
def __repr__(self):
return str(self.__dict__)

def to_dict(self) -> dict:
return self.__dict__

def to_json(self) -> str:
return json.dumps(self.__dict__)

@classmethod
def from_dict(cls, dict_obj: str):
return cls(**dict_obj)

@classmethod
def from_json(cls, json_str: str):
json_object = json.loads(json_str)
return cls(**json_object)
return cls.from_dict(**json_object)


class TaskCallbackRequest(CallbackRequest):
Expand All @@ -83,18 +90,27 @@ def __init__(
self.simple_task_instance = simple_task_instance
self.is_failure_callback = is_failure_callback

def to_json(self) -> str:
def to_dict(self) -> dict:
dict_obj = self.__dict__.copy()
dict_obj["simple_task_instance"] = self.simple_task_instance.as_dict()
return json.dumps(dict_obj)
return dict_obj

def to_json(self) -> str:
return json.dumps(self.to_dict())

@classmethod
def from_json(cls, json_str: str):
from airflow.models.taskinstance import SimpleTaskInstance

kwargs = json.loads(json_str)
simple_ti = SimpleTaskInstance.from_dict(obj_dict=kwargs.pop("simple_task_instance"))
return cls(simple_task_instance=simple_ti, **kwargs)
return cls.from_dict(kwargs)

@classmethod
def from_dict(cls, dict_obj: str):
from airflow.models.taskinstance import SimpleTaskInstance

simple_ti = SimpleTaskInstance.from_dict(obj_dict=dict_obj.pop("simple_task_instance"))
return cls(simple_task_instance=simple_ti, **dict_obj)


class DagCallbackRequest(CallbackRequest):
Expand Down Expand Up @@ -142,3 +158,13 @@ def __init__(
):
super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg)
self.dag_id = dag_id


def callback_from_dict(dict_obj: dict, type_name: str) -> CallbackRequest:
if type_name == "TaskCallbackRequest":
return TaskCallbackRequest.from_dict(dict_obj=dict_obj)
if type_name == "DagCallbackRequest":
return DagCallbackRequest.from_dict(dict_obj=dict_obj)
if type_name == "SlaCallbackRequest":
return SlaCallbackRequest.from_dict(dict_obj=dict_obj)
return CallbackRequest.from_dict(dict_obj=dict_obj)
Loading