Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum {
UCP_REQUEST_FLAG_CALLBACK = UCS_BIT(6),
UCP_REQUEST_FLAG_PROTO_INITIALIZED = UCS_BIT(7),
UCP_REQUEST_FLAG_SYNC = UCS_BIT(8),
UCP_REQUEST_FLAG_PROTO_AMO_PACKED = UCS_BIT(9),
UCP_REQUEST_FLAG_OFFLOADED = UCS_BIT(10),
UCP_REQUEST_FLAG_BLOCK_OFFLOAD = UCS_BIT(11),
UCP_REQUEST_FLAG_STREAM_RECV_WAITALL = UCS_BIT(12),
Expand Down
8 changes: 8 additions & 0 deletions src/ucp/proto/proto_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,14 @@ void ucp_proto_request_zcopy_reset(ucp_request_t *request)
ucp_proto_request_zcopy_clean(request, UCP_DT_MASK_ALL);
}

void ucp_proto_get_reset(ucp_request_t *request)
{
if (request->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED) {
ucp_send_request_id_release(request);
request->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}
}

int ucp_proto_is_short_supported(const ucp_proto_select_param_t *select_param)
{
/* Short protocol requires contig/host */
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/proto/proto_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,6 @@ void ucp_proto_request_zcopy_abort(ucp_request_t *request, ucs_status_t status);

void ucp_proto_request_zcopy_reset(ucp_request_t *request);

void ucp_proto_get_reset(ucp_request_t *request);

#endif
11 changes: 9 additions & 2 deletions src/ucp/rma/amo_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ ucp_proto_amo_progress(uct_pending_req_t *self, ucp_operation_id_t op_id,
spriv->super.rkey_index);

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
pack_arg(req, op_size);
if (!(req->flags & UCP_REQUEST_FLAG_PROTO_AMO_PACKED)) {
pack_arg(req, op_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to update ucp_atomic_op_nbx so it will return UCS_OK only if UCP_REQUEST_FLAG_PROTO_AMO_PACKED flag is set, and then also (for debug build) set buffer to NULL to make sure it's not used anymore

req->flags |= UCP_REQUEST_FLAG_PROTO_AMO_PACKED;
}

if (op_id != UCP_OP_ID_AMO_POST) {
ucp_proto_completion_init(&req->send.state.uct_comp,
ucp_proto_amo_completed);
Expand Down Expand Up @@ -198,7 +202,10 @@ ucp_proto_amo_init(const ucp_proto_init_params_t *init_params,
.desc = #_sub_id, \
.init = ucp_amo_init_##_id, \
.query = ucp_proto_single_query, \
.progress = {ucp_amo_progress_##_id} \
.progress = {ucp_amo_progress_##_id}, \
.abort = (ucp_request_abort_func_t) \
ucs_empty_function_fatal_not_implemented_void, \
.reset = ucp_proto_request_bcopy_reset \
};

#define UCP_PROTO_AMO_REGISTER_MTYPE(_id, _op_id, _bits) \
Expand Down
20 changes: 12 additions & 8 deletions src/ucp/rma/amo_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_atomic_op_nbx,
ep, &ucp_rkey_config(ep->worker, rkey)->proto_select,
rkey->cfg_index, req, op_id, buffer, 1, param->datatype,
op_size, param, 0, 0);
if (ucs_likely(req->flags & UCP_REQUEST_FLAG_PROTO_AMO_PACKED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to set AMO_PACKED flag only for t protocols, to avoid the branch on "op_id == UCP_OP_ID_AMO_POST" in line 264?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on if pack_arg must be called for fetch (non-post) ops as well. As far as I can see, in current implementation, it does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe introduce another flag , UCP_REQUEST_FLAG_PROTO_AMO_COMPLETED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's confusing since buffered (packed) value actually does not complete operation. Also such flag will have different behavior for different ops.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

#if ENABLE_ASSERT
req->send.state.dt_iter.type.contig.buffer = NULL;
#endif
} else {
/* We cannot return UCS_OK if value was not packed, need to block
* freeing or reusage of buffer */
goto out;
}
} else {
status = UCP_RKEY_RESOLVE(rkey, ep, amo);
if (status != UCS_OK) {
Expand Down Expand Up @@ -272,16 +281,11 @@ ucs_status_t ucp_atomic_post(ucp_ep_h ep, ucp_atomic_post_op_t opcode, uint64_t
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE,
.datatype = ucp_dt_make_contig(op_size)
};
ucs_status_ptr_t status_p;
void *request;

status_p = ucp_atomic_op_nbx(ep, ucp_post_atomic_op_table[opcode], &value,
request = ucp_atomic_op_nbx(ep, ucp_post_atomic_op_table[opcode], &value,
1, remote_addr, rkey, &param);
if (UCS_PTR_IS_PTR(status_p)) {
ucp_request_release(status_p);
return UCS_OK;
}

return UCS_PTR_STATUS(status_p);
return ucp_rma_wait(ep->worker, request, "post");
}

static inline ucs_status_t
Expand Down
16 changes: 11 additions & 5 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ ucp_amo_sw_progress(uct_pending_req_t *self, uct_pack_callback_t pack_cb,
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucs_status_t status;

ucs_assert(req->flags & UCP_REQUEST_FLAG_PROTO_AMO_PACKED);

req->send.lane = ucp_ep_get_am_lane(req->send.ep);
if (fetch) {
ucp_send_request_id_alloc(req);
Expand Down Expand Up @@ -360,14 +362,18 @@ ucp_proto_amo_sw_progress(uct_pending_req_t *self, uct_pack_callback_t pack_cb,
ucs_status_t status;

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
if (!(req->flags & UCP_REQUEST_FLAG_PROTO_AMO_PACKED)) {
ucp_datatype_iter_next_pack(&req->send.state.dt_iter,
req->send.ep->worker, SIZE_MAX,
&next_iter, &req->send.amo.value);
req->flags |= UCP_REQUEST_FLAG_PROTO_AMO_PACKED;
}

status = ucp_ep_resolve_remote_id(req->send.ep, spriv->super.lane);
if (status != UCS_OK) {
return status;
}

ucp_datatype_iter_next_pack(&req->send.state.dt_iter,
req->send.ep->worker, SIZE_MAX,
&next_iter, &req->send.amo.value);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}

Expand Down Expand Up @@ -426,7 +432,7 @@ ucp_proto_t ucp_get_amo_post_proto = {
.query = ucp_proto_single_query,
.progress = {ucp_proto_amo_sw_progress_post},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_bcopy_reset
};

static ucs_status_t ucp_proto_amo_sw_progress_fetch(uct_pending_req_t *self)
Expand Down Expand Up @@ -455,5 +461,5 @@ ucp_proto_t ucp_get_amo_fetch_proto = {
.query = ucp_proto_single_query,
.progress = {ucp_proto_amo_sw_progress_fetch},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_get_reset
};
2 changes: 1 addition & 1 deletion src/ucp/rma/get_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ ucp_proto_t ucp_get_am_bcopy_proto = {
.query = ucp_proto_single_query,
.progress = {ucp_proto_get_am_bcopy_progress},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_get_reset
};
4 changes: 2 additions & 2 deletions src/ucp/rma/get_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ ucp_proto_t ucp_get_offload_bcopy_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_proto_get_offload_bcopy_progress},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_bcopy_reset
};

static UCS_F_ALWAYS_INLINE ucs_status_t
Expand Down Expand Up @@ -206,5 +206,5 @@ ucp_proto_t ucp_get_offload_zcopy_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_proto_get_offload_zcopy_progress},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_zcopy_reset
};
2 changes: 1 addition & 1 deletion src/ucp/rma/put_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,5 @@ ucp_proto_t ucp_put_am_bcopy_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_proto_put_am_bcopy_progress},
.abort = ucp_proto_request_bcopy_abort,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_bcopy_reset
};
6 changes: 3 additions & 3 deletions src/ucp/rma/put_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ucp_proto_t ucp_put_offload_short_proto = {
.query = ucp_proto_single_query,
.progress = {ucp_proto_put_offload_short_progress},
.abort = (ucp_request_abort_func_t)ucs_empty_function_fatal_not_implemented_void,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_bcopy_reset
};

static size_t ucp_proto_put_offload_bcopy_pack(void *dest, void *arg)
Expand Down Expand Up @@ -188,7 +188,7 @@ ucp_proto_t ucp_put_offload_bcopy_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_proto_put_offload_bcopy_progress},
.abort = ucp_proto_request_bcopy_abort,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_bcopy_reset
};

static UCS_F_ALWAYS_INLINE ucs_status_t
Expand Down Expand Up @@ -273,5 +273,5 @@ ucp_proto_t ucp_put_offload_zcopy_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_proto_put_offload_zcopy_progress},
.abort = ucp_proto_request_zcopy_abort,
.reset = (ucp_request_reset_func_t)ucs_empty_function_fatal_not_implemented_void
.reset = ucp_proto_request_zcopy_reset
};