Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClpKeyValuePairStreamHandler to support logging dictionary type log events into CLP key-value pair IR format. #46

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
requires-python = ">=3.7"
dependencies = [
"backports.zoneinfo >= 0.2.1; python_version < '3.9'",
"clp-ffi-py >= 0.0.11",
"clp-ffi-py == 0.1.0b1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to release clp-ffi-py and update the version here before merging this PR?

"typing-extensions >= 3.7.4",
"tzlocal == 5.1; python_version < '3.8'",
"tzlocal >= 5.2; python_version >= '3.8'",
Expand Down
94 changes: 94 additions & 0 deletions src/clp_logging/auto_generated_kv_pairs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
from typing import Any, Dict, Optional

ZONED_TIMESTAMP_KEY: str = "zoned_timestamp"
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: str = "utc_epoch_ms"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Aren't all epoch timestamps UTC?
  • How about unix_timestamp_ms?
    • Technically, ms is ambiguous since it could mean Ms or ms due to capitalization, so I would rather use millisecs for the least ambiguity; but I guess that would be a larger change across our codebase.

ZONED_TIMESTAMP_TZ_KEY: str = "timezone"
Comment on lines +4 to +6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the ZONED_? Especially considering the user may pass in None for the timezone?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to use UTC offset rather than TZ_ID for all the reasons we discussed earlier this year?


LEVEL_KEY: str = "level"
LEVEL_NO_KEY: str = "no"
LEVEL_NAME_KEY: str = "name"

SOURCE_CONTEXT_KEY: str = "source_context"
SOURCE_CONTEXT_PATH_KEY: str = "path"
SOURCE_CONTEXT_LINE_KEY: str = "line"

LOGLIB_GENERATED_MSG_KEY: str = "loglib_generated_msg"


class AutoGeneratedKeyValuePairsBuffer:
"""
A reusable buffer for creating auto-generated key-value pairs for log
events.
Comment on lines +21 to +22
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
A reusable buffer for creating auto-generated key-value pairs for log
events.
A reusable buffer for auto-generated key-value pairs.


This buffer maintains a predefined dictionary structure for common metadata
fields, allowing efficient reuse without creating new dictionaries for each
log event.
Comment on lines +24 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This buffer maintains a predefined dictionary structure for common metadata
fields, allowing efficient reuse without creating new dictionaries for each
log event.
This buffer maintains a predefined dictionary for common metadata fields,
to enable efficient reuse without creating new dictionaries for each log
event.

"""

def __init__(self) -> None:
self._buf: Dict[str, Any] = {
ZONED_TIMESTAMP_KEY: {
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: None,
ZONED_TIMESTAMP_TZ_KEY: None,
},
LEVEL_KEY: {
LEVEL_NO_KEY: None,
LEVEL_NAME_KEY: None,
},
SOURCE_CONTEXT_KEY: {
SOURCE_CONTEXT_PATH_KEY: None,
SOURCE_CONTEXT_LINE_KEY: None,
},
}

def generate(
self, timestamp: int, timezone: Optional[str], record: logging.LogRecord
) -> Dict[str, Any]:
"""
Generated auto-generated key-value pairs by populating the underlying
buffer with the given log event metadata.
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Generated auto-generated key-value pairs by populating the underlying
buffer with the given log event metadata.
Generates the auto-generated key-value pairs by populating the
underlying buffer with the given log event metadata.


:param timestamp: The Unix epoch timestamp in millisecond of the log
event.
:param timezone: The timezone of the log event, or None if not
applicable.
Comment on lines +52 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param timestamp: The Unix epoch timestamp in millisecond of the log
event.
:param timezone: The timezone of the log event, or None if not
applicable.
:param timestamp: The log event's millisecond Unix timestamp.
:param timezone: The timezone of the log event, or `None` if UTC.

:param record: The LogRecord containing metadata for the log event.
:return: The populated underlying buffer as the auto-generated key-value
pairs.
"""

self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY] = timestamp
self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_TZ_KEY] = timezone

# NOTE: We don't serialize all the metadata given by `record`. Currently, we only add the
# following metadata into auto-generated kv pairs:
# - log level
# - source context
Comment on lines +64 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# NOTE: We don't serialize all the metadata given by `record`. Currently, we only add the
# following metadata into auto-generated kv pairs:
# - log level
# - source context
# NOTE: We don't add all the metadata contained in `record`. Instead, we only add the
# following fields:
# - Log level
# - Source context


self._buf[LEVEL_KEY][LEVEL_NO_KEY] = record.levelno
self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname

self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_PATH_KEY] = record.pathname
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_LINE_KEY] = record.lineno

return self._buf


