Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion csrc/host_ir/lower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ std::unique_ptr<hir::HostIrContainer> HostIrLower::lower(
tv->setMemoryType(MemoryType::Global);
}

hir_pass::StreamParallelType().runPass(hic.get());
hir_pass::StreamParallelType(params_).runPass(hic.get());

hir_pass::ConvertOpToCommunication(params_).runPass(hic.get());

Expand Down
99 changes: 71 additions & 28 deletions csrc/host_ir/pass/stream_parallel_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ir/internal_base_nodes.h>
#include <ir/utils.h>
#include <kernel_ir.h>
#include <multidevice/utils.h>
#include <ops/all_ops.h>
#include <ops/utils.h>

Expand Down Expand Up @@ -277,7 +278,8 @@ std::vector<Expr*> addTensorAllocations(
// Step 3: Process for-loop bodies by slicing tensors
std::vector<Expr*> processForLoopBodies(
std::vector<Expr*> top_level_exprs,
const IdModel& id_model) {
const IdModel& id_model,
const CommunicatorBackend& communicator_backend) {
TensorSlicingCache tensor_slicing_cache;

for (auto* expr : top_level_exprs) {
Expand All @@ -289,12 +291,12 @@ std::vector<Expr*> processForLoopBodies(
std::vector<Expr*> new_loop_body;

// Lambda to process a tensor in a for-loop body
auto processTensor = [&](Expr*& expr, TensorView* tensor) {
auto processTensor = [&](Expr*& expr, TensorView* tensor, Val* index) {
if (auto stream_idx =
findStreamAxisIndex(tensor, for_loop->iterDomain(), id_model);
stream_idx != -1) {
auto [slicing, is_new] =
tensor_slicing_cache.get(tensor, stream_idx, for_loop->index());
tensor_slicing_cache.get(tensor, stream_idx, index);
if (is_new) {
new_loop_body.push_back(slicing);
}
Expand All @@ -306,6 +308,26 @@ std::vector<Expr*> processForLoopBodies(
}
};

auto* my_device_id = IrBuilder::create<NamedScalar>("rank", DataType::Int);
// We need to make indexing different for when the pipeline will result in
// a p2p ring pipeline backed by cuda ipc, or will result in a collective
// based pipeline. On the one hand, for the case of collective-based
// pipeline, all ranks must index the tensors uniformly, because the
// successive collective must be posted in a globally coherent order (this
// can actually be relaxed by using different process groups, namely, one
// process group per tile, using tags, but this unfortunately hurts
// performance). On the other hand, the case with cuda ipc p2p needs a
// ring pattern where each rank sends and receives to one and only one
// peer, therefore, indexing must be offset by the rank. This is needed
// for two reasons, 1) performance-wise, this is a more efficient way to
// use the network than to have all ranks send or receive to/from one
// device 2) our semantics of sharing the memory handles can only express
// this type of scenario. P2p backend by ProcessGroup can relax condition
// 2) because there is no explicit need to share the memhandle.
auto tensor_index = communicator_backend == CommunicatorBackend::kCuda
? mod(add(my_device_id, for_loop->index()), for_loop->stop())
: for_loop->index();

for (auto* body_expr : for_loop->body().exprs()) {
// We have a special handling for when an axis pass from DIDx to Stream
// parallel type in one expression. This case should be lowered to a P2P
Expand Down Expand Up @@ -360,61 +382,81 @@ std::vector<Expr*> processForLoopBodies(
"expected a stream parallelized first axis on the output but got ",
output_tv);

auto* peer = for_loop->index();
auto* my_device_id =
IrBuilder::create<NamedScalar>("rank", DataType::Int);
auto send_peer = (communicator_backend == CommunicatorBackend::kCuda)
? mod(add(for_loop->stop(), sub(my_device_id, for_loop->index())),
for_loop->stop())
: for_loop->index();
auto recv_peer = tensor_index;
auto* is_sending_to_self =
IrBuilder::create<kir::Predicate>(eq(peer, my_device_id));
IrBuilder::create<kir::Predicate>(eq(send_peer, my_device_id));
auto if_then_else =
IrBuilder::create<kir::IfThenElse>(is_sending_to_self);

auto [slicing_input, is_new] = tensor_slicing_cache.get(
input_tv,
/*dim=*/0,
/*index=*/FusionGuard::getCurFusion()->zeroVal());
auto [slicing_output, is_new_] = tensor_slicing_cache.get(
output_tv, /*dim=*/0, /*index=*/for_loop->index());
auto [slicing_output, is_new_] =
tensor_slicing_cache.get(output_tv, /*dim=*/0, /*index=*/recv_peer);

auto* local_copy = IrBuilder::create<LoadStoreOp>(
LoadStoreOpType::Set, slicing_output->out(), slicing_input->out());

if_then_else->thenBody().push_back(slicing_input);
if_then_else->thenBody().push_back(local_copy);

// Using Start/EndCoalescing here is important to 1) avoid hangs because
// of a wrong global order of send/recv and 2) enjoy full bi-directional
// bandwith.
auto start_coalescing = IrBuilder::create<hir::StartCoalescing>();
auto recv = IrBuilder::create<P2PCommunication>(
P2PCommunicationType::RECV,
slicing_output->out(),
/*peer*/ for_loop->index(),
CommunicatorBackend::kNccl);
recv_peer,
communicator_backend);
auto send = IrBuilder::create<P2PCommunication>(
P2PCommunicationType::SEND,
input_tv,
/*peer*/ for_loop->index(),
CommunicatorBackend::kNccl);
auto end_coalescing = IrBuilder::create<hir::EndCoalescing>();
auto wait = IrBuilder::create<hir::Wait>(end_coalescing);

if_then_else->elseBody().push_back(start_coalescing);
if_then_else->elseBody().push_back(recv);
if_then_else->elseBody().push_back(send);
if_then_else->elseBody().push_back(end_coalescing);
if_then_else->elseBody().push_back(wait);
send_peer,
communicator_backend);
if (communicator_backend == CommunicatorBackend::kNccl) {
// Using Start/EndCoalescing here is important to 1) avoid hangs
// because of a wrong global order of send/recv and 2) enjoy full
// bi-directional bandwith.
auto start_coalescing = IrBuilder::create<hir::StartCoalescing>();
auto end_coalescing = IrBuilder::create<hir::EndCoalescing>();
auto wait = IrBuilder::create<hir::Wait>(end_coalescing);

if_then_else->elseBody().push_back(start_coalescing);
if_then_else->elseBody().push_back(recv);
if_then_else->elseBody().push_back(send);
if_then_else->elseBody().push_back(end_coalescing);
if_then_else->elseBody().push_back(wait);
} else if (communicator_backend == CommunicatorBackend::kCuda) {
auto share_mem_handles = IrBuilder::create<hir::ShareMemHandles>(
std::vector<P2PCommunication*>({recv, send}));
auto wait_send = IrBuilder::create<hir::Wait>(send);
auto wait_recv = IrBuilder::create<hir::Wait>(recv);

if_then_else->elseBody().push_back(share_mem_handles);
if_then_else->elseBody().push_back(send);
if_then_else->elseBody().push_back(recv);
if_then_else->elseBody().push_back(wait_send);
if_then_else->elseBody().push_back(wait_recv);
} else {
NVF_THROW(
"Unsupported communicator backend for lowering stream parallel "
"type into p2p: ",
communicator_backend);
}

new_loop_body.push_back(slicing_output);
new_loop_body.push_back(if_then_else);
} else {
// Process inputs and outputs normally
for (auto* input :
ir_utils::filterByType<TensorView>(body_expr->inputs())) {
processTensor(body_expr, input);
processTensor(body_expr, input, tensor_index);
}
for (auto* output :
ir_utils::filterByType<TensorView>(body_expr->outputs())) {
processTensor(body_expr, output);
processTensor(body_expr, output, tensor_index);
}
new_loop_body.push_back(body_expr);
}
Expand Down Expand Up @@ -553,7 +595,8 @@ void StreamParallelType::passImplementation(Fusion* fusion) {
top_level_exprs = addTensorAllocations(std::move(top_level_exprs), id_model);

// Step 3: Process for-loop bodies by slicing tensors
top_level_exprs = processForLoopBodies(std::move(top_level_exprs), id_model);
top_level_exprs = processForLoopBodies(
std::move(top_level_exprs), id_model, params_.communicator_backend);

// Step 4: Add stream management and synchronization
top_level_exprs = addStreamManagement(std::move(top_level_exprs));
Expand Down
8 changes: 8 additions & 0 deletions csrc/host_ir/pass/stream_parallel_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <fusion.h>
#include <host_ir/lower.h>
#include <host_ir/pass/optimization_pass.h>

namespace nvfuser::hir_pass {
Expand All @@ -25,11 +26,18 @@ namespace nvfuser::hir_pass {
class StreamParallelType : public OptimizationPass<StreamParallelType> {
friend class OptimizationPass<StreamParallelType>;

public:
StreamParallelType(const HostIrLowerParams& params = HostIrLowerParams())
: params_(params) {}

protected:
void passImplementation(Fusion* fusion);
static constexpr std::string_view name() {
return "StreamParallelType";
}

private:
HostIrLowerParams params_;
};

} // namespace nvfuser::hir_pass
3 changes: 2 additions & 1 deletion python/python_direct/enum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void bindEnums(py::module& nvfuser) {
py::enum_<CommunicatorBackend>(
nvfuser, "CommunicatorBackend", py::module_local())
.value("nccl", CommunicatorBackend::kNccl)
.value("ucc", CommunicatorBackend::kUcc);
.value("ucc", CommunicatorBackend::kUcc)
.value("cuda", CommunicatorBackend::kCuda);

py::enum_<SchedulerType>(nvfuser, "SchedulerType", py::module_local())
.value("none", SchedulerType::None)
Expand Down
4 changes: 3 additions & 1 deletion tests/python/multidevice/test_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def multidevice_schedule(fd, tensors, num_devices) -> None:


@pytest.mark.mpi
@pytest.mark.parametrize("backend_type", [CommunicatorBackend.nccl])
@pytest.mark.parametrize(
"backend_type", [CommunicatorBackend.nccl, CommunicatorBackend.cuda]
)
def test_overlap_allgather_matmul_shard_outermost(
multidevice_direct_test, benchmark, backend_type
):
Expand Down