Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- #534: Converted integration API test to pytest - test_test_env_reuse.py
- #534: Converted integration API test to pytest - test_populate_data.py
- #534: Converted integration API test to pytest - test_hash_symlink_loops.py
- #534: Converted integration API test to pytest - test_termination_handler.py

## features
- #517: Added docker-db 2025-1-3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def get_queue_content(q: Queue) -> list[str]:


class TestTerminationHandler(unittest.TestCase):
"""
Deprecated. Replaced by ./test/unit/test_termination_handler.py
"""

def test_success(self):
q = Queue()
Expand Down
163 changes: 60 additions & 103 deletions test/unit/test_termination_handler.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,70 @@
import dataclasses
import io
import multiprocessing as mp
import re
import time
from contextlib import (
ExitStack,
redirect_stderr,
redirect_stdout,
)
from test.matchers import regex_matcher
import sys
from importlib import reload
from multiprocessing import Queue

import pytest
import exasol_integration_test_docker_environment.cli.termination_handler

from exasol_integration_test_docker_environment.cli.termination_handler import (
TerminationHandler,
)
from exasol_integration_test_docker_environment.lib.base.dependency_logger_base_task import (
DependencyLoggerBaseTask,
)
from exasol_integration_test_docker_environment.lib.base.run_task import (
generate_root_task,
run_task,
)

class StdoutQueue:
def __init__(self, wrapped_queue: Queue):
self.queue = wrapped_queue
self.old_stdout = sys.stdout

class CompositeFailingTask(DependencyLoggerBaseTask):
def write(self, msg):
self.queue.put(msg)

def register_required(self):
self.dependencies = self.register_dependencies(
[self.create_child_task(FailingTask1), self.create_child_task(FailingTask2)]
)
return self.dependencies
def flush(self):
self.old_stdout.flush()

def run_task(self):
pass


class FailingTask1(DependencyLoggerBaseTask):

def run_task(self):
# The sleep is needed to garantuee that both FailingTasks are started before one fails,
# because otherwise the other one won't be started
time.sleep(0.1)
raise RuntimeError(f"Error in {self.__class__.__name__} occurred.")


class FailingTask2(DependencyLoggerBaseTask):

def run_task(self):
# The sleep is needed to garantuee that both FailingTasks are started before one fails,
# because otherwise the other one won't be started
time.sleep(0.1)
raise RuntimeError(f"Error in {self.__class__.__name__} occurred.")


@dataclasses.dataclass
class Capture:
err: str
out: str


@pytest.fixture(scope="module")
def capture_output_of_test() -> Capture:
out = io.StringIO()
err = io.StringIO()
with ExitStack() as stack:
# We can't use capsys, because this is a function scope fixture,
# but we want to execute the task only once, due to the sleep
stack.enter_context(redirect_stdout(out))
stack.enter_context(redirect_stderr(err))
stack.enter_context(pytest.raises(SystemExit))
stack.enter_context(TerminationHandler())

def task_creator():
return generate_root_task(task_class=CompositeFailingTask)

run_task(task_creator=task_creator, workers=3)
return Capture(out=out.getvalue(), err=err.getvalue())


def test_command_runtime(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
"The command failed after .* s with:"
)


def test_composite_task_failed(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*Task failure message: Task CompositeFailingTask.* \(or any of it's subtasks\) failed\.",
re.DOTALL,
)


def test_sub_tasks_failed(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*Following task failures were caught during the execution:", re.DOTALL
)


def test_sub_task1_error(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*RuntimeError: Error in FailingTask1 occurred.", re.DOTALL
)


def test_sub_task2_error(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*RuntimeError: Error in FailingTask2 occurred.", re.DOTALL
)
def run_positive(queue: Queue) -> None:
stdout_queue = StdoutQueue(queue)
sys.stdout = stdout_queue
sys.stderr = stdout_queue
# we need to release the termination module here, otherwise the new sys.stdout/sys.stderr might not be set correctly
m = reload(exasol_integration_test_docker_environment.cli.termination_handler)
with m.TerminationHandler():
pass


def test_out_empty(capture_output_of_test):
assert capture_output_of_test.out == ""
def run_with_unknown_error(queue: Queue) -> None:
stdout_queue = StdoutQueue(queue)
sys.stdout = stdout_queue
sys.stderr = stdout_queue
# we need to release the termination module here, otherwise the new sys.stdout/sys.stderr might not be set correctly
m = reload(exasol_integration_test_docker_environment.cli.termination_handler)
with m.TerminationHandler():
raise RuntimeError("unknown error")


def get_queue_content(q: Queue) -> list[str]:
result = []
while not q.empty():
result.append(q.get(block=False))
return result


def test_success():
q = Queue()
p = mp.Process(target=run_positive, args=(q,))
p.start()
p.join()
res = get_queue_content(q)
assert any(
re.match(r"^The command took .+ s$", line) for line in res
), f"Result {res} doesn't contain 'The command took'"
assert p.exitcode == 0


def test_unknown_error():
q = Queue()
p = mp.Process(target=run_with_unknown_error, args=(q,))
p.start()
p.join()
res = get_queue_content(q)
assert any(re.match(r"^The command failed after .+ s with:$", line) for line in res)
assert any("Caught exception:unknown error" == line for line in res)
assert any('raise RuntimeError("unknown error")' in line for line in res)
assert p.exitcode == 1
Loading