def create_loglib_generated_log_event_as_auto_generated_kv_pairs(
timestamp: int, timezone: Optional[str], msg: str
) -> Dict[str, Any]:
"""
:param timestamp: The Unix epoch timestamp in millisecond of the log event.
:param timezone: The timezone of the log event, or None if not applicable.
Comment on lines +82 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param timestamp: The Unix epoch timestamp in millisecond of the log event.
:param timezone: The timezone of the log event, or None if not applicable.
:param timestamp: The log event's millisecond Unix timestamp.
:param timezone: The timezone of the log event, or `None` if UTC.

:param msg: The log message generated by the logging library.
:return: The auto-generated key-value pairs that represents a log event generated by the logging
library itself.
Comment on lines +85 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:return: The auto-generated key-value pairs that represents a log event generated by the logging
library itself.
:return: The auto-generated key-value pairs that represent a log event generated by the logging
library itself.

"""
return {
ZONED_TIMESTAMP_KEY: {
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: timestamp,
ZONED_TIMESTAMP_TZ_KEY: timezone,
},
LOGLIB_GENERATED_MSG_KEY: msg,
}
181 changes: 177 additions & 4 deletions src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import socket
import sys
import time
import warnings
from abc import ABCMeta, abstractmethod
from math import floor
from pathlib import Path
from queue import Empty, Queue
from signal import SIGINT, signal, SIGTERM
from threading import Thread, Timer
from types import FrameType
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union

import tzlocal
from clp_ffi_py.ir import FourByteEncoder
from clp_ffi_py.ir import FourByteEncoder, Serializer
from clp_ffi_py.utils import serialize_dict_to_msgpack
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor

from clp_logging.auto_generated_kv_pairs_utils import (
AutoGeneratedKeyValuePairsBuffer,
create_loglib_generated_log_event_as_auto_generated_kv_pairs,
)
from clp_logging.protocol import (
BYTE_ORDER,
EOF_CHAR,
Expand All @@ -31,6 +37,8 @@

DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s"
WARN_PREFIX: str = " [WARN][clp_logging]"
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs"
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs"


def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]:
Expand Down Expand Up @@ -216,11 +224,11 @@ def __init__(
self.timeout_fn: Callable[[], None] = timeout_fn
self.next_hard_timeout_ts: int = ULONG_MAX
self.min_soft_timeout_delta: int = ULONG_MAX
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes]]] = None
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes], Serializer]] = None
self.hard_timeout_thread: Optional[Timer] = None
self.soft_timeout_thread: Optional[Timer] = None

def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes]]) -> None:
def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes], Serializer]) -> None:
self.ostream = ostream

def timeout(self) -> None:
Expand Down Expand Up @@ -792,3 +800,168 @@ def __init__(
super().__init__(
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout
)


class ClpKeyValuePairStreamHandler(logging.Handler):
"""
A custom logging handler that processes log events containing key-value
pairs and serializes them into the CLP key-value pair IR format.
Comment on lines +807 to +808
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
A custom logging handler that processes log events containing key-value
pairs and serializes them into the CLP key-value pair IR format.
A custom logging handler that serializes key-value pair log events into the
CLP key-value pair IR format.


Differences from `logging.StreamHandler`:
- Expects log events (`logging.LogRecord`) to include key-value pairs represented as a Python
dictionary.
Comment on lines +811 to +812
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a difference since StreamHandler accepts a LogRecord and LogRecord can have the msg be an arbitrary object?

- Serializes the key-value pairs into the CLP key-value pair IR format before writing to the
stream.

Rules for key-value pair representation:
- Key:
- Must be of type `str`.
- Value:
- Must be one of the following types:
- Primitive types: `int`, `float`, `str`, `bool`, or `None`.
- Arrays:
- May contain primitive values, dictionaries, or nested arrays.
- Can be empty.
- Dictionaries:
- Must adhere to the same key-value rules.
- Can be empty.

Comment on lines +816 to +828
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NTS: This needs refactoring.

:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.
Comment on lines +829 to +830
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.
:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.

:param enable_compression: Whether to compress the serialized IR byte sequences using zstd.
:param loglevel_timeout: Customized timeout configuration.
"""

def __init__(
self,
stream: IO[bytes],
enable_compression: bool = True,
timezone: Optional[str] = None,
loglevel_timeout: Optional[CLPLogLevelTimeout] = None,
) -> None:
super().__init__()

self._enable_compression: bool = enable_compression
self._tz: Optional[str] = timezone
self._loglevel_timeout: Optional[CLPLogLevelTimeout] = loglevel_timeout
self._serializer: Optional[Serializer] = None
self._formatter: Optional[logging.Formatter] = None
self._ostream: IO[bytes] = stream

self._auto_generated_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = (
AutoGeneratedKeyValuePairsBuffer()
)

self._init_new_serializer(stream)

# override
def setFormatter(self, fmt: Optional[logging.Formatter]) -> None:
if fmt is None:
return
warnings.warn(
f"Formatter is currently not supported in the current {self.__class__.__name__}",
category=RuntimeWarning,
)
self._formatter = fmt

# override
def emit(self, record: logging.LogRecord) -> None:
"""
Overrides `logging.Handler.emit` to ensure `logging.Handler.handleError`
is always called and avoid requiring a `logging.LogRecord` to call
internal writing functions.
Comment on lines +871 to +872
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the "avoid requiring...`" part.


:param record: The log event to serialize.
"""
try:
self._write(record)
except Exception:
self.handleError(record)

# Added to `logging.StreamHandler` in python 3.7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Added to `logging.StreamHandler` in python 3.7
# Added to `logging.StreamHandler` in Python 3.7

# override
def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]:
if stream is self._ostream:
return None
old_stream: IO[bytes] = self._ostream
self._ostream = stream
# TODO: The following call will close the old stream. However, `logging.StreamHandler`'s
# implementation will only flush the stream but leave it opened. To support this behaviour,
# we need `clp_ffi_py.ir.Serializer` to allow closing the serializer without closing the
# underlying output stream.
self._init_new_serializer(stream)
return old_stream

