Skip to content

Commit

Permalink
Add NPKit GPU event support (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
yzygitzh authored Jun 13, 2024
1 parent 80aefe5 commit 76328fe
Show file tree
Hide file tree
Showing 21 changed files with 500 additions and 165 deletions.
82 changes: 82 additions & 0 deletions .azure-pipelines/ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,85 @@ jobs:
export PATH=/usr/local/mpi/bin:$PATH
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x
workingDirectory: '$(System.DefaultWorkingDirectory)'

- job: UnitTestWithNpKit
timeoutInMinutes: 30
pool:
name: mscclpp
strategy:
matrix:
cuda11:
containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8
cuda12:
containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2

container:
image: $[ variables['containerImage'] ]
options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1

steps:
- task: Bash@3
name: Build
displayName: Build
inputs:
targetType: 'inline'
script: |
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" ..
make -j
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: LockGPUClock
displayName: Lock GPU clock frequency
inputs:
targetType: 'inline'
script: |
sudo nvidia-smi -pm 1
for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do
sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i
done
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: MpUnitTests
displayName: Run mscclpp multi-process unit tests
inputs:
targetType: 'inline'
script: |
set -e
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
export PATH=/usr/local/mpi/bin:$PATH
export NPKIT_DUMP_DIR=./npkit_dump
mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce"
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: PyTests
displayName: Run pytests
inputs:
targetType: 'inline'
script: |
set -e
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
export PATH=/usr/local/mpi/bin:$PATH
export NPKIT_DUMP_DIR=./npkit_dump
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json'
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json'
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output
grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json
workingDirectory: '$(System.DefaultWorkingDirectory)'
5 changes: 2 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)

# Options
option(ENABLE_TRACE "Enable tracing" OFF)
option(USE_NPKIT "Use NPKIT" ON)
option(BUILD_TESTS "Build tests" ON)
option(BUILD_PYTHON_BINDINGS "Build Python bindings" ON)
option(USE_CUDA "Use NVIDIA/CUDA." OFF)
Expand Down Expand Up @@ -119,8 +118,8 @@ endif()
if(ENABLE_TRACE)
target_compile_definitions(mscclpp_obj PRIVATE ENABLE_TRACE)
endif()
if(USE_NPKIT)
target_compile_definitions(mscclpp_obj PRIVATE ENABLE_NPKIT)
if(NPKIT_FLAGS)
target_compile_definitions(mscclpp_obj PRIVATE ${NPKIT_FLAGS})
endif()

# libmscclpp
Expand Down
97 changes: 97 additions & 0 deletions include/mscclpp/npkit/npkit.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef NPKIT_H_
#define NPKIT_H_

#include <mscclpp/device.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/npkit/npkit_event.hpp>
#include <mscclpp/npkit/npkit_struct.hpp>
#include <string>
#include <thread>
#include <vector>

#if defined(__HIP_PLATFORM_AMD__)
#define NPKIT_GET_GPU_TIMESTAMP wall_clock64
#else
#define NPKIT_GET_GPU_TIMESTAMP clock64
#endif

#define NPKIT_SHM_NUM_EVENTS 64

class NpKit {
public:
static const uint64_t kNumGpuEventBuffers = 1024;

static const uint64_t kNumCpuEventBuffers = 64;

static void Init(int rank);

static void Dump(const std::string& dump_dir);

static void Shutdown();

static NpKitEventCollectContext* GetGpuEventCollectContexts();

#if defined(MSCCLPP_DEVICE_COMPILE)
static MSCCLPP_DEVICE_INLINE void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp,
NpKitEvent* event_buffer, uint64_t* event_buffer_head) {
if (*event_buffer_head < NPKIT_SHM_NUM_EVENTS) {
if (threadIdx.x == 0) {
NpKitEvent& event = event_buffer[*event_buffer_head];
event.fields.type = type;
event.fields.size = size;
event.fields.rsvd = rsvd;
event.fields.timestamp = timestamp;
}
(*event_buffer_head)++;
}
}

static MSCCLPP_DEVICE_INLINE void StoreGpuEventShm(NpKitEventCollectContext* npKitEventCollectContexts,
NpKitEvent* event_buffer, uint64_t event_buffer_head) {
#if defined(MSCCLPP_DEVICE_HIP)
__synclds();
#else // !defined(MSCCLPP_DEVICE_HIP)
__syncthreads();
#endif // !defined(MSCCLPP_DEVICE_HIP)
NpKitEventCollectContext* npKitCtx = npKitEventCollectContexts + blockIdx.x;
NpKitEvent* global_event_buffer = npKitCtx->event_buffer;
uint64_t global_event_buffer_head = npKitCtx->event_buffer_head;
for (size_t i = threadIdx.x; i < event_buffer_head * sizeof(NpKitEvent) / sizeof(int4); i += blockDim.x) {
((int4*)(global_event_buffer + global_event_buffer_head))[i] = ((int4*)event_buffer)[i];
}
if (threadIdx.x == 0) {
npKitCtx->event_buffer_head += event_buffer_head;
}
}
#endif

static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id);

static uint64_t* GetCpuTimestamp();

private:
static void CpuTimestampUpdateThread();

// 64K * 1024 * 16B = 1GB per GPU
static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16;

// 64K * 2 (send/recv) * (1024/64) = 2M, 2M * 64 * 16B = 2GB per CPU
static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21;

