Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NPKit GPU event support #310

Merged
merged 17 commits into from
Jun 13, 2024
82 changes: 82 additions & 0 deletions .azure-pipelines/ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,85 @@ jobs:
export PATH=/usr/local/mpi/bin:$PATH
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x
workingDirectory: '$(System.DefaultWorkingDirectory)'

- job: UnitTestWithNpKit
timeoutInMinutes: 30
pool:
name: mscclpp
strategy:
matrix:
cuda11:
containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8
cuda12:
containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2

container:
image: $[ variables['containerImage'] ]
options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1

steps:
- task: Bash@3
yzygitzh marked this conversation as resolved.
Show resolved Hide resolved
name: Build
displayName: Build
inputs:
targetType: 'inline'
script: |
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" ..
make -j
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: LockGPUClock
displayName: Lock GPU clock frequency
inputs:
targetType: 'inline'
script: |
sudo nvidia-smi -pm 1
for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do
sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i
done
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: MpUnitTests
displayName: Run mscclpp multi-process unit tests
inputs:
targetType: 'inline'
script: |
set -e
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
export PATH=/usr/local/mpi/bin:$PATH
export NPKIT_DUMP_DIR=./npkit_dump
mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce"
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: PyTests
displayName: Run pytests
inputs:
targetType: 'inline'
script: |
set -e
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
export PATH=/usr/local/mpi/bin:$PATH
export NPKIT_DUMP_DIR=./npkit_dump
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json'
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json'
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
workingDirectory: '$(System.DefaultWorkingDirectory)'
5 changes: 2 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)

# Options
option(ENABLE_TRACE "Enable tracing" OFF)
option(USE_NPKIT "Use NPKIT" ON)
option(BUILD_TESTS "Build tests" ON)
option(BUILD_PYTHON_BINDINGS "Build Python bindings" ON)
option(USE_CUDA "Use NVIDIA/CUDA." OFF)
Expand Down Expand Up @@ -119,8 +118,8 @@ endif()
if(ENABLE_TRACE)
target_compile_definitions(mscclpp_obj PRIVATE ENABLE_TRACE)
endif()
if(USE_NPKIT)
target_compile_definitions(mscclpp_obj PRIVATE ENABLE_NPKIT)
if(NPKIT_FLAGS)
target_compile_definitions(mscclpp_obj PRIVATE ${NPKIT_FLAGS})
endif()

# libmscclpp
Expand Down
97 changes: 97 additions & 0 deletions include/mscclpp/npkit/npkit.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef NPKIT_H_
#define NPKIT_H_

#include <mscclpp/device.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/npkit/npkit_event.hpp>
#include <mscclpp/npkit/npkit_struct.hpp>
#include <string>
#include <thread>
#include <vector>

#if defined(__HIP_PLATFORM_AMD__)
#define NPKIT_GET_GPU_TIMESTAMP wall_clock64
#else
#define NPKIT_GET_GPU_TIMESTAMP clock64
#endif

#define NPKIT_SHM_NUM_EVENTS 64

class NpKit {
public:
static const uint64_t kNumGpuEventBuffers = 1024;

static const uint64_t kNumCpuEventBuffers = 64;

static void Init(int rank);

static void Dump(const std::string& dump_dir);

static void Shutdown();

static NpKitEventCollectContext* GetGpuEventCollectContexts();

#if defined(MSCCLPP_DEVICE_COMPILE)
static MSCCLPP_DEVICE_INLINE void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp,
NpKitEvent* event_buffer, uint64_t* event_buffer_head) {
if (*event_buffer_head < NPKIT_SHM_NUM_EVENTS) {
if (threadIdx.x == 0) {
NpKitEvent& event = event_buffer[*event_buffer_head];
event.fields.type = type;
event.fields.size = size;
event.fields.rsvd = rsvd;
event.fields.timestamp = timestamp;
}
(*event_buffer_head)++;
}
}

static MSCCLPP_DEVICE_INLINE void StoreGpuEventShm(NpKitEventCollectContext* npKitEventCollectContexts,
NpKitEvent* event_buffer, uint64_t event_buffer_head) {
#if defined(MSCCLPP_DEVICE_HIP)
__synclds();
#else // !defined(MSCCLPP_DEVICE_HIP)
__syncthreads();
#endif // !defined(MSCCLPP_DEVICE_HIP)
NpKitEventCollectContext* npKitCtx = npKitEventCollectContexts + blockIdx.x;
NpKitEvent* global_event_buffer = npKitCtx->event_buffer;
uint64_t global_event_buffer_head = npKitCtx->event_buffer_head;
for (size_t i = threadIdx.x; i < event_buffer_head * sizeof(NpKitEvent) / sizeof(int4); i += blockDim.x) {
((int4*)(global_event_buffer + global_event_buffer_head))[i] = ((int4*)event_buffer)[i];
}
if (threadIdx.x == 0) {
npKitCtx->event_buffer_head += event_buffer_head;
}
}
#endif

static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id);

static uint64_t* GetCpuTimestamp();

private:
static void CpuTimestampUpdateThread();

// 64K * 1024 * 16B = 1GB per GPU
static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16;

