diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e64c31465edd1..596fcb5e11c1f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -93,7 +93,7 @@ repos: - --fuzzy-match-generates-todo - id: insert-license name: Add license for all Python files - exclude: ^\.github/.*$|^airflow/_vendor/ + exclude: ^\.github/.*$|^airflow/_vendor/|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$ files: \.py$|\.pyi$ args: - --comment-style @@ -150,7 +150,7 @@ repos: - id: black name: Run Black (the uncompromising Python code formatter) args: [--config=./pyproject.toml] - exclude: ^airflow/_vendor/ + exclude: ^airflow/_vendor/|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$ - repo: https://github.com/asottile/blacken-docs rev: v1.12.1 hooks: @@ -194,7 +194,7 @@ repos: exclude: ^airflow/_vendor/|^images/breeze/output.*$ - id: fix-encoding-pragma name: Remove encoding header from python files - exclude: ^airflow/_vendor/ + exclude: ^airflow/_vendor/|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$ args: - --remove - id: pretty-format-json @@ -213,7 +213,7 @@ repos: - id: pyupgrade name: Upgrade Python code automatically args: ["--py37-plus"] - exclude: ^airflow/_vendor/ + exclude: ^airflow/_vendor/|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$ - repo: https://github.com/pre-commit/pygrep-hooks rev: v1.9.0 hooks: @@ -238,7 +238,7 @@ repos: name: Run isort to sort imports in Python files files: \.py$|\.pyi$ # To keep consistent with the global isort skip config defined in setup.cfg - exclude: ^airflow/_vendor/|^build/.*$|^venv/.*$|^\.tox/.*$ + exclude: ^airflow/_vendor/|^build/.*$|^venv/.*$|^\.tox/.*$|.*_pb2.py$|.*_pb2_grpc.py$ - repo: https://github.com/pycqa/pydocstyle rev: 6.1.1 hooks: @@ -785,6 +785,12 @@ repos: pass_filenames: false files: ^Dockerfile$|^Dockerfile.ci$|^scripts/docker/.*$ require_serial: true + - id: grpc-proto-compile + name: Compile GRPC proto to python code + entry: ./scripts/ci/pre_commit/pre_commit_grpc_compile.py + language: python + files: ^airflow/.*\.proto + additional_dependencies: ['mypy', 'grpcio-tools', 'mypy-protobuf', 'types-protobuf'] - id: check-changelog-has-no-duplicates name: Check changelogs for duplicate entries language: python @@ -860,7 +866,8 @@ repos: language: python entry: ./scripts/ci/pre_commit/pre_commit_mypy.py --namespace-packages files: \.py$ - exclude: ^provider_packages|^docs|^airflow/_vendor/|^airflow/providers|^airflow/migrations|^dev + exclude: "^provider_packages|^docs|^airflow/_vendor/|^airflow/providers\ + |^airflow/migrations|^dev|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$" require_serial: true additional_dependencies: ['rich>=12.4.4', 'inputimeout'] - id: run-mypy @@ -884,7 +891,7 @@ repos: entry: ./scripts/ci/pre_commit/pre_commit_flake8.py files: \.py$ pass_filenames: true - exclude: ^airflow/_vendor/ + exclude: ^airflow/_vendor/|.*_pb2.py$|.*_pb2.pyi$|.*_pb2_grpc.py$ additional_dependencies: ['rich>=12.4.4', 'inputimeout'] - id: update-migration-references name: Update migration ref doc diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index ca386413b32f9..7699a3e483823 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -233,6 +233,8 @@ require Breeze Docker image to be build locally. +--------------------------------------------------------+------------------------------------------------------------------+---------+ | flynt | Run flynt string format converter for Python | | +--------------------------------------------------------+------------------------------------------------------------------+---------+ +| grpc-proto-compile | Compile GRPC proto to python code | | ++--------------------------------------------------------+------------------------------------------------------------------+---------+ | identity | Print input to the static check hooks for troubleshooting | | +--------------------------------------------------------+------------------------------------------------------------------+---------+ | insert-license | * Add license for all SQL files | | diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py index 8112589cd0262..7aeca42cfce75 100644 --- a/airflow/callbacks/callback_requests.py +++ b/airflow/callbacks/callback_requests.py @@ -16,7 +16,12 @@ # under the License. import json -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional + +from google.protobuf.internal.containers import RepeatedCompositeFieldContainer + +from airflow.internal_api.grpc import internal_api_pb2 +from airflow.internal_api.grpc.internal_api_pb2 import Callback if TYPE_CHECKING: from airflow.models.taskinstance import SimpleTaskInstance @@ -50,6 +55,28 @@ def from_json(cls, json_str: str): json_object = json.loads(json_str) return cls(**json_object) + def to_protobuf( + self, + ) -> Callback: + raise NotImplementedError() + + @staticmethod + def get_callbacks_from_protobuf( + callbacks: RepeatedCompositeFieldContainer[Callback], + ) -> List["CallbackRequest"]: + result_callbacks: List[CallbackRequest] = [] + for callback in callbacks: + type = callback.WhichOneof('callback_type') + if type == "task_request": + result_callbacks.append(TaskCallbackRequest.from_protobuf(callback.task_request)) + elif type == "dag_request": + result_callbacks.append(DagCallbackRequest.from_protobuf(callback.dag_request)) + elif type == 'sla_request': + result_callbacks.append(SlaCallbackRequest.from_protobuf(callback.sla_request)) + else: + raise ValueError(f"Bad type: {type}") + return result_callbacks + class TaskCallbackRequest(CallbackRequest): """ @@ -86,6 +113,27 @@ def from_json(cls, json_str: str): simple_ti = SimpleTaskInstance.from_dict(obj_dict=kwargs.pop("simple_task_instance")) return cls(simple_task_instance=simple_ti, **kwargs) + @classmethod + def from_protobuf(cls, request: internal_api_pb2.TaskCallbackRequest) -> "TaskCallbackRequest": + from airflow.models.taskinstance import SimpleTaskInstance + + return cls( + full_filepath=request.full_filepath, + simple_task_instance=SimpleTaskInstance.from_protobuf(request.task_instance), + is_failure_callback=request.is_failure_callback, + msg=request.message, + ) + + def to_protobuf(self) -> Callback: + return Callback( + task_request=internal_api_pb2.TaskCallbackRequest( + full_filepath=self.full_filepath, + task_instance=self.simple_task_instance.to_protobuf(), + is_failure_callback=self.is_failure_callback, + message=self.msg, + ) + ) + class DagCallbackRequest(CallbackRequest): """ @@ -111,6 +159,27 @@ def __init__( self.run_id = run_id self.is_failure_callback = is_failure_callback + @classmethod + def from_protobuf(cls, request: internal_api_pb2.DagCallbackRequest) -> "DagCallbackRequest": + return cls( + full_filepath=request.full_filepath, + dag_id=request.dag_id, + run_id=request.run_id, + is_failure_callback=request.is_failure_callback, + msg=request.message, + ) + + def to_protobuf(self) -> Callback: + return Callback( + dag_request=internal_api_pb2.DagCallbackRequest( + full_filepath=self.full_filepath, + dag_id=self.dag_id, + run_id=self.run_id, + is_failure_callback=self.is_failure_callback, + message=self.msg, + ) + ) + class SlaCallbackRequest(CallbackRequest): """ @@ -118,8 +187,22 @@ class SlaCallbackRequest(CallbackRequest): :param full_filepath: File Path to use to run the callback :param dag_id: DAG ID + :param msg: Additional Message that can be used for logging """ def __init__(self, full_filepath: str, dag_id: str, msg: Optional[str] = None): super().__init__(full_filepath, msg) self.dag_id = dag_id + + @classmethod + def from_protobuf(cls, request: internal_api_pb2.SlaCallbackRequest) -> "SlaCallbackRequest": + return cls(full_filepath=request.full_filepath, dag_id=request.dag_id, msg=request.message) + + def to_protobuf(self) -> Callback: + return Callback( + sla_request=internal_api_pb2.SlaCallbackRequest( + full_filepath=self.full_filepath, + dag_id=self.dag_id, + message=self.msg, + ) + ) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 11b38ebd31163..54310e3db007b 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -915,6 +915,29 @@ def string_lower_type(val): ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" ) +# internal API client +ARG_NUM_REPEATS = Arg( + ("--num-repeats",), + type=positive_int(allow_zero=False), + default=1, + help="The number of times to repeat the operation.", +) +ARG_USE_GRPC = Arg( + ("--use-grpc",), + default=False, + action='store_true', + help="Whether to use GRPC for tests", +) +ARG_NUM_CALLBACKS = Arg( + ("--num-callbacks",), + type=positive_int(allow_zero=False), + default=1, + help="The multiplier for number of callbacks.", +) +ARG_TEST = Arg( + ('--test',), help='Choose test.', type=str, choices=['file_processor'], default="file_processor" +) + # triggerer ARG_CAPACITY = Arg( ("--capacity",), @@ -1461,6 +1484,33 @@ class GroupCommand(NamedTuple): ), ), ) + +INTERNAL_API_COMMANDS = ( + ActionCommand( + name='server', + help="Start an internal API server instance", + func=lazy_load_command('airflow.cli.commands.internal_api_server_command.internal_api_server'), + args=( + ARG_PID, + ARG_DAEMON, + ARG_STDOUT, + ARG_STDERR, + ARG_LOG_FILE, + ), + ), + ActionCommand( + name='test-client', + help="Test client for internal API", + func=lazy_load_command('airflow.cli.commands.internal_api_client_command.internal_api_client'), + args=( + ARG_NUM_REPEATS, + ARG_NUM_CALLBACKS, + ARG_USE_GRPC, + ARG_TEST, + ), + ), +) + CONNECTIONS_COMMANDS = ( ActionCommand( name='get', @@ -1811,6 +1861,11 @@ class GroupCommand(NamedTuple): help="Database operations", subcommands=DB_COMMANDS, ), + GroupCommand( + name='internal-api', + help='Internal API commands', + subcommands=INTERNAL_API_COMMANDS, + ), ActionCommand( name='kerberos', help="Start a kerberos ticket renewer", diff --git a/airflow/cli/commands/internal_api_client_command.py b/airflow/cli/commands/internal_api_client_command.py new file mode 100644 index 0000000000000..e6daa3be792d8 --- /dev/null +++ b/airflow/cli/commands/internal_api_client_command.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +import logging +from pathlib import Path +from typing import List + +import grpc +from kubernetes.client import models as k8s +from rich.console import Console + +from airflow.callbacks.callback_requests import ( + CallbackRequest, + DagCallbackRequest, + SlaCallbackRequest, + TaskCallbackRequest, +) +from airflow.dag_processing.processor import DagFileProcessor +from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey +from airflow.utils import cli as cli_utils + +console = Console(width=400, color_system="standard") + + +def process_example_files(num_callbacks: int, processor: DagFileProcessor): + example_dags_folder = Path(__file__).parents[3] / "airflow" / "example_dags" + callback_base = [ + TaskCallbackRequest( + full_filepath=str(example_dags_folder / "example_bash_operator.py"), + simple_task_instance=SimpleTaskInstance( + task_id="run_this_last", + dag_id="example_python_operator", + run_id="run_id", + start_date=datetime.datetime.now(), + end_date=datetime.datetime.now(), + try_number=1, + map_index=1, + state="RUNNING", + executor_config={ + "test": "test", + "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})), + }, + pool="pool", + queue="queue", + key=TaskInstanceKey(dag_id="dag", task_id="task_id", run_id="run_id"), + run_as_user="user", + ), + ), + DagCallbackRequest( + full_filepath="file", + dag_id="example_bash_operator", + run_id="run_this_last", + is_failure_callback=False, + msg="Error Message", + ), + SlaCallbackRequest(full_filepath="file", dag_id="example_bash_operator", msg="Error message"), + ] + callbacks: List[CallbackRequest] = [] + for i in range(num_callbacks): + callbacks.extend(callback_base) + sum_dags = 0 + sum_errors = 0 + for file in example_dags_folder.iterdir(): + if file.is_file() and file.name.endswith(".py"): + dags, errors = processor.process_file( + file_path=str(file), callback_requests=callbacks, pickle_dags=True + ) + sum_dags += dags + sum_errors += errors + console.print(f"Found {sum_dags} dags with {sum_errors} errors") + return sum_dags, sum_errors + + +def file_processor_test(num_callbacks: int, processor: DagFileProcessor, num_repeats: int): + total_dags = 0 + total_errors = 0 + for i in range(num_repeats): + dags, errors = process_example_files(num_callbacks, processor) + total_dags += dags + total_errors += errors + console.print(f"Total found {total_dags} dags with {total_errors} errors") + + +@cli_utils.action_cli +def internal_api_client(args): + use_grpc = args.use_grpc + num_repeats = args.num_repeats + processor = DagFileProcessor( + dag_ids=[], + log=logging.getLogger('airflow'), + use_grpc=use_grpc, + channel=grpc.insecure_channel('localhost:50051') if use_grpc else None, + ) + if args.test == "file_processor": + file_processor_test(args.num_callbacks, processor=processor, num_repeats=num_repeats) + else: + console.print(f"[red]Wrong test {args.test}") diff --git a/airflow/cli/commands/internal_api_server_command.py b/airflow/cli/commands/internal_api_server_command.py new file mode 100644 index 0000000000000..789be9138863b --- /dev/null +++ b/airflow/cli/commands/internal_api_server_command.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Internal API command""" +import signal + +import daemon +from daemon.pidfile import TimeoutPIDLockFile + +from airflow.jobs.internal_api_job import InternalAPIJob +from airflow.utils import cli as cli_utils +from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler + + +@cli_utils.action_cli +def internal_api_server(args): + """Starts Internal API server""" + job = InternalAPIJob() + + if args.daemon: + pid, stdout, stderr, log_file = setup_locations( + "internal_api", args.pid, args.stdout, args.stderr, args.log_file + ) + handle = setup_logging(log_file) + with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle: + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + files_preserve=[handle], + stdout=stdout_handle, + stderr=stderr_handle, + ) + with ctx: + job.run() + + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGQUIT, sigquit_handler) + job.run() diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 6fd3d129ed960..f0d2114aacacb 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -27,6 +27,7 @@ from multiprocessing.connection import Connection as MultiprocessingConnection from typing import Iterator, List, Optional, Set, Tuple +from grpc import Channel from setproctitle import setproctitle from sqlalchemy import func, or_ from sqlalchemy.orm.session import Session @@ -40,6 +41,7 @@ ) from airflow.configuration import conf from airflow.exceptions import AirflowException, TaskNotFound +from airflow.internal_api.grpc import internal_api_pb2, internal_api_pb2_grpc from airflow.models import SlaMiss, errors from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag @@ -354,11 +356,21 @@ class DagFileProcessor(LoggingMixin): UNIT_TEST_MODE: bool = conf.getboolean('core', 'UNIT_TEST_MODE') - def __init__(self, dag_ids: Optional[List[str]], log: logging.Logger): + def __init__( + self, + dag_ids: Optional[List[str]], + log: logging.Logger, + use_grpc: bool = False, + channel: Optional[Channel] = None, + ): super().__init__() self.dag_ids = dag_ids self._log = log self.dag_warnings: Set[Tuple[str, str]] = set() + self.use_grpc = use_grpc + self.channel = channel + if self.use_grpc: + self.stub = internal_api_pb2_grpc.FileProcessorServiceStub(channel) @provide_session def manage_slas(self, dag: DAG, session: Session = None) -> None: @@ -642,11 +654,14 @@ def execute_callbacks( @provide_session def _execute_dag_callbacks(self, dagbag: DagBag, request: DagCallbackRequest, session: Session): - dag = dagbag.dags[request.dag_id] - dag_run = dag.get_dagrun(run_id=request.run_id, session=session) - dag.handle_callback( - dagrun=dag_run, success=not request.is_failure_callback, reason=request.msg, session=session - ) + try: + dag = dagbag.dags[request.dag_id] + dag_run = dag.get_dagrun(run_id=request.run_id, session=session) + dag.handle_callback( + dagrun=dag_run, success=not request.is_failure_callback, reason=request.msg, session=session + ) + except Exception: + pass def _execute_task_callbacks(self, dagbag: DagBag, request: TaskCallbackRequest): simple_ti = request.simple_task_instance @@ -662,7 +677,7 @@ def _execute_task_callbacks(self, dagbag: DagBag, request: TaskCallbackRequest): self.log.info('Executed failure callback for %s in state %s', ti, ti.state) @provide_session - def process_file( + def process_file_db( self, file_path: str, callback_requests: List[CallbackRequest], @@ -733,3 +748,29 @@ def process_file( self.log.exception("Error logging DAG warnings.") return len(dagbag.dags), len(dagbag.import_errors) + + def process_file_grpc( + self, + file_path: str, + callback_requests: List[CallbackRequest], + pickle_dags: bool = False, + ) -> Tuple[int, int]: + request = internal_api_pb2.FileProcessorRequest(path=file_path, pickle_dags=pickle_dags) + for callback_request in callback_requests: + request.callbacks.append(callback_request.to_protobuf()) + res = self.stub.processFile(request) + return res.dagsFound, res.errorsFound + + def process_file( + self, + file_path: str, + callback_requests: List[CallbackRequest], + pickle_dags: bool = False, + ) -> Tuple[int, int]: + if self.use_grpc: + return self.process_file_grpc( + file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags + ) + return self.process_file_db( + file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags + ) diff --git a/airflow/internal_api/__init__.py b/airflow/internal_api/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/internal_api/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/internal_api/grpc/internal_api.proto b/airflow/internal_api/grpc/internal_api.proto new file mode 100644 index 0000000000000..e546b943283aa --- /dev/null +++ b/airflow/internal_api/grpc/internal_api.proto @@ -0,0 +1,72 @@ +syntax = "proto3"; + +package airflow.internal_api; + +message TaskInstanceKey { + string dag_id = 1; + string task_id = 2; + string run_id = 3; + int32 try_number = 4; + int32 map_index = 5; +} + +message SimpleTaskInstance { + string dag_id = 1; + string task_id = 2; + string run_id = 3; + optional float start_date = 4; + optional float end_date = 5; + int32 try_number = 6; + int32 map_index = 7; + string state = 8; + bytes executor_config = 9; + string pool = 10; + string queue = 11; + TaskInstanceKey key = 12; + optional string run_as_user = 13; + optional int32 priority_weight = 14; +} + +message TaskCallbackRequest { + string full_filepath = 1; + SimpleTaskInstance task_instance = 2; + optional bool is_failure_callback = 3; + optional string message = 4; +} + +message DagCallbackRequest { + string full_filepath = 1; + string dag_id = 2; + string run_id = 3; + optional bool is_failure_callback = 4; + optional string message = 5; +} + +message SlaCallbackRequest { + string full_filepath = 1; + string dag_id = 2; + optional string message = 3; +} + +message Callback { + oneof callback_type { + TaskCallbackRequest task_request = 1; + DagCallbackRequest dag_request = 2; + SlaCallbackRequest sla_request = 3; + } +} + +message FileProcessorRequest{ + string path = 1; + repeated Callback callbacks = 2; + bool pickle_dags = 3; +} + +message FileProcessorResponse { + int64 dagsFound = 1; + int64 errorsFound = 2; +} + +service FileProcessorService { + rpc processFile (FileProcessorRequest) returns (FileProcessorResponse) {} +} diff --git a/airflow/internal_api/grpc/internal_api_pb2.py b/airflow/internal_api/grpc/internal_api_pb2.py new file mode 100644 index 0000000000000..e06b1c7a88b5e --- /dev/null +++ b/airflow/internal_api/grpc/internal_api_pb2.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: airflow/internal_api/grpc/internal_api.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n,airflow/internal_api/grpc/internal_api.proto\x12\x14\x61irflow.internal_api\"i\n\x0fTaskInstanceKey\x12\x0e\n\x06\x64\x61g_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x0e\n\x06run_id\x18\x03 \x01(\t\x12\x12\n\ntry_number\x18\x04 \x01(\x05\x12\x11\n\tmap_index\x18\x05 \x01(\x05\"\x8d\x03\n\x12SimpleTaskInstance\x12\x0e\n\x06\x64\x61g_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x0e\n\x06run_id\x18\x03 \x01(\t\x12\x17\n\nstart_date\x18\x04 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08\x65nd_date\x18\x05 \x01(\x02H\x01\x88\x01\x01\x12\x12\n\ntry_number\x18\x06 \x01(\x05\x12\x11\n\tmap_index\x18\x07 \x01(\x05\x12\r\n\x05state\x18\x08 \x01(\t\x12\x17\n\x0f\x65xecutor_config\x18\t \x01(\x0c\x12\x0c\n\x04pool\x18\n \x01(\t\x12\r\n\x05queue\x18\x0b \x01(\t\x12\x32\n\x03key\x18\x0c \x01(\x0b\x32%.airflow.internal_api.TaskInstanceKey\x12\x18\n\x0brun_as_user\x18\r \x01(\tH\x02\x88\x01\x01\x12\x1c\n\x0fpriority_weight\x18\x0e \x01(\x05H\x03\x88\x01\x01\x42\r\n\x0b_start_dateB\x0b\n\t_end_dateB\x0e\n\x0c_run_as_userB\x12\n\x10_priority_weight\"\xc9\x01\n\x13TaskCallbackRequest\x12\x15\n\rfull_filepath\x18\x01 \x01(\t\x12?\n\rtask_instance\x18\x02 \x01(\x0b\x32(.airflow.internal_api.SimpleTaskInstance\x12 \n\x13is_failure_callback\x18\x03 \x01(\x08H\x00\x88\x01\x01\x12\x14\n\x07message\x18\x04 \x01(\tH\x01\x88\x01\x01\x42\x16\n\x14_is_failure_callbackB\n\n\x08_message\"\xa7\x01\n\x12\x44\x61gCallbackRequest\x12\x15\n\rfull_filepath\x18\x01 \x01(\t\x12\x0e\n\x06\x64\x61g_id\x18\x02 \x01(\t\x12\x0e\n\x06run_id\x18\x03 \x01(\t\x12 \n\x13is_failure_callback\x18\x04 \x01(\x08H\x00\x88\x01\x01\x12\x14\n\x07message\x18\x05 \x01(\tH\x01\x88\x01\x01\x42\x16\n\x14_is_failure_callbackB\n\n\x08_message\"]\n\x12SlaCallbackRequest\x12\x15\n\rfull_filepath\x18\x01 \x01(\t\x12\x0e\n\x06\x64\x61g_id\x18\x02 \x01(\t\x12\x14\n\x07message\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\n\n\x08_message\"\xe0\x01\n\x08\x43\x61llback\x12\x41\n\x0ctask_request\x18\x01 \x01(\x0b\x32).airflow.internal_api.TaskCallbackRequestH\x00\x12?\n\x0b\x64\x61g_request\x18\x02 \x01(\x0b\x32(.airflow.internal_api.DagCallbackRequestH\x00\x12?\n\x0bsla_request\x18\x03 \x01(\x0b\x32(.airflow.internal_api.SlaCallbackRequestH\x00\x42\x0f\n\rcallback_type\"l\n\x14\x46ileProcessorRequest\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x31\n\tcallbacks\x18\x02 \x03(\x0b\x32\x1e.airflow.internal_api.Callback\x12\x13\n\x0bpickle_dags\x18\x03 \x01(\x08\"?\n\x15\x46ileProcessorResponse\x12\x11\n\tdagsFound\x18\x01 \x01(\x03\x12\x13\n\x0b\x65rrorsFound\x18\x02 \x01(\x03\x32\x80\x01\n\x14\x46ileProcessorService\x12h\n\x0bprocessFile\x12*.airflow.internal_api.FileProcessorRequest\x1a+.airflow.internal_api.FileProcessorResponse\"\x00\x62\x06proto3') + + + +_TASKINSTANCEKEY = DESCRIPTOR.message_types_by_name['TaskInstanceKey'] +_SIMPLETASKINSTANCE = DESCRIPTOR.message_types_by_name['SimpleTaskInstance'] +_TASKCALLBACKREQUEST = DESCRIPTOR.message_types_by_name['TaskCallbackRequest'] +_DAGCALLBACKREQUEST = DESCRIPTOR.message_types_by_name['DagCallbackRequest'] +_SLACALLBACKREQUEST = DESCRIPTOR.message_types_by_name['SlaCallbackRequest'] +_CALLBACK = DESCRIPTOR.message_types_by_name['Callback'] +_FILEPROCESSORREQUEST = DESCRIPTOR.message_types_by_name['FileProcessorRequest'] +_FILEPROCESSORRESPONSE = DESCRIPTOR.message_types_by_name['FileProcessorResponse'] +TaskInstanceKey = _reflection.GeneratedProtocolMessageType('TaskInstanceKey', (_message.Message,), { + 'DESCRIPTOR' : _TASKINSTANCEKEY, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.TaskInstanceKey) + }) +_sym_db.RegisterMessage(TaskInstanceKey) + +SimpleTaskInstance = _reflection.GeneratedProtocolMessageType('SimpleTaskInstance', (_message.Message,), { + 'DESCRIPTOR' : _SIMPLETASKINSTANCE, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.SimpleTaskInstance) + }) +_sym_db.RegisterMessage(SimpleTaskInstance) + +TaskCallbackRequest = _reflection.GeneratedProtocolMessageType('TaskCallbackRequest', (_message.Message,), { + 'DESCRIPTOR' : _TASKCALLBACKREQUEST, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.TaskCallbackRequest) + }) +_sym_db.RegisterMessage(TaskCallbackRequest) + +DagCallbackRequest = _reflection.GeneratedProtocolMessageType('DagCallbackRequest', (_message.Message,), { + 'DESCRIPTOR' : _DAGCALLBACKREQUEST, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.DagCallbackRequest) + }) +_sym_db.RegisterMessage(DagCallbackRequest) + +SlaCallbackRequest = _reflection.GeneratedProtocolMessageType('SlaCallbackRequest', (_message.Message,), { + 'DESCRIPTOR' : _SLACALLBACKREQUEST, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.SlaCallbackRequest) + }) +_sym_db.RegisterMessage(SlaCallbackRequest) + +Callback = _reflection.GeneratedProtocolMessageType('Callback', (_message.Message,), { + 'DESCRIPTOR' : _CALLBACK, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.Callback) + }) +_sym_db.RegisterMessage(Callback) + +FileProcessorRequest = _reflection.GeneratedProtocolMessageType('FileProcessorRequest', (_message.Message,), { + 'DESCRIPTOR' : _FILEPROCESSORREQUEST, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.FileProcessorRequest) + }) +_sym_db.RegisterMessage(FileProcessorRequest) + +FileProcessorResponse = _reflection.GeneratedProtocolMessageType('FileProcessorResponse', (_message.Message,), { + 'DESCRIPTOR' : _FILEPROCESSORRESPONSE, + '__module__' : 'airflow.internal_api.grpc.internal_api_pb2' + # @@protoc_insertion_point(class_scope:airflow.internal_api.FileProcessorResponse) + }) +_sym_db.RegisterMessage(FileProcessorResponse) + +_FILEPROCESSORSERVICE = DESCRIPTOR.services_by_name['FileProcessorService'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _TASKINSTANCEKEY._serialized_start=70 + _TASKINSTANCEKEY._serialized_end=175 + _SIMPLETASKINSTANCE._serialized_start=178 + _SIMPLETASKINSTANCE._serialized_end=575 + _TASKCALLBACKREQUEST._serialized_start=578 + _TASKCALLBACKREQUEST._serialized_end=779 + _DAGCALLBACKREQUEST._serialized_start=782 + _DAGCALLBACKREQUEST._serialized_end=949 + _SLACALLBACKREQUEST._serialized_start=951 + _SLACALLBACKREQUEST._serialized_end=1044 + _CALLBACK._serialized_start=1047 + _CALLBACK._serialized_end=1271 + _FILEPROCESSORREQUEST._serialized_start=1273 + _FILEPROCESSORREQUEST._serialized_end=1381 + _FILEPROCESSORRESPONSE._serialized_start=1383 + _FILEPROCESSORRESPONSE._serialized_end=1446 + _FILEPROCESSORSERVICE._serialized_start=1449 + _FILEPROCESSORSERVICE._serialized_end=1577 +# @@protoc_insertion_point(module_scope) diff --git a/airflow/internal_api/grpc/internal_api_pb2.pyi b/airflow/internal_api/grpc/internal_api_pb2.pyi new file mode 100644 index 0000000000000..9d7f06727ebba --- /dev/null +++ b/airflow/internal_api/grpc/internal_api_pb2.pyi @@ -0,0 +1,222 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" +import builtins +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.message +import typing +import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +class TaskInstanceKey(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + DAG_ID_FIELD_NUMBER: builtins.int + TASK_ID_FIELD_NUMBER: builtins.int + RUN_ID_FIELD_NUMBER: builtins.int + TRY_NUMBER_FIELD_NUMBER: builtins.int + MAP_INDEX_FIELD_NUMBER: builtins.int + dag_id: typing.Text + task_id: typing.Text + run_id: typing.Text + try_number: builtins.int + map_index: builtins.int + def __init__(self, + *, + dag_id: typing.Text = ..., + task_id: typing.Text = ..., + run_id: typing.Text = ..., + try_number: builtins.int = ..., + map_index: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["dag_id",b"dag_id","map_index",b"map_index","run_id",b"run_id","task_id",b"task_id","try_number",b"try_number"]) -> None: ... +global___TaskInstanceKey = TaskInstanceKey + +class SimpleTaskInstance(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + DAG_ID_FIELD_NUMBER: builtins.int + TASK_ID_FIELD_NUMBER: builtins.int + RUN_ID_FIELD_NUMBER: builtins.int + START_DATE_FIELD_NUMBER: builtins.int + END_DATE_FIELD_NUMBER: builtins.int + TRY_NUMBER_FIELD_NUMBER: builtins.int + MAP_INDEX_FIELD_NUMBER: builtins.int + STATE_FIELD_NUMBER: builtins.int + EXECUTOR_CONFIG_FIELD_NUMBER: builtins.int + POOL_FIELD_NUMBER: builtins.int + QUEUE_FIELD_NUMBER: builtins.int + KEY_FIELD_NUMBER: builtins.int + RUN_AS_USER_FIELD_NUMBER: builtins.int + PRIORITY_WEIGHT_FIELD_NUMBER: builtins.int + dag_id: typing.Text + task_id: typing.Text + run_id: typing.Text + start_date: builtins.float + end_date: builtins.float + try_number: builtins.int + map_index: builtins.int + state: typing.Text + executor_config: builtins.bytes + pool: typing.Text + queue: typing.Text + @property + def key(self) -> global___TaskInstanceKey: ... + run_as_user: typing.Text + priority_weight: builtins.int + def __init__(self, + *, + dag_id: typing.Text = ..., + task_id: typing.Text = ..., + run_id: typing.Text = ..., + start_date: typing.Optional[builtins.float] = ..., + end_date: typing.Optional[builtins.float] = ..., + try_number: builtins.int = ..., + map_index: builtins.int = ..., + state: typing.Text = ..., + executor_config: builtins.bytes = ..., + pool: typing.Text = ..., + queue: typing.Text = ..., + key: typing.Optional[global___TaskInstanceKey] = ..., + run_as_user: typing.Optional[typing.Text] = ..., + priority_weight: typing.Optional[builtins.int] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_end_date",b"_end_date","_priority_weight",b"_priority_weight","_run_as_user",b"_run_as_user","_start_date",b"_start_date","end_date",b"end_date","key",b"key","priority_weight",b"priority_weight","run_as_user",b"run_as_user","start_date",b"start_date"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_end_date",b"_end_date","_priority_weight",b"_priority_weight","_run_as_user",b"_run_as_user","_start_date",b"_start_date","dag_id",b"dag_id","end_date",b"end_date","executor_config",b"executor_config","key",b"key","map_index",b"map_index","pool",b"pool","priority_weight",b"priority_weight","queue",b"queue","run_as_user",b"run_as_user","run_id",b"run_id","start_date",b"start_date","state",b"state","task_id",b"task_id","try_number",b"try_number"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_end_date",b"_end_date"]) -> typing.Optional[typing_extensions.Literal["end_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_priority_weight",b"_priority_weight"]) -> typing.Optional[typing_extensions.Literal["priority_weight"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_run_as_user",b"_run_as_user"]) -> typing.Optional[typing_extensions.Literal["run_as_user"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_start_date",b"_start_date"]) -> typing.Optional[typing_extensions.Literal["start_date"]]: ... +global___SimpleTaskInstance = SimpleTaskInstance + +class TaskCallbackRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FULL_FILEPATH_FIELD_NUMBER: builtins.int + TASK_INSTANCE_FIELD_NUMBER: builtins.int + IS_FAILURE_CALLBACK_FIELD_NUMBER: builtins.int + MESSAGE_FIELD_NUMBER: builtins.int + full_filepath: typing.Text + @property + def task_instance(self) -> global___SimpleTaskInstance: ... + is_failure_callback: builtins.bool + message: typing.Text + def __init__(self, + *, + full_filepath: typing.Text = ..., + task_instance: typing.Optional[global___SimpleTaskInstance] = ..., + is_failure_callback: typing.Optional[builtins.bool] = ..., + message: typing.Optional[typing.Text] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback","_message",b"_message","is_failure_callback",b"is_failure_callback","message",b"message","task_instance",b"task_instance"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback","_message",b"_message","full_filepath",b"full_filepath","is_failure_callback",b"is_failure_callback","message",b"message","task_instance",b"task_instance"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback"]) -> typing.Optional[typing_extensions.Literal["is_failure_callback"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_message",b"_message"]) -> typing.Optional[typing_extensions.Literal["message"]]: ... +global___TaskCallbackRequest = TaskCallbackRequest + +class DagCallbackRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FULL_FILEPATH_FIELD_NUMBER: builtins.int + DAG_ID_FIELD_NUMBER: builtins.int + RUN_ID_FIELD_NUMBER: builtins.int + IS_FAILURE_CALLBACK_FIELD_NUMBER: builtins.int + MESSAGE_FIELD_NUMBER: builtins.int + full_filepath: typing.Text + dag_id: typing.Text + run_id: typing.Text + is_failure_callback: builtins.bool + message: typing.Text + def __init__(self, + *, + full_filepath: typing.Text = ..., + dag_id: typing.Text = ..., + run_id: typing.Text = ..., + is_failure_callback: typing.Optional[builtins.bool] = ..., + message: typing.Optional[typing.Text] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback","_message",b"_message","is_failure_callback",b"is_failure_callback","message",b"message"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback","_message",b"_message","dag_id",b"dag_id","full_filepath",b"full_filepath","is_failure_callback",b"is_failure_callback","message",b"message","run_id",b"run_id"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_is_failure_callback",b"_is_failure_callback"]) -> typing.Optional[typing_extensions.Literal["is_failure_callback"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_message",b"_message"]) -> typing.Optional[typing_extensions.Literal["message"]]: ... +global___DagCallbackRequest = DagCallbackRequest + +class SlaCallbackRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FULL_FILEPATH_FIELD_NUMBER: builtins.int + DAG_ID_FIELD_NUMBER: builtins.int + MESSAGE_FIELD_NUMBER: builtins.int + full_filepath: typing.Text + dag_id: typing.Text + message: typing.Text + def __init__(self, + *, + full_filepath: typing.Text = ..., + dag_id: typing.Text = ..., + message: typing.Optional[typing.Text] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_message",b"_message","message",b"message"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_message",b"_message","dag_id",b"dag_id","full_filepath",b"full_filepath","message",b"message"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_message",b"_message"]) -> typing.Optional[typing_extensions.Literal["message"]]: ... +global___SlaCallbackRequest = SlaCallbackRequest + +class Callback(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + TASK_REQUEST_FIELD_NUMBER: builtins.int + DAG_REQUEST_FIELD_NUMBER: builtins.int + SLA_REQUEST_FIELD_NUMBER: builtins.int + @property + def task_request(self) -> global___TaskCallbackRequest: ... + @property + def dag_request(self) -> global___DagCallbackRequest: ... + @property + def sla_request(self) -> global___SlaCallbackRequest: ... + def __init__(self, + *, + task_request: typing.Optional[global___TaskCallbackRequest] = ..., + dag_request: typing.Optional[global___DagCallbackRequest] = ..., + sla_request: typing.Optional[global___SlaCallbackRequest] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["callback_type",b"callback_type","dag_request",b"dag_request","sla_request",b"sla_request","task_request",b"task_request"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["callback_type",b"callback_type","dag_request",b"dag_request","sla_request",b"sla_request","task_request",b"task_request"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["callback_type",b"callback_type"]) -> typing.Optional[typing_extensions.Literal["task_request","dag_request","sla_request"]]: ... +global___Callback = Callback + +class FileProcessorRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + PATH_FIELD_NUMBER: builtins.int + CALLBACKS_FIELD_NUMBER: builtins.int + PICKLE_DAGS_FIELD_NUMBER: builtins.int + path: typing.Text + @property + def callbacks(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Callback]: ... + pickle_dags: builtins.bool + def __init__(self, + *, + path: typing.Text = ..., + callbacks: typing.Optional[typing.Iterable[global___Callback]] = ..., + pickle_dags: builtins.bool = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["callbacks",b"callbacks","path",b"path","pickle_dags",b"pickle_dags"]) -> None: ... +global___FileProcessorRequest = FileProcessorRequest + +class FileProcessorResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + DAGSFOUND_FIELD_NUMBER: builtins.int + ERRORSFOUND_FIELD_NUMBER: builtins.int + dagsFound: builtins.int + errorsFound: builtins.int + def __init__(self, + *, + dagsFound: builtins.int = ..., + errorsFound: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["dagsFound",b"dagsFound","errorsFound",b"errorsFound"]) -> None: ... +global___FileProcessorResponse = FileProcessorResponse diff --git a/airflow/internal_api/grpc/internal_api_pb2_grpc.py b/airflow/internal_api/grpc/internal_api_pb2_grpc.py new file mode 100644 index 0000000000000..323902a70274c --- /dev/null +++ b/airflow/internal_api/grpc/internal_api_pb2_grpc.py @@ -0,0 +1,66 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from airflow.internal_api.grpc import internal_api_pb2 as airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2 + + +class FileProcessorServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.processFile = channel.unary_unary( + '/airflow.internal_api.FileProcessorService/processFile', + request_serializer=airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorRequest.SerializeToString, + response_deserializer=airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorResponse.FromString, + ) + + +class FileProcessorServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def processFile(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_FileProcessorServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'processFile': grpc.unary_unary_rpc_method_handler( + servicer.processFile, + request_deserializer=airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorRequest.FromString, + response_serializer=airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'airflow.internal_api.FileProcessorService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class FileProcessorService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def processFile(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/airflow.internal_api.FileProcessorService/processFile', + airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorRequest.SerializeToString, + airflow_dot_internal__api_dot_grpc_dot_internal__api__pb2.FileProcessorResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/airflow/internal_api/server.py b/airflow/internal_api/server.py new file mode 100644 index 0000000000000..9871bac81fbb8 --- /dev/null +++ b/airflow/internal_api/server.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.callbacks.callback_requests import CallbackRequest +from airflow.dag_processing.processor import DagFileProcessor +from airflow.internal_api.grpc import internal_api_pb2_grpc +from airflow.internal_api.grpc.internal_api_pb2 import FileProcessorRequest, FileProcessorResponse + + +class FileProcessorServiceServicer(internal_api_pb2_grpc.FileProcessorServiceServicer): + """Provides methods that implement functionality of File Processor service.""" + + def __init__(self, processor: DagFileProcessor): + super().__init__() + self.dag_file_processor = processor + + def processFile(self, request: FileProcessorRequest, context): + dags_found, errors_found = self.dag_file_processor.process_file( + file_path=request.path, + callback_requests=CallbackRequest.get_callbacks_from_protobuf(request.callbacks), + pickle_dags=request.pickle_dags, + ) + return FileProcessorResponse(dagsFound=dags_found, errorsFound=errors_found) diff --git a/airflow/jobs/internal_api_job.py b/airflow/jobs/internal_api_job.py new file mode 100644 index 0000000000000..2c61100479f33 --- /dev/null +++ b/airflow/jobs/internal_api_job.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import signal +import sys +from concurrent import futures + +import grpc + +from airflow.dag_processing.processor import DagFileProcessor +from airflow.internal_api.grpc.internal_api_pb2_grpc import add_FileProcessorServiceServicer_to_server +from airflow.internal_api.server import FileProcessorServiceServicer +from airflow.jobs.base_job import BaseJob + + +class InternalAPIJob(BaseJob): + """InternalAPIJob exposes GRPC API to run Database operations.""" + + __mapper_args__ = {'polymorphic_identity': 'InternalApiJob'} + + def __init__(self, *args, **kwargs): + # Call superclass + super().__init__(*args, **kwargs) + + # Set up runner async thread + self.server = None + + def serve(self): + processor = DagFileProcessor(dag_ids=[], log=self.log) + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + add_FileProcessorServiceServicer_to_server(FileProcessorServiceServicer(processor), self.server) + self.server.add_insecure_port('[::]:50051') + self.server.start() + self.server.wait_for_termination() + + def register_signals(self) -> None: + """Register signals that stop child processes""" + signal.signal(signal.SIGINT, self._exit_gracefully) + signal.signal(signal.SIGTERM, self._exit_gracefully) + + def on_kill(self): + """ + Called when there is an external kill command (via the heartbeat + mechanism, for example) + """ + self.server.stop() + + def _exit_gracefully(self, signum, frame) -> None: + """Helper method to clean up processor_agent to avoid leaving orphan processes.""" + # The first time, try to exit nicely + if not self.runner.stop: + self.log.info("Exiting gracefully upon receiving signal %s", signum) + self.server.stop() + else: + self.log.warning("Forcing exit due to second exit signal %s", signum) + sys.exit(os.EX_SOFTWARE) + + def _execute(self) -> None: + self.log.info("Starting the API") + try: + # Serve GRPC Server + self.serve() + except KeyboardInterrupt: + self.log.info("GRPC server terminated") + except Exception: + self.log.exception("Exception when executing InternalAPIJob.execute") + raise + finally: + # Tell the subthread to stop and then wait for it. + # If the user interrupts/terms again, _graceful_exit will allow them + # to force-kill here. + self.log.info("Exited GRPC loop") + + +if __name__ == '__main__': + job = InternalAPIJob() + job.run() diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 929842fd0da4c..eb0d9a71a389e 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -51,7 +51,6 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries from airflow.utils.session import provide_session -from airflow.utils.timeout import timeout if TYPE_CHECKING: import pathlib @@ -337,14 +336,14 @@ def parse(mod_name, filepath): if dagbag_import_timeout <= 0: # no parsing timeout return parse(mod_name, filepath) - timeout_msg = ( + timeout_msg = ( # noqa f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n" "Please take a look at these docs to improve your DAG import time:\n" f"* {get_docs_url('best-practices.html#top-level-python-code')}\n" f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}" ) - with timeout(dagbag_import_timeout, error_message=timeout_msg): - return parse(mod_name, filepath) + # with timeout(dagbag_import_timeout, error_message=timeout_msg): + return parse(mod_name, filepath) def _load_modules_from_zip(self, filepath, safe_mode): mods = [] diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d83ad11b04ba5..4a7b10e1a20cf 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -22,9 +22,11 @@ import math import operator import os +import pickle import signal import warnings from collections import defaultdict +from copy import deepcopy from datetime import datetime, timedelta from functools import partial from types import TracebackType @@ -96,6 +98,7 @@ UnmappableXComTypePushed, XComForMappingNotPushed, ) +from airflow.internal_api.grpc import internal_api_pb2 from airflow.models.base import Base, StringID from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent from airflow.models.log import Log @@ -409,6 +412,25 @@ def key(self) -> "TaskInstanceKey": """ return self + def to_protobuf(self) -> internal_api_pb2.TaskInstanceKey: + return internal_api_pb2.TaskInstanceKey( + dag_id=self.dag_id, + task_id=self.task_id, + run_id=self.run_id, + try_number=self.try_number, + map_index=self.map_index, + ) + + @classmethod + def from_protobuf(cls, key: internal_api_pb2.TaskInstanceKey) -> "TaskInstanceKey": + return cls( + dag_id=key.dag_id, + task_id=key.task_id, + run_id=key.run_id, + try_number=key.try_number, + map_index=key.map_index, + ) + class TaskInstance(Base, LoggingMixin): """ @@ -2663,6 +2685,63 @@ def from_dict(cls, obj_dict: dict) -> "SimpleTaskInstance": end_date = timezone.parse(end_date_str) return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key) + def serialize_executor_config(self) -> bytes: + from airflow.serialization.serialized_objects import BaseSerialization + + if isinstance(self.executor_config, dict) and 'pod_override' in self.executor_config: + value = deepcopy(self.executor_config) + value['pod_override'] = BaseSerialization()._serialize(value['pod_override']) + return pickle.dumps(value) + return pickle.dumps(self.executor_config) + + @classmethod + def deserialize_executor_config(cls, value: bytes) -> Any: + from airflow.serialization.enums import Encoding + from airflow.serialization.serialized_objects import BaseSerialization + + deserialized_value = pickle.loads(value) + if isinstance(deserialized_value, dict) and "pod_override" in deserialized_value: + pod_override = deserialized_value['pod_override'] + if isinstance(pod_override, dict) and pod_override.get(Encoding.TYPE): + deserialized_value['pod_override'] = BaseSerialization()._deserialize(pod_override) + return deserialized_value + + @classmethod + def from_protobuf(cls, instance: internal_api_pb2.SimpleTaskInstance) -> "SimpleTaskInstance": + return cls( + dag_id=instance.dag_id, + task_id=instance.task_id, + run_id=instance.run_id, + start_date=datetime.fromtimestamp(instance.start_date) if instance.start_date else None, + end_date=datetime.fromtimestamp(instance.end_date) if instance.end_date else None, + try_number=instance.try_number, + map_index=instance.map_index, + state=instance.state, + executor_config=SimpleTaskInstance.deserialize_executor_config(instance.executor_config), + pool=instance.pool, + queue=instance.queue, + key=TaskInstanceKey.from_protobuf(key=instance.key), + run_as_user=instance.run_as_user, + priority_weight=instance.priority_weight, + ) + + def to_protobuf(self) -> internal_api_pb2.SimpleTaskInstance: + return internal_api_pb2.SimpleTaskInstance( + task_id=self.task_id, + run_id=self.run_id, + start_date=self.start_date.timestamp() if self.start_date else None, + end_date=self.end_date.timestamp() if self.end_date else None, + try_number=self.try_number, + map_index=self.map_index, + state=self.state, + executor_config=self.serialize_executor_config(), + pool=self.pool, + queue=self.queue, + key=self.key.to_protobuf(), + run_as_user=self.run_as_user, + priority_weight=self.priority_weight, + ) + STATICA_HACK = True globals()['kcah_acitats'[::-1].upper()] = False diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py b/dev/breeze/src/airflow_breeze/pre_commit_ids.py index a0a0970cb73ab..ed98f3ac288f4 100644 --- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py +++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py @@ -73,6 +73,7 @@ 'end-of-file-fixer', 'fix-encoding-pragma', 'flynt', + 'grpc-proto-compile', 'identity', 'insert-license', 'isort', diff --git a/images/breeze/output-commands-hash.txt b/images/breeze/output-commands-hash.txt index c9ac2d5a594b1..295ca8d11d6f2 100644 --- a/images/breeze/output-commands-hash.txt +++ b/images/breeze/output-commands-hash.txt @@ -29,7 +29,7 @@ self-upgrade:b5437c0a1a91533a11ee9d0a9692369c setup-autocomplete:355b72dee171c2fcba46fc90ac7c97b0 shell:4680295fdd8a276d51518d29360c365c start-airflow:92cf775a952439a32d409cd2536da507 -static-checks:03e96245e60c225ed94514ad7b42ceb0 +static-checks:32c1c053bda3ce076a8403aa1561885e stop:8ebd8a42f1003495d37b884de5ac7ce6 tests:e39111ecbd33a65ababb3cbfbac2b069 verify-image:a6b3c70957aea96a5d4d261f23359a2d diff --git a/images/breeze/output-static-checks.svg b/images/breeze/output-static-checks.svg index 37c6deb229b3e..ec3284c5cfc06 100644 --- a/images/breeze/output-static-checks.svg +++ b/images/breeze/output-static-checks.svg @@ -19,245 +19,245 @@ font-weight: 700; } - .terminal-880726558-matrix { + .terminal-3568752477-matrix { font-family: Fira Code, monospace; font-size: 20px; line-height: 24.4px; font-variant-east-asian: full-width; } - .terminal-880726558-title { + .terminal-3568752477-title { font-size: 18px; font-weight: bold; font-family: arial; } - .terminal-880726558-r1 { fill: #c5c8c6;font-weight: bold } -.terminal-880726558-r2 { fill: #c5c8c6 } -.terminal-880726558-r3 { fill: #d0b344;font-weight: bold } -.terminal-880726558-r4 { fill: #868887 } -.terminal-880726558-r5 { fill: #68a0b3;font-weight: bold } -.terminal-880726558-r6 { fill: #98a84b;font-weight: bold } -.terminal-880726558-r7 { fill: #8d7b39 } + .terminal-3568752477-r1 { fill: #c5c8c6;font-weight: bold } +.terminal-3568752477-r2 { fill: #c5c8c6 } +.terminal-3568752477-r3 { fill: #d0b344;font-weight: bold } +.terminal-3568752477-r4 { fill: #868887 } +.terminal-3568752477-r5 { fill: #68a0b3;font-weight: bold } +.terminal-3568752477-r6 { fill: #98a84b;font-weight: bold } +.terminal-3568752477-r7 { fill: #8d7b39 } - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - Command: static-checks + Command: static-checks - + - - -Usage: breeze static-checks [OPTIONS] [PRECOMMIT_ARGS]... - -Run static checks. - -╭─ Pre-commit flags ───────────────────────────────────────────────────────────────────────────────────────────────────╮ ---type-tType(s) of the static checks to run (multiple can be added).                             -(all | black | blacken-docs | check-airflow-2-2-compatibility |                          -check-airflow-config-yaml-consistent | check-apache-license-rat |                        -check-base-operator-partial-arguments | check-base-operator-usage |                      -check-boring-cyborg-configuration | check-breeze-top-dependencies-limited |              -check-builtin-literals | check-changelog-has-no-duplicates |                             -check-daysago-import-from-utils | check-docstring-param-types | check-example-dags-urls  -| check-executables-have-shebangs | check-extra-packages-references | check-extras-order -| check-for-inclusive-language | check-hooks-apply | check-incorrect-use-of-LoggingMixin -| check-integrations-are-consistent | check-lazy-logging | check-merge-conflict |        -check-newsfragments-are-valid | check-no-providers-in-core-examples |                    -check-no-relative-imports | check-persist-credentials-disabled-in-github-workflows |     -check-pre-commit-information-consistent | check-provide-create-sessions-imports |        -check-provider-yaml-valid | check-providers-init-file-missing |                          -check-providers-subpackages-init-file-exist | check-pydevd-left-in-code |                -check-revision-heads-map | check-safe-filter-usage-in-html | check-setup-order |         -check-start-date-not-used-in-defaults | check-system-tests-present |                     -check-system-tests-tocs | check-xml | codespell | compile-www-assets |                   -compile-www-assets-dev | create-missing-init-py-files-tests | debug-statements |         -detect-private-key | doctoc | end-of-file-fixer | fix-encoding-pragma | flynt | identity -| insert-license | isort | lint-chart-schema | lint-css | lint-dockerfile |              -lint-helm-chart | lint-javascript | lint-json-schema | lint-markdown | lint-openapi |    -mixed-line-ending | pretty-format-json | pydocstyle | python-no-log-warn | pyupgrade |   -replace-bad-characters | rst-backticks | run-flake8 | run-mypy | run-shellcheck |        -static-check-autoflake | trailing-whitespace | ts-compile-and-lint-javascript |          -update-breeze-cmd-output | update-breeze-readme-config-hash | update-extras |            -update-in-the-wild-to-be-sorted | update-inlined-dockerfile-scripts |                    -update-local-yml-file | update-migration-references | update-providers-dependencies |    -update-setup-cfg-file | update-spelling-wordlist-to-be-sorted |                          -update-supported-versions | update-vendored-in-k8s-json-schema | update-version |        -yamllint | yesqa)                                                                        ---file-fList of files to run the checks on.(PATH) ---all-files-aRun checks on all files. ---show-diff-on-failure-sShow diff for files modified by the checks. ---last-commit-cRun checks for all files in last commit. Mutually exclusive with --commit-ref. -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -╭─ Options ────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---commit-ref-rRun checks for this commit reference only (can be any git commit-ish reference). Mutually   -exclusive with --last-commit.                                                               -(TEXT)                                                                                      ---verbose-vPrint verbose information about performed steps. ---dry-run-DIf dry-run is set, commands are only printed, not executed. ---github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow] ---help-hShow this message and exit. -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ + + +Usage: breeze static-checks [OPTIONS] [PRECOMMIT_ARGS]... + +Run static checks. + +╭─ Pre-commit flags ───────────────────────────────────────────────────────────────────────────────────────────────────╮ +--type-tType(s) of the static checks to run (multiple can be added).                             +(all | black | blacken-docs | check-airflow-2-2-compatibility |                          +check-airflow-config-yaml-consistent | check-apache-license-rat |                        +check-base-operator-partial-arguments | check-base-operator-usage |                      +check-boring-cyborg-configuration | check-breeze-top-dependencies-limited |              +check-builtin-literals | check-changelog-has-no-duplicates |                             +check-daysago-import-from-utils | check-docstring-param-types | check-example-dags-urls  +| check-executables-have-shebangs | check-extra-packages-references | check-extras-order +| check-for-inclusive-language | check-hooks-apply | check-incorrect-use-of-LoggingMixin +| check-integrations-are-consistent | check-lazy-logging | check-merge-conflict |        +check-newsfragments-are-valid | check-no-providers-in-core-examples |                    +check-no-relative-imports | check-persist-credentials-disabled-in-github-workflows |     +check-pre-commit-information-consistent | check-provide-create-sessions-imports |        +check-provider-yaml-valid | check-providers-init-file-missing |                          +check-providers-subpackages-init-file-exist | check-pydevd-left-in-code |                +check-revision-heads-map | check-safe-filter-usage-in-html | check-setup-order |         +check-start-date-not-used-in-defaults | check-system-tests-present |                     +check-system-tests-tocs | check-xml | codespell | compile-www-assets |                   +compile-www-assets-dev | create-missing-init-py-files-tests | debug-statements |         +detect-private-key | doctoc | end-of-file-fixer | fix-encoding-pragma | flynt |          +grpc-proto-compile | identity | insert-license | isort | lint-chart-schema | lint-css |  +lint-dockerfile | lint-helm-chart | lint-javascript | lint-json-schema | lint-markdown | +lint-openapi | mixed-line-ending | pretty-format-json | pydocstyle | python-no-log-warn  +| pyupgrade | replace-bad-characters | rst-backticks | run-flake8 | run-mypy |           +run-shellcheck | static-check-autoflake | trailing-whitespace |                          +ts-compile-and-lint-javascript | update-breeze-cmd-output |                              +update-breeze-readme-config-hash | update-extras | update-in-the-wild-to-be-sorted |     +update-inlined-dockerfile-scripts | update-local-yml-file | update-migration-references  +| update-providers-dependencies | update-setup-cfg-file |                                +update-spelling-wordlist-to-be-sorted | update-supported-versions |                      +update-vendored-in-k8s-json-schema | update-version | yamllint | yesqa)                  +--file-fList of files to run the checks on.(PATH) +--all-files-aRun checks on all files. +--show-diff-on-failure-sShow diff for files modified by the checks. +--last-commit-cRun checks for all files in last commit. Mutually exclusive with --commit-ref. +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Options ────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +--commit-ref-rRun checks for this commit reference only (can be any git commit-ish reference). Mutually   +exclusive with --last-commit.                                                               +(TEXT)                                                                                      +--verbose-vPrint verbose information about performed steps. +--dry-run-DIf dry-run is set, commands are only printed, not executed. +--github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow] +--help-hShow this message and exit. +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/scripts/ci/pre_commit/pre_commit_grpc_compile.py b/scripts/ci/pre_commit/pre_commit_grpc_compile.py new file mode 100755 index 0000000000000..27a82a7ecc570 --- /dev/null +++ b/scripts/ci/pre_commit/pre_commit_grpc_compile.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import shutil +import subprocess +import sys + +if __name__ not in ("__main__", "__mp_main__"): + raise SystemExit( + "This file is intended to be executed as an executable program. You cannot use it as a module." + f"To run this script, run the ./{__file__} command [FILE] ..." + ) + +GRPC_TOOLS_PROTOC = "grpc_tools.protoc" +PROTOC_GEN_MYPY_NAME = "protoc-gen-mypy" + +if __name__ == '__main__': + protoc_gen_mypy_path = shutil.which(PROTOC_GEN_MYPY_NAME) + python_path = sys.executable + command = [ + python_path, + "-m", + GRPC_TOOLS_PROTOC, + f"--plugin={PROTOC_GEN_MYPY_NAME}={protoc_gen_mypy_path}", + "--python_out", + ".", + "--grpc_python_out", + ".", + "--mypy_out", + ".", + "--proto_path", + ".", + *sys.argv[1:], + ] + subprocess.check_call(command)