static std::vector<mscclpp::UniqueCudaPtr<NpKitEvent>> gpu_event_buffers_;
static std::vector<std::unique_ptr<NpKitEvent[]>> cpu_event_buffers_;

static mscclpp::UniqueCudaPtr<NpKitEventCollectContext> gpu_collect_contexts_;
static std::unique_ptr<NpKitEventCollectContext[]> cpu_collect_contexts_;

static uint64_t rank_;

static mscclpp::UniqueCudaHostPtr<uint64_t> cpu_timestamp_;
static std::unique_ptr<std::thread> cpu_timestamp_update_thread_;
static volatile bool cpu_timestamp_update_thread_should_stop_;
};

#endif
18 changes: 18 additions & 0 deletions include/mscclpp/npkit/npkit_event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef NPKIT_EVENT_H_
#define NPKIT_EVENT_H_

#define NPKIT_EVENT_INVALID 0x0

#define NPKIT_EVENT_TIME_SYNC_GPU 0x1
#define NPKIT_EVENT_TIME_SYNC_CPU 0x2

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ struct NpKitEventCollectContext {

#pragma pack(pop)

#endif
#endif
1 change: 1 addition & 0 deletions python/mscclpp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
PacketType,
version,
is_nvls_supported,
npkit,
)

__version__ = version()
Expand Down
2 changes: 2 additions & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern void register_utils(nb::module_& m);
extern void register_numa(nb::module_& m);
extern void register_nvls(nb::module_& m);
extern void register_executor(nb::module_& m);
extern void register_npkit(nb::module_& m);

template <typename T>
void def_nonblocking_future(nb::handle& m, const std::string& typestr) {
Expand Down Expand Up @@ -189,4 +190,5 @@ NB_MODULE(_mscclpp, m) {
register_numa(m);
register_nvls(m);
register_executor(m);
register_npkit(m);
}
16 changes: 16 additions & 0 deletions python/mscclpp/npkit_py.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <nanobind/nanobind.h>
#include <nanobind/stl/string.h>

#include <mscclpp/npkit/npkit.hpp>

namespace nb = nanobind;

void register_npkit(nb::module_ &m) {
nb::module_ sub_m = m.def_submodule("npkit", "NPKit functions");
sub_m.def("init", &NpKit::Init);
sub_m.def("dump", &NpKit::Dump);
sub_m.def("shutdown", &NpKit::Shutdown);
}
7 changes: 7 additions & 0 deletions python/test/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Executor,
ExecutionPlan,
PacketType,
npkit,
)
import mscclpp.comm as mscclpp_comm

Expand Down Expand Up @@ -87,6 +88,9 @@ def main(
mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use()
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(execution_paln_name, execution_plan_path)

cp.random.seed(seed)
Expand Down Expand Up @@ -119,6 +123,9 @@ def main(

mscclpp_group.barrier()
execution_time = bench_time(100, 10, executor_func)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()
print(
f"Rank: {MPI.COMM_WORLD.rank} Execution time: {execution_time} us, "
f"data size: {sendbuf.nbytes} bytes data type: {dtype().dtype.name} "
Expand Down
7 changes: 7 additions & 0 deletions python/test/test_mscclpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TcpBootstrap,
Transport,
is_nvls_supported,
npkit,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
Expand Down Expand Up @@ -603,6 +604,9 @@ def test_executor(mpi_group: MpiGroup, filename: str):
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm)
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename))

nelems = 1024 * 1024
Expand All @@ -629,3 +633,6 @@ def test_executor(mpi_group: MpiGroup, filename: str):
)
stream.synchronize()
assert cp.allclose(sendbuf, expected, atol=1e-3 * mpi_group.comm.size)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()
4 changes: 3 additions & 1 deletion src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

#include "connection.hpp"

#if defined(ENABLE_NPKIT)
#include <mscclpp/npkit/npkit.hpp>
#endif
#include <mscclpp/utils.hpp>
#include <sstream>
#include <thread>

#include "debug.h"
#include "endpoint.hpp"
#include "infiniband/verbs.h"
#include "npkit/npkit.h"

namespace mscclpp {

Expand Down
32 changes: 28 additions & 4 deletions src/executor/execution_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,43 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo
switch (dataType) {
case DataType::INT32:
executionKernel<int32_t, PacketType><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag);
rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::UINT32:
executionKernel<uint32_t><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag);
rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::FLOAT16:
executionKernel<half><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag);
rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
case DataType::FLOAT32:
executionKernel<float><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag);
rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag
#if defined(ENABLE_NPKIT)
,
NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp());
#else
);
#endif
break;
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ struct Executor::Impl {
DataType dataType, cudaStream_t stream, PacketType packetType) {
static uint32_t flag = 0;
int nthreadblocks = context.deviceExecutionPlans.size();
#if defined(ENABLE_NPKIT)
size_t sharedMemSize = sizeof(DeviceExecutionPlan) + NPKIT_SHM_NUM_EVENTS * sizeof(NpKitEvent);
#else
size_t sharedMemSize = sizeof(DeviceExecutionPlan);
#endif
switch (packetType) {
case PacketType::LL16:
ExecutionKernel::launchKernel<LL16Packet>(
Expand Down
Loading

0 comments on commit 76328fe

Please sign in to comment.