diff --git a/src/clp_logging/auto_generated_kv_pairs_utils.py b/src/clp_logging/auto_generated_kv_pairs_utils.py index c047d4a..6a6cb8c 100644 --- a/src/clp_logging/auto_generated_kv_pairs_utils.py +++ b/src/clp_logging/auto_generated_kv_pairs_utils.py @@ -75,7 +75,7 @@ def generate( return self._buf -def create_loglib_generated_log_event( +def create_loglib_generated_log_event_as_auto_generated_kv_pairs( timestamp: int, timezone: Optional[str], msg: str ) -> Dict[str, Any]: """ diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index c387d5b..746827f 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -20,7 +20,7 @@ from clp_logging.auto_generated_kv_pairs_utils import ( AutoGeneratedKeyValuePairsBuffer, - create_loglib_generated_log_event, + create_loglib_generated_log_event_as_auto_generated_kv_pairs, ) from clp_logging.protocol import ( BYTE_ORDER, @@ -963,5 +963,5 @@ def _log_level_timeout_callback(self, msg: str) -> None: """ curr_ts: int = floor(time.time() * 1000) self._serialize_kv_pair_log_event( - create_loglib_generated_log_event(curr_ts, self._tz, msg), {} + create_loglib_generated_log_event_as_auto_generated_kv_pairs(curr_ts, self._tz, msg), {} ) diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 22ec377..17ac958 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -28,6 +28,7 @@ LEVEL_KEY, LEVEL_NAME_KEY, LEVEL_NO_KEY, + LOGLIB_GENERATED_MSG_KEY, SOURCE_CONTEXT_KEY, SOURCE_CONTEXT_LINE_KEY, SOURCE_CONTEXT_PATH_KEY, @@ -947,70 +948,6 @@ class TestClpKeyValuePairHandlerLogLevelTimeoutBase(TestClpKeyValuePairLoggingBa _loglevel_timeout: CLPLogLevelTimeout - # override - def _close(self) -> None: - if self._loglevel_timeout.hard_timeout_thread is not None: - self._loglevel_timeout.hard_timeout_thread.cancel() - if self._loglevel_timeout.soft_timeout_thread is not None: - self._loglevel_timeout.soft_timeout_thread.cancel() - super()._close() - - def _setup_handler(self) -> None: - raise NotImplementedError("`_setup_handler` must be implemented by derived testers") - - def _test_timeout( - self, - loglevels: List[int], - log_delay: float, - hard_deltas: Dict[int, int], - soft_deltas: Dict[int, int], - expected_timeout_deltas: List[float], - ) -> None: - """ - Mirror of `TestCLPLogLevelTimeoutBase._test_timeout`. - - :param loglevels: Generate one log for each entry at given log level. - :param log_delay: (fraction of) seconds to sleep between `logger.log` calls. - :param hard_deltas: Deltas in ms to initialize `CLPLogLevelTimeout`. - :param soft_deltas: Deltas in ms to initialize `CLPLogLevelTimeout`. - :param expected_timeout_deltas: Expected elapsed time from start of logging to timeout `i`. - """ - - expected_timeout_count: int = len(expected_timeout_deltas) - # typing for multiprocess.Synchronized* has open issues - # https://github.com/python/typeshed/issues/8799 - # TODO: when the issue is closed we should update the typing here - timeout_ts: SynchronizedArray[c_double] = Array(c_double, [0.0] * expected_timeout_count) - timeout_count: Synchronized[int] = cast("Synchronized[int]", Value(c_int, 0)) - - def timeout_fn() -> None: - nonlocal timeout_ts - nonlocal timeout_count - timeout_ts[timeout_count.value] = c_double(time.time()) - timeout_count.value += 1 - - self._loglevel_timeout = CLPLogLevelTimeout(timeout_fn, hard_deltas, soft_deltas) - self._setup_handler() - - start_ts: float = time.time() - for i, loglevel in enumerate(loglevels): - self._log(loglevel, {"idx": i}) - time.sleep(log_delay) - - # We want sleep long enough so that the final expected timeout can occur, but also ensure - # `time.sleep` receives a non-negative number. - time_to_last_timeout: float = max(0, (start_ts + expected_timeout_deltas[-1]) - time.time()) - time.sleep(time_to_last_timeout) - - self._close_and_compare() - self.assertEqual(timeout_count.value, expected_timeout_count) - for i in range(expected_timeout_count): - self.assertAlmostEqual( - timeout_ts[i], # type: ignore - start_ts + expected_timeout_deltas[i], - delta=ASSERT_TIMESTAMP_DELTA_S, - ) - def test_pushback_soft_timeout(self) -> None: log_delay: float = LOG_DELAY_S soft_delta_s: float = log_delay * 2 @@ -1076,6 +1013,95 @@ def test_end_timeout(self) -> None: ], ) + def test_bad_timeout_dicts(self) -> None: + self._loglevel_timeout = CLPLogLevelTimeout(lambda: None, {}, {}) + self._setup_handler() + loglevel: int = logging.DEBUG + self._log(loglevel, {}) + self._close() + + deserialized_log_events: List[Dict[str, Any]] = self._read_clp() + self.assertEqual(len(deserialized_log_events), 3) + + self.assertEqual( + deserialized_log_events[0][AUTO_GENERATED_KV_PAIRS_KEY][LOGLIB_GENERATED_MSG_KEY], + ( + f" {WARN_PREFIX.lstrip()} log level {loglevel} not in self.hard_timeout_deltas; " + "defaulting to _HARD_TIMEOUT_DELTAS[logging.INFO].\n" + ), + ) + self.assertEqual( + deserialized_log_events[1][AUTO_GENERATED_KV_PAIRS_KEY][LOGLIB_GENERATED_MSG_KEY], + ( + f" {WARN_PREFIX.lstrip()} log level {loglevel} not in self.soft_timeout_deltas; " + "defaulting to _SOFT_TIMEOUT_DELTAS[logging.INFO].\n" + ), + ) + + # override + def _close(self) -> None: + if self._loglevel_timeout.hard_timeout_thread is not None: + self._loglevel_timeout.hard_timeout_thread.cancel() + if self._loglevel_timeout.soft_timeout_thread is not None: + self._loglevel_timeout.soft_timeout_thread.cancel() + super()._close() + + def _setup_handler(self) -> None: + raise NotImplementedError("`_setup_handler` must be implemented by derived testers") + + def _test_timeout( + self, + loglevels: List[int], + log_delay: float, + hard_deltas: Dict[int, int], + soft_deltas: Dict[int, int], + expected_timeout_deltas: List[float], + ) -> None: + """ + Mirror of `TestCLPLogLevelTimeoutBase._test_timeout`. + + :param loglevels: Generate one log for each entry at given log level. + :param log_delay: (fraction of) seconds to sleep between `logger.log` calls. + :param hard_deltas: Deltas in ms to initialize `CLPLogLevelTimeout`. + :param soft_deltas: Deltas in ms to initialize `CLPLogLevelTimeout`. + :param expected_timeout_deltas: Expected elapsed time from start of logging to timeout `i`. + """ + + expected_timeout_count: int = len(expected_timeout_deltas) + # typing for multiprocess.Synchronized* has open issues + # https://github.com/python/typeshed/issues/8799 + # TODO: when the issue is closed we should update the typing here + timeout_ts: SynchronizedArray[c_double] = Array(c_double, [0.0] * expected_timeout_count) + timeout_count: Synchronized[int] = cast("Synchronized[int]", Value(c_int, 0)) + + def timeout_fn() -> None: + nonlocal timeout_ts + nonlocal timeout_count + timeout_ts[timeout_count.value] = c_double(time.time()) + timeout_count.value += 1 + + self._loglevel_timeout = CLPLogLevelTimeout(timeout_fn, hard_deltas, soft_deltas) + self._setup_handler() + + start_ts: float = time.time() + for i, loglevel in enumerate(loglevels): + self._log(loglevel, {"idx": i}) + time.sleep(log_delay) + + # We want sleep long enough so that the final expected timeout can occur, but also ensure + # `time.sleep` receives a non-negative number. + time_to_last_timeout: float = max(0, (start_ts + expected_timeout_deltas[-1]) - time.time()) + time.sleep(time_to_last_timeout) + + self._close_and_compare() + self.assertEqual(timeout_count.value, expected_timeout_count) + for i in range(expected_timeout_count): + self.assertAlmostEqual( + timeout_ts[i], # type: ignore + start_ts + expected_timeout_deltas[i], + delta=ASSERT_TIMESTAMP_DELTA_S, + ) + class TestClpKeyValuePairStreamingHandlerRaw(TestClpKeyValuePairHandlerBase): """