Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -554,18 +554,28 @@ command takes care about it. This is needed when you want to run webserver insid
Breeze cleanup
--------------

Breeze uses docker images heavily and those images are rebuild periodically. This might cause extra
disk usage by the images. If you need to clean-up the images periodically you can run
``breeze setup cleanup`` command (by default it will skip removing your images before cleaning up but you
can also remove the images to clean-up everything by adding ``--all``).
Sometimes you need to cleanup your docker environment (and it is recommended you do that regularly). There
are several reasons why you might want to do that.

Those are all available flags of ``cleanup`` command:
Breeze uses docker images heavily and those images are rebuild periodically and might leave dangling, unused
images in docker cache. This might cause extra disk usage. Also running various docker compose commands
(for example running tests with ``breeze testing tests``) might create additional docker networks that might
prevent new networks from being created. Those networks are not removed automatically by docker-compose.
Also Breeze uses it's own cache to keep information about all images.

All those unused images, networks and cache can be removed by running ``breeze cleanup`` command. By default
it will not remove the most recent images that you might need to run breeze commands, but you
can also remove those breeze images to clean-up everything by adding ``--all`` command (note that you will
need to build the images again from scratch - pulling from the registry might take a while).

Breeze will ask you to confirm each step, unless you specify ``--answer yes`` flag.

Those are all available flags of ``cleanup`` command:

.. image:: ./images/breeze/output_cleanup.svg
:target: https://raw.githubusercontent.com/apache/airflow/main/images/breeze/output_cleanup.svg
:width: 100%
:alt: Breeze setup cleanup
:alt: Breeze cleanup

