Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/cpp/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

nixl_example = executable('nixl_example',
'nixl_example.cpp',
dependencies: [nixl_dep, nixl_infra, nixl_common_deps],
dependencies: [nixl_dep, nixl_infra, nixl_common_deps, nixl_test_utils_dep],
include_directories: [nixl_inc_dirs, utils_inc_dirs],
link_with: [serdes_lib],
install: true)

if etcd_dep.found()
etcd_example = executable('nixl_etcd_example',
'nixl_etcd_example.cpp',
dependencies: [nixl_dep, nixl_infra, nixl_common_deps],
dependencies: [nixl_dep, nixl_infra, nixl_common_deps, nixl_test_utils_dep],
include_directories: [nixl_inc_dirs, utils_inc_dirs],
link_with: [serdes_lib],
install: true)
Expand Down
63 changes: 33 additions & 30 deletions examples/cpp/nixl_etcd_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/
#include <iostream>
#include <cassert>
#include <thread>
#include <chrono>
#include <cstring>

#include "nixl.h"
#include "test_utils.h"


// Change these values to match your etcd setup
const std::string ETCD_ENDPOINT = "http://localhost:2379";
Expand Down Expand Up @@ -130,7 +131,7 @@ int main() {
std::vector<nixl_backend_t> plugins;

ret1 = A1.getAvailPlugins(plugins);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get available plugins", AGENT1_NAME);

std::cout << "Available plugins:\n";

Expand All @@ -140,8 +141,8 @@ int main() {
ret1 = A1.getPluginParams("UCX", mems1, init1);
ret2 = A2.getPluginParams("UCX", mems2, init2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get plugin params for UCX", AGENT1_NAME);
nixl_exit_on_failure(ret2, "Failed to get plugin params for UCX", AGENT2_NAME);

std::cout << "Params before init:\n";
printParams(init1, mems1);
Expand All @@ -151,25 +152,25 @@ int main() {
nixlBackendH* ucx1, *ucx2;
ret1 = A1.createBackend("UCX", init1, ucx1);
ret2 = A2.createBackend("UCX", init2, ucx2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to create UCX backend", AGENT1_NAME);
nixl_exit_on_failure(ret2, "Failed to create UCX backend", AGENT2_NAME);

ret1 = A1.getBackendParams(ucx1, mems1, init1);
ret2 = A2.getBackendParams(ucx2, mems2, init2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get UCX backend params", AGENT1_NAME);
nixl_exit_on_failure(ret2, "Failed to get UCX backend params", AGENT2_NAME);


std::cout << "Params after init:\n";
printParams(init1, mems1);
printParams(init2, mems2);

// Register memory with both agents
status = registerMemory(&addr1, &A1, &dlist1, &extra_params1, ucx1, 0xaa);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to register memory", AGENT1_NAME);
status = registerMemory(&addr2, &A2, &dlist2, &extra_params2, ucx2, 0xbb);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to register memory", AGENT2_NAME);

std::cout << "\nEtcd Metadata Exchange Demo\n";
std::cout << "==========================\n";
Expand All @@ -179,10 +180,10 @@ int main() {

// Both agents send their metadata to etcd
status = A1.sendLocalMD();
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local MD", AGENT1_NAME);

status = A2.sendLocalMD();
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local MD", AGENT2_NAME);

// Give etcd time to process
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -192,11 +193,11 @@ int main() {

// Agent1 fetches metadata for Agent2
status = A1.fetchRemoteMD(AGENT2_NAME);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to fetch remote MD", AGENT1_NAME);

// Agent2 fetches metadata for Agent1
status = A2.fetchRemoteMD(AGENT1_NAME);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to fetch remote MD", AGENT2_NAME);

// Do transfer from Agent 1 to Agent 2
size_t req_size = 8;
Expand Down Expand Up @@ -229,8 +230,10 @@ int main() {
extra_params1.hasNotif = true;
ret1 = A1.createXferReq(NIXL_WRITE, req_src_descs, req_dst_descs, AGENT2_NAME, req_handle, &extra_params1);
std::cout << "Xfer request created, status: " << nixlEnumStrings::statusStr(ret1) << std::endl;
nixl_exit_on_failure(ret1, "Failed to create Xfer Req", AGENT1_NAME);

status = A1.postXferReq(req_handle);
nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to post Xfer Req", AGENT1_NAME);

std::cout << "Transfer was posted\n";

Expand All @@ -240,20 +243,20 @@ int main() {
while (status != NIXL_SUCCESS || n_notifs == 0) {
if (status != NIXL_SUCCESS) status = A1.getXferStatus(req_handle);
if (n_notifs == 0) ret2 = A2.getNotifs(notif_map);
assert (status >= 0);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to get Xfer status", AGENT1_NAME);
nixl_exit_on_failure(ret2, "Failed to get notifs", AGENT2_NAME);
n_notifs = notif_map.size();
}

std::cout << "Transfer verified\n";

ret1 = A1.releaseXferReq(req_handle);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to release Xfer Req", AGENT1_NAME);

ret1 = A1.deregisterMem(dlist1, &extra_params1);
ret2 = A2.deregisterMem(dlist2, &extra_params2);
assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to deregister memory", AGENT1_NAME);
nixl_exit_on_failure(ret2, "Failed to deregister memory", AGENT2_NAME);

// 3. Partial Metadata Exchange
std::cout << "\n3. Sending partial metadata to etcd...\n";
Expand All @@ -274,36 +277,36 @@ int main() {

// Send partial metadata
status = A1.sendLocalPartialMD(empty_dlist1, &conn_params1);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local partial MD", AGENT1_NAME);

status = A2.sendLocalPartialMD(empty_dlist2, &conn_params2);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local partial MD", AGENT2_NAME);

// Send once partial with different label
conn_params1.metadataLabel = PARTIAL_LABEL_2;
status = A1.sendLocalPartialMD(empty_dlist1, &conn_params1);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local partial MD", AGENT1_NAME);

conn_params2.metadataLabel = PARTIAL_LABEL_2;
status = A2.sendLocalPartialMD(empty_dlist2, &conn_params2);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to send local partial MD", AGENT2_NAME);

nixl_opt_args_t fetch_params;
fetch_params.metadataLabel = PARTIAL_LABEL_1;
status = A1.fetchRemoteMD(AGENT2_NAME, &fetch_params);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to fetch remote MD", AGENT1_NAME);

status = A2.fetchRemoteMD(AGENT1_NAME, &fetch_params);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to fetch remote MD", AGENT2_NAME);

std::this_thread::sleep_for(std::chrono::seconds(1));

// 4. Invalidate Metadata
std::cout << "\n4. Invalidating metadata in etcd...\n";

// Invalidate agent1's metadata
// Invalidate AGENT1_NAME's metadata
status = A1.invalidateLocalMD();
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to invalidate local MD", AGENT1_NAME);

std::this_thread::sleep_for(std::chrono::seconds(1));

Expand All @@ -316,14 +319,14 @@ int main() {
// Try invalidating again, this should log a debug message
std::cout << "Trying to invalidate again...\n";
status = A1.invalidateLocalMD();
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to invalidate local MD", AGENT1_NAME);

std::this_thread::sleep_for(std::chrono::seconds(1));

// 5. Fetch metadata with invalid label. This should not block forever and print error message.
std::cout << "\n5. Fetching metadata with invalid label...\n";
status = A2.fetchRemoteMD("INVALID_AGENT", &fetch_params);
assert(status == NIXL_SUCCESS);
nixl_exit_on_failure(status, "Failed to fetch remote MD", AGENT2_NAME);

std::this_thread::sleep_for(std::chrono::seconds(1));

Expand Down
52 changes: 27 additions & 25 deletions examples/cpp/nixl_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <sys/time.h>

#include "nixl.h"
#include "test_utils.h"


std::string agent1("Agent001");
std::string agent2("Agent002");
Expand All @@ -29,7 +31,7 @@ void check_buf(void* buf, size_t len) {

// Do some checks on the data.
for(size_t i = 0; i<len; i++){
assert (((uint8_t*) buf)[i] == 0xbb);
nixl_exit_on_failure((((uint8_t *)buf)[i] == 0xbb), "Data mismatch!", agent1);
}
}

Expand Down Expand Up @@ -89,7 +91,7 @@ main(int argc, char **argv) {
std::vector<nixl_backend_t> plugins;

ret1 = A1.getAvailPlugins(plugins);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get available plugins", agent1);

std::cout << "Available plugins:\n";

Expand All @@ -100,8 +102,8 @@ main(int argc, char **argv) {
ret1 = A1.getPluginParams(backend, mems1, init1);
ret2 = A2.getPluginParams(backend, mems2, init2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get plugin params", agent1);
nixl_exit_on_failure(ret2, "Failed to get plugin params", agent2);

std::cout << "Params before init:\n";
printParams(init1, mems1);
Expand All @@ -115,14 +117,14 @@ main(int argc, char **argv) {
extra_params1.backends.push_back(bknd1);
extra_params2.backends.push_back(bknd2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to create " + backend + " backend", agent1);
nixl_exit_on_failure(ret2, "Failed to create " + backend + " backend", agent2);

ret1 = A1.getBackendParams(bknd1, mems1, init1);
ret2 = A2.getBackendParams(bknd2, mems2, init2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get " + backend + " backend params", agent1);
nixl_exit_on_failure(ret2, "Failed to get " + backend + " backend params", agent2);

std::cout << "Params after init:\n";
printParams(init1, mems1);
Expand Down Expand Up @@ -161,25 +163,22 @@ main(int argc, char **argv) {

ret1 = A1.registerMem(dlist1, &extra_params1);
ret2 = A2.registerMem(dlist2, &extra_params2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to register memory", agent1);
nixl_exit_on_failure(ret2, "Failed to register memory", agent2);

std::string meta1;
ret1 = A1.getLocalMD(meta1);
std::string meta2;
ret2 = A2.getLocalMD(meta2);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to get local MD", agent1);
nixl_exit_on_failure(ret2, "Failed to get local MD", agent2);

std::cout << "Agent1's Metadata: " << meta1 << "\n";
std::cout << "Agent2's Metadata: " << meta2 << "\n";

ret1 = A1.loadRemoteMD (meta2, ret_s1);

assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to load remote MD", agent1);

size_t req_size = 8;
size_t dst_offset = 8;
Expand All @@ -204,9 +203,10 @@ main(int argc, char **argv) {
extra_params1.notifMsg = "notification";
extra_params1.hasNotif = true;
ret1 = A1.createXferReq(NIXL_WRITE, req_src_descs, req_dst_descs, agent2, req_handle, &extra_params1);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to create Xfer Req", agent1);

nixl_status_t status = A1.postXferReq(req_handle);
nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to post Xfer Req", agent1);

std::cout << "Transfer was posted\n";

Expand All @@ -216,31 +216,33 @@ main(int argc, char **argv) {
while (status != NIXL_SUCCESS || n_notifs == 0) {
if (status != NIXL_SUCCESS) status = A1.getXferStatus(req_handle);
if (n_notifs == 0) ret2 = A2.getNotifs(notif_map);
assert (status >= 0);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to post Xfer Req", agent1);
nixl_exit_on_failure(ret2, "Failed to get notifs", agent2);
n_notifs = notif_map.size();
}

std::vector<std::string> agent1_notifs = notif_map[agent1];
assert (agent1_notifs.size() == 1);
assert (agent1_notifs.front() == "notification");
nixl_exit_on_failure((agent1_notifs.size() == 1), "Incorrect notif size", agent1);
nixl_exit_on_failure(
(agent1_notifs.front() == "notification"), "Incorrect notification", agent1);

notif_map[agent1].clear(); // Redundant, for testing
notif_map.clear();
n_notifs = 0;

std::cout << "Transfer verified\n";

ret1 = A1.releaseXferReq(req_handle);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to release Xfer Req", agent1);

ret1 = A1.deregisterMem(dlist1, &extra_params1);
ret2 = A2.deregisterMem(dlist2, &extra_params2);
assert (ret1 == NIXL_SUCCESS);
assert (ret2 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to deregister memory", agent1);
nixl_exit_on_failure(ret2, "Failed to deregister memory", agent2);

//only initiator should call invalidate
ret1 = A1.invalidateRemoteMD(agent2);
assert (ret1 == NIXL_SUCCESS);
nixl_exit_on_failure(ret1, "Failed to invalidate remote MD", agent1);

free(addr1);
free(addr2);
Expand Down
9 changes: 5 additions & 4 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,11 @@ plugins_inc_dirs = include_directories('src/plugins')
utils_inc_dirs = include_directories('src/utils')

subdir('src')

if get_option('buildtype') != 'release'
subdir('test')
subdir('examples')
if get_option('build_tests')
subdir('test')
endif
if get_option('build_examples')
subdir('examples')
endif

if get_option('install_headers')
Expand Down
2 changes: 2 additions & 0 deletions meson_options.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ option('log_level', type: 'combo', choices: ['trace', 'debug', 'info', 'warning'
option('rust', type: 'boolean', value: false, description: 'Build Rust bindings')

# Tests
option('build_tests', type: 'boolean', value: true, description: 'Build all tests')
option('build_examples', type: 'boolean', value: true, description: 'Build all examples')
option('test_all_plugins', type: 'boolean', value: false, description: 'Testing all plugins in addition to the mocks..')
3 changes: 3 additions & 0 deletions src/infra/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ nixl_build_lib = library('nixl_build',
install: true)

nixl_infra = declare_dependency(link_with: nixl_build_lib)

# Test utilities library that can depend on nixl_dep (created after nixl_dep is defined)
# This will be defined in a separate meson file to avoid circular dependencies
Loading