diff --git a/airflow/api_connexion/openapi/internal_api.yaml b/airflow/api_connexion/openapi/internal_api.yaml new file mode 100644 index 0000000000000..128689f8218db --- /dev/null +++ b/airflow/api_connexion/openapi/internal_api.yaml @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +x-send-defaults: true +openapi: 3.0.0 +x-api-id: json-rpc-example +info: + title: JSON-RPC OpenAPI + version: 1.0.0 + description: Example of how to describe a JSON-RPC 2 API in OpenAPI +servers: + - url: /internal/v1 + description: Apache Airflow Internal API. +paths: + "/rpcapi": + post: + operationId: rpcapi + deprecated: false + summary: Example of JSON-RPC2 Post + x-openapi-router-controller: airflow.api_internal.rpc_api + operationId: json_rpc + description: Example post using JSON-RPC params. + tags: + - JSONRPC + parameters: [] + responses: + '200': + description: Successful response + requestBody: + x-body-name: body + required: true + content: + application/json: + schema: + type: object + required: + - method + - jsonrpc + - params + properties: + jsonrpc: + type: string + default: '2.0' + description: JSON-RPC Version (2.0) + method: + type: string + default: example + description: Method name + params: + title: Parameters + type: object +x-headers: [] +x-explorer-enabled: true +x-proxy-enabled: true +x-samples-enabled: true +components: + schemas: + JsonRpcRequired: + type: object + required: + - method + - jsonrpc + properties: + method: + type: string + default: examplePost + description: Method name + jsonrpc: + type: string + default: '2.0' + description: JSON-RPC Version (2.0) + discriminator: + propertyName: method_name + examplePost: + allOf: + - "$ref": "#/components/schemas/JsonRpcRequired" + - type: object + properties: + params: + title: Parameters + type: object +tags: [] diff --git a/airflow/api_internal/__init__.py b/airflow/api_internal/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/api_internal/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/api_internal/internal_api_decorator.py b/airflow/api_internal/internal_api_decorator.py new file mode 100644 index 0000000000000..b5f2769c70578 --- /dev/null +++ b/airflow/api_internal/internal_api_decorator.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import inspect +import json +import requests +from airflow.exceptions import AirflowException +from airflow.api_internal.rpc_api import METHODS + + +# TODO read these from configuration +use_internal_api = False +url = "http://127.0.0.1:50051/internal/v1/rpcapi" + +def remove_none(d: dict) -> dict: + if isinstance(d, dict): + for k, v in list(d.items()): + if v is None: + del d[k] + else: + remove_none(v) + if isinstance(d, list): + for v in d: + remove_none(v) + return d + + +def internal_api_call( + method_name: str, +): + headers = { + "Content-Type": "application/json", + } + + def jsonrpc_request(params_json): + data = {"jsonrpc": "2.0", "method": method_name, "params": params_json} + + response = requests.post(url, data=json.dumps(data), headers=headers) + if response.status_code != 200: + print(f"Internal API error {response.content}") + raise AirflowException( + f"Got {response.status_code}:{response.reason} when submitting the internal api call." + ) + return response.content + + def inner(func): + def make_call(*args, **kwargs): + + if use_internal_api: + print("internal_api_call") + bound = inspect.signature(func).bind(*args, **kwargs) + arguments_dict = dict(bound.arguments) + if "session" in arguments_dict: + del arguments_dict["session"] + handler = METHODS[method_name] + result = jsonrpc_request(handler.args_to_json(arguments_dict)) + if result is not None: + return handler.result_from_json(result) + else: + return + + print("standard call") + return func(*args, **kwargs) + + return make_call + + return inner + diff --git a/airflow/api_internal/rpc_api.py b/airflow/api_internal/rpc_api.py new file mode 100644 index 0000000000000..8fa7f6a66bec5 --- /dev/null +++ b/airflow/api_internal/rpc_api.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Callable +from flask import Response +import json +import logging + +from airflow.callbacks.callback_requests import CallbackRequest, callback_from_dict + +from typing import Any, Tuple, List +from airflow.api_connexion.types import APIResponse +from airflow.dag_processing.processor import DagFileProcessor + +# internal_api_method : (module, class, method_name) +# PLEASE KEEP BACKWARD COMPATIBLE: +# - do not remove/change 'internal_api_method' string. +# - when adding more parameters (default values) + + +class InternalApiHandler: + def __init__( + self, + func: Callable, + args_to_json: Callable[[dict], str] = json.dumps, + args_from_json: Callable[[str], dict] = lambda x: x, + result_to_json: Callable[[Any], str] = json.dumps, + result_from_json: Callable[[str], Any] = lambda x: x, + ): + self.func = func + self.args_to_json = args_to_json + self.args_from_json = args_from_json + self.result_to_json = result_to_json + self.result_from_json = result_from_json + + +def json_to_tuple(json_object: str) -> Tuple[Any, Any]: + return tuple(json.loads(json_object)) + + +def process_file_args_to_json(args: dict) -> dict: + result_dict = args.copy() + callback_requests: List[CallbackRequest] = args["callback_requests"] + callbacks: List[dict] = [] + + for callback in callback_requests: + d = callback.to_dict().copy() + d['type'] = str(callback.__class__.__name__) + callbacks.append(d) + result_dict["callback_requests"] = callbacks + return result_dict + + +def process_file_args_from_json(args: dict) -> dict: + result_dict = args.copy() + callback_requests: List[dict] = result_dict["callback_requests"] + callbacks: List[CallbackRequest] = [] + + for callback in callback_requests: + type_name = callback['type'] + del callback['type'] + callbacks.append(callback_from_dict(callback, type_name)) + result_dict["callback_requests"] = callbacks + return result_dict + + +processor = DagFileProcessor( + dag_ids=[], log=logging.getLogger('airflow'), dag_directory="/opt/airflow/airflow/example_dags" +) +METHODS = { + 'update_import_errors': InternalApiHandler(func=DagFileProcessor.update_import_errors), + 'process_file': InternalApiHandler( + func=processor.process_file, + args_to_json=process_file_args_to_json, + args_from_json=process_file_args_from_json, + result_from_json=json_to_tuple, + ), +} + +# handler for /internal/v1/rpcapi +def json_rpc( + body: dict, +) -> APIResponse: + """Process internal API request.""" + method_name = body.get("method") + params = body.get("params") + handler = METHODS[method_name] + args = handler.args_from_json(params) + output = handler.func(**args) + if output is not None: + output_json = handler.result_to_json(output) + else: + output_json = '' + return Response(response=output_json, headers={"Content-Type": "application/json"}) diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py index d8c36cb7532b0..e35d3a74d6d9a 100644 --- a/airflow/callbacks/callback_requests.py +++ b/airflow/callbacks/callback_requests.py @@ -50,13 +50,20 @@ def __eq__(self, other): def __repr__(self): return str(self.__dict__) + def to_dict(self) -> dict: + return self.__dict__ + def to_json(self) -> str: return json.dumps(self.__dict__) + @classmethod + def from_dict(cls, dict_obj: str): + return cls(**dict_obj) + @classmethod def from_json(cls, json_str: str): json_object = json.loads(json_str) - return cls(**json_object) + return cls.from_dict(**json_object) class TaskCallbackRequest(CallbackRequest): @@ -83,18 +90,27 @@ def __init__( self.simple_task_instance = simple_task_instance self.is_failure_callback = is_failure_callback - def to_json(self) -> str: + def to_dict(self) -> dict: dict_obj = self.__dict__.copy() dict_obj["simple_task_instance"] = self.simple_task_instance.as_dict() - return json.dumps(dict_obj) + return dict_obj + + def to_json(self) -> str: + return json.dumps(self.to_dict()) @classmethod def from_json(cls, json_str: str): from airflow.models.taskinstance import SimpleTaskInstance kwargs = json.loads(json_str) - simple_ti = SimpleTaskInstance.from_dict(obj_dict=kwargs.pop("simple_task_instance")) - return cls(simple_task_instance=simple_ti, **kwargs) + return cls.from_dict(kwargs) + + @classmethod + def from_dict(cls, dict_obj: str): + from airflow.models.taskinstance import SimpleTaskInstance + + simple_ti = SimpleTaskInstance.from_dict(obj_dict=dict_obj.pop("simple_task_instance")) + return cls(simple_task_instance=simple_ti, **dict_obj) class DagCallbackRequest(CallbackRequest): @@ -142,3 +158,13 @@ def __init__( ): super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg) self.dag_id = dag_id + + +def callback_from_dict(dict_obj: dict, type_name: str) -> CallbackRequest: + if type_name == "TaskCallbackRequest": + return TaskCallbackRequest.from_dict(dict_obj=dict_obj) + if type_name == "DagCallbackRequest": + return DagCallbackRequest.from_dict(dict_obj=dict_obj) + if type_name == "SlaCallbackRequest": + return SlaCallbackRequest.from_dict(dict_obj=dict_obj) + return CallbackRequest.from_dict(dict_obj=dict_obj) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index bed5fc22499be..3190ac50367a7 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -951,6 +951,29 @@ def string_lower_type(val): ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" ) +# internal API client +ARG_NUM_REPEATS = Arg( + ("--num-repeats",), + type=positive_int(allow_zero=False), + default=1, + help="The number of times to repeat the operation.", +) +ARG_USE_GRPC = Arg( + ("--use-grpc",), + default=False, + action='store_true', + help="Whether to use GRPC for tests", +) +ARG_NUM_CALLBACKS = Arg( + ("--num-callbacks",), + type=positive_int(allow_zero=False), + default=1, + help="The multiplier for number of callbacks.", +) +ARG_TEST = Arg( + ('--test',), help='Choose test.', type=str, choices=['file_processor'], default="file_processor" +) + # triggerer ARG_CAPACITY = Arg( ("--capacity",), @@ -1503,6 +1526,33 @@ class GroupCommand(NamedTuple): ), ), ) + +INTERNAL_API_COMMANDS = ( + ActionCommand( + name='server', + help="Start an internal API server instance", + func=lazy_load_command('airflow.cli.commands.internal_api_server_command.internal_api_server'), + args=( + ARG_PID, + ARG_DAEMON, + ARG_STDOUT, + ARG_STDERR, + ARG_LOG_FILE, + ), + ), + ActionCommand( + name='test-client', + help="Test client for internal API", + func=lazy_load_command('airflow.cli.commands.internal_api_client_command.internal_api_client'), + args=( + ARG_NUM_REPEATS, + ARG_NUM_CALLBACKS, + ARG_USE_GRPC, + ARG_TEST, + ), + ), +) + CONNECTIONS_COMMANDS = ( ActionCommand( name='get', @@ -1877,6 +1927,11 @@ class GroupCommand(NamedTuple): help="Database operations", subcommands=DB_COMMANDS, ), + GroupCommand( + name='internal-api', + help='Internal API commands', + subcommands=INTERNAL_API_COMMANDS, + ), ActionCommand( name='kerberos', help="Start a kerberos ticket renewer", diff --git a/airflow/cli/commands/internal_api_client_command.py b/airflow/cli/commands/internal_api_client_command.py new file mode 100644 index 0000000000000..0e0b3ad9e6d61 --- /dev/null +++ b/airflow/cli/commands/internal_api_client_command.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +import logging +from pathlib import Path +from typing import List, Tuple + +import grpc +from kubernetes.client import models as k8s +from rich.console import Console + +from airflow.callbacks.callback_requests import ( + CallbackRequest, + DagCallbackRequest, + SlaCallbackRequest, + TaskCallbackRequest, +) +from airflow.dag_processing.processor import DagFileProcessor +from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey +from airflow.utils import cli as cli_utils +from airflow.api_internal.internal_api_decorator import internal_api_call + +console = Console(width=400, color_system="standard") + +example_dags_folder ='/opt/airflow/airflow/example_dags' + +processor = DagFileProcessor( + dag_ids=[], + log=logging.getLogger("airflow"), + dag_directory=example_dags_folder, +) + + +@internal_api_call("process_file") +def process_file( + file_path: str, callback_requests: List[CallbackRequest], pickle_dags: bool = False +) -> Tuple[int, int]: + return processor.process_file( + file_path=file_path, + callback_requests=callback_requests, + pickle_dags=pickle_dags, + ) + + +def process_example_files(num_callbacks: int): + callback_base = [ + TaskCallbackRequest( + full_filepath=f"{example_dags_folder}/example_python_operator.py", + processor_subdir=example_dags_folder, + simple_task_instance=SimpleTaskInstance( + task_id="run_this_last", + dag_id="example_python_operator", + run_id="run_id", + start_date=datetime.datetime.now(), + end_date=datetime.datetime.now(), + try_number=1, + map_index=1, + state="RUNNING", + executor_config={ + "test": "test", + # "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})), + }, + pool="pool", + queue="queue", + key=TaskInstanceKey(dag_id="dag", task_id="task_id", run_id="run_id"), + run_as_user="user", + ), + ), + DagCallbackRequest( + full_filepath=f"{example_dags_folder}/example_bash_operator.py", + dag_id="example_bash_operator", + run_id="run_after_loop", + is_failure_callback=False, + msg="Error Message", + processor_subdir=example_dags_folder, + ), + SlaCallbackRequest( + full_filepath=f"{example_dags_folder}/example_bash_operator.py", + dag_id="example_bash_operator", + msg="Error message", + processor_subdir=example_dags_folder, + ), + ] + callbacks: List[CallbackRequest] = [] + for i in range(num_callbacks): + callbacks.extend(callback_base) + sum_dags = 0 + sum_errors = 0 + for file in Path(example_dags_folder).iterdir(): + if file.is_file() and file.name.endswith(".py"): + dags, errors = process_file(file_path=str(file), callback_requests=callbacks, pickle_dags=True) + sum_dags += dags + sum_errors += errors + console.print(f"Found {sum_dags} dags with {sum_errors} errors") + return sum_dags, sum_errors + + +def file_processor_test(num_callbacks: int, num_repeats: int): + total_dags = 0 + total_errors = 0 + for i in range(num_repeats): + dags, errors = process_example_files(num_callbacks) + total_dags += dags + total_errors += errors + console.print(f"Total found {total_dags} dags with {total_errors} errors") + + +@cli_utils.action_cli +def internal_api_client(args): + num_repeats = args.num_repeats + if args.test == "file_processor": + file_processor_test(args.num_callbacks, num_repeats=num_repeats) + else: + console.print(f"[red]Wrong test {args.test}") diff --git a/airflow/cli/commands/internal_api_server_command.py b/airflow/cli/commands/internal_api_server_command.py new file mode 100644 index 0000000000000..789be9138863b --- /dev/null +++ b/airflow/cli/commands/internal_api_server_command.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Internal API command""" +import signal + +import daemon +from daemon.pidfile import TimeoutPIDLockFile + +from airflow.jobs.internal_api_job import InternalAPIJob +from airflow.utils import cli as cli_utils +from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler + + +@cli_utils.action_cli +def internal_api_server(args): + """Starts Internal API server""" + job = InternalAPIJob() + + if args.daemon: + pid, stdout, stderr, log_file = setup_locations( + "internal_api", args.pid, args.stdout, args.stderr, args.log_file + ) + handle = setup_logging(log_file) + with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle: + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + files_preserve=[handle], + stdout=stdout_handle, + stderr=stderr_handle, + ) + with ctx: + job.run() + + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGQUIT, sigquit_handler) + job.run() diff --git a/airflow/jobs/internal_api_job.py b/airflow/jobs/internal_api_job.py new file mode 100644 index 0000000000000..4dfbf17b490bf --- /dev/null +++ b/airflow/jobs/internal_api_job.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import signal +import sys +from concurrent import futures +from os import path +import connexion + +import grpc + +from airflow.dag_processing.processor import DagFileProcessor +from airflow.jobs.base_job import BaseJob + +from connexion import App, ProblemException +from flask import Flask, request + +ROOT_APP_DIR = path.abspath(path.join(path.dirname(__file__), path.pardir, path.pardir)) + + +class InternalAPIJob(BaseJob): + """InternalAPIJob exposes GRPC API to run Database operations.""" + + __mapper_args__ = {'polymorphic_identity': 'InternalApiJob'} + + def __init__(self, *args, **kwargs): + # Call superclass + super().__init__(*args, **kwargs) + + # Set up runner async thread + self.app = None + + def serve(self): + + base_path = '/internal/v1' + flask_app = Flask(__name__) + spec_dir = path.join(ROOT_APP_DIR, 'airflow', 'api_connexion', 'openapi') + + self.app = App(__name__, specification_dir=spec_dir, skip_error_handlers=True) + self.app.app = flask_app + self.app.add_api( + specification='internal_api.yaml', + base_path=base_path, + validate_responses=True, + strict_validation=True, + ) + self.app.run(port=50051) + + def register_signals(self) -> None: + """Register signals that stop child processes""" + signal.signal(signal.SIGINT, self._exit_gracefully) + signal.signal(signal.SIGTERM, self._exit_gracefully) + + def on_kill(self): + """ + Called when there is an external kill command (via the heartbeat + mechanism, for example) + """ + self.server.stop() + + def _exit_gracefully(self, signum, frame) -> None: + """Helper method to clean up processor_agent to avoid leaving orphan processes.""" + # The first time, try to exit nicely + if not self.runner.stop: + self.log.info("Exiting gracefully upon receiving signal %s", signum) + self.server.stop() + else: + self.log.warning("Forcing exit due to second exit signal %s", signum) + sys.exit(os.EX_SOFTWARE) + + def _execute(self) -> None: + self.log.info("Starting the API") + try: + # Serve GRPC Server + self.serve() + except KeyboardInterrupt: + self.log.info("Internal API server terminated") + except Exception: + self.log.exception("Exception when executing InternalAPIJob.execute") + raise + finally: + # Tell the subthread to stop and then wait for it. + # If the user interrupts/terms again, _graceful_exit will allow them + # to force-kill here. + self.log.info("Exited GRPC loop") + + +if __name__ == '__main__': + job = InternalAPIJob() + job.run() diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e8b19f03dca51..3a651aeb6854a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2615,6 +2615,11 @@ def as_dict(self): if not val or isinstance(val, str): continue new_dict.update({key: val.isoformat()}) + if key == 'executor_config': + val = new_dict[key] + if 'pod_override' in val: + new_dict[key]['pod_override'] = val['pod_override'].to_dict() + return new_dict @classmethod