Running arbitrary commands in container
---------------------------------------
Expand Down
9 changes: 7 additions & 2 deletions dev/breeze/src/airflow_breeze/commands/main_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from airflow_breeze.utils.confirm import Answer, user_confirm
from airflow_breeze.utils.console import get_console
from airflow_breeze.utils.docker_command_utils import remove_docker_networks
from airflow_breeze.utils.path_utils import BUILD_CACHE_DIR
from airflow_breeze.utils.run_utils import run_command
from airflow_breeze.utils.shared_options import get_dry_run
Expand Down Expand Up @@ -249,10 +250,14 @@ def cleanup(all: bool):
sys.exit(0)
else:
get_console().print("[info]No locally downloaded images to remove[/]\n")
get_console().print("Removing unused networks")
given_answer = user_confirm("Are you sure with the removal of unused docker networks?")
if given_answer == Answer.YES:
remove_docker_networks()
get_console().print("Pruning docker images")
given_answer = user_confirm("Are you sure with the removal?")
given_answer = user_confirm("Are you sure with the removal of docker images?")
if given_answer == Answer.YES:
system_prune_command_to_execute = ["docker", "system", "prune"]
system_prune_command_to_execute = ["docker", "system", "prune", "-f"]
run_command(
system_prune_command_to_execute,
check=False,
Expand Down
68 changes: 61 additions & 7 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import json
import os
import re
import sys
Expand All @@ -25,7 +26,11 @@
from click import IntRange

from airflow_breeze.commands.ci_image_commands import rebuild_or_pull_ci_image_if_needed
from airflow_breeze.global_constants import ALLOWED_TEST_TYPE_CHOICES, all_selective_test_types
from airflow_breeze.global_constants import (
ALLOWED_TEST_TYPE_CHOICES,
PROVIDER_PACKAGE_JSON_FILE,
all_selective_test_types,
)
from airflow_breeze.params.build_prod_params import BuildProdParams
from airflow_breeze.params.shell_params import ShellParams
from airflow_breeze.utils.ci_group import ci_group
Expand Down Expand Up @@ -56,6 +61,7 @@
DOCKER_COMPOSE_COMMAND,
get_env_variables_for_docker_commands,
perform_environment_checks,
remove_docker_networks,
)
from airflow_breeze.utils.parallel import (
GenericRegexpProgressMatcher,
Expand Down Expand Up @@ -129,7 +135,6 @@ def _run_test(
env_variables["TEST_TIMEOUT"] = str(test_timeout)
if db_reset:
env_variables["DB_RESET"] = "true"
perform_environment_checks()
env_variables["TEST_TYPE"] = exec_shell_params.test_type
env_variables["SKIP_PROVIDER_TESTS"] = str(exec_shell_params.skip_provider_tests).lower()
if "[" in exec_shell_params.test_type and not exec_shell_params.test_type.startswith("Providers"):
Expand Down Expand Up @@ -158,6 +163,7 @@ def _run_test(
]
run_cmd.extend(list(extra_pytest_args))
try:
remove_docker_networks(networks=[f"airflow-test-{project_name}_default"])
result = run_command(
run_cmd,
env=env_variables,
Expand Down Expand Up @@ -198,6 +204,7 @@ def _run_test(
check=False,
verbose_override=False,
)
remove_docker_networks(networks=[f"airflow-test-{project_name}_default"])
return result.returncode, f"Test: {exec_shell_params.test_type}"


Expand All @@ -217,8 +224,9 @@ def _run_tests_in_pool(
debug_resources: bool,
skip_cleanup: bool,
):
with ci_group(f"Testing {' '.join(tests_to_run)}"):
all_params = [f"Test {test_type}" for test_type in tests_to_run]
escaped_tests = [test.replace("[", "\\[") for test in tests_to_run]
with ci_group(f"Testing {' '.join(escaped_tests)}"):
all_params = [f"{test_type}" for test_type in tests_to_run]
with run_with_pool(
parallelism=parallelism,
all_params=all_params,
Expand All @@ -242,9 +250,10 @@ def _run_tests_in_pool(
)
for index, test_type in enumerate(tests_to_run)
]
escaped_tests = [test.replace("[", "\\[") for test in tests_to_run]
check_async_run_results(
results=results,
success=f"Tests {' '.join(tests_to_run)} completed successfully",
success=f"Tests {' '.join(escaped_tests)} completed successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand All @@ -270,7 +279,7 @@ def run_tests_in_parallel(
memory_available = psutil.virtual_memory()
if memory_available.available < LOW_MEMORY_CONDITION and exec_shell_params.backend in ["mssql", "mysql"]:
# Run heavy tests sequentially
heavy_test_types_to_run = {"Core", "Providers"} & set(test_types_list)
heavy_test_types_to_run = {"Core"} & set(test_types_list)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will make Providers tests safe to run for Public runners - each of them will require much less of memory and the way it is done where each "provider" test will run in their separate docker container that will be torn down immediately when the test type completes, we will continuously reclaim resources from each test type while we are running the tests on Public runners.

if heavy_test_types_to_run:
# some of those are requested
get_console().print(
Expand Down Expand Up @@ -405,10 +414,18 @@ def tests(
)
rebuild_or_pull_ci_image_if_needed(command_params=exec_shell_params)
cleanup_python_generated_files()
perform_environment_checks()
split_provider_test = False
if run_in_parallel:
if parallelism > 4:
get_console().print(
"The parallelism is bigger than 4, we want to split "
"providers into separate test types to gain on parallelism"
)
split_provider_test = True
run_tests_in_parallel(
exec_shell_params=exec_shell_params,
test_types_list=test_types.split(" "),
test_types_list=generate_list_of_tests(test_types, split_provider_tests=split_provider_test),
extra_pytest_args=extra_pytest_args,
db_reset=db_reset,
# Allow to pass information on whether to use full tests in the parallel execution mode
Expand All @@ -432,6 +449,42 @@ def tests(
sys.exit(returncode)


def generate_list_of_tests(test_types: str, split_provider_tests: bool) -> list[str]:
"""
Generates list of tests to run based on test types and whether we want to split providers or not.
:param test_types: Test types in a form of space-separated string
:param split_provider_tests: whether to split providers to separate tests per provider. For small number
of cpus we do not want to split provider tests as the overhead with initializing multiple docker
compose/DB instances is too high vs. gain from running multiple providers in parallel.
:return: list of test types to run
"""
split_test_types = test_types.split(" ")
if split_provider_tests:
final_list = split_test_types
else:
final_list = []
for test_type in split_test_types:
if test_type.startswith("Providers"):
if test_type == "Providers" or test_types == "Providers[all]":
final_list.extend(
[
f"Providers[{key}]"
for key in json.loads(PROVIDER_PACKAGE_JSON_FILE.read_text()).keys()
]
)
else:
provider_keys = test_type.replace("Providers[", "").replace("]", "").split(",")
final_list.extend([f"Providers[{provider}]" for provider in provider_keys])
else:
final_list.append(test_type)
# Make "Providers", "Providers[amazon]" "WWW tests first as they are the longest test types to run and
# should start early in the process. This is to make sure that last tests to run are small. In case
# last tests to run are long then the parallelism is not fully utilized because the long last test
# will only use single CPU while it is running alone.
final_list.sort(key=lambda x: x in ["Providers", "Providers[amazon]", "WWW"], reverse=True)
return final_list


@testing.command(
name="integration-tests",
help="Run the specified integratio tests.",
Expand Down Expand Up @@ -495,6 +548,7 @@ def integration_tests(
skip_provider_tests=skip_provider_tests,
)
cleanup_python_generated_files()
perform_environment_checks()
returncode, _ = _run_test(
exec_shell_params=exec_shell_params,
extra_pytest_args=extra_pytest_args,
Expand Down
4 changes: 4 additions & 0 deletions dev/breeze/src/airflow_breeze/utils/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class Output(NamedTuple):
def file(self) -> TextIO:
return open(self.file_name, "a+t")

@property
def escaped_title(self) -> str:
return self.title.replace("[", "\\[")


@lru_cache(maxsize=None)
def get_console(output: Output | None = None) -> Console:
Expand Down
25 changes: 24 additions & 1 deletion dev/breeze/src/airflow_breeze/utils/docker_command_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sys
from copy import deepcopy
from random import randint
from subprocess import CalledProcessError, CompletedProcess
from subprocess import DEVNULL, CalledProcessError, CompletedProcess

from airflow_breeze.params.build_ci_params import BuildCiParams
from airflow_breeze.params.build_prod_params import BuildProdParams
Expand Down Expand Up @@ -769,3 +769,26 @@ def fix_ownership_using_docker():
"/opt/airflow/scripts/in_container/run_fix_ownership.sh",
]
run_command(cmd, text=True, env=env, check=False)


def remove_docker_networks(networks: list[str] | None = None) -> None:
"""
Removes specified docker networks. If no networks are specified, it removes all unused networks.
Errors are ignored (not even printed in the output), so you can safely call it without checking
if the networks exist.

:param networks: list of networks to remove
"""
if networks is None:
run_command(
["docker", "network", "prune", "-f"],
check=False,
stderr=DEVNULL,
)
else:
for network in networks:
run_command(
["docker", "network", "rm", network],
check=False,
stderr=DEVNULL,
)
13 changes: 7 additions & 6 deletions dev/breeze/src/airflow_breeze/utils/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_temp_file_name() -> str:
def get_output_files(titles: list[str]) -> list[Output]:
outputs = [Output(title=titles[i], file_name=get_temp_file_name()) for i in range(len(titles))]
for out in outputs:
get_console().print(f"[info]Capturing output of {out.title}:[/] {out.file_name}")
get_console().print(f"[info]Capturing output of {out.escaped_title}:[/] {out.file_name}")
return outputs


Expand Down Expand Up @@ -292,7 +292,7 @@ def print_single_progress(self, output: Output):
else:
current_line = progress_lines[index]
if current_line:
prefix = f"Progress: {output.title:<30}"
prefix = f"> {output.title[:40]:<38}"
if not first_line:
# remove job prefix after first line
prefix = " " * len(prefix)
Expand All @@ -301,7 +301,7 @@ def print_single_progress(self, output: Output):
else:
size = os.path.getsize(output.file_name) if Path(output.file_name).exists() else 0
default_output = f"File: {output.file_name} Size: {size:>10} bytes"
get_console().print(f"Progress: {output.title[:30]:<30} {default_output:>161}")
get_console().print(f"> {output.escaped_title[:40]:<38} {default_output:>161}")

def print_summary(self):
import psutil
Expand Down Expand Up @@ -341,6 +341,7 @@ def print_async_summary(completed_list: list[ApplyResult]) -> None:
get_console().print()
for result in completed_list:
return_code, info = result.get()
info = info.replace("[", "\[")
if return_code != 0:
get_console().print(f"[error]NOK[/] for {info}: Return code: {return_code}.")
else:
Expand Down Expand Up @@ -414,10 +415,10 @@ def check_async_run_results(
else:
message_type = MessageType.SUCCESS
if message_type == MessageType.ERROR or include_success_outputs:
with ci_group(title=f"{outputs[i].title}", message_type=message_type):
with ci_group(title=f"{outputs[i].escaped_title}", message_type=message_type):
os.write(1, Path(outputs[i].file_name).read_bytes())
else:
get_console().print(f"[success]{outputs[i].title}")
get_console().print(f"[success]{outputs[i].escaped_title} OK[/]")
if summarize_on_ci != SummarizeAfter.NO_SUMMARY:
regex = re.compile(summary_start_regexp) if summary_start_regexp is not None else None
for i, result in enumerate(results):
Expand All @@ -430,7 +431,7 @@ def check_async_run_results(
for line in Path(outputs[i].file_name).read_bytes().decode(errors="ignore").splitlines():
if not print_lines and (regex is None or regex.match(remove_ansi_colours(line))):
print_lines = True
get_console().print(f"\n[info]Summary: {outputs[i].title:<30}:\n")
get_console().print(f"\n[info]Summary: {outputs[i].escaped_title:<30}:\n")
if print_lines:
print(line)
try:
Expand Down