Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/dev/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Kafka

`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents
emitted by the `RunEngine` to kafka. The kafka callback is automatically added by
{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always
enabled.

Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html).

The kafka broker to send to can be controlled using the `IBEX_BLUESKY_CORE_KAFKA_BROKER` environment variable, if
an instrument needs to override the default. The kafka topic will be `<INSTRUMENT>_bluesky`, where `INSTRUMENT` is the
instrument name with any NDX or NDH prefix stripped.

The message key will always be `doc` for bluesky documents; specifying a non-null key enforces message ordering.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ classifiers = [

dependencies = [
"bluesky", # Bluesky framework
"bluesky-kafka", # Bluesky-kafka integration
"ophyd-async[ca] == 0.12.3", # Device abstraction
"matplotlib", # Plotting
"lmfit", # Fitting
Expand Down
2 changes: 1 addition & 1 deletion src/ibex_bluesky_core/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# Find the log directory, if already set in the environment, else use the default
log_location = os.environ.get("IBEX_BLUESKY_CORE_LOGS", DEFAULT_LOG_FOLDER)

INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async"]
INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async", "bluesky_kafka"]


@cache
Expand Down
32 changes: 31 additions & 1 deletion src/ibex_bluesky_core/run_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
from typing import Any, cast

import bluesky.preprocessors as bpp
import msgpack
from bluesky.run_engine import RunEngine, RunEngineResult
from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted

from ibex_bluesky_core.callbacks import DocLoggingCallback
from ibex_bluesky_core.preprocessors import add_rb_number_processor

__all__ = ["get_run_engine", "run_plan"]
__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]


import os
import socket

from bluesky_kafka import Publisher

from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY
from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler
from ibex_bluesky_core.utils import is_matplotlib_backend_qt
Expand All @@ -26,6 +32,21 @@
logger = logging.getLogger(__name__)


DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"


def get_kafka_topic_name() -> str:
"""Get the name of the bluesky kafka topic for this machine."""
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
computer_name = computer_name.upper()
if computer_name.startswith(("NDX", "NDH")):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should really pull this out of genie and into server_common but we'll do that later

name = computer_name[3:]
else:
name = computer_name

return f"{name}_bluesky"


class _DuringTask(DuringTask):
def block(self, blocking_event: Event) -> None:
"""On windows, event.wait() on the main thread is not interruptible by a CTRL-C.
Expand Down Expand Up @@ -103,6 +124,15 @@ def get_run_engine() -> RunEngine:
log_callback = DocLoggingCallback()
RE.subscribe(log_callback)

kafka_callback = Publisher(
topic=get_kafka_topic_name(),
bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER),
key="doc",
serializer=msgpack.dumps,
producer_config={"enable.idempotence": True},
)
RE.subscribe(kafka_callback)

RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler)
RE.register_command(CALL_QT_AWARE_MSG_KEY, call_qt_aware_handler)

Expand Down
14 changes: 13 additions & 1 deletion tests/test_run_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
from collections.abc import Generator
from typing import Any
from unittest import mock
from unittest.mock import MagicMock

import bluesky.plan_stubs as bps
Expand All @@ -11,7 +12,7 @@
from bluesky.run_engine import RunEngineResult
from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted

from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan
from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan
from ibex_bluesky_core.version import version


Expand Down Expand Up @@ -146,3 +147,14 @@ def plan():
result = run_plan(plan())
assert result.plan_result == "happy_path_result"
assert result.exit_status == "success"


def test_get_kafka_topic_name():
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"):
assert get_kafka_topic_name() == "FOO_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"):
assert get_kafka_topic_name() == "BAR_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"):
assert get_kafka_topic_name() == "BAZ_bluesky"