diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index b10bd6c4e46..041e51018da 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -171,7 +171,7 @@ std::unique_ptr HostIrLower::lower( tv->setMemoryType(MemoryType::Global); } - hir_pass::StreamParallelType().runPass(hic.get()); + hir_pass::StreamParallelType(params_).runPass(hic.get()); hir_pass::ConvertOpToCommunication(params_).runPass(hic.get()); diff --git a/csrc/host_ir/pass/stream_parallel_type.cpp b/csrc/host_ir/pass/stream_parallel_type.cpp index 874f2bc3926..391dab9c77b 100644 --- a/csrc/host_ir/pass/stream_parallel_type.cpp +++ b/csrc/host_ir/pass/stream_parallel_type.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -277,7 +278,8 @@ std::vector addTensorAllocations( // Step 3: Process for-loop bodies by slicing tensors std::vector processForLoopBodies( std::vector top_level_exprs, - const IdModel& id_model) { + const IdModel& id_model, + const CommunicatorBackend& communicator_backend) { TensorSlicingCache tensor_slicing_cache; for (auto* expr : top_level_exprs) { @@ -289,12 +291,12 @@ std::vector processForLoopBodies( std::vector new_loop_body; // Lambda to process a tensor in a for-loop body - auto processTensor = [&](Expr*& expr, TensorView* tensor) { + auto processTensor = [&](Expr*& expr, TensorView* tensor, Val* index) { if (auto stream_idx = findStreamAxisIndex(tensor, for_loop->iterDomain(), id_model); stream_idx != -1) { auto [slicing, is_new] = - tensor_slicing_cache.get(tensor, stream_idx, for_loop->index()); + tensor_slicing_cache.get(tensor, stream_idx, index); if (is_new) { new_loop_body.push_back(slicing); } @@ -306,6 +308,26 @@ std::vector processForLoopBodies( } }; + auto* my_device_id = IrBuilder::create("rank", DataType::Int); + // We need to make indexing different for when the pipeline will result in + // a p2p ring pipeline backed by cuda ipc, or will result in a collective + // based pipeline. On the one hand, for the case of collective-based + // pipeline, all ranks must index the tensors uniformly, because the + // successive collective must be posted in a globally coherent order (this + // can actually be relaxed by using different process groups, namely, one + // process group per tile, using tags, but this unfortunately hurts + // performance). On the other hand, the case with cuda ipc p2p needs a + // ring pattern where each rank sends and receives to one and only one + // peer, therefore, indexing must be offset by the rank. This is needed + // for two reasons, 1) performance-wise, this is a more efficient way to + // use the network than to have all ranks send or receive to/from one + // device 2) our semantics of sharing the memory handles can only express + // this type of scenario. P2p backend by ProcessGroup can relax condition + // 2) because there is no explicit need to share the memhandle. + auto tensor_index = communicator_backend == CommunicatorBackend::kCuda + ? mod(add(my_device_id, for_loop->index()), for_loop->stop()) + : for_loop->index(); + for (auto* body_expr : for_loop->body().exprs()) { // We have a special handling for when an axis pass from DIDx to Stream // parallel type in one expression. This case should be lowered to a P2P @@ -360,11 +382,13 @@ std::vector processForLoopBodies( "expected a stream parallelized first axis on the output but got ", output_tv); - auto* peer = for_loop->index(); - auto* my_device_id = - IrBuilder::create("rank", DataType::Int); + auto send_peer = (communicator_backend == CommunicatorBackend::kCuda) + ? mod(add(for_loop->stop(), sub(my_device_id, for_loop->index())), + for_loop->stop()) + : for_loop->index(); + auto recv_peer = tensor_index; auto* is_sending_to_self = - IrBuilder::create(eq(peer, my_device_id)); + IrBuilder::create(eq(send_peer, my_device_id)); auto if_then_else = IrBuilder::create(is_sending_to_self); @@ -372,8 +396,8 @@ std::vector processForLoopBodies( input_tv, /*dim=*/0, /*index=*/FusionGuard::getCurFusion()->zeroVal()); - auto [slicing_output, is_new_] = tensor_slicing_cache.get( - output_tv, /*dim=*/0, /*index=*/for_loop->index()); + auto [slicing_output, is_new_] = + tensor_slicing_cache.get(output_tv, /*dim=*/0, /*index=*/recv_peer); auto* local_copy = IrBuilder::create( LoadStoreOpType::Set, slicing_output->out(), slicing_input->out()); @@ -381,28 +405,46 @@ std::vector processForLoopBodies( if_then_else->thenBody().push_back(slicing_input); if_then_else->thenBody().push_back(local_copy); - // Using Start/EndCoalescing here is important to 1) avoid hangs because - // of a wrong global order of send/recv and 2) enjoy full bi-directional - // bandwith. - auto start_coalescing = IrBuilder::create(); auto recv = IrBuilder::create( P2PCommunicationType::RECV, slicing_output->out(), - /*peer*/ for_loop->index(), - CommunicatorBackend::kNccl); + recv_peer, + communicator_backend); auto send = IrBuilder::create( P2PCommunicationType::SEND, input_tv, - /*peer*/ for_loop->index(), - CommunicatorBackend::kNccl); - auto end_coalescing = IrBuilder::create(); - auto wait = IrBuilder::create(end_coalescing); - - if_then_else->elseBody().push_back(start_coalescing); - if_then_else->elseBody().push_back(recv); - if_then_else->elseBody().push_back(send); - if_then_else->elseBody().push_back(end_coalescing); - if_then_else->elseBody().push_back(wait); + send_peer, + communicator_backend); + if (communicator_backend == CommunicatorBackend::kNccl) { + // Using Start/EndCoalescing here is important to 1) avoid hangs + // because of a wrong global order of send/recv and 2) enjoy full + // bi-directional bandwith. + auto start_coalescing = IrBuilder::create(); + auto end_coalescing = IrBuilder::create(); + auto wait = IrBuilder::create(end_coalescing); + + if_then_else->elseBody().push_back(start_coalescing); + if_then_else->elseBody().push_back(recv); + if_then_else->elseBody().push_back(send); + if_then_else->elseBody().push_back(end_coalescing); + if_then_else->elseBody().push_back(wait); + } else if (communicator_backend == CommunicatorBackend::kCuda) { + auto share_mem_handles = IrBuilder::create( + std::vector({recv, send})); + auto wait_send = IrBuilder::create(send); + auto wait_recv = IrBuilder::create(recv); + + if_then_else->elseBody().push_back(share_mem_handles); + if_then_else->elseBody().push_back(send); + if_then_else->elseBody().push_back(recv); + if_then_else->elseBody().push_back(wait_send); + if_then_else->elseBody().push_back(wait_recv); + } else { + NVF_THROW( + "Unsupported communicator backend for lowering stream parallel " + "type into p2p: ", + communicator_backend); + } new_loop_body.push_back(slicing_output); new_loop_body.push_back(if_then_else); @@ -410,11 +452,11 @@ std::vector processForLoopBodies( // Process inputs and outputs normally for (auto* input : ir_utils::filterByType(body_expr->inputs())) { - processTensor(body_expr, input); + processTensor(body_expr, input, tensor_index); } for (auto* output : ir_utils::filterByType(body_expr->outputs())) { - processTensor(body_expr, output); + processTensor(body_expr, output, tensor_index); } new_loop_body.push_back(body_expr); } @@ -553,7 +595,8 @@ void StreamParallelType::passImplementation(Fusion* fusion) { top_level_exprs = addTensorAllocations(std::move(top_level_exprs), id_model); // Step 3: Process for-loop bodies by slicing tensors - top_level_exprs = processForLoopBodies(std::move(top_level_exprs), id_model); + top_level_exprs = processForLoopBodies( + std::move(top_level_exprs), id_model, params_.communicator_backend); // Step 4: Add stream management and synchronization top_level_exprs = addStreamManagement(std::move(top_level_exprs)); diff --git a/csrc/host_ir/pass/stream_parallel_type.h b/csrc/host_ir/pass/stream_parallel_type.h index c98b2088915..d6ca28fc0b3 100644 --- a/csrc/host_ir/pass/stream_parallel_type.h +++ b/csrc/host_ir/pass/stream_parallel_type.h @@ -8,6 +8,7 @@ #pragma once #include +#include #include namespace nvfuser::hir_pass { @@ -25,11 +26,18 @@ namespace nvfuser::hir_pass { class StreamParallelType : public OptimizationPass { friend class OptimizationPass; + public: + StreamParallelType(const HostIrLowerParams& params = HostIrLowerParams()) + : params_(params) {} + protected: void passImplementation(Fusion* fusion); static constexpr std::string_view name() { return "StreamParallelType"; } + + private: + HostIrLowerParams params_; }; } // namespace nvfuser::hir_pass diff --git a/python/python_direct/enum.cpp b/python/python_direct/enum.cpp index 4f1a7caa914..2fd4f839e86 100644 --- a/python/python_direct/enum.cpp +++ b/python/python_direct/enum.cpp @@ -53,7 +53,8 @@ void bindEnums(py::module& nvfuser) { py::enum_( nvfuser, "CommunicatorBackend", py::module_local()) .value("nccl", CommunicatorBackend::kNccl) - .value("ucc", CommunicatorBackend::kUcc); + .value("ucc", CommunicatorBackend::kUcc) + .value("cuda", CommunicatorBackend::kCuda); py::enum_(nvfuser, "SchedulerType", py::module_local()) .value("none", SchedulerType::None) diff --git a/tests/python/multidevice/test_overlap.py b/tests/python/multidevice/test_overlap.py index 44e4bc3d079..30a8d8d83ea 100644 --- a/tests/python/multidevice/test_overlap.py +++ b/tests/python/multidevice/test_overlap.py @@ -81,7 +81,9 @@ def multidevice_schedule(fd, tensors, num_devices) -> None: @pytest.mark.mpi -@pytest.mark.parametrize("backend_type", [CommunicatorBackend.nccl]) +@pytest.mark.parametrize( + "backend_type", [CommunicatorBackend.nccl, CommunicatorBackend.cuda] +) def test_overlap_allgather_matmul_shard_outermost( multidevice_direct_test, benchmark, backend_type ):