Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indicate which package a schema comes from when missing #378

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions fedora_messaging/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import datetime
import json
import logging
import re
import uuid
from importlib.metadata import entry_points
from importlib.metadata import distribution as get_distribution
from importlib.metadata import entry_points, PackageNotFoundError

import jsonschema
import pika
Expand Down Expand Up @@ -65,6 +67,7 @@
# Maps string names of message types to classes and back
_schema_name_to_class = {}
_class_to_schema_name = {}
_schema_name_to_package = {}

# Used to load the registry automatically on first use
_registry_loaded = False
Expand All @@ -83,18 +86,32 @@ def get_class(schema_name):
Returns:
Message: A sub-class of :class:`Message` to create the message from.
"""

return _get_class_from_headers(dict(fedora_messaging_schema=schema_name))


def _get_class_from_headers(headers):
global _registry_loaded
if not _registry_loaded:
load_message_classes()

schema_name = headers["fedora_messaging_schema"]
try:
return _schema_name_to_class[schema_name]
except KeyError:
schema_package = headers.get("fedora_messaging_schema_package")
if schema_package:
package_text = f"You can install the missing schema from package {schema_package!r}"
else:
package_text = (
"Either install the package with its schema definition or define a schema"
)

_log.warning(
'The schema "%s" is not in the schema registry! Either install '
"the package with its schema definition or define a schema. "
'The schema "%s" is not in the schema registry! %s. '
"Falling back to the default schema...",
schema_name,
package_text,
)
return Message

Expand Down Expand Up @@ -124,6 +141,24 @@ def get_name(cls):
) from e


def _get_distribution_from_module(module):
if not module:
return None
module_parts = module.split(".")
while module_parts:
try:
distribution = get_distribution(".".join(module_parts))
try:
distribution_name = distribution.name
except AttributeError: # pragma: no cover
# COMPAT: Python <= 3.9
distribution_name = distribution.metadata["Name"]
except PackageNotFoundError:
return _get_distribution_from_module(".".join(module_parts[:-1]))
# Normalize the name: PEP 503 plus dashes as underscores.
return re.sub(r"[-_.]+", "-", distribution_name).lower().replace("-", "_")


def load_message_classes():
"""Load the 'fedora.messages' entry points and register the message classes."""
try:
Expand All @@ -141,6 +176,12 @@ def load_message_classes():
)
_schema_name_to_class[message.name] = cls
_class_to_schema_name[cls] = message.name
try:
module = message.module
except AttributeError: # pragma: no cover
# COMPAT: Python <= 3.8
module = message.pattern.match(message.value).group("module")
_schema_name_to_package[message.name] = _get_distribution_from_module(module)
global _registry_loaded
_registry_loaded = True

Expand All @@ -167,7 +208,7 @@ def get_message(routing_key, properties, body):
properties.headers = {}

try:
MessageClass = get_class(properties.headers["fedora_messaging_schema"])
MessageClass = _get_class_from_headers(properties.headers)
except KeyError:
_log.error(
"Message (headers=%r, body=%r) arrived without a schema header."
Expand Down Expand Up @@ -320,6 +361,7 @@ class attribute, although this is a convenient approach. Users are
"enum": [DEBUG, INFO, WARNING, ERROR],
},
"fedora_messaging_schema": {"type": "string"},
"fedora_messaging_schema_package": {"type": "string"},
"sent-at": {"type": "string"},
},
}
Expand Down Expand Up @@ -347,7 +389,10 @@ def _build_properties(self, headers):
# Consumers use this to determine what schema to use and if they're out
# of date.
headers = headers.copy()
headers["fedora_messaging_schema"] = get_name(self.__class__)
headers["fedora_messaging_schema"] = schema_name = get_name(self.__class__)
schema_package = _schema_name_to_package.get(schema_name)
if schema_package:
headers["fedora_messaging_schema_package"] = schema_package
now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
headers["sent-at"] = now.isoformat()
headers["fedora_messaging_severity"] = self.severity
Expand Down Expand Up @@ -723,9 +768,11 @@ def load_message(message_dict):
jsonschema.validate(message_dict, SERIALIZED_MESSAGE_SCHEMA)
except jsonschema.exceptions.ValidationError as e:
raise ValidationError(e) from e
MessageClass = get_class(
message_dict.get("headers", {}).get("fedora_messaging_schema", "base.message")
)
try:
MessageClass = _get_class_from_headers(message_dict.get("headers", {}))
except KeyError:
# No "fedora_messaging_schema" header
MessageClass = Message
message = MessageClass(
body=message_dict["body"],
topic=message_dict["topic"],
Expand Down
1 change: 1 addition & 0 deletions news/187.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Indicate which package a schema comes from when missing
1 change: 1 addition & 0 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def test_twisted_consume_halt_consumer(queue_and_binding):
expected_headers = {
"fedora_messaging_severity": 20,
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"priority": 0,
"niceness": "very",
}
Expand Down
14 changes: 9 additions & 5 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,10 @@ def test_save_recorded_messages_when_limit_is_reached(self):
test_recorder = cli.Recorder(2, mock_file)
test_recorder.collect_message(msg1)
mock_file.write.assert_called_with(
'{"body": {"test_key1": "test_value1"}, "headers"'
': {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, '
'{"body": {"test_key1": "test_value1"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 20, '
'"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, '
'"id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", '
'"priority": 0, "queue": null, "topic": "test_topic1"}\n'
Expand All @@ -716,9 +718,11 @@ def test_save_recorded_messages_when_limit_is_reached(self):
assert the_exception.exit_code == 0
assert test_recorder.counter == 2
mock_file.write.assert_called_with(
'{"body": {"test_key2": "test_value2"}, "headers": '
'{"fedora_messaging_schema": "base.message", "fedora_messaging_severity": '
'20, "priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'{"body": {"test_key2": "test_value2"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 20, '
'"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "priority": 0, "queue": null, '
'"topic": "test_topic2"}\n'
)
Expand Down
89 changes: 78 additions & 11 deletions tests/unit/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ def test_missing_headers(self):
)
assert isinstance(received_msg, message.Message)

def test_missing_schema(self, caplog):
"""Assert a missing schema package gives an informative log."""
msg = message.Message()
msg._headers = {
"fedora_messaging_schema": "dummy",
"fedora_messaging_schema_package": "dummy-package",
"fedora_messaging_severity": message.INFO,
}
received_msg = message.get_message(
msg._encoded_routing_key, msg._properties, msg._encoded_body
)
assert isinstance(received_msg, message.Message)
assert caplog.messages == [
'The schema "dummy" is not in the schema registry! You can install the missing schema '
"from package 'dummy-package'. Falling back to the default schema..."
]

@mock.patch.dict(message._class_to_schema_name, {DeprecatedMessage: "deprecated_message_id"})
@mock.patch.dict(message._schema_name_to_class, {"deprecated_message_id": DeprecatedMessage})
def test_deprecated(self, caplog):
Expand All @@ -86,6 +103,7 @@ def test_proper_message(self):
test_id = "test id"
test_headers = {
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"fedora_messaging_severity": message.WARNING,
}
test_properties = pika.BasicProperties(
Expand All @@ -100,9 +118,11 @@ def test_proper_message(self):

test_msg.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"priority": 2, "queue": "test queue", "topic": "test topic"}\n'
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", "priority": 2, "queue": "test queue", "topic": "test topic"}\n'
)
assert expected_json == message.dumps(test_msg)

Expand All @@ -114,6 +134,7 @@ def test_proper_message_multiple(self):
test_id = "test id"
test_headers = {
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"fedora_messaging_severity": message.WARNING,
}
test_properties = pika.BasicProperties(
Expand All @@ -128,11 +149,17 @@ def test_proper_message_multiple(self):
test_msg.queue = test_queue
test_msg2.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
)

Expand All @@ -152,8 +179,11 @@ class TestMessageLoads:
def test_proper_json(self):
"""Assert loading single message from json work."""
message_json = (
'{"topic": "test topic", "headers": {"fedora_messaging_schema": "base.message", '
'"fedora_messaging_severity": 30}, "id": "test id", "body": '
'{"topic": "test topic", "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", "body": '
'{"test_key": "test_value"}, "priority": 2, "queue": "test queue"}\n'
)
messages = message.loads(message_json)
Expand All @@ -167,6 +197,7 @@ def test_proper_json(self):
assert 2 == test_message.priority
assert message.WARNING == test_message._headers["fedora_messaging_severity"]
assert "base.message" == test_message._headers["fedora_messaging_schema"]
assert "fedora_messaging" == test_message._headers["fedora_messaging_schema_package"]

def test_improper_json(self):
"""Assert proper exception is raised when improper json is provided."""
Expand All @@ -184,10 +215,11 @@ def test_missing_headers(self):
}
test_message = message.load_message(message_dict)
assert test_message._headers["fedora_messaging_schema"] == "base.message"
assert test_message._headers["fedora_messaging_schema_package"] == "fedora_messaging"
assert test_message._headers["fedora_messaging_severity"] == message.INFO
assert "sent-at" in test_message._headers

def test_missing_messaging_schema(self):
def test_missing_messaging_schema_header(self, caplog):
"""Assert the default schema is used when messaging schema is missing."""
message_dict = {
"id": "test id",
Expand All @@ -198,6 +230,27 @@ def test_missing_messaging_schema(self):
}
test_message = message.load_message(message_dict)
assert isinstance(test_message, message.Message)
assert caplog.messages == []

def test_missing_messaging_schema(self, caplog):
"""Assert a helpful message is logged when the schema is missing."""
message_dict = {
"id": "test id",
"topic": "test topic",
"headers": {
"fedora_messaging_schema": "dummy",
"fedora_messaging_schema_package": "dummy-package",
"fedora_messaging_severity": 30,
},
"body": {"test_key": "test_value"},
"queue": "test queue",
}
test_message = message.load_message(message_dict)
assert isinstance(test_message, message.Message)
assert caplog.messages == [
'The schema "dummy" is not in the schema registry! You can install the missing schema '
"from package 'dummy-package'. Falling back to the default schema..."
]

def test_missing_body(self):
"""Assert proper exception is raised when body is missing."""
Expand Down Expand Up @@ -366,13 +419,16 @@ def test_properties_default(self):
assert "sent-at" in msg._properties.headers
assert "fedora_messaging_schema" in msg._properties.headers
assert msg._properties.headers["fedora_messaging_schema"] == "base.message"
assert "fedora_messaging_schema_package" in msg._properties.headers
assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging"

def test_headers(self):
msg = message.Message(headers={"foo": "bar"})
assert "foo" in msg._properties.headers
assert msg._properties.headers["foo"] == "bar"
# The fedora_messaging_schema key must also be added when headers are given.
# The fedora_messaging_schema keys must also be added when headers are given.
assert msg._properties.headers["fedora_messaging_schema"] == "base.message"
assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging"

def test_severity_default_header_set(self):
"""Assert the default severity is placed in the header if unspecified."""
Expand Down Expand Up @@ -526,9 +582,16 @@ def flatpaks(self):


@mock.patch.dict(message._class_to_schema_name, {CustomMessage: "custom_id"})
@mock.patch.dict(message._schema_name_to_package, {"custom_id": "custom-package"})
class TestCustomMessage:
"""Tests for a Message subclass that provides filter headers"""

def test_schema_headers(self):
"""Assert schema name and package are placed in the message headers."""
msg = CustomMessage(body={})
assert msg._headers.get("fedora_messaging_schema") == "custom_id"
assert msg._headers.get("fedora_messaging_schema_package") == "custom-package"

def test_usernames(self):
"""Assert usernames are placed in the message headers."""
msg = CustomMessage(body={"users": ["jcline", "abompard"]})
Expand Down Expand Up @@ -656,3 +719,7 @@ def test_get_name_autoload_once(self):
with mock.patch.dict(message._class_to_schema_name, {}, clear=True):
with pytest.raises(TypeError):
message.get_name("this.is.not.an.entrypoint")

def test_get_distribution_from_module(self):
"""Assert getting the distribution from a non-existing module does not crash."""
assert message._get_distribution_from_module("does.not.exist") is None