diff --git a/BREEZE.rst b/BREEZE.rst index 7356774fa143a..b5e41f5b6f973 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -554,18 +554,28 @@ command takes care about it. This is needed when you want to run webserver insid Breeze cleanup -------------- -Breeze uses docker images heavily and those images are rebuild periodically. This might cause extra -disk usage by the images. If you need to clean-up the images periodically you can run -``breeze setup cleanup`` command (by default it will skip removing your images before cleaning up but you -can also remove the images to clean-up everything by adding ``--all``). +Sometimes you need to cleanup your docker environment (and it is recommended you do that regularly). There +are several reasons why you might want to do that. -Those are all available flags of ``cleanup`` command: +Breeze uses docker images heavily and those images are rebuild periodically and might leave dangling, unused +images in docker cache. This might cause extra disk usage. Also running various docker compose commands +(for example running tests with ``breeze testing tests``) might create additional docker networks that might +prevent new networks from being created. Those networks are not removed automatically by docker-compose. +Also Breeze uses it's own cache to keep information about all images. + +All those unused images, networks and cache can be removed by running ``breeze cleanup`` command. By default +it will not remove the most recent images that you might need to run breeze commands, but you +can also remove those breeze images to clean-up everything by adding ``--all`` command (note that you will +need to build the images again from scratch - pulling from the registry might take a while). +Breeze will ask you to confirm each step, unless you specify ``--answer yes`` flag. + +Those are all available flags of ``cleanup`` command: .. image:: ./images/breeze/output_cleanup.svg :target: https://raw.githubusercontent.com/apache/airflow/main/images/breeze/output_cleanup.svg :width: 100% - :alt: Breeze setup cleanup + :alt: Breeze cleanup Running arbitrary commands in container --------------------------------------- diff --git a/dev/breeze/src/airflow_breeze/commands/main_command.py b/dev/breeze/src/airflow_breeze/commands/main_command.py index 60a75982443c0..d82582fbd5ff5 100644 --- a/dev/breeze/src/airflow_breeze/commands/main_command.py +++ b/dev/breeze/src/airflow_breeze/commands/main_command.py @@ -46,6 +46,7 @@ ) from airflow_breeze.utils.confirm import Answer, user_confirm from airflow_breeze.utils.console import get_console +from airflow_breeze.utils.docker_command_utils import remove_docker_networks from airflow_breeze.utils.path_utils import BUILD_CACHE_DIR from airflow_breeze.utils.run_utils import run_command from airflow_breeze.utils.shared_options import get_dry_run @@ -249,10 +250,14 @@ def cleanup(all: bool): sys.exit(0) else: get_console().print("[info]No locally downloaded images to remove[/]\n") + get_console().print("Removing unused networks") + given_answer = user_confirm("Are you sure with the removal of unused docker networks?") + if given_answer == Answer.YES: + remove_docker_networks() get_console().print("Pruning docker images") - given_answer = user_confirm("Are you sure with the removal?") + given_answer = user_confirm("Are you sure with the removal of docker images?") if given_answer == Answer.YES: - system_prune_command_to_execute = ["docker", "system", "prune"] + system_prune_command_to_execute = ["docker", "system", "prune", "-f"] run_command( system_prune_command_to_execute, check=False, diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index a0033b9075e1c..82d9bbbb4ed75 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import json import os import re import sys @@ -25,7 +26,11 @@ from click import IntRange from airflow_breeze.commands.ci_image_commands import rebuild_or_pull_ci_image_if_needed -from airflow_breeze.global_constants import ALLOWED_TEST_TYPE_CHOICES, all_selective_test_types +from airflow_breeze.global_constants import ( + ALLOWED_TEST_TYPE_CHOICES, + PROVIDER_PACKAGE_JSON_FILE, + all_selective_test_types, +) from airflow_breeze.params.build_prod_params import BuildProdParams from airflow_breeze.params.shell_params import ShellParams from airflow_breeze.utils.ci_group import ci_group @@ -56,6 +61,7 @@ DOCKER_COMPOSE_COMMAND, get_env_variables_for_docker_commands, perform_environment_checks, + remove_docker_networks, ) from airflow_breeze.utils.parallel import ( GenericRegexpProgressMatcher, @@ -129,7 +135,6 @@ def _run_test( env_variables["TEST_TIMEOUT"] = str(test_timeout) if db_reset: env_variables["DB_RESET"] = "true" - perform_environment_checks() env_variables["TEST_TYPE"] = exec_shell_params.test_type env_variables["SKIP_PROVIDER_TESTS"] = str(exec_shell_params.skip_provider_tests).lower() if "[" in exec_shell_params.test_type and not exec_shell_params.test_type.startswith("Providers"): @@ -158,6 +163,7 @@ def _run_test( ] run_cmd.extend(list(extra_pytest_args)) try: + remove_docker_networks(networks=[f"airflow-test-{project_name}_default"]) result = run_command( run_cmd, env=env_variables, @@ -198,6 +204,7 @@ def _run_test( check=False, verbose_override=False, ) + remove_docker_networks(networks=[f"airflow-test-{project_name}_default"]) return result.returncode, f"Test: {exec_shell_params.test_type}" @@ -217,8 +224,9 @@ def _run_tests_in_pool( debug_resources: bool, skip_cleanup: bool, ): - with ci_group(f"Testing {' '.join(tests_to_run)}"): - all_params = [f"Test {test_type}" for test_type in tests_to_run] + escaped_tests = [test.replace("[", "\\[") for test in tests_to_run] + with ci_group(f"Testing {' '.join(escaped_tests)}"): + all_params = [f"{test_type}" for test_type in tests_to_run] with run_with_pool( parallelism=parallelism, all_params=all_params, @@ -242,9 +250,10 @@ def _run_tests_in_pool( ) for index, test_type in enumerate(tests_to_run) ] + escaped_tests = [test.replace("[", "\\[") for test in tests_to_run] check_async_run_results( results=results, - success=f"Tests {' '.join(tests_to_run)} completed successfully", + success=f"Tests {' '.join(escaped_tests)} completed successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -270,7 +279,7 @@ def run_tests_in_parallel( memory_available = psutil.virtual_memory() if memory_available.available < LOW_MEMORY_CONDITION and exec_shell_params.backend in ["mssql", "mysql"]: # Run heavy tests sequentially - heavy_test_types_to_run = {"Core", "Providers"} & set(test_types_list) + heavy_test_types_to_run = {"Core"} & set(test_types_list) if heavy_test_types_to_run: # some of those are requested get_console().print( @@ -405,10 +414,18 @@ def tests( ) rebuild_or_pull_ci_image_if_needed(command_params=exec_shell_params) cleanup_python_generated_files() + perform_environment_checks() + split_provider_test = False if run_in_parallel: + if parallelism > 4: + get_console().print( + "The parallelism is bigger than 4, we want to split " + "providers into separate test types to gain on parallelism" + ) + split_provider_test = True run_tests_in_parallel( exec_shell_params=exec_shell_params, - test_types_list=test_types.split(" "), + test_types_list=generate_list_of_tests(test_types, split_provider_tests=split_provider_test), extra_pytest_args=extra_pytest_args, db_reset=db_reset, # Allow to pass information on whether to use full tests in the parallel execution mode @@ -432,6 +449,42 @@ def tests( sys.exit(returncode) +def generate_list_of_tests(test_types: str, split_provider_tests: bool) -> list[str]: + """ + Generates list of tests to run based on test types and whether we want to split providers or not. + :param test_types: Test types in a form of space-separated string + :param split_provider_tests: whether to split providers to separate tests per provider. For small number + of cpus we do not want to split provider tests as the overhead with initializing multiple docker + compose/DB instances is too high vs. gain from running multiple providers in parallel. + :return: list of test types to run + """ + split_test_types = test_types.split(" ") + if split_provider_tests: + final_list = split_test_types + else: + final_list = [] + for test_type in split_test_types: + if test_type.startswith("Providers"): + if test_type == "Providers" or test_types == "Providers[all]": + final_list.extend( + [ + f"Providers[{key}]" + for key in json.loads(PROVIDER_PACKAGE_JSON_FILE.read_text()).keys() + ] + ) + else: + provider_keys = test_type.replace("Providers[", "").replace("]", "").split(",") + final_list.extend([f"Providers[{provider}]" for provider in provider_keys]) + else: + final_list.append(test_type) + # Make "Providers", "Providers[amazon]" "WWW tests first as they are the longest test types to run and + # should start early in the process. This is to make sure that last tests to run are small. In case + # last tests to run are long then the parallelism is not fully utilized because the long last test + # will only use single CPU while it is running alone. + final_list.sort(key=lambda x: x in ["Providers", "Providers[amazon]", "WWW"], reverse=True) + return final_list + + @testing.command( name="integration-tests", help="Run the specified integratio tests.", @@ -495,6 +548,7 @@ def integration_tests( skip_provider_tests=skip_provider_tests, ) cleanup_python_generated_files() + perform_environment_checks() returncode, _ = _run_test( exec_shell_params=exec_shell_params, extra_pytest_args=extra_pytest_args, diff --git a/dev/breeze/src/airflow_breeze/utils/console.py b/dev/breeze/src/airflow_breeze/utils/console.py index cf357e48b8b60..c1a14c86f5240 100644 --- a/dev/breeze/src/airflow_breeze/utils/console.py +++ b/dev/breeze/src/airflow_breeze/utils/console.py @@ -81,6 +81,10 @@ class Output(NamedTuple): def file(self) -> TextIO: return open(self.file_name, "a+t") + @property + def escaped_title(self) -> str: + return self.title.replace("[", "\\[") + @lru_cache(maxsize=None) def get_console(output: Output | None = None) -> Console: diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py index 98d341ce19c9e..e5699abb6b26d 100644 --- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py @@ -22,7 +22,7 @@ import sys from copy import deepcopy from random import randint -from subprocess import CalledProcessError, CompletedProcess +from subprocess import DEVNULL, CalledProcessError, CompletedProcess from airflow_breeze.params.build_ci_params import BuildCiParams from airflow_breeze.params.build_prod_params import BuildProdParams @@ -769,3 +769,26 @@ def fix_ownership_using_docker(): "/opt/airflow/scripts/in_container/run_fix_ownership.sh", ] run_command(cmd, text=True, env=env, check=False) + + +def remove_docker_networks(networks: list[str] | None = None) -> None: + """ + Removes specified docker networks. If no networks are specified, it removes all unused networks. + Errors are ignored (not even printed in the output), so you can safely call it without checking + if the networks exist. + + :param networks: list of networks to remove + """ + if networks is None: + run_command( + ["docker", "network", "prune", "-f"], + check=False, + stderr=DEVNULL, + ) + else: + for network in networks: + run_command( + ["docker", "network", "rm", network], + check=False, + stderr=DEVNULL, + ) diff --git a/dev/breeze/src/airflow_breeze/utils/parallel.py b/dev/breeze/src/airflow_breeze/utils/parallel.py index d8e22eb003f69..2c94e2c70f82c 100644 --- a/dev/breeze/src/airflow_breeze/utils/parallel.py +++ b/dev/breeze/src/airflow_breeze/utils/parallel.py @@ -68,7 +68,7 @@ def get_temp_file_name() -> str: def get_output_files(titles: list[str]) -> list[Output]: outputs = [Output(title=titles[i], file_name=get_temp_file_name()) for i in range(len(titles))] for out in outputs: - get_console().print(f"[info]Capturing output of {out.title}:[/] {out.file_name}") + get_console().print(f"[info]Capturing output of {out.escaped_title}:[/] {out.file_name}") return outputs @@ -292,7 +292,7 @@ def print_single_progress(self, output: Output): else: current_line = progress_lines[index] if current_line: - prefix = f"Progress: {output.title:<30}" + prefix = f"> {output.title[:40]:<38}" if not first_line: # remove job prefix after first line prefix = " " * len(prefix) @@ -301,7 +301,7 @@ def print_single_progress(self, output: Output): else: size = os.path.getsize(output.file_name) if Path(output.file_name).exists() else 0 default_output = f"File: {output.file_name} Size: {size:>10} bytes" - get_console().print(f"Progress: {output.title[:30]:<30} {default_output:>161}") + get_console().print(f"> {output.escaped_title[:40]:<38} {default_output:>161}") def print_summary(self): import psutil @@ -341,6 +341,7 @@ def print_async_summary(completed_list: list[ApplyResult]) -> None: get_console().print() for result in completed_list: return_code, info = result.get() + info = info.replace("[", "\[") if return_code != 0: get_console().print(f"[error]NOK[/] for {info}: Return code: {return_code}.") else: @@ -414,10 +415,10 @@ def check_async_run_results( else: message_type = MessageType.SUCCESS if message_type == MessageType.ERROR or include_success_outputs: - with ci_group(title=f"{outputs[i].title}", message_type=message_type): + with ci_group(title=f"{outputs[i].escaped_title}", message_type=message_type): os.write(1, Path(outputs[i].file_name).read_bytes()) else: - get_console().print(f"[success]{outputs[i].title}") + get_console().print(f"[success]{outputs[i].escaped_title} OK[/]") if summarize_on_ci != SummarizeAfter.NO_SUMMARY: regex = re.compile(summary_start_regexp) if summary_start_regexp is not None else None for i, result in enumerate(results): @@ -430,7 +431,7 @@ def check_async_run_results( for line in Path(outputs[i].file_name).read_bytes().decode(errors="ignore").splitlines(): if not print_lines and (regex is None or regex.match(remove_ansi_colours(line))): print_lines = True - get_console().print(f"\n[info]Summary: {outputs[i].title:<30}:\n") + get_console().print(f"\n[info]Summary: {outputs[i].escaped_title:<30}:\n") if print_lines: print(line) try: diff --git a/tests/providers/smtp/hooks/smtp.py b/tests/providers/smtp/hooks/test_smtp.py similarity index 100% rename from tests/providers/smtp/hooks/smtp.py rename to tests/providers/smtp/hooks/test_smtp.py