From 5b6761473bcfc0aef04bee34cc6769fea892a5bb Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 15 Dec 2022 11:02:13 +0800 Subject: [PATCH] [cherry-pick] [branch-2.10] Better Python garbage collection management for C++-owned objects (#16535) (#18921) Co-authored-by: Zac Bentley --- .gitignore | 1 + pulsar-client-cpp/python/pulsar/__init__.py | 13 ++++- pulsar-client-cpp/python/pulsar_test.py | 31 ++++++++++ pulsar-client-cpp/python/src/config.cc | 65 ++++++--------------- pulsar-client-cpp/python/src/utils.h | 20 +++++++ 5 files changed, 82 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index cfaf6c8ab075b..c1ed7e2710bcf 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,7 @@ target/ # Python *.pyc +.python-version # Perf tools *.hgrm diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 3773702937088..67b5788e340c7 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -437,8 +437,7 @@ def __init__(self, service_url, conf.concurrent_lookup_requests(concurrent_lookup_requests) if log_conf_file_path: conf.log_conf_file_path(log_conf_file_path) - if logger: - conf.set_logger(logger) + conf.set_logger(self._prepare_logger(logger) if logger else None) if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): conf.use_tls(True) if tls_trust_certs_file_path: @@ -450,6 +449,16 @@ def __init__(self, service_url, self._client = _pulsar.Client(service_url, conf) self._consumers = [] + @staticmethod + def _prepare_logger(logger): + import logging + def log(level, message): + old_threads = logging.logThreads + logging.logThreads = False + logger.log(logging.getLevelName(level), message) + logging.logThreads = old_threads + return log + def create_producer(self, topic, producer_name=None, schema=schema.BytesSchema(), diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 8e1c11342a4f8..4d71c189ae31c 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -19,6 +19,8 @@ # +import threading +import logging from unittest import TestCase, main import time import os @@ -1228,6 +1230,35 @@ def test_json_schema_encode(self): second_encode = schema.encode(record) self.assertEqual(first_encode, second_encode) + def test_logger_thread_leaks(self): + def _do_connect(close): + logger = logging.getLogger(str(threading.current_thread().ident)) + logger.setLevel(logging.INFO) + client = pulsar.Client( + service_url="pulsar://localhost:6650", + io_threads=4, + message_listener_threads=4, + operation_timeout_seconds=1, + log_conf_file_path=None, + authentication=None, + logger=logger, + ) + client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test") + if close: + client.close() + + for should_close in (True, False): + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close)) + _do_connect(should_close) + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)) + threads = [] + for _ in range(10): + threads.append(threading.Thread(target=_do_connect, args=(should_close))) + threads[-1].start() + for thread in threads: + thread.join() + assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close) + def _check_value_error(self, fun): with self.assertRaises(ValueError): fun() diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 2dee1a1183d81..25f4ca51087a9 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -93,27 +93,21 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu return conf; } -class LoggerWrapper : public Logger { - PyObject* const _pyLogger; - const int _pythonLogLevel; +class LoggerWrapper : public Logger, public CaptivePythonObjectMixin { const std::unique_ptr _fallbackLogger; - static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); } - public: - LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger) - : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) { - Py_XINCREF(_pyLogger); - } + LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger) + : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {} LoggerWrapper(const LoggerWrapper&) = delete; LoggerWrapper(LoggerWrapper&&) noexcept = delete; LoggerWrapper& operator=(const LoggerWrapper&) = delete; LoggerWrapper& operator=(LoggerWrapper&&) = delete; - virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); } - - bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; } + bool isEnabled(Level level) { + return true; // Python loggers are always enabled; they decide internally whether or not to log. + } void log(Level level, int line, const std::string& message) { if (!Py_IsInitialized()) { @@ -121,66 +115,45 @@ class LoggerWrapper : public Logger { _fallbackLogger->log(level, line, message); } else { PyGILState_STATE state = PyGILState_Ensure(); - + PyObject *type, *value, *traceback; + PyErr_Fetch(&type, &value, &traceback); try { switch (level) { case Logger::LEVEL_DEBUG: - py::call_method(_pyLogger, "debug", message.c_str()); + py::call(_captive, "DEBUG", message.c_str()); break; case Logger::LEVEL_INFO: - py::call_method(_pyLogger, "info", message.c_str()); + py::call(_captive, "INFO", message.c_str()); break; case Logger::LEVEL_WARN: - py::call_method(_pyLogger, "warning", message.c_str()); + py::call(_captive, "WARNING", message.c_str()); break; case Logger::LEVEL_ERROR: - py::call_method(_pyLogger, "error", message.c_str()); + py::call(_captive, "ERROR", message.c_str()); break; } - } catch (const py::error_already_set& e) { + PyErr_Print(); _fallbackLogger->log(level, line, message); } - + PyErr_Restore(type, value, traceback); PyGILState_Release(state); } } }; -class LoggerWrapperFactory : public LoggerFactory { +class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin { std::unique_ptr _fallbackLoggerFactory{new ConsoleLoggerFactory}; - PyObject* _pyLogger; - Optional _pythonLogLevel{Optional::empty()}; - - void initializePythonLogLevel() { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - int level = py::call_method(_pyLogger, "getEffectiveLevel"); - _pythonLogLevel = Optional::of(level); - } catch (const py::error_already_set& e) { - // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger - _pythonLogLevel = Optional::empty(); - } - - PyGILState_Release(state); - } public: - LoggerWrapperFactory(py::object pyLogger) { - _pyLogger = pyLogger.ptr(); - Py_XINCREF(_pyLogger); - initializePythonLogLevel(); - } - - virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); } + LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {} Logger* getLogger(const std::string& fileName) { const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName); - if (_pythonLogLevel.is_present()) { - return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger); - } else { + if (_captive == py::object().ptr()) { return fallbackLogger; + } else { + return new LoggerWrapper(_captive, fallbackLogger); } } }; diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/utils.h index 5be44732fb704..71d20b7ce4aee 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/utils.h @@ -49,3 +49,23 @@ struct CryptoKeyReaderWrapper { CryptoKeyReaderWrapper(); CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath); }; + +class CaptivePythonObjectMixin { + protected: + PyObject* _captive; + + CaptivePythonObjectMixin(PyObject* captive) { + _captive = captive; + PyGILState_STATE state = PyGILState_Ensure(); + Py_XINCREF(_captive); + PyGILState_Release(state); + } + + ~CaptivePythonObjectMixin() { + if (Py_IsInitialized()) { + PyGILState_STATE state = PyGILState_Ensure(); + Py_XDECREF(_captive); + PyGILState_Release(state); + } + } +};