Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
BasedOnStyle: Google

IndentWidth: 4
ColumnLimit: 100
ContinuationIndentWidth: 8
UseTab: Never
MaxEmptyLinesToKeep: 2

SortIncludes: true
CompactNamespaces: false
ReflowComments: true

DerivePointerAlignment: false
PointerAlignment: Left

AllowShortIfStatementsOnASingleLine: true
AllowShortFunctionsOnASingleLine: Inline

AccessModifierOffset: -4

AllowShortBlocksOnASingleLine: true
ReferenceAlignment: Right
AlignConsecutiveDeclarations: Consecutive
AlwaysBreakAfterReturnType: None
AlignOperands: DontAlign
AlignAfterOpenBracket: DontAlign
BreakBeforeBraces: Linux
BraceWrapping:
AfterClass: false
AfterControlStatement: false
AfterEnum: false
AfterFunction: true
AfterNamespace: true
AfterStruct: false
AfterUnion: false
BeforeCatch: true

ConstructorInitializerAllOnOneLineOrOnePerLine: false

IndentCaseLabels: true
122 changes: 49 additions & 73 deletions benchmark/nixlbench/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@
* limitations under the License.
*/

#include "config.h"
#include <iostream>
#include <gflags/gflags.h>
#include <nixl.h>
#include <sys/time.h>
#include <gflags/gflags.h>
#include "utils/utils.h"

#include <iostream>

#include "config.h"
#include "utils/scope_guard.h"
#include "utils/utils.h"
#include "worker/nixl/nixl_worker.h"
#if HAVE_NVSHMEM && HAVE_CUDA
#include "worker/nvshmem/nvshmem_worker.h"
#endif
#include <unistd.h>
#include <memory>

#include <csignal>
#include <memory>