# override
def close(self) -> None:
if self._serializer is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use _is_closed() here, no?

return
if self._loglevel_timeout:
self._loglevel_timeout.timeout()
# NOTE: Closing the serializer will ensure that any buffered results are flushed and the
# underlying output stream is properly closed.
self._serializer.close()
self._serializer = None
super().close()

def _is_closed(self) -> bool:
return self._serializer is None

def _init_new_serializer(self, stream: IO[bytes]) -> None:
"""
Initializes a new serializer that writes to the given stream.

:param stream: The stream that the underlying serializer will write to.
"""
cctx: ZstdCompressor = ZstdCompressor()
self._serializer = Serializer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Shouldn't we close the existing Serializer before replacing it?
  • We should note that we close the stream in the docstring, right?

cctx.stream_writer(stream) if self._enable_compression else stream
)

def _write(self, record: logging.LogRecord) -> None:
"""
Writes the log event into the underlying serializer.

:param record: The log event to serialize.
:raise IOError: If the handler has been already closed.
:raise TypeError: If `record.msg` is not a Python dictionary.
"""
if self._is_closed():
raise IOError("The handler has been closed.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why IOError instead of RuntimeError used in other handlers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise IOError("The handler has been closed.")
raise IOError("Stream already closed.")


if not isinstance(record.msg, dict):
raise TypeError("The log msg must be a valid Python dictionary.")

curr_ts: int = floor(time.time() * 1000)

if self._loglevel_timeout is not None:
self._loglevel_timeout.update(record.levelno, curr_ts, self._log_level_timeout_callback)

self._serialize_kv_pair_log_event(
self._auto_generated_kv_pairs_buf.generate(curr_ts, self._tz, record), record.msg
)

def _serialize_kv_pair_log_event(
self, auto_generated_kv_pairs: Dict[str, Any], user_generated_kv_pairs: Dict[str, Any]
) -> None:
"""
:param auto_generated_kv_pairs: A dict of auto generated kv pairs.
:param user_generated_kv_pairs: A dict of user generated kv pairs.
"""
log_event: Dict[str, Any] = {
AUTO_GENERATED_KV_PAIRS_KEY: auto_generated_kv_pairs,
USER_GENERATED_KV_PAIRS_KEY: user_generated_kv_pairs,
}
Comment on lines +951 to +954
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I would've thought that before we have support for auto-generated kv-pairs in clp-ffi-py, we would make everything a user-generated kv-pair, except auto-generated kv-pairs would be prefixed with @. The current approach makes it so that the user has no way of knowing how to query their logs unless they know what we're doing here, right?

assert self._serializer is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why assert here but not in _log_leve_timeout_callback?

self._serializer.serialize_log_event_from_msgpack_map(serialize_dict_to_msgpack(log_event))

def _log_level_timeout_callback(self, msg: str) -> None:
"""
Callback for `CLPLogLevelTimeout` to log internal errors.

:param msg: The message sent from `CLPLogLevelTimeout`.`
"""
curr_ts: int = floor(time.time() * 1000)
self._serialize_kv_pair_log_event(
create_loglib_generated_log_event_as_auto_generated_kv_pairs(curr_ts, self._tz, msg), {}
)
9 changes: 8 additions & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import unittest
from typing import Iterable, Optional, Union

from tests.test_handlers import TestCLPBase, TestCLPSegmentStreamingBase
from tests.test_handlers import (
TestCLPBase,
TestClpKeyValuePairLoggingBase,
TestCLPSegmentStreamingBase,
)


def add_tests(suite: unittest.TestSuite, loader: unittest.TestLoader, test_class: type) -> None:
Expand Down Expand Up @@ -35,4 +39,7 @@ def load_tests(
for seg_test_class in TestCLPSegmentStreamingBase.__subclasses__():
add_tests(suite, loader, seg_test_class)

for kv_pair_handler_test_class in TestClpKeyValuePairLoggingBase.__subclasses__():
add_tests(suite, loader, kv_pair_handler_test_class)

return suite
Loading
Loading