Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions structlog_gcp/base.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
import structlog.processors
from structlog.typing import Processor

from . import errors, processors
from . import error_reporting, processors


def build_processors(
service: str | None = None,
version: str | None = None,
) -> list[Processor]:
procs = []

procs.extend(processors.CoreCloudLogging().setup())
procs.extend(processors.LogSeverity().setup())
procs.extend(processors.CodeLocation().setup())
procs.extend(errors.ReportException().setup())
procs.extend(errors.ReportError(["CRITICAL"]).setup())
procs.extend(errors.ServiceContext(service, version).setup())
procs.extend(processors.FormatAsCloudLogging().setup())
procs: list[Processor] = []

# Add a timestamp in ISO 8601 format.
procs.append(structlog.processors.TimeStamper(fmt="iso"))
procs.append(processors.init_cloud_logging)

procs.extend(processors.setup_log_severity())
procs.extend(processors.setup_code_location())

# Errors: log exceptions
procs.extend(error_reporting.setup_exceptions())

# Errors: formatter for Error Reporting
procs.append(error_reporting.ReportError(["CRITICAL"]))

# Errors: add service context
procs.append(error_reporting.ServiceContext(service, version))

# Finally: Cloud Logging formatter
procs.append(processors.finalize_cloud_logging)

# Format as JSON
procs.append(structlog.processors.JSONRenderer())

return procs
83 changes: 35 additions & 48 deletions structlog_gcp/errors.py → structlog_gcp/error_reporting.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,13 @@
import os

import structlog
import structlog.processors
from structlog.typing import EventDict, Processor, WrappedLogger

from .types import CLOUD_LOGGING_KEY, ERROR_EVENT_TYPE, SOURCE_LOCATION_KEY


class ServiceContext:
def __init__(self, service: str | None = None, version: str | None = None) -> None:
# https://cloud.google.com/functions/docs/configuring/env-var#runtime_environment_variables_set_automatically
if service is None:
service = os.environ.get("K_SERVICE", "unknown service")

if version is None:
version = os.environ.get("K_REVISION", "unknown version")

self.service_context = {"service": service, "version": version}

def setup(self) -> list[Processor]:
return [self]

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Add a service context in which an error has occurred.

This is part of the Error Reporting API, so it's only added when an error happens.
"""

event_type = event_dict[CLOUD_LOGGING_KEY].get("@type")
if event_type != ERROR_EVENT_TYPE:
return event_dict

# https://cloud.google.com/error-reporting/reference/rest/v1beta1/ServiceContext
event_dict[CLOUD_LOGGING_KEY]["serviceContext"] = self.service_context

return event_dict
def setup_exceptions(log_level: str = "CRITICAL") -> list[Processor]:
return [structlog.processors.format_exc_info, ReportException(log_level)]


class ReportException:
Expand All @@ -47,9 +19,6 @@ class ReportException:
def __init__(self, log_level: str = "CRITICAL") -> None:
self.log_level = log_level

def setup(self) -> list[Processor]:
return [structlog.processors.format_exc_info, self]

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
Expand Down Expand Up @@ -80,19 +49,6 @@ class ReportError:
def __init__(self, severities: list[str]) -> None:
self.severities = severities

def setup(self) -> list[Processor]:
return [self]

def _build_service_context(self) -> dict[str, str]:
# https://cloud.google.com/error-reporting/reference/rest/v1beta1/ServiceContext
service_context = {
# https://cloud.google.com/functions/docs/configuring/env-var#runtime_environment_variables_set_automatically
"service": os.environ.get("K_SERVICE", "unknown service"),
"version": os.environ.get("K_REVISION", "unknown version"),
}

return service_context

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
Expand All @@ -108,6 +64,37 @@ def __call__(

event_dict[CLOUD_LOGGING_KEY]["@type"] = ERROR_EVENT_TYPE
event_dict[CLOUD_LOGGING_KEY]["context"] = error_context
event_dict[CLOUD_LOGGING_KEY]["serviceContext"] = self._build_service_context()

# "serviceContext" should be added by the ServiceContext processor.
# event_dict[CLOUD_LOGGING_KEY]["serviceContext"]

return event_dict


class ServiceContext:
def __init__(self, service: str | None = None, version: str | None = None) -> None:
# https://cloud.google.com/functions/docs/configuring/env-var#runtime_environment_variables_set_automatically
if service is None:
service = os.environ.get("K_SERVICE", "unknown service")

if version is None:
version = os.environ.get("K_REVISION", "unknown version")

self.service_context = {"service": service, "version": version}

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Add a service context in which an error has occurred.

This is part of the Error Reporting API, so it's only added when an error happens.
"""

event_type = event_dict[CLOUD_LOGGING_KEY].get("@type")
if event_type != ERROR_EVENT_TYPE:
return event_dict

# https://cloud.google.com/error-reporting/reference/rest/v1beta1/ServiceContext
event_dict[CLOUD_LOGGING_KEY]["serviceContext"] = self.service_context

return event_dict
122 changes: 55 additions & 67 deletions structlog_gcp/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,40 @@
from .types import CLOUD_LOGGING_KEY, SOURCE_LOCATION_KEY


class CoreCloudLogging:
"""Initialize the Google Cloud Logging event message"""
def setup_log_severity() -> list[Processor]:
return [structlog.processors.add_log_level, LogSeverity()]