static std::pair<size_t, size_t> getStrideScheme(xferBenchWorker &worker, int num_threads) {
int initiator_device, target_device;
static std::pair<size_t, size_t> getStrideScheme(xferBenchWorker &worker, int num_threads)
{
int initiator_device, target_device;
size_t buffer_size, count, stride;

initiator_device = xferBenchConfig::num_initiator_dev;
Expand All @@ -44,22 +48,14 @@ static std::pair<size_t, size_t> getStrideScheme(xferBenchWorker &worker, int nu
// TODO: add macro for schemes
// Maybe, we can squeze ONE_TO_MANY and MANY_TO_ONE into TP scheme
if (XFERBENCH_SCHEME_ONE_TO_MANY == xferBenchConfig::scheme) {
if (worker.isInitiator()) {
count = target_device;
}
if (worker.isInitiator()) { count = target_device; }
} else if (XFERBENCH_SCHEME_MANY_TO_ONE == xferBenchConfig::scheme) {
if (worker.isTarget()) {
count = initiator_device;
}
if (worker.isTarget()) { count = initiator_device; }
} else if (XFERBENCH_SCHEME_TP == xferBenchConfig::scheme) {
if (worker.isInitiator()) {
if (initiator_device < target_device) {
count = target_device / initiator_device;
}
if (initiator_device < target_device) { count = target_device / initiator_device; }
} else if (worker.isTarget()) {
if (target_device < initiator_device) {
count = initiator_device / target_device;
}
if (target_device < initiator_device) { count = initiator_device / target_device; }
}
}
stride = buffer_size / count;
Expand All @@ -68,14 +64,13 @@ static std::pair<size_t, size_t> getStrideScheme(xferBenchWorker &worker, int nu
}

static std::vector<std::vector<xferBenchIOV>> createTransferDescLists(xferBenchWorker &worker,
std::vector<std::vector<xferBenchIOV>> &iov_lists,
size_t block_size,
size_t batch_size,
int num_threads) {
std::vector<std::vector<xferBenchIOV>> &iov_lists, size_t block_size, size_t batch_size,
int num_threads)
{
auto [count, stride] = getStrideScheme(worker, num_threads);
std::vector<std::vector<xferBenchIOV>> xfer_lists;

for (const auto &iov_list: iov_lists) {
for (const auto &iov_list : iov_lists) {
std::vector<xferBenchIOV> xfer_list;

for (const auto &iov : iov_list) {
Expand All @@ -84,9 +79,8 @@ static std::vector<std::vector<xferBenchIOV>> createTransferDescLists(xferBenchW

for (size_t j = 0; j < batch_size; j++) {
size_t block_offset = ((j * block_size) % iov.len);
xfer_list.push_back(xferBenchIOV((iov.addr + dev_offset) + block_offset,
block_size,
iov.devId));
xfer_list.push_back(xferBenchIOV(
(iov.addr + dev_offset) + block_offset, block_size, iov.devId));
}
}
}
Expand All @@ -97,24 +91,20 @@ static std::vector<std::vector<xferBenchIOV>> createTransferDescLists(xferBenchW
return xfer_lists;
}

static int processBatchSizes(xferBenchWorker &worker,
std::vector<std::vector<xferBenchIOV>> &iov_lists,
size_t block_size, int num_threads) {
static int processBatchSizes(xferBenchWorker &worker,
std::vector<std::vector<xferBenchIOV>> &iov_lists, size_t block_size, int num_threads)
{
for (size_t batch_size = xferBenchConfig::start_batch_size;
!worker.signaled() &&
batch_size <= xferBenchConfig::max_batch_size;
batch_size *= 2) {
auto local_trans_lists = createTransferDescLists(worker,
iov_lists,
block_size,
batch_size,
num_threads);
!worker.signaled() && batch_size <= xferBenchConfig::max_batch_size; batch_size *= 2) {
auto local_trans_lists =
createTransferDescLists(worker, iov_lists, block_size, batch_size, num_threads);

if (worker.isTarget()) {
worker.exchangeIOV(local_trans_lists);
worker.poll(block_size);

if (xferBenchConfig::check_consistency && xferBenchConfig::op_type == XFERBENCH_OP_WRITE) {
if (xferBenchConfig::check_consistency &&
xferBenchConfig::op_type == XFERBENCH_OP_WRITE) {
xferBenchUtils::checkConsistency(local_trans_lists);
}
if (IS_PAIRWISE_AND_SG()) {
Expand All @@ -123,28 +113,26 @@ static int processBatchSizes(xferBenchWorker &worker,
xferBenchUtils::printStats(true, block_size, batch_size, 0);
}
} else if (worker.isInitiator()) {
std::vector<std::vector<xferBenchIOV>> remote_trans_lists(worker.exchangeIOV(local_trans_lists));
std::vector<std::vector<xferBenchIOV>> remote_trans_lists(
worker.exchangeIOV(local_trans_lists));

auto result = worker.transfer(block_size,
local_trans_lists,
remote_trans_lists);
if (std::holds_alternative<int>(result)) {
return 1;
}
auto result = worker.transfer(block_size, local_trans_lists, remote_trans_lists);
if (std::holds_alternative<int>(result)) { return 1; }

if (xferBenchConfig::check_consistency && xferBenchConfig::op_type == XFERBENCH_OP_READ) {
if (xferBenchConfig::check_consistency &&
xferBenchConfig::op_type == XFERBENCH_OP_READ) {
xferBenchUtils::checkConsistency(local_trans_lists);
}

xferBenchUtils::printStats(false, block_size, batch_size,
std::get<double>(result));
xferBenchUtils::printStats(false, block_size, batch_size, std::get<double>(result));
}
}

return 0;
}

static std::unique_ptr<xferBenchWorker> createWorker(int *argc, char ***argv) {
static std::unique_ptr<xferBenchWorker> createWorker(int* argc, char*** argv)
{
if (xferBenchConfig::worker_type == "nixl") {
std::vector<std::string> devices = xferBenchConfig::parseDeviceList();
if (devices.empty()) {
Expand All @@ -165,21 +153,18 @@ static std::unique_ptr<xferBenchWorker> createWorker(int *argc, char ***argv) {
}
}

int main(int argc, char *argv[]) {
int main(int argc, char* argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);

int ret = xferBenchConfig::loadFromFlags();
if (0 != ret) {
return EXIT_FAILURE;
}
if (0 != ret) { return EXIT_FAILURE; }

int num_threads = xferBenchConfig::num_threads;

// Create the appropriate worker based on worker configuration
std::unique_ptr<xferBenchWorker> worker_ptr = createWorker(&argc, &argv);
if (!worker_ptr) {
return EXIT_FAILURE;
}
if (!worker_ptr) { return EXIT_FAILURE; }

std::signal(SIGINT, worker_ptr->signalHandler);

Expand All @@ -191,34 +176,25 @@ int main(int argc, char *argv[]) {
}

std::vector<std::vector<xferBenchIOV>> iov_lists = worker_ptr->allocateMemory(num_threads);
auto mem_guard = make_scope_guard ([&] {
worker_ptr->deallocateMemory(iov_lists);
});
auto mem_guard = make_scope_guard([&] { worker_ptr->deallocateMemory(iov_lists); });

ret = worker_ptr->exchangeMetadata();
if (0 != ret) {
return EXIT_FAILURE;
}
if (0 != ret) { return EXIT_FAILURE; }

if (worker_ptr->isInitiator() && worker_ptr->isMasterRank()) {
xferBenchConfig::printConfig();
xferBenchUtils::printStatsHeader();
}

for (size_t block_size = xferBenchConfig::start_block_size;
!worker_ptr->signaled() &&
block_size <= xferBenchConfig::max_block_size;
block_size *= 2) {
!worker_ptr->signaled() && block_size <= xferBenchConfig::max_block_size;
block_size *= 2) {
ret = processBatchSizes(*worker_ptr, iov_lists, block_size, num_threads);
if (0 != ret) {
return EXIT_FAILURE;
}
if (0 != ret) { return EXIT_FAILURE; }
}

ret = worker_ptr->synchronize(); // Make sure environment is not used anymore
if (0 != ret) {
return EXIT_FAILURE;
}
ret = worker_ptr->synchronize(); // Make sure environment is not used anymore
if (0 != ret) { return EXIT_FAILURE; }

gflags::ShutDownCommandLineFlags();

Expand Down
Loading
Loading