From e4ae222f9581112e1d20742d44ff1a989e36224d Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Wed, 8 Jan 2025 10:19:56 +0100 Subject: [PATCH 1/3] Fix VM logs websocket endpoint getting stuck Jira ticket: ALEPH-316 At some point the websocket endpoint stopped sending log entries to client Original report from Arnaud: "aleph instance logs" output gets stuck VM started and is accessible, but logs seem frozen when querying them via CLI. Analysis: The log entries are in journald and retrievable but they don't get put at on the internal queue. My guess is that there is a lock bug between the journal polling and the queue max length, probably that if the queue is full and get emptied, the polling doesn't reset unless new message arrive in journald. Solution: Rework the journald to Queue implementation. Note: I have tried a lot of combinaison and variation of the logic and rewrote that code block multiple times, this version works as far as I can tell. The interaction between asyncio select handling, journald fd reset and async task is complicated and confusing. Other versions broke in some way or others, sometimes subtle. For examples: - Past logs working but new log entries not getting consumed - Queue (re)filling starting only when a new entrie was created - infinite tasks always popping up - multiple task consuming the same fd - Polling never resetting and consumer being called in a loop Some of theses issues get hidden by the TCP buffer on the websocket and only popup after some time or depending on the network or how the client interact. So beware if you try to rewrite that logic. Also DO NOT TRUST ChatGPT or Deepseek on this, they will produce nice looking code that do not works properly. To test: Start an instance or a program on your dev CRN. Fetch the logs using `aleph instance logs` with the --domain option. For testing you can insert additional log entries using the systemd-cat command ```bash echo 1 $(date)|systemd-cat -t vm-63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace-stdout ``` or ```bash for ((i=1; i<=400; i++)); do echo $i echo $i |systemd-cat -t vm-63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace-stdout sleep 1 done ``` Log server side can be checked using ``` journalctl -t vm-63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace-stderr -t vm-63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace-stdout -f ``` For reproducing or debugging issues adding an asyncio.sleep(0.2) inside the websocket handling code usually helps, most inner loop of `stream_logs` method inside src/aleph/vm/orchestrator/views/operator.py --- src/aleph/vm/orchestrator/views/operator.py | 1 + src/aleph/vm/utils/logs.py | 29 ++++++++++++++------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index fc7436d70..b808e94fe 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -93,6 +93,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: logger.debug(message) await ws.send_json({"type": log_type, "message": message}) + queue.task_done() finally: await ws.close() diff --git a/src/aleph/vm/utils/logs.py b/src/aleph/vm/utils/logs.py index 1bf0dc449..005016040 100644 --- a/src/aleph/vm/utils/logs.py +++ b/src/aleph/vm/utils/logs.py @@ -1,8 +1,8 @@ import asyncio import logging from collections.abc import Callable, Generator -from datetime import datetime -from typing import TypedDict +from datetime import datetime, timedelta +from typing import List, TypedDict from systemd import journal @@ -35,27 +35,38 @@ def make_logs_queue(stdout_identifier, stderr_identifier, skip_past=False) -> tu r = journal.Reader() r.add_match(SYSLOG_IDENTIFIER=stdout_identifier) r.add_match(SYSLOG_IDENTIFIER=stderr_identifier) - queue: asyncio.Queue = asyncio.Queue(maxsize=1000) + queue: asyncio.Queue = asyncio.Queue(maxsize=5) + tasks: List[asyncio.Future] = [] - def _ready_for_read() -> None: - change_type = r.process() # reset fd status - if change_type != journal.APPEND: - return + async def process_messages() -> None: + loop.remove_reader(r.fileno()) entry: EntryDict for entry in r: log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == stdout_identifier else "stderr" msg = entry["MESSAGE"] - asyncio.create_task(queue.put((log_type, msg))) + await queue.put((log_type, msg)) + r.process() # reset fd status + r.process() # reset fd status + loop.add_reader(r.fileno(), _ready_for_read) + + def _ready_for_read() -> None: + task = loop.create_task(process_messages(), name=f"process_messages-queue-{id(queue)}") + tasks.append(task) + task.add_done_callback(tasks.remove) if skip_past: - r.seek_tail() + # seek_tail doesn't work see https://github.com/systemd/systemd/issues/17662 + r.seek_realtime(datetime.now() - timedelta(seconds=10)) loop = asyncio.get_event_loop() loop.add_reader(r.fileno(), _ready_for_read) + r.process() def do_cancel(): logger.info(f"cancelling reader {r}") loop.remove_reader(r.fileno()) + for task in tasks: + task.cancel() r.close() return queue, do_cancel From 61642de49a5f9fbfa7f013a499936e6bbc5b7d62 Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Thu, 9 Jan 2025 09:48:33 +0100 Subject: [PATCH 2/3] Rework make_logs_queue to make it more readable --- src/aleph/vm/utils/logs.py | 39 +++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/aleph/vm/utils/logs.py b/src/aleph/vm/utils/logs.py index 005016040..868aad7a3 100644 --- a/src/aleph/vm/utils/logs.py +++ b/src/aleph/vm/utils/logs.py @@ -2,7 +2,7 @@ import logging from collections.abc import Callable, Generator from datetime import datetime, timedelta -from typing import List, TypedDict +from typing import TypedDict from systemd import journal @@ -32,42 +32,47 @@ def make_logs_queue(stdout_identifier, stderr_identifier, skip_past=False) -> tu For more information refer to the sd-journal(3) manpage and systemd.journal module documentation. """ - r = journal.Reader() - r.add_match(SYSLOG_IDENTIFIER=stdout_identifier) - r.add_match(SYSLOG_IDENTIFIER=stderr_identifier) + journal_reader = journal.Reader() + journal_reader.add_match(SYSLOG_IDENTIFIER=stdout_identifier) + journal_reader.add_match(SYSLOG_IDENTIFIER=stderr_identifier) queue: asyncio.Queue = asyncio.Queue(maxsize=5) - tasks: List[asyncio.Future] = [] + tasks: list[asyncio.Task] = [] + + loop = asyncio.get_event_loop() async def process_messages() -> None: - loop.remove_reader(r.fileno()) + """Enqueue all the available log entries, wait if queue is full, then wait for new message via add_reader""" + # Remove reader so we don't get called again while processing + loop.remove_reader(journal_reader.fileno()) entry: EntryDict - for entry in r: + for entry in journal_reader: log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == stdout_identifier else "stderr" msg = entry["MESSAGE"] + # will wait if queue is full await queue.put((log_type, msg)) - r.process() # reset fd status - r.process() # reset fd status - loop.add_reader(r.fileno(), _ready_for_read) + journal_reader.process() # reset fd status + journal_reader.process() # reset fd status + # Call _ready_for_read read when entries are readable again, this is non-blocking + loop.add_reader(journal_reader.fileno(), _ready_for_read) def _ready_for_read() -> None: + # wrapper around process_messages as add_reader don't take an async func task = loop.create_task(process_messages(), name=f"process_messages-queue-{id(queue)}") tasks.append(task) task.add_done_callback(tasks.remove) if skip_past: # seek_tail doesn't work see https://github.com/systemd/systemd/issues/17662 - r.seek_realtime(datetime.now() - timedelta(seconds=10)) + journal_reader.seek_realtime(datetime.now() - timedelta(seconds=10)) - loop = asyncio.get_event_loop() - loop.add_reader(r.fileno(), _ready_for_read) - r.process() + _ready_for_read() def do_cancel(): - logger.info(f"cancelling reader {r}") - loop.remove_reader(r.fileno()) + logger.info(f"cancelling queue and reader {journal_reader}") + loop.remove_reader(journal_reader.fileno()) for task in tasks: task.cancel() - r.close() + journal_reader.close() return queue, do_cancel From 7a50834f2f147f87826ac1e6e8b313947bdfecfa Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Tue, 14 Jan 2025 16:01:44 +0100 Subject: [PATCH 3/3] Test test_make_logs_queue --- tests/supervisor/test_log.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 tests/supervisor/test_log.py diff --git a/tests/supervisor/test_log.py b/tests/supervisor/test_log.py new file mode 100644 index 000000000..23f27aaaa --- /dev/null +++ b/tests/supervisor/test_log.py @@ -0,0 +1,15 @@ +from asyncio import QueueEmpty + +from aleph.vm.utils.logs import make_logs_queue + + +def test_make_logs_queue(): + stdout_identifier = "test_stdout" + stderr_identifier = "test_stderr" + queue, do_cancel = make_logs_queue(stdout_identifier, stderr_identifier) + import pytest + + with pytest.raises(QueueEmpty): + while queue.get_nowait(): + queue.task_done() + do_cancel()