Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- #534: Converted integration API test to pytest - test_hash_temp_dir.py
- #534: Converted integration API test to pytest - test_hash_temp_dir_with_files.py
- #534: Converted integration API test to pytest - test_test_env_reuse.py
- #534: Converted integration API test to pytest - test_termination_handler.py

## features
- #517: Added docker-db 2025-1-3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def get_queue_content(q: Queue) -> list[str]:


class TestTerminationHandler(unittest.TestCase):
"""
Deprecated. Replaced by ./test/unit/test_termination_handler.py
"""

def test_success(self):
q = Queue()
Expand Down
163 changes: 60 additions & 103 deletions test/unit/test_termination_handler.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,70 @@
import dataclasses
import io
import multiprocessing as mp
import re
import time
from contextlib import (
ExitStack,
redirect_stderr,
redirect_stdout,
)
from test.matchers import regex_matcher
import sys
from importlib import reload
from multiprocessing import Queue

import pytest
import exasol_integration_test_docker_environment.cli.termination_handler

from exasol_integration_test_docker_environment.cli.termination_handler import (
TerminationHandler,
)
from exasol_integration_test_docker_environment.lib.base.dependency_logger_base_task import (
DependencyLoggerBaseTask,
)
from exasol_integration_test_docker_environment.lib.base.run_task import (
generate_root_task,
run_task,
)

class StdoutQueue:
def __init__(self, wrapped_queue: Queue):
self.queue = wrapped_queue
self.old_stdout = sys.stdout

class CompositeFailingTask(DependencyLoggerBaseTask):
def write(self, msg):
self.queue.put(msg)

def register_required(self):
self.dependencies = self.register_dependencies(
[self.create_child_task(FailingTask1), self.create_child_task(FailingTask2)]
)
return self.dependencies
def flush(self):
self.old_stdout.flush()

def run_task(self):
pass


class FailingTask1(DependencyLoggerBaseTask):

def run_task(self):
# The sleep is needed to garantuee that both FailingTasks are started before one fails,
# because otherwise the other one won't be started
time.sleep(0.1)
raise RuntimeError(f"Error in {self.__class__.__name__} occurred.")


class FailingTask2(DependencyLoggerBaseTask):

def run_task(self):
# The sleep is needed to garantuee that both FailingTasks are started before one fails,
# because otherwise the other one won't be started
time.sleep(0.1)
raise RuntimeError(f"Error in {self.__class__.__name__} occurred.")


@dataclasses.dataclass
class Capture:
err: str
out: str


@pytest.fixture(scope="module")
def capture_output_of_test() -> Capture:
out = io.StringIO()
err = io.StringIO()
with ExitStack() as stack:
# We can't use capsys, because this is a function scope fixture,
# but we want to execute the task only once, due to the sleep
stack.enter_context(redirect_stdout(out))
stack.enter_context(redirect_stderr(err))
stack.enter_context(pytest.raises(SystemExit))
stack.enter_context(TerminationHandler())

def task_creator():
return generate_root_task(task_class=CompositeFailingTask)

run_task(task_creator=task_creator, workers=3)
return Capture(out=out.getvalue(), err=err.getvalue())


def test_command_runtime(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
"The command failed after .* s with:"
)


def test_composite_task_failed(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*Task failure message: Task CompositeFailingTask.* \(or any of it's subtasks\) failed\.",
re.DOTALL,
)


def test_sub_tasks_failed(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*Following task failures were caught during the execution:", re.DOTALL
)


def test_sub_task1_error(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*RuntimeError: Error in FailingTask1 occurred.", re.DOTALL
)


def test_sub_task2_error(capture_output_of_test):
assert capture_output_of_test.err == regex_matcher(
r".*RuntimeError: Error in FailingTask2 occurred.", re.DOTALL
)
def run_positive(queue: Queue) -> None:
stdout_queue = StdoutQueue(queue)
sys.stdout = stdout_queue
sys.stderr = stdout_queue
# we need to release the termination module here, otherwise the new sys.stdout/sys.stderr might not be set correctly
m = reload(exasol_integration_test_docker_environment.cli.termination_handler)
with m.TerminationHandler():
pass


def test_out_empty(capture_output_of_test):
assert capture_output_of_test.out == ""
def run_with_unknown_error(queue: Queue) -> None:
stdout_queue = StdoutQueue(queue)
sys.stdout = stdout_queue
sys.stderr = stdout_queue
# we need to release the termination module here, otherwise the new sys.stdout/sys.stderr might not be set correctly
m = reload(exasol_integration_test_docker_environment.cli.termination_handler)
with m.TerminationHandler():
raise RuntimeError("unknown error")


def get_queue_content(q: Queue) -> list[str]:
result = []
while not q.empty():
result.append(q.get(block=False))
return result


def test_success():
q = Queue()
p = mp.Process(target=run_positive, args=(q,))
p.start()
p.join()
res = get_queue_content(q)
assert any(
re.match(r"^The command took .+ s$", line) for line in res
), f"Result {res} doesn't contain 'The command took'"
assert p.exitcode == 0


def test_unknown_error():
q = Queue()
p = mp.Process(target=run_with_unknown_error, args=(q,))
p.start()
p.join()
res = get_queue_content(q)
assert any(re.match(r"^The command failed after .+ s with:$", line) for line in res)
assert any("Caught exception:unknown error" == line for line in res)
assert any('raise RuntimeError("unknown error")' in line for line in res)
assert p.exitcode == 1