From 0afba081c3378126309233f5bca55eed0b59f0cc Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 4 Jun 2024 14:00:40 +0000 Subject: [PATCH 01/16] Add NPKit GPU event support --- CMakeLists.txt | 5 +- include/mscclpp/npkit/npkit.hpp | 76 ++++++ include/mscclpp/npkit/npkit_event.hpp | 18 ++ .../mscclpp/npkit/npkit_struct.hpp | 2 +- python/mscclpp/nvls_py.cpp | 4 + src/connection.cc | 4 +- src/executor/execution_kernel.cu | 30 ++- src/executor/executor.cc | 7 + src/include/execution_kernel.hpp | 98 ++++++- src/npkit/npkit.cc | 97 +++++-- src/npkit/npkit.h | 66 ----- src/npkit/npkit_event.h | 23 -- test/executor_test.cc | 28 +- tools/npkit/npkit_trace_generator.py | 255 +++++++++--------- 14 files changed, 454 insertions(+), 259 deletions(-) create mode 100644 include/mscclpp/npkit/npkit.hpp create mode 100644 include/mscclpp/npkit/npkit_event.hpp rename src/npkit/npkit_struct.h => include/mscclpp/npkit/npkit_struct.hpp (98%) delete mode 100644 src/npkit/npkit.h delete mode 100644 src/npkit/npkit_event.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 31525f9c9..58918eec5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,6 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) # Options option(ENABLE_TRACE "Enable tracing" OFF) -option(USE_NPKIT "Use NPKIT" ON) option(BUILD_TESTS "Build tests" ON) option(BUILD_PYTHON_BINDINGS "Build Python bindings" ON) option(USE_CUDA "Use NVIDIA/CUDA." OFF) @@ -119,8 +118,8 @@ endif() if(ENABLE_TRACE) target_compile_definitions(mscclpp_obj PRIVATE ENABLE_TRACE) endif() -if(USE_NPKIT) - target_compile_definitions(mscclpp_obj PRIVATE ENABLE_NPKIT) +if(NPKIT_FLAGS) + target_compile_definitions(mscclpp_obj PRIVATE ${NPKIT_FLAGS}) endif() # libmscclpp diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp new file mode 100644 index 000000000..ca4e7bf68 --- /dev/null +++ b/include/mscclpp/npkit/npkit.hpp @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef NPKIT_H_ +#define NPKIT_H_ + +#include +#include + +#include +#include +#include + +#if defined(__HIP_PLATFORM_AMD__) +#define NPKIT_GET_GPU_TIMESTAMP wall_clock64 +#else +#define NPKIT_GET_GPU_TIMESTAMP clock64 +#endif + +#define NPKIT_SHM_NUM_EVENTS 64 + +class NpKit { + public: + static const uint64_t kNumGpuEventBuffers = 1024; + + static const uint64_t kNumCpuEventBuffers = 64; + + static void Init(int rank); + + static void Dump(const std::string& dump_dir); + + static void Shutdown(); + + static NpKitEventCollectContext* GetGpuEventCollectContexts(); + + static inline __device__ void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, + NpKitEvent* event_buffer, uint64_t* event_buffer_head) { + if (*event_buffer_head < NPKIT_SHM_NUM_EVENTS) { + if (threadIdx.x == 0) { + NpKitEvent& event = event_buffer[*event_buffer_head]; + event.fields.type = type; + event.fields.size = size; + event.fields.rsvd = rsvd; + event.fields.timestamp = timestamp; + } + (*event_buffer_head)++; + } + } + + static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); + + static uint64_t* GetCpuTimestamp(); + + private: + static void CpuTimestampUpdateThread(); + + // 64K * 1024 * 16B = 1GB per GPU + static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16; + + // 64K * 2 (send/recv) * (1024/64) = 2M, 2M * 64 * 16B = 2GB per CPU + static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21; + + static std::vector> gpu_event_buffers_; + static std::vector> cpu_event_buffers_; + + static mscclpp::UniqueCudaPtr gpu_collect_contexts_; + static std::unique_ptr cpu_collect_contexts_; + + static uint64_t rank_; + + static mscclpp::UniqueCudaHostPtr cpu_timestamp_; + static std::unique_ptr cpu_timestamp_update_thread_; + static volatile bool cpu_timestamp_update_thread_should_stop_; +}; + +#endif diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp new file mode 100644 index 000000000..22d2ecc22 --- /dev/null +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef NPKIT_EVENT_H_ +#define NPKIT_EVENT_H_ + +#define NPKIT_EVENT_INVALID 0x0 + +#define NPKIT_EVENT_TIME_SYNC_GPU 0x1 +#define NPKIT_EVENT_TIME_SYNC_CPU 0x2 + +#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3 +#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 + +#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15 + +#endif diff --git a/src/npkit/npkit_struct.h b/include/mscclpp/npkit/npkit_struct.hpp similarity index 98% rename from src/npkit/npkit_struct.h rename to include/mscclpp/npkit/npkit_struct.hpp index 62b417f24..44de35357 100644 --- a/src/npkit/npkit_struct.h +++ b/include/mscclpp/npkit/npkit_struct.hpp @@ -25,4 +25,4 @@ struct NpKitEventCollectContext { #pragma pack(pop) -#endif \ No newline at end of file +#endif diff --git a/python/mscclpp/nvls_py.cpp b/python/mscclpp/nvls_py.cpp index 819a7c6b0..652d9e68a 100644 --- a/python/mscclpp/nvls_py.cpp +++ b/python/mscclpp/nvls_py.cpp @@ -34,5 +34,9 @@ void register_nvls(nb::module_& m) { .def("get_multicast_min_granularity", &NvlsConnection::getMultiCastMinGranularity); m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("allRanks"), +#if (USE_NVLS) nb::arg("bufferSize") = NvlsConnection::DefaultNvlsBufferSize); +#else + nb::arg("bufferSize") = 0); +#endif } diff --git a/src/connection.cc b/src/connection.cc index b5fd5b9b9..fc3724c08 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -3,6 +3,9 @@ #include "connection.hpp" +#if defined(ENABLE_NPKIT) +#include +#endif #include #include #include @@ -10,7 +13,6 @@ #include "debug.h" #include "endpoint.hpp" #include "infiniband/verbs.h" -#include "npkit/npkit.h" namespace mscclpp { diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 4e96af9ab..4f2138f68 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +#include + #include "execution_kernel.hpp" #if defined(MSCCLPP_DEVICE_CUDA) @@ -13,19 +15,39 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo switch (dataType) { case DataType::INT32: executionKernel<<>>( - rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag); + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::UINT32: executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag); + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::FLOAT16: executionKernel<<>>( - rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag); + rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::FLOAT32: executionKernel<<>>( - rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag); + rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; } } diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 950f02012..ce962e446 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -2,6 +2,9 @@ // Licensed under the MIT license. #include +#if defined(ENABLE_NPKIT) +#include +#endif #include #include #include @@ -260,7 +263,11 @@ struct Executor::Impl { DataType dataType, cudaStream_t stream, PacketType packetType) { static uint32_t flag = 0; int nthreadblocks = context.deviceExecutionPlans.size(); +#if defined(ENABLE_NPKIT) + size_t sharedMemSize = sizeof(DeviceExecutionPlan) + NPKIT_SHM_NUM_EVENTS * sizeof(NpKitEvent); +#else size_t sharedMemSize = sizeof(DeviceExecutionPlan); +#endif switch (packetType) { case PacketType::LL16: ExecutionKernel::launchKernel( diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 9b2b77f4d..e56621557 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -18,6 +18,10 @@ #define __synclds() asm volatile("s_waitcnt lgkmcnt(0) \n s_barrier"); #endif // defined(MSCCLPP_DEVICE_HIP) +#if defined(ENABLE_NPKIT) +#include +#endif + namespace { template MSCCLPP_DEVICE_INLINE To bit_cast(const From& src) { @@ -331,10 +335,21 @@ MSCCLPP_DEVICE_INLINE void handleReduceSend(T* dst, uint32_t dstOffsetByBytes, T template __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* input, T* output, T* scratch, - size_t scratchSize, DeviceExecutionPlan* plan, uint32_t flag) { + size_t scratchSize, DeviceExecutionPlan* plan, uint32_t flag +#if defined(ENABLE_NPKIT) + , NpKitEventCollectContext* npKitEventCollectContexts, uint64_t* cpuTimestamp) { +#else + ) { +#endif extern __shared__ int4 sharedMem[]; int bid = blockIdx.x; int tid = threadIdx.x; +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) + uint64_t npkit_timestamp_entry = 0; + if (tid == 0) { + npkit_timestamp_entry = NPKIT_GET_GPU_TIMESTAMP(); + } +#endif DeviceExecutionPlan* localPlan = plan + bid; for (size_t i = tid; i < sizeof(DeviceExecutionPlan) / sizeof(int4); i += blockDim.x) { sharedMem[i] = ((int4*)localPlan)[i]; @@ -344,14 +359,43 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu #else // !defined(MSCCLPP_DEVICE_HIP) __syncthreads(); #endif // !defined(MSCCLPP_DEVICE_HIP) + localPlan = (DeviceExecutionPlan*)sharedMem; int nOperations = localPlan->nOperations; Operation* operations = localPlan->operations; DeviceHandle* smChannels = localPlan->channels.smChannels; DeviceHandle* proxyChannels = localPlan->channels.proxyChannels; +#if defined(ENABLE_NPKIT) + NpKitEvent* event_buffer = (NpKitEvent *)((char *)sharedMem + sizeof(DeviceExecutionPlan)); + uint64_t event_buffer_head = 0; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + event_buffer, &event_buffer_head); +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), + event_buffer, &event_buffer_head); +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_ENTRY, 0, 0, npkit_timestamp_entry, + event_buffer, &event_buffer_head); + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_EXIT, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), + event_buffer, &event_buffer_head); +#endif + for (int i = 0; i < nOperations; i++) { Operation& op = operations[i]; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY) + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(), + event_buffer, &event_buffer_head); +#endif + if (op.type == OperationType::BARRIER) { __syncthreads(); } else if (op.type == OperationType::SIGNAL) { @@ -395,7 +439,31 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu handleReduceSend(dst, op.dstOffset, src, op.srcOffset, tmp, op.inputOffsets, smChannels, op.outputChannelIndexes, op.outputOffsets, op.nOutputs, op.size); } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT) + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(), + event_buffer, &event_buffer_head); +#endif + + } + +#if defined(ENABLE_NPKIT) +#if defined(MSCCLPP_DEVICE_HIP) + __synclds(); +#else // !defined(MSCCLPP_DEVICE_HIP) + __syncthreads(); +#endif // !defined(MSCCLPP_DEVICE_HIP) + NpKitEventCollectContext* npKitCtx = npKitEventCollectContexts + bid; + NpKitEvent* global_event_buffer = npKitCtx->event_buffer; + uint64_t global_event_buffer_head = npKitCtx->event_buffer_head; + for (size_t i = tid; i < event_buffer_head * sizeof(NpKitEvent) / sizeof(int4); i += blockDim.x) { + ((int4*)(global_event_buffer + global_event_buffer_head))[i] = ((int4*)event_buffer)[i]; } + if (tid == 0) { + npKitCtx->event_buffer_head += event_buffer_head; + } +#endif + } #endif // defined(MSCCLPP_DEVICE_COMPILE) @@ -409,19 +477,39 @@ class ExecutionKernel { switch (dataType) { case DataType::INT32: executionKernel<<>>( - rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag); + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::UINT32: executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag); + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::FLOAT16: executionKernel<<>>( - rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag); + rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; case DataType::FLOAT32: executionKernel<<>>( - rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag); + rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); +#endif break; } } diff --git a/src/npkit/npkit.cc b/src/npkit/npkit.cc index 466806d1f..19dd89349 100644 --- a/src/npkit/npkit.cc +++ b/src/npkit/npkit.cc @@ -1,13 +1,14 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include "npkit.h" - -#include - #include #include +#include + #include +#include + +#include "debug.h" uint64_t NpKit::rank_ = 0; @@ -16,41 +17,85 @@ std::vector> NpKit::cpu_event_buffers_; mscclpp::UniqueCudaPtr NpKit::gpu_collect_contexts_; std::unique_ptr NpKit::cpu_collect_contexts_; -uint64_t NpKit::cpu_base_system_timestamp_ = 0; -uint64_t NpKit::cpu_base_steady_timestamp_ = 0; + +mscclpp::UniqueCudaHostPtr NpKit::cpu_timestamp_; +std::unique_ptr NpKit::cpu_timestamp_update_thread_; +volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false; + +void NpKit::CpuTimestampUpdateThread() { + uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); + uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + uint64_t curr_steady_clock = 0; + volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_.get(); + while (!cpu_timestamp_update_thread_should_stop_) { + curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + *volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); + } +} void NpKit::Init(int rank) { +#if defined(ENABLE_NPKIT) uint64_t i = 0; NpKitEventCollectContext ctx; ctx.event_buffer_head = 0; rank_ = rank; // Init event data structures - gpu_collect_contexts_ = mscclpp::allocUniqueCuda(kNumGpuEventBuffers); - for (i = 0; i < kNumGpuEventBuffers; i++) { + gpu_collect_contexts_ = mscclpp::allocUniqueCuda(NpKit::kNumGpuEventBuffers); + for (i = 0; i < NpKit::kNumGpuEventBuffers; i++) { gpu_event_buffers_.emplace_back(mscclpp::allocUniqueCuda(kMaxNumGpuEventsPerBuffer)); ctx.event_buffer = gpu_event_buffers_[i].get(); mscclpp::memcpyCuda(gpu_collect_contexts_.get() + i, &ctx, 1); } - cpu_collect_contexts_ = std::make_unique(kNumCpuEventBuffers); - for (i = 0; i < kNumCpuEventBuffers; i++) { + cpu_collect_contexts_ = std::make_unique(NpKit::kNumCpuEventBuffers); + for (i = 0; i < NpKit::kNumCpuEventBuffers; i++) { cpu_event_buffers_.emplace_back(std::make_unique(kMaxNumCpuEventsPerBuffer)); ctx.event_buffer = cpu_event_buffers_[i].get(); cpu_collect_contexts_[i] = ctx; } // Init timestamp - cpu_base_system_timestamp_ = std::chrono::system_clock::now().time_since_epoch().count(); - cpu_base_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count(); + cpu_timestamp_ = mscclpp::makeUniqueCudaHost(); + volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_.get(); + *volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + cpu_timestamp_update_thread_should_stop_ = false; + cpu_timestamp_update_thread_ = std::make_unique(CpuTimestampUpdateThread); +#else + WARN("NpKit::Init(%d) : MSCCLpp library was not built with NPKit enabled.", rank); +#endif +} + +#if defined(ENABLE_NPKIT) +static int GetGpuClockRateInKhz() { + int dev_id; +#if defined(__HIP_PLATFORM_AMD__) + hipDeviceProp_t dev_prop; + char gcn_arch[256]; + MSCCLPP_CUDATHROW(hipGetDevice(&dev_id)); + MSCCLPP_CUDATHROW(hipGetDeviceProperties(&dev_prop, dev_id)); + char *gcnArchNameToken = strtok(dev_prop.gcnArchName, ":"); + strcpy(gcn_arch, gcnArchNameToken); + if (strncmp("gfx94", gcn_arch, 5) == 0) + return 100000; + else + return 25000; +#else + cudaDeviceProp dev_prop; + MSCCLPP_CUDATHROW(cudaGetDevice(&dev_id)); + MSCCLPP_CUDATHROW(cudaGetDeviceProperties(&dev_prop, dev_id)); + return dev_prop.clockRate; +#endif } +#endif void NpKit::Dump(const std::string& dump_dir) { +#if defined(ENABLE_NPKIT) uint64_t i = 0; std::string dump_file_path; // Dump CPU events - for (i = 0; i < kNumCpuEventBuffers; i++) { + for (i = 0; i < NpKit::kNumCpuEventBuffers; i++) { dump_file_path = dump_dir; dump_file_path += "/cpu_events_rank_"; dump_file_path += std::to_string(rank_); @@ -80,7 +125,7 @@ void NpKit::Dump(const std::string& dump_dir) { clock_period_den_file.close(); // Dump GPU events, reuse CPU struct - for (i = 0; i < kNumGpuEventBuffers; i++) { + for (i = 0; i < NpKit::kNumGpuEventBuffers; i++) { dump_file_path = dump_dir; dump_file_path += "/gpu_events_rank_"; dump_file_path += std::to_string(rank_); @@ -98,17 +143,21 @@ void NpKit::Dump(const std::string& dump_dir) { dump_file_path = dump_dir; dump_file_path += "/gpu_clock_rate_rank_"; dump_file_path += std::to_string(rank_); - cudaDeviceProp dev_prop; - int dev; - MSCCLPP_CUDATHROW(cudaGetDevice(&dev)); - MSCCLPP_CUDATHROW(cudaGetDeviceProperties(&dev_prop, dev)); - std::string clock_rate_str = std::to_string(dev_prop.clockRate); + std::string clock_rate_str = std::to_string(GetGpuClockRateInKhz()); auto gpu_clock_rate_file = std::fstream(dump_file_path, std::ios::out); gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length()); gpu_clock_rate_file.close(); +#else + WARN("NpKit::Dump(%s) : MSCCLpp library was not built with NPKit enabled.", dump_dir.c_str()); +#endif } void NpKit::Shutdown() { +#if defined(ENABLE_NPKIT) + // Stop CPU timestamp updating thread + cpu_timestamp_update_thread_should_stop_ = true; + cpu_timestamp_update_thread_->join(); + // Free CPU event data structures cpu_event_buffers_.clear(); cpu_collect_contexts_.reset(); @@ -116,6 +165,11 @@ void NpKit::Shutdown() { // Free GPU event data structures gpu_event_buffers_.clear(); gpu_collect_contexts_.reset(); + + // Free timestamp + cpu_timestamp_update_thread_.reset(); + cpu_timestamp_.reset(); +#endif } NpKitEventCollectContext* NpKit::GetGpuEventCollectContexts() { return gpu_collect_contexts_.get(); } @@ -132,7 +186,4 @@ void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t } } -uint64_t NpKit::GetCpuTimestamp() { - uint64_t cpu_curr_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count(); - return cpu_base_steady_timestamp_ + (cpu_curr_steady_timestamp_ - cpu_base_steady_timestamp_); -} +uint64_t* NpKit::GetCpuTimestamp() { return cpu_timestamp_.get(); } diff --git a/src/npkit/npkit.h b/src/npkit/npkit.h deleted file mode 100644 index 21ba928ae..000000000 --- a/src/npkit/npkit.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#ifndef NPKIT_H_ -#define NPKIT_H_ - -#include -#include -#include - -#include "npkit_event.h" -#include "npkit_struct.h" - -class NpKit { - public: - static const uint64_t kNumGpuEventBuffers = 512; - - static const uint64_t kNumCpuEventBuffers = 32; - - static void Init(int rank); - - static void Dump(const std::string& dump_dir); - - static void Shutdown(); - - static NpKitEventCollectContext* GetGpuEventCollectContexts(); - -#ifdef __CUDACC__ - static inline __device__ void CollectGpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, - NpKitEventCollectContext* ctx) { - uint64_t event_buffer_head = ctx->event_buffer_head; - if (event_buffer_head < kMaxNumGpuEventsPerBuffer) { - NpKitEvent& event = ctx->event_buffer[event_buffer_head]; - event.fields.type = type; - event.fields.size = size; - event.fields.rsvd = rsvd; - event.fields.timestamp = timestamp; - ctx->event_buffer_head++; - } - } -#endif // __CUDACC__ - - static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); - - static uint64_t GetCpuTimestamp(); - - private: - // 64K * 512 * 16B = 512MB per GPU - static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16; - - // 64K * 2 (send/recv) * (512/32) = 2M, 2M * 32 * 16B = 1GB per CPU - static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21; - - static std::vector> gpu_event_buffers_; - static std::vector> cpu_event_buffers_; - - static mscclpp::UniqueCudaPtr gpu_collect_contexts_; - static std::unique_ptr cpu_collect_contexts_; - - static uint64_t cpu_base_system_timestamp_; - static uint64_t cpu_base_steady_timestamp_; - - static uint64_t rank_; -}; - -#endif \ No newline at end of file diff --git a/src/npkit/npkit_event.h b/src/npkit/npkit_event.h deleted file mode 100644 index f17e71363..000000000 --- a/src/npkit/npkit_event.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#ifndef NPKIT_EVENT_H_ -#define NPKIT_EVENT_H_ - -#define NPKIT_EVENT_INVALID 0x0 - -#define NPKIT_EVENT_TIME_SYNC_GPU 0x1 -#define NPKIT_EVENT_TIME_SYNC_CPU 0x2 - -#define NPKIT_EVENT_SM_REDUCE_ENTRY 0x3 -#define NPKIT_EVENT_SM_REDUCE_EXIT 0x4 - -#define NPKIT_EVENT_IB_SEND_DATA_ENTRY 0x5 -#define NPKIT_EVENT_IB_SEND_FLAG_ENTRY 0x6 -#define NPKIT_EVENT_IB_SEND_EXIT 0x7 - -#define NPKIT_EVENT_DMA_SEND_DATA_ENTRY 0x8 -#define NPKIT_EVENT_DMA_SEND_FLAG_ENTRY 0x9 -#define NPKIT_EVENT_DMA_SEND_EXIT 0xA - -#endif \ No newline at end of file diff --git a/test/executor_test.cc b/test/executor_test.cc index a30691dde..90d0ac22c 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -74,11 +75,14 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 5) { + if (argc != 8) { std::cerr << "Usage: " << argv[0] << " " << " " << " " - << " " << std::endl; + << " " + << " " + << " " + << " " << std::endl; return 1; } @@ -93,6 +97,9 @@ int main(int argc, char* argv[]) { const std::string executionPlanName = argv[2]; const std::string executionPlanPath = argv[3]; const int nthreadsPerBlock = std::stoi(argv[4]); + const int niters = std::stoi(argv[5]); + const int ngraphIters = std::stoi(argv[6]); + const int enableNpKit = std::stoi(argv[7]); std::shared_ptr bootstrap; mscclpp::UniqueId id; @@ -103,11 +110,26 @@ int main(int argc, char* argv[]) { std::shared_ptr communicator = std::make_shared(bootstrap); std::shared_ptr executor = std::make_shared(communicator); + if (enableNpKit) { + NpKit::Init(rank); + } + mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath); std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); std::vector dataHost(bufferSize / sizeof(int), rank); MSCCLPP_CUDATHROW(cudaMemcpy(sendbuff.get(), dataHost.data(), bufferSize, cudaMemcpyHostToDevice)); - double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, 200, 20); + double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, niters, ngraphIters); + + if (enableNpKit) { + const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); + if (npkitDumpDir == nullptr) { + std::cerr << "NPKIT_DUMP_DIR is empty" << std::endl; + } else { + NpKit::Dump(npkitDumpDir); + } + NpKit::Shutdown(); + } + std::cout << "Rank " << rank << ": " << bufferSize << " bytes " << deltaSec * 1.e6 << " us" << std::endl; MPI_Finalize(); return 0; diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 4f2bc1b5f..5ab924116 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -2,152 +2,163 @@ # Licensed under the MIT License. import argparse -import json import os +import json +from queue import Queue def parse_npkit_event_header(npkit_event_header_path): - npkit_event_def = {"id_to_type": {}, "type_to_id": {}} - with open(npkit_event_header_path, "r") as f: + npkit_event_def = {'id_to_type': {}, 'type_to_id': {}} + executor_ops = [ + 'BARRIER', + 'PUT', + 'PUT_PACKET', + 'GET', + 'COPY', + 'COPY_PACKET', + 'SIGNAL', + 'WAIT', + 'FLUSH', + 'REDUCE', + 'REDUCE_PACKET', + 'REDUCE_SEND', + 'REDUCE_SEND_PACKET', + 'READ_REDUCE_COPY', + 'READ_REDUCE_COPY_SEND', + ] + executor_op_to_offset = {} + for executor_op in executor_ops: + executor_op_to_offset[executor_op] = len(executor_op_to_offset) + with open(npkit_event_header_path, 'r') as f: lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0] line_idx = 0 while line_idx < len(lines): - if lines[line_idx].startswith("#define NPKIT_EVENT_"): + if lines[line_idx].startswith('#define NPKIT_EVENT_'): fields = lines[line_idx].split() if len(fields) == 3: event_type = fields[1] event_id = int(fields[2], 0) - npkit_event_def["type_to_id"][event_type] = event_id - npkit_event_def["id_to_type"][event_id] = event_type + if lines[line_idx].startswith('#define NPKIT_EVENT_EXECUTOR_OP_BASE'): + for executor_op in executor_op_to_offset: + real_event_id = event_id + executor_op_to_offset[executor_op] + if 'ENTRY' in lines[line_idx]: + event_type = 'NPKIT_EVENT_EXECUTOR_%s_ENTRY' % executor_op + elif 'EXIT' in lines[line_idx]: + event_type = 'NPKIT_EVENT_EXECUTOR_%s_EXIT' % executor_op + npkit_event_def['type_to_id'][event_type] = real_event_id + npkit_event_def['id_to_type'][real_event_id] = event_type + else: + npkit_event_def['type_to_id'][event_type] = event_id + npkit_event_def['id_to_type'][event_id] = event_type line_idx += 1 return npkit_event_def - -def trim_event_name(event_type): - list_event_type_name = event_type.split("_") - if "NPKIT" in list_event_type_name: - list_event_type_name.remove("NPKIT") - if "EVENT" in list_event_type_name: - list_event_type_name.remove("EVENT") - if "ENTRY" in list_event_type_name: - list_event_type_name.remove("ENTRY") - return "_".join(list_event_type_name) - - def parse_gpu_clock_scale(gpu_clock_file_path): - with open(gpu_clock_file_path, "r") as f: + with open(gpu_clock_file_path, 'r') as f: freq_in_khz = f.read() return float(freq_in_khz) * 1e3 / 1e6 - def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path): - with open(cpu_clock_num_file_path, "r") as f: + with open(cpu_clock_num_file_path, 'r') as f: num = float(f.read()) - with open(cpu_clock_den_file_path, "r") as f: + with open(cpu_clock_den_file_path, 'r') as f: den = float(f.read()) return den / num / 1e6 - def parse_gpu_event(event_bytes): return { - "id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False), - "size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False), - "rsvd": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False), - "timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False), + 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), + 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), + 'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), + 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) } - def parse_cpu_event(event_bytes): return { - "id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False), - "size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False), - "slot": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False), - "timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False), + 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), + 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), + 'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), + 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) } - def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale): - gpu_event_file_path = os.path.join(npkit_dump_dir, "gpu_events_rank_%d_buf_%d" % (rank, buf_idx)) + gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx)) raw_event_size = 16 curr_cpu_base_time = None curr_gpu_base_time = None gpu_events = [] event_type_to_seq = {} - with open(gpu_event_file_path, "rb") as f: + with open(gpu_event_file_path, 'rb') as f: raw_content = f.read() raw_content_size = len(raw_content) raw_content_idx = 0 while raw_content_idx < raw_content_size: parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) - if npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_CPU": - curr_cpu_base_time = parsed_gpu_event["timestamp"] / cpu_clock_scale + if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU': + curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scale curr_gpu_base_time = None - elif npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_GPU": + elif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU': if curr_gpu_base_time is None: - curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale + curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale else: if curr_gpu_base_time is None: - curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale - event_type = npkit_event_def["id_to_type"][parsed_gpu_event["id"]] - phase = "B" if event_type.endswith("_ENTRY") else "E" - gpu_events.append( - { - "ph": phase, - "ts": curr_cpu_base_time + parsed_gpu_event["timestamp"] / gpu_clock_scale - curr_gpu_base_time, - "pid": rank, - "tid": buf_idx + 1, - } - ) - if phase == "B": + curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale + event_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']] + phase = 'B' if event_type.endswith('_ENTRY') else 'E' + gpu_events.append({ + 'ph': phase, + 'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time, + 'pid': rank, + 'tid': buf_idx + 1 + }) + if phase == 'B': if event_type not in event_type_to_seq: event_type_to_seq[event_type] = 0 - gpu_events[-1].update( - { - "name": trim_event_name(event_type), - "cat": "GPU", - "args": { - "rank": rank, - "buf_idx": buf_idx, - "seq": event_type_to_seq[event_type], - "rsvd_0": parsed_gpu_event["rsvd"], - "size_0": parsed_gpu_event["size"], - }, + gpu_events[-1].update({ + 'name': event_type, + 'cat': 'GPU', + 'args': { + 'rank': rank, + 'buf_idx': buf_idx, + 'seq': event_type_to_seq[event_type], + 'rsvd_0': parsed_gpu_event['rsvd'], + 'size_0': parsed_gpu_event['size'] } - ) + }) event_type_to_seq[event_type] += 1 else: - gpu_events[-1]["args"] = { - "size": parsed_gpu_event["size"], - "rsvd": parsed_gpu_event["rsvd"], - } - delta_time = gpu_events[-1]["ts"] - gpu_events[-2]["ts"] - gpu_events[-1]["args"]["bw (GB/s)"] = gpu_events[-1]["args"]["size"] / delta_time / 1e3 + gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']} + delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts'] + gpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else gpu_events[-1]['args']['size'] / delta_time / 1e3 raw_content_idx += raw_event_size return gpu_events - def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale): - cpu_event_file_path = os.path.join(npkit_dump_dir, "cpu_events_rank_%d_channel_%d" % (rank, channel)) + cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel)) raw_event_size = 16 cpu_events = [] event_type_to_seq = {} fiber_is_usable = [] - fiber_open_info = [] + fiber_open_ts = [] slot_to_fiber_id = {} channel_shift = 1000 - with open(cpu_event_file_path, "rb") as f: + with open(cpu_event_file_path, 'rb') as f: raw_content = f.read() raw_content_size = len(raw_content) raw_content_idx = 0 while raw_content_idx < raw_content_size: parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) - event_type = npkit_event_def["id_to_type"][parsed_cpu_event["id"]] - phase = "B" if event_type.endswith("_ENTRY") else "E" - cpu_events.append({"ph": phase, "ts": parsed_cpu_event["timestamp"] / cpu_clock_scale, "pid": rank}) - slot = parsed_cpu_event["slot"] - if phase == "B": + event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']] + phase = 'B' if event_type.endswith('_ENTRY') else 'E' + cpu_events.append({ + 'ph': phase, + 'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale, + 'pid': rank + }) + slot = parsed_cpu_event['slot'] + if phase == 'B': # Open fiber event fiber_id = 0 while fiber_id < len(fiber_is_usable): @@ -156,96 +167,80 @@ def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clo fiber_id += 1 if fiber_id == len(fiber_is_usable): fiber_is_usable.append(True) - fiber_open_info.append({"ts": 0.0, "size": 0}) + fiber_open_ts.append(0.0) slot_to_fiber_id[slot] = fiber_id - fiber_open_info[fiber_id]["ts"] = cpu_events[-1]["ts"] - fiber_open_info[fiber_id]["size"] = parsed_cpu_event["size"] + fiber_open_ts[fiber_id] = cpu_events[-1]['ts'] fiber_is_usable[fiber_id] = False if event_type not in event_type_to_seq: event_type_to_seq[event_type] = 0 - cpu_events[-1].update( - { - "name": trim_event_name(event_type), - "cat": "CPU", - "args": { - "rank": rank, - "channel": channel, - "slot": parsed_cpu_event["slot"], - "seq": event_type_to_seq[event_type], - "size_0": parsed_cpu_event["size"], - }, + cpu_events[-1].update({ + 'name': event_type, + 'cat': 'CPU', + 'args': { + 'rank': rank, + 'channel': channel, + 'slot': parsed_cpu_event['slot'], + 'seq': event_type_to_seq[event_type], + 'size_0': parsed_cpu_event['size'] } - ) + }) event_type_to_seq[event_type] += 1 else: # Close fiber event fiber_id = slot_to_fiber_id[slot] slot_to_fiber_id.pop(slot) - last_ts = fiber_open_info[fiber_id]["ts"] - last_size = fiber_open_info[fiber_id]["size"] + last_ts = fiber_open_ts[fiber_id] fiber_is_usable[fiber_id] = True - delta_time = max(0.001, cpu_events[-1]["ts"] - last_ts) - cpu_events[-1]["args"] = { - "size_1": parsed_cpu_event["size"], - "size": max(last_size, parsed_cpu_event["size"]), - } - cpu_events[-1]["args"]["bw (GB/s)"] = cpu_events[-1]["args"]["size"] / delta_time / 1e3 + delta_time = max(0.001, cpu_events[-1]['ts'] - last_ts) + cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']} + cpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else cpu_events[-1]['args']['size'] / delta_time / 1e3 - cpu_events[-1]["tid"] = fiber_id + (channel + 1) * channel_shift + cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shift raw_content_idx += raw_event_size return cpu_events - def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def): files_in_dump_dir = next(os.walk(npkit_dump_dir))[2] - gpu_event_files = [x for x in files_in_dump_dir if x.startswith("gpu_events_rank_")] - cpu_event_files = [x for x in files_in_dump_dir if x.startswith("cpu_events_rank_")] + gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')] + cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')] - ranks = list(set([int(x.split("_rank_")[1].split("_")[0]) for x in gpu_event_files])) - buf_indices = list(set([int(x.split("_buf_")[1].split("_")[0]) for x in gpu_event_files])) - channels = list(set([int(x.split("_channel_")[1].split("_")[0]) for x in cpu_event_files])) + ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files])) + buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files])) + channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files])) - trace = {"traceEvents": []} + trace = {'traceEvents': []} for rank in ranks: - cpu_clock_den_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_den_rank_%d" % rank) - cpu_clock_num_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_num_rank_%d" % rank) + cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank) + cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank) cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path) - gpu_clock_file_path = os.path.join(npkit_dump_dir, "gpu_clock_rate_rank_%d" % rank) + gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank) gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path) for buf_idx in buf_indices: - gpu_events = parse_gpu_event_file( - npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale - ) - trace["traceEvents"].extend(gpu_events) + gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale) + trace['traceEvents'].extend(gpu_events) for channel in channels: cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale) - trace["traceEvents"].extend(cpu_events) + trace['traceEvents'].extend(cpu_events) - trace["traceEvents"].sort(key=lambda x: x["ts"]) - trace["displayTimeUnit"] = "ns" + trace['traceEvents'].sort(key=lambda x : x['ts']) + trace['displayTimeUnit'] = 'ns' os.makedirs(output_dir, exist_ok=True) - with open(os.path.join(output_dir, "npkit_event_trace.json"), "w") as f: + with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f: json.dump(trace, f) - -if __name__ == "__main__": +if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument("--npkit_dump_dir", type=str, required=True, help="NPKit dump directory.") - parser.add_argument( - "--npkit_event_header_path", - type=str, - required=True, - help="Path to npkit_event.h.", - ) - parser.add_argument("--output_dir", type=str, required=True, help="Path to output directory.") + parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.') + parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.') + parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.') args = parser.parse_args() npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path) From ff491d5a7d092bc1a7c1daa2be16e8340c191a11 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 08:20:37 +0000 Subject: [PATCH 02/16] fix bug --- include/mscclpp/npkit/npkit.hpp | 3 +++ src/executor/execution_kernel.cu | 2 ++ 2 files changed, 5 insertions(+) diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp index ca4e7bf68..baa560ce8 100644 --- a/include/mscclpp/npkit/npkit.hpp +++ b/include/mscclpp/npkit/npkit.hpp @@ -5,6 +5,7 @@ #define NPKIT_H_ #include +#include #include #include @@ -33,6 +34,7 @@ class NpKit { static NpKitEventCollectContext* GetGpuEventCollectContexts(); +#if defined(MSCCLPP_DEVICE_COMPILE) static inline __device__ void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, NpKitEvent* event_buffer, uint64_t* event_buffer_head) { if (*event_buffer_head < NPKIT_SHM_NUM_EVENTS) { @@ -46,6 +48,7 @@ class NpKit { (*event_buffer_head)++; } } +#endif static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 4f2138f68..6831ab572 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +#if defined(ENABLE_NPKIT) #include +#endif #include "execution_kernel.hpp" From dd1f7c5e21ea9665190c3e03d991197ffc09a967 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 08:43:52 +0000 Subject: [PATCH 03/16] fix lint --- include/mscclpp/npkit/npkit_event.hpp | 14 +- src/executor/execution_kernel.cu | 20 ++- src/include/execution_kernel.hpp | 56 +++--- src/npkit/npkit.cc | 6 +- test/executor_test.cc | 11 +- tools/npkit/npkit_trace_generator.py | 241 ++++++++++++++------------ 6 files changed, 185 insertions(+), 163 deletions(-) diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp index 22d2ecc22..da0206c0f 100644 --- a/include/mscclpp/npkit/npkit_event.hpp +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -4,15 +4,15 @@ #ifndef NPKIT_EVENT_H_ #define NPKIT_EVENT_H_ -#define NPKIT_EVENT_INVALID 0x0 +#define NPKIT_EVENT_INVALID 0x0 -#define NPKIT_EVENT_TIME_SYNC_GPU 0x1 -#define NPKIT_EVENT_TIME_SYNC_CPU 0x2 +#define NPKIT_EVENT_TIME_SYNC_GPU 0x1 +#define NPKIT_EVENT_TIME_SYNC_CPU 0x2 -#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3 -#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 +#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3 +#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15 #endif diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 6831ab572..91ba6d279 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -19,36 +19,40 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo executionKernel<<>>( rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::UINT32: executionKernel<<>>( rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::FLOAT16: executionKernel<<>>( rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::FLOAT32: executionKernel<<>>( rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; } diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index e56621557..26c69c17c 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -337,17 +337,19 @@ template __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* input, T* output, T* scratch, size_t scratchSize, DeviceExecutionPlan* plan, uint32_t flag #if defined(ENABLE_NPKIT) - , NpKitEventCollectContext* npKitEventCollectContexts, uint64_t* cpuTimestamp) { + , + NpKitEventCollectContext* npKitEventCollectContexts, uint64_t* cpuTimestamp) { #else - ) { +) { #endif extern __shared__ int4 sharedMem[]; int bid = blockIdx.x; int tid = threadIdx.x; -#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && \ + defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) uint64_t npkit_timestamp_entry = 0; if (tid == 0) { - npkit_timestamp_entry = NPKIT_GET_GPU_TIMESTAMP(); + npkit_timestamp_entry = NPKIT_GET_GPU_TIMESTAMP(); } #endif DeviceExecutionPlan* localPlan = plan + bid; @@ -367,25 +369,25 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu DeviceHandle* proxyChannels = localPlan->channels.proxyChannels; #if defined(ENABLE_NPKIT) - NpKitEvent* event_buffer = (NpKitEvent *)((char *)sharedMem + sizeof(DeviceExecutionPlan)); + NpKitEvent* event_buffer = (NpKitEvent*)((char*)sharedMem + sizeof(DeviceExecutionPlan)); uint64_t event_buffer_head = 0; #endif #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) - NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, - event_buffer, &event_buffer_head); + NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, event_buffer, &event_buffer_head); #endif #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) - NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), - event_buffer, &event_buffer_head); + NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), event_buffer, + &event_buffer_head); #endif -#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) - NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_ENTRY, 0, 0, npkit_timestamp_entry, - event_buffer, &event_buffer_head); - NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_EXIT, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), - event_buffer, &event_buffer_head); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && \ + defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_ENTRY, 0, 0, npkit_timestamp_entry, event_buffer, + &event_buffer_head); + NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_EXIT, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), event_buffer, + &event_buffer_head); #endif for (int i = 0; i < nOperations; i++) { @@ -393,7 +395,7 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY) NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(), - event_buffer, &event_buffer_head); + event_buffer, &event_buffer_head); #endif if (op.type == OperationType::BARRIER) { @@ -442,9 +444,8 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT) NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(), - event_buffer, &event_buffer_head); + event_buffer, &event_buffer_head); #endif - } #if defined(ENABLE_NPKIT) @@ -463,7 +464,6 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu npKitCtx->event_buffer_head += event_buffer_head; } #endif - } #endif // defined(MSCCLPP_DEVICE_COMPILE) @@ -479,36 +479,40 @@ class ExecutionKernel { executionKernel<<>>( rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::UINT32: executionKernel<<>>( rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::FLOAT16: executionKernel<<>>( rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; case DataType::FLOAT32: executionKernel<<>>( rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag #if defined(ENABLE_NPKIT) - , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else - ); + ); #endif break; } diff --git a/src/npkit/npkit.cc b/src/npkit/npkit.cc index 19dd89349..67f725fb2 100644 --- a/src/npkit/npkit.cc +++ b/src/npkit/npkit.cc @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include -#include #include +#include +#include #include #include @@ -74,7 +74,7 @@ static int GetGpuClockRateInKhz() { char gcn_arch[256]; MSCCLPP_CUDATHROW(hipGetDevice(&dev_id)); MSCCLPP_CUDATHROW(hipGetDeviceProperties(&dev_prop, dev_id)); - char *gcnArchNameToken = strtok(dev_prop.gcnArchName, ":"); + char* gcnArchNameToken = strtok(dev_prop.gcnArchName, ":"); strcpy(gcn_arch, gcnArchNameToken); if (strncmp("gfx94", gcn_arch, 5) == 0) return 100000; diff --git a/test/executor_test.cc b/test/executor_test.cc index 90d0ac22c..a40ca42f4 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -76,12 +76,8 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s int main(int argc, char* argv[]) { if (argc != 8) { - std::cerr << "Usage: " << argv[0] << " " - << " " - << " " - << " " - << " " - << " " + std::cerr << "Usage: " << argv[0] << " " << " " << " " + << " " << " " << " " << " " << std::endl; return 1; } @@ -118,7 +114,8 @@ int main(int argc, char* argv[]) { std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); std::vector dataHost(bufferSize / sizeof(int), rank); MSCCLPP_CUDATHROW(cudaMemcpy(sendbuff.get(), dataHost.data(), bufferSize, cudaMemcpyHostToDevice)); - double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, niters, ngraphIters); + double deltaSec = + benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, niters, ngraphIters); if (enableNpKit) { const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 5ab924116..8c15a3ac0 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -7,134 +7,147 @@ from queue import Queue + def parse_npkit_event_header(npkit_event_header_path): - npkit_event_def = {'id_to_type': {}, 'type_to_id': {}} + npkit_event_def = {"id_to_type": {}, "type_to_id": {}} executor_ops = [ - 'BARRIER', - 'PUT', - 'PUT_PACKET', - 'GET', - 'COPY', - 'COPY_PACKET', - 'SIGNAL', - 'WAIT', - 'FLUSH', - 'REDUCE', - 'REDUCE_PACKET', - 'REDUCE_SEND', - 'REDUCE_SEND_PACKET', - 'READ_REDUCE_COPY', - 'READ_REDUCE_COPY_SEND', + "BARRIER", + "PUT", + "PUT_PACKET", + "GET", + "COPY", + "COPY_PACKET", + "SIGNAL", + "WAIT", + "FLUSH", + "REDUCE", + "REDUCE_PACKET", + "REDUCE_SEND", + "REDUCE_SEND_PACKET", + "READ_REDUCE_COPY", + "READ_REDUCE_COPY_SEND", ] executor_op_to_offset = {} for executor_op in executor_ops: executor_op_to_offset[executor_op] = len(executor_op_to_offset) - with open(npkit_event_header_path, 'r') as f: + with open(npkit_event_header_path, "r") as f: lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0] line_idx = 0 while line_idx < len(lines): - if lines[line_idx].startswith('#define NPKIT_EVENT_'): + if lines[line_idx].startswith("#define NPKIT_EVENT_"): fields = lines[line_idx].split() if len(fields) == 3: event_type = fields[1] event_id = int(fields[2], 0) - if lines[line_idx].startswith('#define NPKIT_EVENT_EXECUTOR_OP_BASE'): + if lines[line_idx].startswith("#define NPKIT_EVENT_EXECUTOR_OP_BASE"): for executor_op in executor_op_to_offset: real_event_id = event_id + executor_op_to_offset[executor_op] - if 'ENTRY' in lines[line_idx]: - event_type = 'NPKIT_EVENT_EXECUTOR_%s_ENTRY' % executor_op - elif 'EXIT' in lines[line_idx]: - event_type = 'NPKIT_EVENT_EXECUTOR_%s_EXIT' % executor_op - npkit_event_def['type_to_id'][event_type] = real_event_id - npkit_event_def['id_to_type'][real_event_id] = event_type + if "ENTRY" in lines[line_idx]: + event_type = "NPKIT_EVENT_EXECUTOR_%s_ENTRY" % executor_op + elif "EXIT" in lines[line_idx]: + event_type = "NPKIT_EVENT_EXECUTOR_%s_EXIT" % executor_op + npkit_event_def["type_to_id"][event_type] = real_event_id + npkit_event_def["id_to_type"][real_event_id] = event_type else: - npkit_event_def['type_to_id'][event_type] = event_id - npkit_event_def['id_to_type'][event_id] = event_type + npkit_event_def["type_to_id"][event_type] = event_id + npkit_event_def["id_to_type"][event_id] = event_type line_idx += 1 return npkit_event_def + def parse_gpu_clock_scale(gpu_clock_file_path): - with open(gpu_clock_file_path, 'r') as f: + with open(gpu_clock_file_path, "r") as f: freq_in_khz = f.read() return float(freq_in_khz) * 1e3 / 1e6 + def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path): - with open(cpu_clock_num_file_path, 'r') as f: + with open(cpu_clock_num_file_path, "r") as f: num = float(f.read()) - with open(cpu_clock_den_file_path, 'r') as f: + with open(cpu_clock_den_file_path, "r") as f: den = float(f.read()) return den / num / 1e6 + def parse_gpu_event(event_bytes): return { - 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), - 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), - 'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), - 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) + "id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False), + "size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False), + "rsvd": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False), + "timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False), } + def parse_cpu_event(event_bytes): return { - 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), - 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), - 'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), - 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) + "id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False), + "size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False), + "slot": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False), + "timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False), } + def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale): - gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx)) + gpu_event_file_path = os.path.join(npkit_dump_dir, "gpu_events_rank_%d_buf_%d" % (rank, buf_idx)) raw_event_size = 16 curr_cpu_base_time = None curr_gpu_base_time = None gpu_events = [] event_type_to_seq = {} - with open(gpu_event_file_path, 'rb') as f: + with open(gpu_event_file_path, "rb") as f: raw_content = f.read() raw_content_size = len(raw_content) raw_content_idx = 0 while raw_content_idx < raw_content_size: parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) - if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU': - curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scale + if npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_CPU": + curr_cpu_base_time = parsed_gpu_event["timestamp"] / cpu_clock_scale curr_gpu_base_time = None - elif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU': + elif npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_GPU": if curr_gpu_base_time is None: - curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale + curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale else: if curr_gpu_base_time is None: - curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale - event_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']] - phase = 'B' if event_type.endswith('_ENTRY') else 'E' - gpu_events.append({ - 'ph': phase, - 'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time, - 'pid': rank, - 'tid': buf_idx + 1 - }) - if phase == 'B': + curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale + event_type = npkit_event_def["id_to_type"][parsed_gpu_event["id"]] + phase = "B" if event_type.endswith("_ENTRY") else "E" + gpu_events.append( + { + "ph": phase, + "ts": curr_cpu_base_time + parsed_gpu_event["timestamp"] / gpu_clock_scale - curr_gpu_base_time, + "pid": rank, + "tid": buf_idx + 1, + } + ) + if phase == "B": if event_type not in event_type_to_seq: event_type_to_seq[event_type] = 0 - gpu_events[-1].update({ - 'name': event_type, - 'cat': 'GPU', - 'args': { - 'rank': rank, - 'buf_idx': buf_idx, - 'seq': event_type_to_seq[event_type], - 'rsvd_0': parsed_gpu_event['rsvd'], - 'size_0': parsed_gpu_event['size'] + gpu_events[-1].update( + { + "name": event_type, + "cat": "GPU", + "args": { + "rank": rank, + "buf_idx": buf_idx, + "seq": event_type_to_seq[event_type], + "rsvd_0": parsed_gpu_event["rsvd"], + "size_0": parsed_gpu_event["size"], + }, } - }) + ) event_type_to_seq[event_type] += 1 else: - gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']} - delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts'] - gpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else gpu_events[-1]['args']['size'] / delta_time / 1e3 + gpu_events[-1]["args"] = {"size": parsed_gpu_event["size"], "rsvd": parsed_gpu_event["rsvd"]} + delta_time = gpu_events[-1]["ts"] - gpu_events[-2]["ts"] + gpu_events[-1]["args"]["bw (GB/s)"] = ( + 0.0 if delta_time == 0.0 else gpu_events[-1]["args"]["size"] / delta_time / 1e3 + ) raw_content_idx += raw_event_size return gpu_events + def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale): - cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel)) + cpu_event_file_path = os.path.join(npkit_dump_dir, "cpu_events_rank_%d_channel_%d" % (rank, channel)) raw_event_size = 16 cpu_events = [] event_type_to_seq = {} @@ -144,21 +157,17 @@ def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clo slot_to_fiber_id = {} channel_shift = 1000 - with open(cpu_event_file_path, 'rb') as f: + with open(cpu_event_file_path, "rb") as f: raw_content = f.read() raw_content_size = len(raw_content) raw_content_idx = 0 while raw_content_idx < raw_content_size: parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) - event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']] - phase = 'B' if event_type.endswith('_ENTRY') else 'E' - cpu_events.append({ - 'ph': phase, - 'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale, - 'pid': rank - }) - slot = parsed_cpu_event['slot'] - if phase == 'B': + event_type = npkit_event_def["id_to_type"][parsed_cpu_event["id"]] + phase = "B" if event_type.endswith("_ENTRY") else "E" + cpu_events.append({"ph": phase, "ts": parsed_cpu_event["timestamp"] / cpu_clock_scale, "pid": rank}) + slot = parsed_cpu_event["slot"] + if phase == "B": # Open fiber event fiber_id = 0 while fiber_id < len(fiber_is_usable): @@ -169,22 +178,24 @@ def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clo fiber_is_usable.append(True) fiber_open_ts.append(0.0) slot_to_fiber_id[slot] = fiber_id - fiber_open_ts[fiber_id] = cpu_events[-1]['ts'] + fiber_open_ts[fiber_id] = cpu_events[-1]["ts"] fiber_is_usable[fiber_id] = False if event_type not in event_type_to_seq: event_type_to_seq[event_type] = 0 - cpu_events[-1].update({ - 'name': event_type, - 'cat': 'CPU', - 'args': { - 'rank': rank, - 'channel': channel, - 'slot': parsed_cpu_event['slot'], - 'seq': event_type_to_seq[event_type], - 'size_0': parsed_cpu_event['size'] + cpu_events[-1].update( + { + "name": event_type, + "cat": "CPU", + "args": { + "rank": rank, + "channel": channel, + "slot": parsed_cpu_event["slot"], + "seq": event_type_to_seq[event_type], + "size_0": parsed_cpu_event["size"], + }, } - }) + ) event_type_to_seq[event_type] += 1 else: # Close fiber event @@ -193,54 +204,60 @@ def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clo last_ts = fiber_open_ts[fiber_id] fiber_is_usable[fiber_id] = True - delta_time = max(0.001, cpu_events[-1]['ts'] - last_ts) - cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']} - cpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else cpu_events[-1]['args']['size'] / delta_time / 1e3 + delta_time = max(0.001, cpu_events[-1]["ts"] - last_ts) + cpu_events[-1]["args"] = {"size": parsed_cpu_event["size"]} + cpu_events[-1]["args"]["bw (GB/s)"] = ( + 0.0 if delta_time == 0.0 else cpu_events[-1]["args"]["size"] / delta_time / 1e3 + ) - cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shift + cpu_events[-1]["tid"] = fiber_id + (channel + 1) * channel_shift raw_content_idx += raw_event_size return cpu_events + def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def): files_in_dump_dir = next(os.walk(npkit_dump_dir))[2] - gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')] - cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')] + gpu_event_files = [x for x in files_in_dump_dir if x.startswith("gpu_events_rank_")] + cpu_event_files = [x for x in files_in_dump_dir if x.startswith("cpu_events_rank_")] - ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files])) - buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files])) - channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files])) + ranks = list(set([int(x.split("_rank_")[1].split("_")[0]) for x in gpu_event_files])) + buf_indices = list(set([int(x.split("_buf_")[1].split("_")[0]) for x in gpu_event_files])) + channels = list(set([int(x.split("_channel_")[1].split("_")[0]) for x in cpu_event_files])) - trace = {'traceEvents': []} + trace = {"traceEvents": []} for rank in ranks: - cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank) - cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank) + cpu_clock_den_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_den_rank_%d" % rank) + cpu_clock_num_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_num_rank_%d" % rank) cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path) - gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank) + gpu_clock_file_path = os.path.join(npkit_dump_dir, "gpu_clock_rate_rank_%d" % rank) gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path) for buf_idx in buf_indices: - gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale) - trace['traceEvents'].extend(gpu_events) + gpu_events = parse_gpu_event_file( + npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale + ) + trace["traceEvents"].extend(gpu_events) for channel in channels: cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale) - trace['traceEvents'].extend(cpu_events) + trace["traceEvents"].extend(cpu_events) - trace['traceEvents'].sort(key=lambda x : x['ts']) - trace['displayTimeUnit'] = 'ns' + trace["traceEvents"].sort(key=lambda x: x["ts"]) + trace["displayTimeUnit"] = "ns" os.makedirs(output_dir, exist_ok=True) - with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f: + with open(os.path.join(output_dir, "npkit_event_trace.json"), "w") as f: json.dump(trace, f) -if __name__ == '__main__': + +if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.') - parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.') - parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.') + parser.add_argument("--npkit_dump_dir", type=str, required=True, help="NPKit dump directory.") + parser.add_argument("--npkit_event_header_path", type=str, required=True, help="Path to npkit_event.h.") + parser.add_argument("--output_dir", type=str, required=True, help="Path to output directory.") args = parser.parse_args() npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path) From 15758ec87886e8b8b467936b7b0dd8ca9a74db8c Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 08:52:05 +0000 Subject: [PATCH 04/16] fix lint --- include/mscclpp/npkit/npkit.hpp | 7 +++---- test/executor_test.cc | 8 ++++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp index baa560ce8..3560d503f 100644 --- a/include/mscclpp/npkit/npkit.hpp +++ b/include/mscclpp/npkit/npkit.hpp @@ -4,13 +4,12 @@ #ifndef NPKIT_H_ #define NPKIT_H_ -#include -#include -#include - #include #include #include +#include +#include +#include #if defined(__HIP_PLATFORM_AMD__) #define NPKIT_GET_GPU_TIMESTAMP wall_clock64 diff --git a/test/executor_test.cc b/test/executor_test.cc index a40ca42f4..d1700675d 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -76,8 +76,12 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s int main(int argc, char* argv[]) { if (argc != 8) { - std::cerr << "Usage: " << argv[0] << " " << " " << " " - << " " << " " << " " + std::cerr << "Usage: " << argv[0] << " " + << " " + << " " + << " " + << " " + << " " << " " << std::endl; return 1; } From 49dbc1b75f3f5dd5e4f79fd3672fb242f7fe1007 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 13:45:20 +0000 Subject: [PATCH 05/16] add tests --- .azure-pipelines/ut.yml | 72 +++++++++++++++++++++++++++++++++ include/mscclpp/npkit/npkit.hpp | 1 + python/mscclpp/__init__.py | 1 + python/mscclpp/core_py.cpp | 2 + python/mscclpp/npkit_py.cpp | 15 +++++++ python/test/executor_test.py | 7 ++++ python/test/test_mscclpp.py | 7 ++++ test/executor_test.cc | 18 +++------ test/mp_unit/executor_tests.cc | 9 +++++ test/mp_unit/mp_unit_tests.hpp | 1 + 10 files changed, 121 insertions(+), 12 deletions(-) create mode 100644 python/mscclpp/npkit_py.cpp diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 78b679e8d..ccaaf761e 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -79,3 +79,75 @@ jobs: export PATH=/usr/local/mpi/bin:$PATH mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: BuildWithNpKit + displayName: Build with NPKit + inputs: + targetType: 'inline' + script: | + rm -rf build && mkdir build && cd build + cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: MpUnitTestsWithNpKit + displayName: Run mscclpp multi-process unit tests (executor part) with NPKit + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_EXIT ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: PyTestsWithNpKit + displayName: Run pytests (executor part) with NPKit + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_EXIT ./npkit_output/npkit_event_trace.json + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_EXIT ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_EXIT ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp index 3560d503f..36f1dbe09 100644 --- a/include/mscclpp/npkit/npkit.hpp +++ b/include/mscclpp/npkit/npkit.hpp @@ -4,6 +4,7 @@ #ifndef NPKIT_H_ #define NPKIT_H_ +#include #include #include #include diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index 0acc55fc5..c9df30cf1 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -25,6 +25,7 @@ PacketType, version, is_nvls_supported, + npkit, ) __version__ = version() diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index 8dc9df57b..a44256a0d 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -22,6 +22,7 @@ extern void register_utils(nb::module_& m); extern void register_numa(nb::module_& m); extern void register_nvls(nb::module_& m); extern void register_executor(nb::module_& m); +extern void register_npkit(nb::module_& m); template void def_nonblocking_future(nb::handle& m, const std::string& typestr) { @@ -189,4 +190,5 @@ NB_MODULE(_mscclpp, m) { register_numa(m); register_nvls(m); register_executor(m); + register_npkit(m); } diff --git a/python/mscclpp/npkit_py.cpp b/python/mscclpp/npkit_py.cpp new file mode 100644 index 000000000..71b591435 --- /dev/null +++ b/python/mscclpp/npkit_py.cpp @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include + +#include + +namespace nb = nanobind; + +void register_npkit(nb::module_ &m) { + nb::module_ sub_m = m.def_submodule("npkit", "NPKit functions"); + sub_m.def("init", &NpKit::Init); + sub_m.def("dump", &NpKit::Dump); + sub_m.def("shutdown", &NpKit::Shutdown); +} diff --git a/python/test/executor_test.py b/python/test/executor_test.py index d744a4c1a..50808280a 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -7,6 +7,7 @@ Executor, ExecutionPlan, PacketType, + npkit, ) import mscclpp.comm as mscclpp_comm @@ -87,6 +88,9 @@ def main( mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD) cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() executor = Executor(mscclpp_group.communicator) + npkit_dump_dir = os.getenv('NPKIT_DUMP_DIR') + if npkit_dump_dir is not None: + npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan(execution_paln_name, execution_plan_path) cp.random.seed(seed) @@ -119,6 +123,9 @@ def main( mscclpp_group.barrier() execution_time = bench_time(100, 10, executor_func) + if npkit_dump_dir is not None: + npkit.dump(npkit_dump_dir) + npkit.shutdown() print( f"Rank: {MPI.COMM_WORLD.rank} Execution time: {execution_time} us, " f"data size: {sendbuf.nbytes} bytes data type: {dtype().dtype.name} " diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index c6014b84e..c2345e014 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -24,6 +24,7 @@ TcpBootstrap, Transport, is_nvls_supported, + npkit, ) import mscclpp.comm as mscclpp_comm from mscclpp.utils import KernelBuilder, pack @@ -603,6 +604,9 @@ def test_executor(mpi_group: MpiGroup, filename: str): project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm) executor = Executor(mscclpp_group.communicator) + npkit_dump_dir = os.getenv('NPKIT_DUMP_DIR') + if npkit_dump_dir is not None: + npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename)) nelems = 1024 * 1024 @@ -629,3 +633,6 @@ def test_executor(mpi_group: MpiGroup, filename: str): ) stream.synchronize() assert cp.allclose(sendbuf, expected, atol=1e-3 * mpi_group.comm.size) + if npkit_dump_dir is not None: + npkit.dump(npkit_dump_dir) + npkit.shutdown() diff --git a/test/executor_test.cc b/test/executor_test.cc index d1700675d..24796dd4b 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -75,14 +75,13 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 8) { + if (argc != 7) { std::cerr << "Usage: " << argv[0] << " " << " " << " " << " " << " " - << " " - << " " << std::endl; + << " " << std::endl; return 1; } @@ -99,7 +98,7 @@ int main(int argc, char* argv[]) { const int nthreadsPerBlock = std::stoi(argv[4]); const int niters = std::stoi(argv[5]); const int ngraphIters = std::stoi(argv[6]); - const int enableNpKit = std::stoi(argv[7]); + const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); std::shared_ptr bootstrap; mscclpp::UniqueId id; @@ -110,7 +109,7 @@ int main(int argc, char* argv[]) { std::shared_ptr communicator = std::make_shared(bootstrap); std::shared_ptr executor = std::make_shared(communicator); - if (enableNpKit) { + if (npkitDumpDir != nullptr) { NpKit::Init(rank); } @@ -121,13 +120,8 @@ int main(int argc, char* argv[]) { double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, niters, ngraphIters); - if (enableNpKit) { - const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); - if (npkitDumpDir == nullptr) { - std::cerr << "NPKIT_DUMP_DIR is empty" << std::endl; - } else { - NpKit::Dump(npkitDumpDir); - } + if (npkitDumpDir != nullptr) { + NpKit::Dump(npkitDumpDir); NpKit::Shutdown(); } diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index fb1d104be..5baa2b67a 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -4,6 +4,7 @@ #include #include +#include #include "mp_unit_tests.hpp" @@ -30,9 +31,17 @@ void ExecutorTest::SetUp() { bootstrap->initialize(id); std::shared_ptr communicator = std::make_shared(bootstrap); executor = std::make_shared(communicator); + npkitDumpDir = getenv("NPKIT_DUMP_DIR"); + if (npkitDumpDir != nullptr) { + NpKit::Init(gEnv->rank); + } } void ExecutorTest::TearDown() { + if (npkitDumpDir != nullptr) { + NpKit::Dump(npkitDumpDir); + NpKit::Shutdown(); + } executor.reset(); MultiProcessTest::TearDown(); } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index e13a05104..8afa8e917 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -170,5 +170,6 @@ class ExecutorTest : public MultiProcessTest { void TearDown() override; std::shared_ptr executor; + const char* npkitDumpDir; }; #endif // MSCCLPP_MP_UNIT_TESTS_HPP_ From a20ada943b41f9bf9e085a0f476e5f25ef039be8 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 13:48:52 +0000 Subject: [PATCH 06/16] fix lint --- python/test/executor_test.py | 2 +- python/test/test_mscclpp.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 50808280a..3a0bd2d74 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -88,7 +88,7 @@ def main( mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD) cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() executor = Executor(mscclpp_group.communicator) - npkit_dump_dir = os.getenv('NPKIT_DUMP_DIR') + npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan(execution_paln_name, execution_plan_path) diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index c2345e014..4af3ddb36 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -604,7 +604,7 @@ def test_executor(mpi_group: MpiGroup, filename: str): project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm) executor = Executor(mscclpp_group.communicator) - npkit_dump_dir = os.getenv('NPKIT_DUMP_DIR') + npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename)) From 192ebe7bceeeb51f1ac37cb68ec9e9f3717df356 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 11 Jun 2024 14:54:21 +0000 Subject: [PATCH 07/16] fix bug --- src/executor/execution_kernel.cu | 4 ---- src/executor/executor.cc | 3 --- 2 files changed, 7 deletions(-) diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 91ba6d279..06079f439 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -1,10 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#if defined(ENABLE_NPKIT) -#include -#endif - #include "execution_kernel.hpp" #if defined(MSCCLPP_DEVICE_CUDA) diff --git a/src/executor/executor.cc b/src/executor/executor.cc index ce962e446..b1530c7ee 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -2,9 +2,6 @@ // Licensed under the MIT license. #include -#if defined(ENABLE_NPKIT) -#include -#endif #include #include #include From a3f3166a80ab368f318ca6ebb7100675dbdb72ad Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 00:28:06 +0800 Subject: [PATCH 08/16] Update execution_kernel.hpp --- src/include/execution_kernel.hpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 26c69c17c..7a5f34eeb 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -5,6 +5,9 @@ #define MSCCLPP_EXECUTION_KERNEL_HPP_ #include +#if defined(ENABLE_NPKIT) +#include +#endif #include #include #include @@ -18,10 +21,6 @@ #define __synclds() asm volatile("s_waitcnt lgkmcnt(0) \n s_barrier"); #endif // defined(MSCCLPP_DEVICE_HIP) -#if defined(ENABLE_NPKIT) -#include -#endif - namespace { template MSCCLPP_DEVICE_INLINE To bit_cast(const From& src) { From 94e4c5984d3469881a4744b7a0b3ab889f792671 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 01:25:45 +0000 Subject: [PATCH 09/16] fix pipeline --- .azure-pipelines/ut.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index ccaaf761e..6eb11ce62 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -103,8 +103,6 @@ jobs: export NPKIT_DUMP_DIR=./npkit_dump mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json @@ -127,8 +125,6 @@ jobs: export NPKIT_DUMP_DIR=./npkit_dump mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json @@ -140,8 +136,6 @@ jobs: rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_TIME_SYNC_CPU ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_TIME_SYNC_GPU ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json From 1a82ce6b430073cd024dd67ca3f39e2654c422e3 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 01:26:56 +0000 Subject: [PATCH 10/16] fix pipeline --- .azure-pipelines/ut.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 6eb11ce62..7d30531f3 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -104,13 +104,9 @@ jobs: mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_EXIT ./npkit_output/npkit_event_trace.json workingDirectory: '$(System.DefaultWorkingDirectory)' - task: Bash@3 @@ -126,22 +122,14 @@ jobs: mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_EXIT ./npkit_output/npkit_event_trace.json rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_INIT_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_EXIT ./npkit_output/npkit_event_trace.json grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_EXIT ./npkit_output/npkit_event_trace.json workingDirectory: '$(System.DefaultWorkingDirectory)' From ecefa5f5be58eaf8c59e6408a6bad3a81a57baf7 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 02:48:11 +0000 Subject: [PATCH 11/16] fix bug --- python/mscclpp/npkit_py.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/python/mscclpp/npkit_py.cpp b/python/mscclpp/npkit_py.cpp index 71b591435..0557b72d8 100644 --- a/python/mscclpp/npkit_py.cpp +++ b/python/mscclpp/npkit_py.cpp @@ -2,6 +2,7 @@ // Licensed under the MIT license. #include +#include #include From 83a83747147d38a6f39a24f8f2a99817c01b7f95 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 14:01:12 +0000 Subject: [PATCH 12/16] address comments 1 --- .azure-pipelines/ut-npkit.yml | 91 +++++++++++++++++++++++++++++++++++ .azure-pipelines/ut.yml | 54 --------------------- python/mscclpp/nvls_py.cpp | 4 -- src/npkit/npkit.cc | 10 ++-- 4 files changed, 96 insertions(+), 63 deletions(-) create mode 100644 .azure-pipelines/ut-npkit.yml diff --git a/.azure-pipelines/ut-npkit.yml b/.azure-pipelines/ut-npkit.yml new file mode 100644 index 000000000..4afe7abdc --- /dev/null +++ b/.azure-pipelines/ut-npkit.yml @@ -0,0 +1,91 @@ +trigger: +- main + +pr: + branches: + include: + - main + drafts: false + +jobs: +- job: UnitTestWithNpKit + timeoutInMinutes: 30 + pool: + name: mscclpp + strategy: + matrix: + cuda11: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8 + cuda12: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2 + + container: + image: $[ variables['containerImage'] ] + options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1 + + steps: + - task: Bash@3 + name: Build + displayName: Build + inputs: + targetType: 'inline' + script: | + mkdir build && cd build + cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: LockGPUClock + displayName: Lock GPU clock frequency + inputs: + targetType: 'inline' + script: | + sudo nvidia-smi -pm 1 + for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do + sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i + done + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: MpUnitTests + displayName: Run mscclpp multi-process unit tests + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: PyTests + displayName: Run pytests + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 7d30531f3..78b679e8d 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -79,57 +79,3 @@ jobs: export PATH=/usr/local/mpi/bin:$PATH mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: BuildWithNpKit - displayName: Build with NPKit - inputs: - targetType: 'inline' - script: | - rm -rf build && mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" .. - make -j - workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: MpUnitTestsWithNpKit - displayName: Run mscclpp multi-process unit tests (executor part) with NPKit - inputs: - targetType: 'inline' - script: | - set -e - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - export PATH=/usr/local/mpi/bin:$PATH - export NPKIT_DUMP_DIR=./npkit_dump - mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: PyTestsWithNpKit - displayName: Run pytests (executor part) with NPKit - inputs: - targetType: 'inline' - script: | - set -e - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - export PATH=/usr/local/mpi/bin:$PATH - export NPKIT_DUMP_DIR=./npkit_dump - mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/python/mscclpp/nvls_py.cpp b/python/mscclpp/nvls_py.cpp index 652d9e68a..819a7c6b0 100644 --- a/python/mscclpp/nvls_py.cpp +++ b/python/mscclpp/nvls_py.cpp @@ -34,9 +34,5 @@ void register_nvls(nb::module_& m) { .def("get_multicast_min_granularity", &NvlsConnection::getMultiCastMinGranularity); m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("allRanks"), -#if (USE_NVLS) nb::arg("bufferSize") = NvlsConnection::DefaultNvlsBufferSize); -#else - nb::arg("bufferSize") = 0); -#endif } diff --git a/src/npkit/npkit.cc b/src/npkit/npkit.cc index 67f725fb2..54bac9d62 100644 --- a/src/npkit/npkit.cc +++ b/src/npkit/npkit.cc @@ -62,7 +62,7 @@ void NpKit::Init(int rank) { cpu_timestamp_update_thread_should_stop_ = false; cpu_timestamp_update_thread_ = std::make_unique(CpuTimestampUpdateThread); #else - WARN("NpKit::Init(%d) : MSCCLpp library was not built with NPKit enabled.", rank); + WARN("NpKit::Init(%d) : MSCCLPP library was not built with NPKit enabled.", rank); #endif } @@ -70,10 +70,10 @@ void NpKit::Init(int rank) { static int GetGpuClockRateInKhz() { int dev_id; #if defined(__HIP_PLATFORM_AMD__) - hipDeviceProp_t dev_prop; + cudaDeviceProp_t dev_prop; char gcn_arch[256]; - MSCCLPP_CUDATHROW(hipGetDevice(&dev_id)); - MSCCLPP_CUDATHROW(hipGetDeviceProperties(&dev_prop, dev_id)); + MSCCLPP_CUDATHROW(cudaGetDevice(&dev_id)); + MSCCLPP_CUDATHROW(cudaGetDeviceProperties(&dev_prop, dev_id)); char* gcnArchNameToken = strtok(dev_prop.gcnArchName, ":"); strcpy(gcn_arch, gcnArchNameToken); if (strncmp("gfx94", gcn_arch, 5) == 0) @@ -148,7 +148,7 @@ void NpKit::Dump(const std::string& dump_dir) { gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length()); gpu_clock_rate_file.close(); #else - WARN("NpKit::Dump(%s) : MSCCLpp library was not built with NPKit enabled.", dump_dir.c_str()); + WARN("NpKit::Dump(%s) : MSCCLPP library was not built with NPKit enabled.", dump_dir.c_str()); #endif } From 742c28a9f98d42bcabc0404fd29211802612c6d7 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 14:24:21 +0000 Subject: [PATCH 13/16] address comments 2 --- include/mscclpp/npkit/npkit.hpp | 22 ++++++++++++++++++++-- src/include/execution_kernel.hpp | 27 ++++++--------------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp index 36f1dbe09..d0c7d7179 100644 --- a/include/mscclpp/npkit/npkit.hpp +++ b/include/mscclpp/npkit/npkit.hpp @@ -35,8 +35,8 @@ class NpKit { static NpKitEventCollectContext* GetGpuEventCollectContexts(); #if defined(MSCCLPP_DEVICE_COMPILE) - static inline __device__ void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, - NpKitEvent* event_buffer, uint64_t* event_buffer_head) { + static MSCCLPP_DEVICE_INLINE void CollectGpuEventShm(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, + NpKitEvent* event_buffer, uint64_t* event_buffer_head) { if (*event_buffer_head < NPKIT_SHM_NUM_EVENTS) { if (threadIdx.x == 0) { NpKitEvent& event = event_buffer[*event_buffer_head]; @@ -48,6 +48,24 @@ class NpKit { (*event_buffer_head)++; } } + + static MSCCLPP_DEVICE_INLINE void StoreGpuEventShm(NpKitEventCollectContext* npKitEventCollectContexts, + uint64_t event_buffer_head) { +#if defined(MSCCLPP_DEVICE_HIP) + __synclds(); +#else // !defined(MSCCLPP_DEVICE_HIP) + __syncthreads(); +#endif // !defined(MSCCLPP_DEVICE_HIP) + NpKitEventCollectContext* npKitCtx = npKitEventCollectContexts + blockIdx.x; + NpKitEvent* global_event_buffer = npKitCtx->event_buffer; + uint64_t global_event_buffer_head = npKitCtx->event_buffer_head; + for (size_t i = threadIdx.x; i < event_buffer_head * sizeof(NpKitEvent) / sizeof(int4); i += blockDim.x) { + ((int4*)(global_event_buffer + global_event_buffer_head))[i] = ((int4*)event_buffer)[i]; + } + if (threadIdx.x == 0) { + npKitCtx->event_buffer_head += event_buffer_head; + } + } #endif static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 07a7d64c1..7b8616089 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -346,12 +346,15 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu extern __shared__ int4 sharedMem[]; int bid = blockIdx.x; int tid = threadIdx.x; -#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && \ - defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) +#if defined(ENABLE_NPKIT) + NpKitEvent* event_buffer = (NpKitEvent*)((char*)sharedMem + sizeof(DeviceExecutionPlan)); + uint64_t event_buffer_head = 0; +#if defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT) uint64_t npkit_timestamp_entry = 0; if (tid == 0) { npkit_timestamp_entry = NPKIT_GET_GPU_TIMESTAMP(); } +#endif #endif DeviceExecutionPlan* localPlan = plan + bid; for (size_t i = tid; i < sizeof(DeviceExecutionPlan) / sizeof(int4); i += blockDim.x) { @@ -369,11 +372,6 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu DeviceHandle* smChannels = localPlan->channels.smChannels; DeviceHandle* proxyChannels = localPlan->channels.proxyChannels; -#if defined(ENABLE_NPKIT) - NpKitEvent* event_buffer = (NpKitEvent*)((char*)sharedMem + sizeof(DeviceExecutionPlan)); - uint64_t event_buffer_head = 0; -#endif - #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, event_buffer, &event_buffer_head); #endif @@ -456,20 +454,7 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu } #if defined(ENABLE_NPKIT) -#if defined(MSCCLPP_DEVICE_HIP) - __synclds(); -#else // !defined(MSCCLPP_DEVICE_HIP) - __syncthreads(); -#endif // !defined(MSCCLPP_DEVICE_HIP) - NpKitEventCollectContext* npKitCtx = npKitEventCollectContexts + bid; - NpKitEvent* global_event_buffer = npKitCtx->event_buffer; - uint64_t global_event_buffer_head = npKitCtx->event_buffer_head; - for (size_t i = tid; i < event_buffer_head * sizeof(NpKitEvent) / sizeof(int4); i += blockDim.x) { - ((int4*)(global_event_buffer + global_event_buffer_head))[i] = ((int4*)event_buffer)[i]; - } - if (tid == 0) { - npKitCtx->event_buffer_head += event_buffer_head; - } + NpKit::StoreGpuEventShm(npKitEventCollectContexts, event_buffer_head); #endif } #endif // defined(MSCCLPP_DEVICE_COMPILE) From 5d7c5aa27b681f3f777cc7483c3328b41e4084b9 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 12 Jun 2024 14:32:40 +0000 Subject: [PATCH 14/16] fix whitespace --- src/include/execution_kernel.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 7b8616089..03ef13426 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -365,7 +365,6 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu #else // !defined(MSCCLPP_DEVICE_HIP) __syncthreads(); #endif // !defined(MSCCLPP_DEVICE_HIP) - localPlan = (DeviceExecutionPlan*)sharedMem; int nOperations = localPlan->nOperations; Operation* operations = localPlan->operations; From 951f9286c8c79d9ebb3c7bd65cd30d27625f521b Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Thu, 13 Jun 2024 03:08:57 +0000 Subject: [PATCH 15/16] address comments 3 --- .azure-pipelines/ut-npkit.yml | 91 ----------------------------------- .azure-pipelines/ut.yml | 82 +++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 91 deletions(-) delete mode 100644 .azure-pipelines/ut-npkit.yml diff --git a/.azure-pipelines/ut-npkit.yml b/.azure-pipelines/ut-npkit.yml deleted file mode 100644 index 4afe7abdc..000000000 --- a/.azure-pipelines/ut-npkit.yml +++ /dev/null @@ -1,91 +0,0 @@ -trigger: -- main - -pr: - branches: - include: - - main - drafts: false - -jobs: -- job: UnitTestWithNpKit - timeoutInMinutes: 30 - pool: - name: mscclpp - strategy: - matrix: - cuda11: - containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8 - cuda12: - containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2 - - container: - image: $[ variables['containerImage'] ] - options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1 - - steps: - - task: Bash@3 - name: Build - displayName: Build - inputs: - targetType: 'inline' - script: | - mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" .. - make -j - workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: LockGPUClock - displayName: Lock GPU clock frequency - inputs: - targetType: 'inline' - script: | - sudo nvidia-smi -pm 1 - for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do - sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i - done - workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: MpUnitTests - displayName: Run mscclpp multi-process unit tests - inputs: - targetType: 'inline' - script: | - set -e - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - export PATH=/usr/local/mpi/bin:$PATH - export NPKIT_DUMP_DIR=./npkit_dump - mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - workingDirectory: '$(System.DefaultWorkingDirectory)' - - - task: Bash@3 - name: PyTests - displayName: Run pytests - inputs: - targetType: 'inline' - script: | - set -e - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - export PATH=/usr/local/mpi/bin:$PATH - export NPKIT_DUMP_DIR=./npkit_dump - mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json - rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output - mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' - python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output - grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json - workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 78b679e8d..40a648809 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -79,3 +79,85 @@ jobs: export PATH=/usr/local/mpi/bin:$PATH mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x workingDirectory: '$(System.DefaultWorkingDirectory)' + +- job: UnitTestWithNpKit + timeoutInMinutes: 30 + pool: + name: mscclpp + strategy: + matrix: + cuda11: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8 + cuda12: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2 + + container: + image: $[ variables['containerImage'] ] + options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1 + + steps: + - task: Bash@3 + name: Build + displayName: Build + inputs: + targetType: 'inline' + script: | + mkdir build && cd build + cmake -DCMAKE_BUILD_TYPE=Release -DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT" .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: LockGPUClock + displayName: Lock GPU clock frequency + inputs: + targetType: 'inline' + script: | + sudo nvidia-smi -pm 1 + for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do + sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i + done + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: MpUnitTests + displayName: Run mscclpp multi-process unit tests + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter="ExecutorTest.TwoNodesAllreduce" + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: PyTests + displayName: Run pytests + inputs: + targetType: 'inline' + script: | + set -e + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + export PATH=/usr/local/mpi/bin:$PATH + export NPKIT_DUMP_DIR=./npkit_dump + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_SIGNAL_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_WAIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_READ_REDUCE_COPY_SEND_ENTRY ./npkit_output/npkit_event_trace.json + rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output + mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce_packet.json' + python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output + grep -q NPKIT_EVENT_EXECUTOR_INIT_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_COPY_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_PUT_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + grep -q NPKIT_EVENT_EXECUTOR_REDUCE_SEND_PACKET_ENTRY ./npkit_output/npkit_event_trace.json + workingDirectory: '$(System.DefaultWorkingDirectory)' From a789cf6d9ef1bd8a4af5b98f062426ce87d7de9d Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Thu, 13 Jun 2024 04:26:53 +0000 Subject: [PATCH 16/16] fix bug --- include/mscclpp/npkit/npkit.hpp | 2 +- src/include/execution_kernel.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/mscclpp/npkit/npkit.hpp b/include/mscclpp/npkit/npkit.hpp index d0c7d7179..d2f98a7c5 100644 --- a/include/mscclpp/npkit/npkit.hpp +++ b/include/mscclpp/npkit/npkit.hpp @@ -50,7 +50,7 @@ class NpKit { } static MSCCLPP_DEVICE_INLINE void StoreGpuEventShm(NpKitEventCollectContext* npKitEventCollectContexts, - uint64_t event_buffer_head) { + NpKitEvent* event_buffer, uint64_t event_buffer_head) { #if defined(MSCCLPP_DEVICE_HIP) __synclds(); #else // !defined(MSCCLPP_DEVICE_HIP) diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 03ef13426..834e0f3f8 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -453,7 +453,7 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu } #if defined(ENABLE_NPKIT) - NpKit::StoreGpuEventShm(npKitEventCollectContexts, event_buffer_head); + NpKit::StoreGpuEventShm(npKitEventCollectContexts, event_buffer, event_buffer_head); #endif } #endif // defined(MSCCLPP_DEVICE_COMPILE)