// 64K * 2 (send/recv) * (1024/64) = 2M, 2M * 64 * 16B = 2GB per CPU
static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21;

static std::vector<mscclpp::UniqueCudaPtr<NpKitEvent>> gpu_event_buffers_;
static std::vector<std::unique_ptr<NpKitEvent[]>> cpu_event_buffers_;

static mscclpp::UniqueCudaPtr<NpKitEventCollectContext> gpu_collect_contexts_;
static std::unique_ptr<NpKitEventCollectContext[]> cpu_collect_contexts_;

static uint64_t rank_;

static mscclpp::UniqueCudaHostPtr<uint64_t> cpu_timestamp_;
static std::unique_ptr<std::thread> cpu_timestamp_update_thread_;
static volatile bool cpu_timestamp_update_thread_should_stop_;
};

#endif
18 changes: 18 additions & 0 deletions include/mscclpp/npkit/npkit_event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef NPKIT_EVENT_H_
#define NPKIT_EVENT_H_

#define NPKIT_EVENT_INVALID 0x0

#define NPKIT_EVENT_TIME_SYNC_GPU 0x1
#define NPKIT_EVENT_TIME_SYNC_CPU 0x2

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ struct NpKitEventCollectContext {

#pragma pack(pop)

#endif
#endif
1 change: 1 addition & 0 deletions python/mscclpp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
PacketType,
version,
is_nvls_supported,
npkit,
)

__version__ = version()
Expand Down
2 changes: 2 additions & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern void register_utils(nb::module_& m);
extern void register_numa(nb::module_& m);
extern void register_nvls(nb::module_& m);
extern void register_executor(nb::module_& m);
extern void register_npkit(nb::module_& m);

template <typename T>
void def_nonblocking_future(nb::handle& m, const std::string& typestr) {
Expand Down Expand Up @@ -189,4 +190,5 @@ NB_MODULE(_mscclpp, m) {
register_numa(m);
register_nvls(m);
register_executor(m);
register_npkit(m);
}
16 changes: 16 additions & 0 deletions python/mscclpp/npkit_py.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <nanobind/nanobind.h>
#include <nanobind/stl/string.h>

#include <mscclpp/npkit/npkit.hpp>

namespace nb = nanobind;

void register_npkit(nb::module_ &m) {
nb::module_ sub_m = m.def_submodule("npkit", "NPKit functions");
sub_m.def("init", &NpKit::Init);
sub_m.def("dump", &NpKit::Dump);
sub_m.def("shutdown", &NpKit::Shutdown);
}
7 changes: 7 additions & 0 deletions python/test/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Executor,
ExecutionPlan,
PacketType,
npkit,
)
import mscclpp.comm as mscclpp_comm

Expand Down Expand Up @@ -87,6 +88,9 @@ def main(
mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use()
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(execution_paln_name, execution_plan_path)

cp.random.seed(seed)
Expand Down Expand Up @@ -119,6 +123,9 @@ def main(

mscclpp_group.barrier()
execution_time = bench_time(100, 10, executor_func)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()
print(
f"Rank: {MPI.COMM_WORLD.rank} Execution time: {execution_time} us, "
f"data size: {sendbuf.nbytes} bytes data type: {dtype().dtype.name} "
Expand Down
7 changes: 7 additions & 0 deletions python/test/test_mscclpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TcpBootstrap,
Transport,
is_nvls_supported,
npkit,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
Expand Down Expand Up @@ -603,6 +604,9 @@ def test_executor(mpi_group: MpiGroup, filename: str):
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm)
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename))

nelems = 1024 * 1024
Expand All @@ -629,3 +633,6 @@ def test_executor(mpi_group: MpiGroup, filename: str):
)
stream.synchronize()
assert cp.allclose(sendbuf, expected, atol=1e-3 * mpi_group.comm.size)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()
4 changes: 3 additions & 1 deletion src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

#include "connection.hpp"

#if defined(ENABLE_NPKIT)
#include <mscclpp/npkit/npkit.hpp>
#endif
#include <mscclpp/utils.hpp>
#include <sstream>
#include <thread>

#include "debug.h"
#include "endpoint.hpp"
#include "infiniband/verbs.h"
#include "npkit/npkit.h"

namespace mscclpp {

Expand Down
32 changes: 28 additions & 4 deletions src/executor/execution_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,43 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo
switch (dataType) {
case DataType::INT32:
executionKernel<int32_t, PacketType><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag);
rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::UINT32:
executionKernel<uint32_t><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag);
rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::FLOAT16:
executionKernel<half><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag);
rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::FLOAT32:
executionKernel<float><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag);
rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ struct Executor::Impl {
DataType dataType, cudaStream_t stream, PacketType packetType) {
static uint32_t flag = 0;
int nthreadblocks = context.deviceExecutionPlans.size();
#if defined(ENABLE_NPKIT)
size_t sharedMemSize = sizeof(DeviceExecutionPlan) + NPKIT_SHM_NUM_EVENTS * sizeof(NpKitEvent);
#else
size_t sharedMemSize = sizeof(DeviceExecutionPlan);
#endif
switch (packetType) {
case PacketType::LL16:
ExecutionKernel::launchKernel<LL16Packet>(
Expand Down
Loading
Loading