Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
logger.debug(message)

await ws.send_json({"type": log_type, "message": message})
queue.task_done()

finally:
await ws.close()
Expand Down
50 changes: 33 additions & 17 deletions src/aleph/vm/utils/logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from collections.abc import Callable, Generator
from datetime import datetime
from datetime import datetime, timedelta
from typing import TypedDict

from systemd import journal
Expand Down Expand Up @@ -32,31 +32,47 @@
For more information refer to the sd-journal(3) manpage
and systemd.journal module documentation.
"""
r = journal.Reader()
r.add_match(SYSLOG_IDENTIFIER=stdout_identifier)
r.add_match(SYSLOG_IDENTIFIER=stderr_identifier)
queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
journal_reader = journal.Reader()
journal_reader.add_match(SYSLOG_IDENTIFIER=stdout_identifier)
journal_reader.add_match(SYSLOG_IDENTIFIER=stderr_identifier)
queue: asyncio.Queue = asyncio.Queue(maxsize=5)
tasks: list[asyncio.Task] = []

def _ready_for_read() -> None:
change_type = r.process() # reset fd status
if change_type != journal.APPEND:
return
loop = asyncio.get_event_loop()

async def process_messages() -> None:
"""Enqueue all the available log entries, wait if queue is full, then wait for new message via add_reader"""
# Remove reader so we don't get called again while processing
loop.remove_reader(journal_reader.fileno())

Check warning on line 46 in src/aleph/vm/utils/logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/utils/logs.py#L46

Added line #L46 was not covered by tests
entry: EntryDict
for entry in r:
for entry in journal_reader:
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == stdout_identifier else "stderr"
msg = entry["MESSAGE"]
asyncio.create_task(queue.put((log_type, msg)))
# will wait if queue is full
await queue.put((log_type, msg))
journal_reader.process() # reset fd status
journal_reader.process() # reset fd status

Check warning on line 54 in src/aleph/vm/utils/logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/utils/logs.py#L52-L54

Added lines #L52 - L54 were not covered by tests
# Call _ready_for_read read when entries are readable again, this is non-blocking
loop.add_reader(journal_reader.fileno(), _ready_for_read)

Check warning on line 56 in src/aleph/vm/utils/logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/utils/logs.py#L56

Added line #L56 was not covered by tests

def _ready_for_read() -> None:
# wrapper around process_messages as add_reader don't take an async func
task = loop.create_task(process_messages(), name=f"process_messages-queue-{id(queue)}")
tasks.append(task)
task.add_done_callback(tasks.remove)

if skip_past:
r.seek_tail()
# seek_tail doesn't work see https://github.com/systemd/systemd/issues/17662
journal_reader.seek_realtime(datetime.now() - timedelta(seconds=10))

Check warning on line 66 in src/aleph/vm/utils/logs.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/utils/logs.py#L66

Added line #L66 was not covered by tests

loop = asyncio.get_event_loop()
loop.add_reader(r.fileno(), _ready_for_read)
_ready_for_read()

def do_cancel():
logger.info(f"cancelling reader {r}")
loop.remove_reader(r.fileno())
r.close()
logger.info(f"cancelling queue and reader {journal_reader}")
loop.remove_reader(journal_reader.fileno())
for task in tasks:
task.cancel()
journal_reader.close()

return queue, do_cancel

Expand Down
15 changes: 15 additions & 0 deletions tests/supervisor/test_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from asyncio import QueueEmpty

from aleph.vm.utils.logs import make_logs_queue


def test_make_logs_queue():
stdout_identifier = "test_stdout"
stderr_identifier = "test_stderr"
queue, do_cancel = make_logs_queue(stdout_identifier, stderr_identifier)
import pytest

with pytest.raises(QueueEmpty):
while queue.get_nowait():
queue.task_done()

Check warning on line 14 in tests/supervisor/test_log.py

View check run for this annotation

Codecov / codecov/patch

tests/supervisor/test_log.py#L14

Added line #L14 was not covered by tests
do_cancel()
Loading