def setup(self) -> list[Processor]:
return [
# If some value is in bytes, decode it to a unicode str.
structlog.processors.UnicodeDecoder(),
# Add a timestamp in ISO 8601 format.
structlog.processors.TimeStamper(fmt="iso"),
self,
def setup_code_location() -> list[Processor]:
call_site_processors = structlog.processors.CallsiteParameterAdder(
parameters=[
structlog.processors.CallsiteParameter.PATHNAME,
structlog.processors.CallsiteParameter.MODULE,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
]
)

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
value = {
"message": event_dict.pop("event"),
"time": event_dict.pop("timestamp"),
}
return [call_site_processors, code_location]

event_dict[CLOUD_LOGGING_KEY] = value
return event_dict

def init_cloud_logging(
logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Initialize the Google Cloud Logging event message"""

value = {
"message": event_dict.pop("event"),
"time": event_dict.pop("timestamp"),
}

event_dict[CLOUD_LOGGING_KEY] = value
return event_dict


class FormatAsCloudLogging:
def finalize_cloud_logging(
logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Finalize the Google Cloud Logging event message and replace the logging event.

This is not exactly the format the Cloud Logging directly ingests, but
Expand All @@ -43,29 +52,28 @@ class FormatAsCloudLogging:
See: https://cloud.google.com/logging/docs/structured-logging#special-payload-fields
"""

def setup(self) -> list[Processor]:
return [self]
# Take out the Google Cloud Logging set of fields from the event dict
gcp_event: EventDict = event_dict.pop(CLOUD_LOGGING_KEY)

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
# Take out the Google Cloud Logging set of fields from the event dict
gcp_event: EventDict = event_dict.pop(CLOUD_LOGGING_KEY)
# Override whatever is left from the event dict with the content of all
# the Google Cloud Logging-formatted fields.
event_dict.update(gcp_event)

# Override whatever is left from the event dict with the content of all
# the Google Cloud Logging-formatted fields.
event_dict.update(gcp_event)
# Fields which are not known by Google Cloud Logging will be added to
# the `jsonPayload` field.
#
# See the `message` field documentation in:
# https://cloud.google.com/logging/docs/structured-logging#special-payload-fields

# Fields which are not known by Google Cloud Logging will be added to
# the `jsonPayload` field.
# See the `message` field documentation in:
# https://cloud.google.com/logging/docs/structured-logging#special-payload-fields

return event_dict
return event_dict


class LogSeverity:
"""Set the severity using the Google Cloud Logging severities"""
"""Set the severity using the Google Cloud Logging severities.


See: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
"""

def __init__(self) -> None:
self.default = "notset"
Expand All @@ -84,17 +92,10 @@ def __init__(self) -> None:
# "emergency": "EMERGENCY", # One or more systems are unusable.
}

def setup(self) -> list[Processor]:
# Add log level to event dict.
return [structlog.processors.add_log_level, self]

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Format a Python log level value as a GCP log severity.

See: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
"""
"""Format a Python log level value as a GCP log severity."""

log_level = event_dict.pop("level")
severity = self.mapping.get(log_level, self.default)
Expand All @@ -103,30 +104,17 @@ def __call__(
return event_dict


class CodeLocation:
def code_location(
logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
"""Inject the location of the logging message into the logs"""

def setup(self) -> list[Processor]:
# Add callsite parameters.
call_site_proc = structlog.processors.CallsiteParameterAdder(
parameters=[
structlog.processors.CallsiteParameter.PATHNAME,
structlog.processors.CallsiteParameter.MODULE,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
]
)
return [call_site_proc, self]
location = {
"file": event_dict.pop("pathname"),
"line": str(event_dict.pop("lineno")),
"function": f"{event_dict.pop('module')}:{event_dict.pop('func_name')}",
}

def __call__(
self, logger: WrappedLogger, method_name: str, event_dict: EventDict
) -> EventDict:
location = {
"file": event_dict.pop("pathname"),
"line": str(event_dict.pop("lineno")),
"function": f"{event_dict.pop('module')}:{event_dict.pop('func_name')}",
}

event_dict[CLOUD_LOGGING_KEY][SOURCE_LOCATION_KEY] = location
event_dict[CLOUD_LOGGING_KEY][SOURCE_LOCATION_KEY] = location

return event_dict
return event_dict
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def mock_logger_env():
):
yield


@pytest.fixture
def logger(mock_logger_env):
"""Setup a logger for testing and return it"""
Expand Down
6 changes: 3 additions & 3 deletions tests/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_normal(stdout, logger):
"severity": "INFO",
"time": "2023-04-01T08:00:00.000000Z",
}
assert expected == msg
assert msg == expected


def test_error(stdout, logger):
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_error(stdout, logger):
"stack_trace": "oh noes\nTraceback blabla",
"time": "2023-04-01T08:00:00.000000Z",
}
assert expected == msg
assert msg == expected


def test_service_context_default(stdout, logger):
Expand Down Expand Up @@ -144,4 +144,4 @@ def test_extra_labels(stdout, logger):
"test4": {"foo": "bar"},
"test5": {"date": "datetime.date(2023, 1, 1)"},
}
assert expected == msg
assert msg == expected