From a4761976324a0e2597f762e7e8bd0ec9d342ba21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Fri, 5 Jul 2024 17:41:50 +0200 Subject: [PATCH] Indicate which package a schema comes from when missing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes: #187 Signed-off-by: Aurélien Bompard --- fedora_messaging/message.py | 63 +++++++++++++++++++++---- news/187.feature | 1 + tests/integration/test_api.py | 1 + tests/unit/test_cli.py | 14 ++++-- tests/unit/test_message.py | 89 ++++++++++++++++++++++++++++++----- 5 files changed, 144 insertions(+), 24 deletions(-) create mode 100644 news/187.feature diff --git a/fedora_messaging/message.py b/fedora_messaging/message.py index df790d3a..bc174df0 100644 --- a/fedora_messaging/message.py +++ b/fedora_messaging/message.py @@ -27,8 +27,10 @@ import datetime import json import logging +import re import uuid -from importlib.metadata import entry_points +from importlib.metadata import distribution as get_distribution +from importlib.metadata import entry_points, PackageNotFoundError import jsonschema import pika @@ -65,6 +67,7 @@ # Maps string names of message types to classes and back _schema_name_to_class = {} _class_to_schema_name = {} +_schema_name_to_package = {} # Used to load the registry automatically on first use _registry_loaded = False @@ -83,18 +86,32 @@ def get_class(schema_name): Returns: Message: A sub-class of :class:`Message` to create the message from. """ + + return _get_class_from_headers(dict(fedora_messaging_schema=schema_name)) + + +def _get_class_from_headers(headers): global _registry_loaded if not _registry_loaded: load_message_classes() + schema_name = headers["fedora_messaging_schema"] try: return _schema_name_to_class[schema_name] except KeyError: + schema_package = headers.get("fedora_messaging_schema_package") + if schema_package: + package_text = f"You can install the missing schema from package {schema_package!r}" + else: + package_text = ( + "Either install the package with its schema definition or define a schema" + ) + _log.warning( - 'The schema "%s" is not in the schema registry! Either install ' - "the package with its schema definition or define a schema. " + 'The schema "%s" is not in the schema registry! %s. ' "Falling back to the default schema...", schema_name, + package_text, ) return Message @@ -124,6 +141,24 @@ def get_name(cls): ) from e +def _get_distribution_from_module(module): + if not module: + return None + module_parts = module.split(".") + while module_parts: + try: + distribution = get_distribution(".".join(module_parts)) + try: + distribution_name = distribution.name + except AttributeError: # pragma: no cover + # COMPAT: Python <= 3.9 + distribution_name = distribution.metadata["Name"] + except PackageNotFoundError: + return _get_distribution_from_module(".".join(module_parts[:-1])) + # Normalize the name: PEP 503 plus dashes as underscores. + return re.sub(r"[-_.]+", "-", distribution_name).lower().replace("-", "_") + + def load_message_classes(): """Load the 'fedora.messages' entry points and register the message classes.""" try: @@ -141,6 +176,12 @@ def load_message_classes(): ) _schema_name_to_class[message.name] = cls _class_to_schema_name[cls] = message.name + try: + module = message.module + except AttributeError: # pragma: no cover + # COMPAT: Python <= 3.8 + module = message.pattern.match(message.value).group("module") + _schema_name_to_package[message.name] = _get_distribution_from_module(module) global _registry_loaded _registry_loaded = True @@ -167,7 +208,7 @@ def get_message(routing_key, properties, body): properties.headers = {} try: - MessageClass = get_class(properties.headers["fedora_messaging_schema"]) + MessageClass = _get_class_from_headers(properties.headers) except KeyError: _log.error( "Message (headers=%r, body=%r) arrived without a schema header." @@ -320,6 +361,7 @@ class attribute, although this is a convenient approach. Users are "enum": [DEBUG, INFO, WARNING, ERROR], }, "fedora_messaging_schema": {"type": "string"}, + "fedora_messaging_schema_package": {"type": "string"}, "sent-at": {"type": "string"}, }, } @@ -347,7 +389,10 @@ def _build_properties(self, headers): # Consumers use this to determine what schema to use and if they're out # of date. headers = headers.copy() - headers["fedora_messaging_schema"] = get_name(self.__class__) + headers["fedora_messaging_schema"] = schema_name = get_name(self.__class__) + schema_package = _schema_name_to_package.get(schema_name) + if schema_package: + headers["fedora_messaging_schema_package"] = schema_package now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0) headers["sent-at"] = now.isoformat() headers["fedora_messaging_severity"] = self.severity @@ -723,9 +768,11 @@ def load_message(message_dict): jsonschema.validate(message_dict, SERIALIZED_MESSAGE_SCHEMA) except jsonschema.exceptions.ValidationError as e: raise ValidationError(e) from e - MessageClass = get_class( - message_dict.get("headers", {}).get("fedora_messaging_schema", "base.message") - ) + try: + MessageClass = _get_class_from_headers(message_dict.get("headers", {})) + except KeyError: + # No "fedora_messaging_schema" header + MessageClass = Message message = MessageClass( body=message_dict["body"], topic=message_dict["topic"], diff --git a/news/187.feature b/news/187.feature new file mode 100644 index 00000000..cb0c01af --- /dev/null +++ b/news/187.feature @@ -0,0 +1 @@ +Indicate which package a schema comes from when missing diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index 23f16681..d5c652a5 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -120,6 +120,7 @@ def test_twisted_consume_halt_consumer(queue_and_binding): expected_headers = { "fedora_messaging_severity": 20, "fedora_messaging_schema": "base.message", + "fedora_messaging_schema_package": "fedora_messaging", "priority": 0, "niceness": "very", } diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index b43be92f..855312c9 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -703,8 +703,10 @@ def test_save_recorded_messages_when_limit_is_reached(self): test_recorder = cli.Recorder(2, mock_file) test_recorder.collect_message(msg1) mock_file.write.assert_called_with( - '{"body": {"test_key1": "test_value1"}, "headers"' - ': {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, ' + '{"body": {"test_key1": "test_value1"}, "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 20, ' '"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, ' '"id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", ' '"priority": 0, "queue": null, "topic": "test_topic1"}\n' @@ -716,9 +718,11 @@ def test_save_recorded_messages_when_limit_is_reached(self): assert the_exception.exit_code == 0 assert test_recorder.counter == 2 mock_file.write.assert_called_with( - '{"body": {"test_key2": "test_value2"}, "headers": ' - '{"fedora_messaging_schema": "base.message", "fedora_messaging_severity": ' - '20, "priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": ' + '{"body": {"test_key2": "test_value2"}, "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 20, ' + '"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": ' '"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "priority": 0, "queue": null, ' '"topic": "test_topic2"}\n' ) diff --git a/tests/unit/test_message.py b/tests/unit/test_message.py index 573b592c..d0faed5d 100644 --- a/tests/unit/test_message.py +++ b/tests/unit/test_message.py @@ -60,6 +60,23 @@ def test_missing_headers(self): ) assert isinstance(received_msg, message.Message) + def test_missing_schema(self, caplog): + """Assert a missing schema package gives an informative log.""" + msg = message.Message() + msg._headers = { + "fedora_messaging_schema": "dummy", + "fedora_messaging_schema_package": "dummy-package", + "fedora_messaging_severity": message.INFO, + } + received_msg = message.get_message( + msg._encoded_routing_key, msg._properties, msg._encoded_body + ) + assert isinstance(received_msg, message.Message) + assert caplog.messages == [ + 'The schema "dummy" is not in the schema registry! You can install the missing schema ' + "from package 'dummy-package'. Falling back to the default schema..." + ] + @mock.patch.dict(message._class_to_schema_name, {DeprecatedMessage: "deprecated_message_id"}) @mock.patch.dict(message._schema_name_to_class, {"deprecated_message_id": DeprecatedMessage}) def test_deprecated(self, caplog): @@ -86,6 +103,7 @@ def test_proper_message(self): test_id = "test id" test_headers = { "fedora_messaging_schema": "base.message", + "fedora_messaging_schema_package": "fedora_messaging", "fedora_messaging_severity": message.WARNING, } test_properties = pika.BasicProperties( @@ -100,9 +118,11 @@ def test_proper_message(self): test_msg.queue = test_queue expected_json = ( - '{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": ' - '"base.message", "fedora_messaging_severity": 30}, "id": "test id", ' - '"priority": 2, "queue": "test queue", "topic": "test topic"}\n' + '{"body": {"test_key": "test_value"}, "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 30' + '}, "id": "test id", "priority": 2, "queue": "test queue", "topic": "test topic"}\n' ) assert expected_json == message.dumps(test_msg) @@ -114,6 +134,7 @@ def test_proper_message_multiple(self): test_id = "test id" test_headers = { "fedora_messaging_schema": "base.message", + "fedora_messaging_schema_package": "fedora_messaging", "fedora_messaging_severity": message.WARNING, } test_properties = pika.BasicProperties( @@ -128,11 +149,17 @@ def test_proper_message_multiple(self): test_msg.queue = test_queue test_msg2.queue = test_queue expected_json = ( - '{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": ' - '"base.message", "fedora_messaging_severity": 30}, "id": "test id", ' + '{"body": {"test_key": "test_value"}, "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 30' + '}, "id": "test id", ' '"priority": 0, "queue": "test queue", "topic": "test topic"}\n' - '{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": ' - '"base.message", "fedora_messaging_severity": 30}, "id": "test id", ' + '{"body": {"test_key": "test_value"}, "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 30' + '}, "id": "test id", ' '"priority": 0, "queue": "test queue", "topic": "test topic"}\n' ) @@ -152,8 +179,11 @@ class TestMessageLoads: def test_proper_json(self): """Assert loading single message from json work.""" message_json = ( - '{"topic": "test topic", "headers": {"fedora_messaging_schema": "base.message", ' - '"fedora_messaging_severity": 30}, "id": "test id", "body": ' + '{"topic": "test topic", "headers": {' + '"fedora_messaging_schema": "base.message", ' + '"fedora_messaging_schema_package": "fedora_messaging", ' + '"fedora_messaging_severity": 30' + '}, "id": "test id", "body": ' '{"test_key": "test_value"}, "priority": 2, "queue": "test queue"}\n' ) messages = message.loads(message_json) @@ -167,6 +197,7 @@ def test_proper_json(self): assert 2 == test_message.priority assert message.WARNING == test_message._headers["fedora_messaging_severity"] assert "base.message" == test_message._headers["fedora_messaging_schema"] + assert "fedora_messaging" == test_message._headers["fedora_messaging_schema_package"] def test_improper_json(self): """Assert proper exception is raised when improper json is provided.""" @@ -184,10 +215,11 @@ def test_missing_headers(self): } test_message = message.load_message(message_dict) assert test_message._headers["fedora_messaging_schema"] == "base.message" + assert test_message._headers["fedora_messaging_schema_package"] == "fedora_messaging" assert test_message._headers["fedora_messaging_severity"] == message.INFO assert "sent-at" in test_message._headers - def test_missing_messaging_schema(self): + def test_missing_messaging_schema_header(self, caplog): """Assert the default schema is used when messaging schema is missing.""" message_dict = { "id": "test id", @@ -198,6 +230,27 @@ def test_missing_messaging_schema(self): } test_message = message.load_message(message_dict) assert isinstance(test_message, message.Message) + assert caplog.messages == [] + + def test_missing_messaging_schema(self, caplog): + """Assert a helpful message is logged when the schema is missing.""" + message_dict = { + "id": "test id", + "topic": "test topic", + "headers": { + "fedora_messaging_schema": "dummy", + "fedora_messaging_schema_package": "dummy-package", + "fedora_messaging_severity": 30, + }, + "body": {"test_key": "test_value"}, + "queue": "test queue", + } + test_message = message.load_message(message_dict) + assert isinstance(test_message, message.Message) + assert caplog.messages == [ + 'The schema "dummy" is not in the schema registry! You can install the missing schema ' + "from package 'dummy-package'. Falling back to the default schema..." + ] def test_missing_body(self): """Assert proper exception is raised when body is missing.""" @@ -366,13 +419,16 @@ def test_properties_default(self): assert "sent-at" in msg._properties.headers assert "fedora_messaging_schema" in msg._properties.headers assert msg._properties.headers["fedora_messaging_schema"] == "base.message" + assert "fedora_messaging_schema_package" in msg._properties.headers + assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging" def test_headers(self): msg = message.Message(headers={"foo": "bar"}) assert "foo" in msg._properties.headers assert msg._properties.headers["foo"] == "bar" - # The fedora_messaging_schema key must also be added when headers are given. + # The fedora_messaging_schema keys must also be added when headers are given. assert msg._properties.headers["fedora_messaging_schema"] == "base.message" + assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging" def test_severity_default_header_set(self): """Assert the default severity is placed in the header if unspecified.""" @@ -526,9 +582,16 @@ def flatpaks(self): @mock.patch.dict(message._class_to_schema_name, {CustomMessage: "custom_id"}) +@mock.patch.dict(message._schema_name_to_package, {"custom_id": "custom-package"}) class TestCustomMessage: """Tests for a Message subclass that provides filter headers""" + def test_schema_headers(self): + """Assert schema name and package are placed in the message headers.""" + msg = CustomMessage(body={}) + assert msg._headers.get("fedora_messaging_schema") == "custom_id" + assert msg._headers.get("fedora_messaging_schema_package") == "custom-package" + def test_usernames(self): """Assert usernames are placed in the message headers.""" msg = CustomMessage(body={"users": ["jcline", "abompard"]}) @@ -656,3 +719,7 @@ def test_get_name_autoload_once(self): with mock.patch.dict(message._class_to_schema_name, {}, clear=True): with pytest.raises(TypeError): message.get_name("this.is.not.an.entrypoint") + + def test_get_distribution_from_module(self): + """Assert getting the distribution from a non-existing module does not crash.""" + assert message._get_distribution_from_module("does.not.exist") is None