Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,25 @@ enum ucp_worker_params_field {
UCP_WORKER_PARAM_FIELD_CPU_MASK = UCS_BIT(1), /**< Worker's CPU bitmap */
UCP_WORKER_PARAM_FIELD_EVENTS = UCS_BIT(2), /**< Worker's events bitmap */
UCP_WORKER_PARAM_FIELD_USER_DATA = UCS_BIT(3), /**< User data */
UCP_WORKER_PARAM_FIELD_EVENT_FD = UCS_BIT(4) /**< External event file
UCP_WORKER_PARAM_FIELD_EVENT_FD = UCS_BIT(4), /**< External event file
descriptor */
UCP_WORKER_PARAM_FIELD_FLAGS = UCS_BIT(5) /**< Worker flags */
};


/**
* @ingroup UCP_WORKER
* @brief UCP worker flags
*
* This enumeration allows specifying flags for @ref ucp_worker_params_t.flags,
* which is used as parameter for @ref ucp_worker_create.
*/
typedef enum {
UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK = UCS_BIT(0) /**< Do not print warnings
about request leaks */
} ucp_worker_flags_t;


/**
* @ingroup UCP_WORKER
* @brief UCP listener parameters field mask.
Expand Down Expand Up @@ -1117,6 +1131,14 @@ typedef struct ucp_worker_params {
*/
int event_fd;

/**
* Worker flags.
* This value is optional.
* If @ref UCP_WORKER_PARAM_FIELD_FLAGS is not set in the field_mask, the
* value of this field will default to 0.
*/
uint64_t flags;

} ucp_worker_params_t;


Expand Down
10 changes: 8 additions & 2 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,8 @@ static void ucp_worker_destroy_mpools(ucp_worker_h worker)
ucs_mpool_cleanup(&worker->reg_mp, 1);
ucs_mpool_cleanup(&worker->am_mp, 1);
ucs_mpool_cleanup(&worker->rkey_mp, 1);
ucs_mpool_cleanup(&worker->req_mp, 1);
ucs_mpool_cleanup(&worker->req_mp,
!(worker->flags & UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK));
}

/* All the ucp endpoints will share the configurations. No need for every ep to
Expand Down Expand Up @@ -1981,7 +1982,12 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
}

uct_thread_mode = UCS_THREAD_MODE_SINGLE;
worker->flags = 0;

/* copy user flags, and mask-out unsupported flags for compatibility */
worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) &
UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT);
UCS_STATIC_ASSERT(UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK <
UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT));

if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) {
#if ENABLE_MT
Expand Down
20 changes: 16 additions & 4 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,21 @@
* UCP worker flags
*/
enum {
UCP_WORKER_FLAG_EXTERNAL_EVENT_FD = UCS_BIT(0), /**< worker event fd is external */
UCP_WORKER_FLAG_EDGE_TRIGGERED = UCS_BIT(1), /**< events are edge-triggered */
UCP_WORKER_FLAG_MT = UCS_BIT(2) /**< MT locking is required */
/** Internal worker flags start from this bit index, to co-exist with user
* flags specified when worker is created */
UCP_WORKER_INTERNAL_FLAGS_SHIFT = 32,

/** MT locking is required */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are values moved to allow combining with new params->flags?
Can you please add a comment and some check they are not mixed?

UCP_WORKER_FLAG_MT =
UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 0),

/** Events are edge-triggered */
UCP_WORKER_FLAG_EDGE_TRIGGERED =
UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 1),

/** Worker event fd is external */
UCP_WORKER_FLAG_EXTERNAL_EVENT_FD =
UCS_BIT(UCP_WORKER_INTERNAL_FLAGS_SHIFT + 2)
};


