Skip to content

Commit

Permalink
Data Plane - Initial P2P and RDMA Get (#112)
Browse files Browse the repository at this point in the history
This PR is the first stage of bring back the data plane to the SRF runtime.

This adds the following to the `data_plane` namespace:
- `Callbacks` - a struct of static methods used to handle UCX callback on locally initiated UCX transactions, e.g. issuing a tagged send or and RDMA GET.
- `Request` - a struct which holds the state of an async transaction. This object holds a bit more data than just a promise/future pair. I figure the API will have two ways to kick off an async transaction, one that takes a ref to a `Request` and another that return a `Request`. The latter requires a heap allocation, so the former could be used as a subtle optimization for structured concurrency.
- `DataPlaneServerWorker` which is the Runnable that drives the UCX worker's progress method which ultimately executes the UCX callbacks. More functionality will be added to this component over time, specifically using `ucp_nb_probe` to match any incoming events who's payloads were larger than the pre-posted buffers.

The remaining work in this PR is moving the ucx tests into the internal tests binary and re-enables the RDMA get test.

This is not a complete implementation of the UCX Data Plane. #144 was created to address the WIP state.

Authors:
  - Ryan Olson (https://github.com/ryanolson)

Approvers:
  - Devin Robison (https://github.com/drobison00)

URL: #112
  • Loading branch information
ryanolson authored Jul 26, 2022
1 parent e5dd099 commit d1a3569
Show file tree
Hide file tree
Showing 36 changed files with 987 additions and 522 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ include(cmake/setup_iwyu.cmake)

# Keep all source files sorted!!!
add_library(libsrf
src/internal/data_plane/callbacks.cpp
src/internal/data_plane/client.cpp
src/internal/data_plane/resources.cpp
src/internal/data_plane/request.cpp
src/internal/data_plane/server.cpp
src/internal/executor/executor.cpp
src/internal/executor/iexecutor.cpp
Expand Down
22 changes: 7 additions & 15 deletions include/srf/codable/encoded_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <typeindex>
#include <utility>
#include <vector>

namespace srf::codable {

/**
Expand All @@ -60,7 +61,7 @@ class EncodedObject
const protos::EncodedObject& proto() const;

/**
* @brief Access const memory::buffer_view of the RemoteDescriptor at the required index
* @brief Access const memory::buffer_view of the RemoteMemoryDescriptor at the required index
* @return memory::const_buffer_view
*/
memory::const_buffer_view memory_block(std::size_t idx) const;
Expand Down Expand Up @@ -119,20 +120,20 @@ class EncodedObject
memory::buffer_view mutable_memory_block(std::size_t idx) const;

/**
* @brief Converts a memory block to a RemoteDescriptor proto
* @brief Converts a memory block to a RemoteMemoryDescriptor proto
*
* @param view
* @return protos::RemoteDescriptor
* @return protos::RemoteMemoryDescriptor
*/
static protos::RemoteDescriptor encode_descriptor(memory::const_buffer_view view);
static protos::RemoteMemoryDescriptor encode_descriptor(memory::const_buffer_view view);

/**
* @brief Converts a RemoteDescriptor proto to a mutable memory block
* @brief Converts a RemoteMemoryDescriptor proto to a mutable memory block
*
* @param desc
* @return memory::buffer_view
*/
static memory::buffer_view decode_descriptor(const protos::RemoteDescriptor& desc);
static memory::buffer_view decode_descriptor(const protos::RemoteMemoryDescriptor& desc);

/**
* @brief Add a custom protobuf meta data to the descriptor list
Expand Down Expand Up @@ -266,13 +267,4 @@ MetaDataT EncodedObject::meta_data(std::size_t idx) const
return meta_data;
}

std::size_t EncodedObject::add_buffer(std::shared_ptr<memory::memory_resource> mr, std::size_t bytes)
{
CHECK(m_context_acquired);
memory::buffer buff(bytes, mr);
auto index = add_memory_block(buff);
m_buffers[index] = std::move(buff);
return index;
}

} // namespace srf::codable
4 changes: 2 additions & 2 deletions protos/srf/protos/codable.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum MemoryKind
None = 99;
}

message RemoteDescriptor
message RemoteMemoryDescriptor
{
uint32 instance_id = 1;
uint32 object_id = 2;
Expand Down Expand Up @@ -63,7 +63,7 @@ message Descriptor
{
oneof desc
{
RemoteDescriptor remote_desc = 1;
RemoteMemoryDescriptor remote_desc = 1;
PackedDescriptor packed_desc = 2;
EagerDescriptor eager_desc = 3;
MetaDataDescriptor meta_data_desc = 4;
Expand Down
5 changes: 5 additions & 0 deletions src/internal/data_plane/WIP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Work In Progress

Files in this directory should be considered a WIP up until this file is removed from the directory.

See: https://github.com/nv-morpheus/SRF/issues/144
87 changes: 87 additions & 0 deletions src/internal/data_plane/callbacks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "internal/data_plane/callbacks.hpp"

#include "internal/data_plane/request.hpp"

#include <glog/logging.h>
#include <ucp/api/ucp.h>

namespace srf::internal::data_plane {

void Callbacks::send(void* request, ucs_status_t status, void* user_data)
{
DVLOG(10) << "send callback start for request " << request;

DCHECK(user_data);
auto* user_req = static_cast<Request*>(user_data);
DCHECK(user_req->m_state == Request::State::Running);

if (user_req->m_rkey != nullptr)
{
ucp_rkey_destroy(reinterpret_cast<ucp_rkey_h>(user_req->m_rkey));
}

if (status == UCS_OK)
{
ucp_request_free(request);
user_req->m_request = nullptr;
user_req->m_state = Request::State::OK;
}

else if (status == UCS_ERR_CANCELED)
{
ucp_request_free(request);
user_req->m_request = nullptr;
user_req->m_state = Request::State::Cancelled;
}
else
{
// todo(ryan) - set the promise exception ptr
LOG(FATAL) << "data_plane: pre_posted_recv_callback failed with status: " << ucs_status_string(status);
user_req->m_state = Request::State::Error;
}
}

void Callbacks::recv(void* request, ucs_status_t status, const ucp_tag_recv_info_t* msg_info, void* user_data)
{
DCHECK(user_data);
auto* user_req = static_cast<Request*>(user_data);
DCHECK(user_req->m_state == Request::State::Running);

if (status == UCS_OK) // cpp20 [[likely]]
{
ucp_request_free(request);
user_req->m_request = nullptr;
user_req->m_state = Request::State::OK;
}
else if (status == UCS_ERR_CANCELED)
{
ucp_request_free(request);
user_req->m_request = nullptr;
user_req->m_state = Request::State::Cancelled;
}
else
{
// todo(ryan) - set the promise exception ptr
LOG(FATAL) << "data_plane: pre_posted_recv_callback failed with status: " << ucs_status_string(status);
user_req->m_state = Request::State::Error;
}
}

} // namespace srf::internal::data_plane
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,19 @@
* limitations under the License.
*/

#include "internal/data_plane/client_worker.hpp"
#pragma once

#include "internal/ucx/worker.hpp"

#include <boost/fiber/operations.hpp>
#include <ucp/api/ucp_compat.h>
#include <ucp/api/ucp.h>
#include <ucs/memory/memory_type.h>
#include <ucs/type/status.h>

namespace srf::internal::data_plane {

void DataPlaneClientWorker::on_data(void*&& data)
struct Callbacks final
{
while (ucp_request_is_completed(data) == 0)
{
if (m_worker->progress() != 0U)
{
continue;
}
boost::this_fiber::yield();
}
ucp_request_release(data);
}
// internal point-to-point
static void send(void* request, ucs_status_t status, void* user_data);
static void recv(void* request, ucs_status_t status, const ucp_tag_recv_info_t* msg_info, void* user_data);
};

} // namespace srf::internal::data_plane
Loading

0 comments on commit d1a3569

Please sign in to comment.