diff --git a/src/ucp/api/ucp.h b/src/ucp/api/ucp.h index d68995af4fe..47e8867e5e8 100644 --- a/src/ucp/api/ucp.h +++ b/src/ucp/api/ucp.h @@ -165,11 +165,25 @@ enum ucp_worker_params_field { UCP_WORKER_PARAM_FIELD_CPU_MASK = UCS_BIT(1), /**< Worker's CPU bitmap */ UCP_WORKER_PARAM_FIELD_EVENTS = UCS_BIT(2), /**< Worker's events bitmap */ UCP_WORKER_PARAM_FIELD_USER_DATA = UCS_BIT(3), /**< User data */ - UCP_WORKER_PARAM_FIELD_EVENT_FD = UCS_BIT(4) /**< External event file + UCP_WORKER_PARAM_FIELD_EVENT_FD = UCS_BIT(4), /**< External event file descriptor */ + UCP_WORKER_PARAM_FIELD_FLAGS = UCS_BIT(5) /**< Worker flags */ }; +/** + * @ingroup UCP_WORKER + * @brief UCP worker flags + * + * This enumeration allows specifying flags for @ref ucp_worker_params_t.flags, + * which is used as parameter for @ref ucp_worker_create. + */ +typedef enum { + UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK = UCS_BIT(0) /**< Do not print warnings + about request leaks */ +} ucp_worker_flags_t; + + /** * @ingroup UCP_WORKER * @brief UCP listener parameters field mask. @@ -1117,6 +1131,14 @@ typedef struct ucp_worker_params { */ int event_fd; + /** + * Worker flags. + * This value is optional. + * If @ref UCP_WORKER_PARAM_FIELD_FLAGS is not set in the field_mask, the + * value of this field will default to 0. + */ + uint64_t flags; + } ucp_worker_params_t; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 67dd6523f7f..8f743e7209f 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -1806,7 +1806,8 @@ static void ucp_worker_destroy_mpools(ucp_worker_h worker) ucs_mpool_cleanup(&worker->reg_mp, 1); ucs_mpool_cleanup(&worker->am_mp, 1); ucs_mpool_cleanup(&worker->rkey_mp, 1); - ucs_mpool_cleanup(&worker->req_mp, 1); + ucs_mpool_cleanup(&worker->req_mp, + !(worker->flags & UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK)); } /* All the ucp endpoints will share the configurations. No need for every ep to @@ -1981,7 +1982,12 @@ ucs_status_t ucp_worker_create(ucp_context_h context, } uct_thread_mode = UCS_THREAD_MODE_SINGLE; - worker->flags = 0; + + /* copy user flags, and mask-out unsupported flags for compatibility */ + worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) & + UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT); + UCS_STATIC_ASSERT(UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK < + UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT)); if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) { #if ENABLE_MT diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 1481a362720..d98974576ed 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -59,9 +59,21 @@ * UCP worker flags */ enum { - UCP_WORKER_FLAG_EXTERNAL_EVENT_FD = UCS_BIT(0), /**< worker event fd is external */ - UCP_WORKER_FLAG_EDGE_TRIGGERED = UCS_BIT(1), /**< events are edge-triggered */ - UCP_WORKER_FLAG_MT = UCS_BIT(2) /**< MT locking is required */ + /** Internal worker flags start from this bit index, to co-exist with user + * flags specified when worker is created */ + UCP_WORKER_INTERNAL_FLAGS_SHIFT = 32, + + /** MT locking is required */ + UCP_WORKER_FLAG_MT = + UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 0), + + /** Events are edge-triggered */ + UCP_WORKER_FLAG_EDGE_TRIGGERED = + UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 1), + + /** Worker event fd is external */ + UCP_WORKER_FLAG_EXTERNAL_EVENT_FD = + UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 2) }; @@ -202,7 +214,7 @@ struct ucp_worker_cm { * UCP worker (thread context). */ typedef struct ucp_worker { - unsigned flags; /* Worker flags */ + uint64_t flags; /* Worker flags */ ucs_async_context_t async; /* Async context for this worker */ ucp_context_h context; /* Back-reference to UCP context */ uint64_t uuid; /* Unique ID for wireup */ diff --git a/test/gtest/common/test.cc b/test/gtest/common/test.cc index 0cdcb8a46eb..ebacb777ccf 100644 --- a/test/gtest/common/test.cc +++ b/test/gtest/common/test.cc @@ -207,69 +207,86 @@ void test_base::push_debug_message_with_limit(std::vector& vec, } ucs_log_func_rc_t -test_base::hide_errors_logger(const char *file, unsigned line, const char *function, - ucs_log_level_t level, - const ucs_log_component_config_t *comp_conf, - const char *message, va_list ap) +test_base::common_logger(ucs_log_level_t log_level_to_handle, bool print, + std::vector &messages_vec, size_t limit, + const char *file, unsigned line, const char *function, + ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap) { - if (level == UCS_LOG_LEVEL_ERROR) { - pthread_mutex_lock(&m_logger_mutex); - va_list ap2; - va_copy(ap2, ap); - m_errors.push_back(format_message(message, ap2)); - va_end(ap2); - level = UCS_LOG_LEVEL_DEBUG; - pthread_mutex_unlock(&m_logger_mutex); + if (level != log_level_to_handle) { + return UCS_LOG_FUNC_RC_CONTINUE; + } + + // dump the formatted message to a stringstream + va_list ap2; + va_copy(ap2, ap); + std::istringstream iss(format_message(message, ap2)); + va_end(ap2); + + // save each line of the message to messages_vec, and print it if reqeusted + pthread_mutex_lock(&m_logger_mutex); + std::string message_line; + while (getline(iss, message_line, '\n')) { + push_debug_message_with_limit(messages_vec, message_line, limit); + if (print) { + UCS_TEST_MESSAGE << "< " << message_line << " >"; + } + } + pthread_mutex_unlock(&m_logger_mutex); + + // if the message was not printed, pass it to default handler in debug level + if (!print) { + ucs_log_default_handler(file, line, function, UCS_LOG_LEVEL_DEBUG, + comp_conf, message, ap); } - ucs_log_default_handler(file, line, function, level, - &ucs_global_opts.log_component, message, ap); return UCS_LOG_FUNC_RC_STOP; } ucs_log_func_rc_t -test_base::hide_warns_logger(const char *file, unsigned line, const char *function, - ucs_log_level_t level, +test_base::hide_errors_logger(const char *file, unsigned line, + const char *function, ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap) +{ + return common_logger(UCS_LOG_LEVEL_ERROR, false, m_errors, + std::numeric_limits::max(), file, line, + function, level, comp_conf, message, ap); +} + +ucs_log_func_rc_t +test_base::hide_warns_logger(const char *file, unsigned line, + const char *function, ucs_log_level_t level, const ucs_log_component_config_t *comp_conf, const char *message, va_list ap) { - if (level == UCS_LOG_LEVEL_WARN) { - pthread_mutex_lock(&m_logger_mutex); - va_list ap2; - va_copy(ap2, ap); - m_warnings.push_back(format_message(message, ap2)); - va_end(ap2); - level = UCS_LOG_LEVEL_DEBUG; - pthread_mutex_unlock(&m_logger_mutex); - } - - ucs_log_default_handler(file, line, function, level, - &ucs_global_opts.log_component, message, ap); - return UCS_LOG_FUNC_RC_STOP; + return common_logger(UCS_LOG_LEVEL_WARN, false, m_warnings, + std::numeric_limits::max(), file, line, + function, level, comp_conf, message, ap); } ucs_log_func_rc_t -test_base::wrap_errors_logger(const char *file, unsigned line, const char *function, - ucs_log_level_t level, +test_base::wrap_errors_logger(const char *file, unsigned line, + const char *function, ucs_log_level_t level, const ucs_log_component_config_t *comp_conf, const char *message, va_list ap) { - /* Ignore warnings about empty memory pool */ - if (level == UCS_LOG_LEVEL_ERROR) { - pthread_mutex_lock(&m_logger_mutex); - std::istringstream iss(format_message(message, ap)); - std::string text; - while (getline(iss, text, '\n')) { - push_debug_message_with_limit(m_errors, text, 1000); - UCS_TEST_MESSAGE << "< " << text << " >"; - } - pthread_mutex_unlock(&m_logger_mutex); - return UCS_LOG_FUNC_RC_STOP; - } + return common_logger(UCS_LOG_LEVEL_ERROR, true, m_errors, 1000, file, line, + function, level, comp_conf, message, ap); +} - return UCS_LOG_FUNC_RC_CONTINUE; +ucs_log_func_rc_t +test_base::wrap_warns_logger(const char *file, unsigned line, + const char *function, ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap) +{ + return common_logger(UCS_LOG_LEVEL_WARN, true, m_warnings, 1000, file, line, + function, level, comp_conf, message, ap); } + unsigned test_base::num_errors() { return m_total_errors - m_num_errors_before; diff --git a/test/gtest/common/test.h b/test/gtest/common/test.h index d974907e822..f91bd15277b 100644 --- a/test/gtest/common/test.h +++ b/test/gtest/common/test.h @@ -108,6 +108,12 @@ class test_base { const ucs_log_component_config_t *comp_conf, const char *message, va_list ap); + static ucs_log_func_rc_t + wrap_warns_logger(const char *file, unsigned line, const char *function, + ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap); + unsigned num_errors(); unsigned num_warnings(); @@ -136,6 +142,14 @@ class test_base { const std::string& message, const size_t limit); + static ucs_log_func_rc_t + common_logger(ucs_log_level_t log_level_to_handle, bool print, + std::vector &messages_vec, size_t limit, + const char *file, unsigned line, const char *function, + ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap); + static void *thread_func(void *arg); pthread_barrier_t m_barrier; diff --git a/test/gtest/ucp/test_ucp_worker.cc b/test/gtest/ucp/test_ucp_worker.cc index 3fb08a84f67..d3d5113e6fc 100644 --- a/test/gtest/ucp/test_ucp_worker.cc +++ b/test/gtest/ucp/test_ucp_worker.cc @@ -58,7 +58,7 @@ class test_ucp_worker_discard : public ucp_test { pending_reqs.push_back(req); - if (func == ucp_wireup_msg_progress) { + if (func == ucp_wireup_msg_progress) { req->send.ep = &m_fake_ep; } @@ -462,3 +462,88 @@ UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_ok_not_wait_comp) { } UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_worker_discard, all, "all") + + +class test_ucp_worker_request_leak : public ucp_test { +public: + enum { + LEAK_CHECK, + LEAK_IGNORE + }; + + static void get_test_variants(std::vector &variants) + { + add_variant_with_value(variants, UCP_FEATURE_TAG, LEAK_CHECK, + "leak_check"); + add_variant_with_value(variants, UCP_FEATURE_TAG, LEAK_IGNORE, + "leak_ignore"); + } + + bool ignore_leak() + { + return get_variant_value(0) == LEAK_IGNORE; + } + + /// @override + virtual ucp_worker_params_t get_worker_params() + { + ucp_worker_params_t params = ucp_test::get_worker_params(); + if (ignore_leak()) { + params.field_mask |= UCP_WORKER_PARAM_FIELD_FLAGS; + params.flags |= UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK; + } + return params; + } + + /// @override + virtual void init() + { + ucp_test::init(); + sender().connect(&receiver(), get_ep_params()); + } + + /// @override + virtual void cleanup() + { + if (ignore_leak()) { + // Should not have warnings if leak check is off + ucp_test::cleanup(); + } else { + scoped_log_handler wrap_warn(wrap_warns_logger); + ucp_test::cleanup(); + check_leak_warnings(); // Leak check is enabled - expect warnings + } + } + +private: + void check_leak_warnings() + { + EXPECT_EQ(2u, m_warnings.size()); + for (size_t i = 0; i < m_warnings.size(); ++i) { + std::string::size_type pos = m_warnings[i].find( + "not returned to mpool ucp_requests"); + EXPECT_NE(std::string::npos, pos); + } + } +}; + +UCS_TEST_P(test_ucp_worker_request_leak, tag_send_recv) +{ + ucp_request_param_t param; + param.op_attr_mask = UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + void *sreq = ucp_tag_send_nbx(sender().ep(), NULL, 0, 0, ¶m); + ASSERT_TRUE(UCS_PTR_IS_PTR(sreq)); + + void *rreq = ucp_tag_recv_nbx(receiver().worker(), NULL, 0, 0, 0, ¶m); + ASSERT_TRUE(UCS_PTR_IS_PTR(rreq)); + + UCS_TEST_MESSAGE << "send req: " << sreq << ", recv req: " << rreq; + while ((ucp_request_check_status(sreq) != UCS_OK) || + (ucp_request_check_status(rreq) != UCS_OK)) { + progress(); + } + + // Exit the test without releasing the requests +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_worker_request_leak, all, "all")