Expand Down Expand Up @@ -202,7 +214,7 @@ struct ucp_worker_cm {
* UCP worker (thread context).
*/
typedef struct ucp_worker {
unsigned flags; /* Worker flags */
uint64_t flags; /* Worker flags */
ucs_async_context_t async; /* Async context for this worker */
ucp_context_h context; /* Back-reference to UCP context */
uint64_t uuid; /* Unique ID for wireup */
Expand Down
105 changes: 61 additions & 44 deletions test/gtest/common/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,69 +207,86 @@ void test_base::push_debug_message_with_limit(std::vector<std::string>& vec,
}

ucs_log_func_rc_t
test_base::hide_errors_logger(const char *file, unsigned line, const char *function,
ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
test_base::common_logger(ucs_log_level_t log_level_to_handle, bool print,
std::vector<std::string> &messages_vec, size_t limit,
const char *file, unsigned line, const char *function,
ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
{
if (level == UCS_LOG_LEVEL_ERROR) {
pthread_mutex_lock(&m_logger_mutex);
va_list ap2;
va_copy(ap2, ap);
m_errors.push_back(format_message(message, ap2));
va_end(ap2);
level = UCS_LOG_LEVEL_DEBUG;
pthread_mutex_unlock(&m_logger_mutex);
if (level != log_level_to_handle) {
return UCS_LOG_FUNC_RC_CONTINUE;
}

// dump the formatted message to a stringstream
va_list ap2;
va_copy(ap2, ap);
std::istringstream iss(format_message(message, ap2));
va_end(ap2);

// save each line of the message to messages_vec, and print it if reqeusted
pthread_mutex_lock(&m_logger_mutex);
std::string message_line;
while (getline(iss, message_line, '\n')) {
push_debug_message_with_limit(messages_vec, message_line, limit);
if (print) {
UCS_TEST_MESSAGE << "< " << message_line << " >";
}
}
pthread_mutex_unlock(&m_logger_mutex);

// if the message was not printed, pass it to default handler in debug level
if (!print) {
ucs_log_default_handler(file, line, function, UCS_LOG_LEVEL_DEBUG,
comp_conf, message, ap);
}

ucs_log_default_handler(file, line, function, level,
&ucs_global_opts.log_component, message, ap);
return UCS_LOG_FUNC_RC_STOP;
}

ucs_log_func_rc_t
test_base::hide_warns_logger(const char *file, unsigned line, const char *function,
ucs_log_level_t level,
test_base::hide_errors_logger(const char *file, unsigned line,
const char *function, ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
{
return common_logger(UCS_LOG_LEVEL_ERROR, false, m_errors,
std::numeric_limits<size_t>::max(), file, line,
function, level, comp_conf, message, ap);
}

ucs_log_func_rc_t
test_base::hide_warns_logger(const char *file, unsigned line,
const char *function, ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
{
if (level == UCS_LOG_LEVEL_WARN) {
pthread_mutex_lock(&m_logger_mutex);
va_list ap2;
va_copy(ap2, ap);
m_warnings.push_back(format_message(message, ap2));
va_end(ap2);
level = UCS_LOG_LEVEL_DEBUG;
pthread_mutex_unlock(&m_logger_mutex);
}

ucs_log_default_handler(file, line, function, level,
&ucs_global_opts.log_component, message, ap);
return UCS_LOG_FUNC_RC_STOP;
return common_logger(UCS_LOG_LEVEL_WARN, false, m_warnings,
std::numeric_limits<size_t>::max(), file, line,
function, level, comp_conf, message, ap);
}

ucs_log_func_rc_t
test_base::wrap_errors_logger(const char *file, unsigned line, const char *function,
ucs_log_level_t level,
test_base::wrap_errors_logger(const char *file, unsigned line,
const char *function, ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
{
/* Ignore warnings about empty memory pool */
if (level == UCS_LOG_LEVEL_ERROR) {
pthread_mutex_lock(&m_logger_mutex);
std::istringstream iss(format_message(message, ap));
std::string text;
while (getline(iss, text, '\n')) {
push_debug_message_with_limit(m_errors, text, 1000);
UCS_TEST_MESSAGE << "< " << text << " >";
}
pthread_mutex_unlock(&m_logger_mutex);
return UCS_LOG_FUNC_RC_STOP;
}
return common_logger(UCS_LOG_LEVEL_ERROR, true, m_errors, 1000, file, line,
function, level, comp_conf, message, ap);
}

return UCS_LOG_FUNC_RC_CONTINUE;
ucs_log_func_rc_t
test_base::wrap_warns_logger(const char *file, unsigned line,
const char *function, ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap)
{
return common_logger(UCS_LOG_LEVEL_WARN, true, m_warnings, 1000, file, line,
function, level, comp_conf, message, ap);
}


unsigned test_base::num_errors()
{
return m_total_errors - m_num_errors_before;
Expand Down
14 changes: 14 additions & 0 deletions test/gtest/common/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class test_base {
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap);

static ucs_log_func_rc_t
wrap_warns_logger(const char *file, unsigned line, const char *function,
ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap);

unsigned num_errors();

unsigned num_warnings();
Expand Down Expand Up @@ -136,6 +142,14 @@ class test_base {
const std::string& message,
const size_t limit);

static ucs_log_func_rc_t
common_logger(ucs_log_level_t log_level_to_handle, bool print,
std::vector<std::string> &messages_vec, size_t limit,
const char *file, unsigned line, const char *function,
ucs_log_level_t level,
const ucs_log_component_config_t *comp_conf,
const char *message, va_list ap);

static void *thread_func(void *arg);

pthread_barrier_t m_barrier;
Expand Down
87 changes: 86 additions & 1 deletion test/gtest/ucp/test_ucp_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class test_ucp_worker_discard : public ucp_test {

pending_reqs.push_back(req);

if (func == ucp_wireup_msg_progress) {
if (func == ucp_wireup_msg_progress) {
req->send.ep = &m_fake_ep;
}

Expand Down Expand Up @@ -462,3 +462,88 @@ UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_ok_not_wait_comp) {
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_worker_discard, all, "all")


class test_ucp_worker_request_leak : public ucp_test {
public:
enum {
LEAK_CHECK,
LEAK_IGNORE
};

static void get_test_variants(std::vector<ucp_test_variant> &variants)
{
add_variant_with_value(variants, UCP_FEATURE_TAG, LEAK_CHECK,
"leak_check");
add_variant_with_value(variants, UCP_FEATURE_TAG, LEAK_IGNORE,
"leak_ignore");
}

bool ignore_leak()
{
return get_variant_value(0) == LEAK_IGNORE;
}

/// @override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor extra / here and below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's supposed to be doxgen comment
https://www.doxygen.nl/manual/docblocks.html

A third alternative is to use a block of at least two C++ comment lines, where each line starts with an additional slash or an exclamation mark. Here are examples of the two cases:

///
/// ... text ...
///

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aa, ok

virtual ucp_worker_params_t get_worker_params()
{
ucp_worker_params_t params = ucp_test::get_worker_params();
if (ignore_leak()) {
params.field_mask |= UCP_WORKER_PARAM_FIELD_FLAGS;
params.flags |= UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK;
}
return params;
}

/// @override
virtual void init()
{
ucp_test::init();
sender().connect(&receiver(), get_ep_params());
}

/// @override
virtual void cleanup()
{
if (ignore_leak()) {
// Should not have warnings if leak check is off
ucp_test::cleanup();
} else {
scoped_log_handler wrap_warn(wrap_warns_logger);
ucp_test::cleanup();
check_leak_warnings(); // Leak check is enabled - expect warnings
}
}

private:
void check_leak_warnings()
{
EXPECT_EQ(2u, m_warnings.size());
for (size_t i = 0; i < m_warnings.size(); ++i) {
std::string::size_type pos = m_warnings[i].find(
"not returned to mpool ucp_requests");
EXPECT_NE(std::string::npos, pos);
}
}
};

UCS_TEST_P(test_ucp_worker_request_leak, tag_send_recv)
{
ucp_request_param_t param;
param.op_attr_mask = UCP_OP_ATTR_FLAG_NO_IMM_CMPL;
void *sreq = ucp_tag_send_nbx(sender().ep(), NULL, 0, 0, &param);
ASSERT_TRUE(UCS_PTR_IS_PTR(sreq));

void *rreq = ucp_tag_recv_nbx(receiver().worker(), NULL, 0, 0, 0, &param);
ASSERT_TRUE(UCS_PTR_IS_PTR(rreq));

UCS_TEST_MESSAGE << "send req: " << sreq << ", recv req: " << rreq;
while ((ucp_request_check_status(sreq) != UCS_OK) ||
(ucp_request_check_status(rreq) != UCS_OK)) {
progress();
}

// Exit the test without releasing the requests
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_worker_request_leak, all, "all")