diff --git a/csrc/multidevice/cuda_p2p.h b/csrc/multidevice/cuda_p2p.h index 9c840f3987f..de3ff70be72 100644 --- a/csrc/multidevice/cuda_p2p.h +++ b/csrc/multidevice/cuda_p2p.h @@ -13,6 +13,8 @@ namespace nvfuser { enum class P2pProtocol { Get, Put }; +P2pProtocol getP2pProtocol(); + std::ostream& operator<<(std::ostream& os, P2pProtocol protocol); // Returns the prescribed P2P protocol based on NVFUSER_ENABLE option diff --git a/tests/cpp/test_multidevice_host_ir_overlap.cpp b/tests/cpp/test_multidevice_host_ir_overlap.cpp index 21b849d952d..3ccd4bfb0d9 100644 --- a/tests/cpp/test_multidevice_host_ir_overlap.cpp +++ b/tests/cpp/test_multidevice_host_ir_overlap.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -1094,10 +1095,13 @@ TEST_F( TEST_F( RingAllgatherOverlapTest, - DISABLED_RingAllgatherBasedPipeliningHostIRImplementationCudaIpc) { + RingAllgatherBasedPipeliningHostIRImplementationCudaIpc) { if (communicator_->size() == 1) { GTEST_SKIP() << "Skipping test for single device"; } + if (getP2pProtocol() == P2pProtocol::Put) { + GTEST_SKIP() << "Skipping test for CudaIpc P2pProtocol::Put"; + } auto hic = std::make_unique(); FusionGuard::setCurFusion(hic.get()); @@ -1141,10 +1145,13 @@ TEST_F( CircularBufferLoopStage::NotApplicable, /*circular_buffer_loop_stage_depth=*/0); - auto* stream_index = - mod(add(i, j), IrBuilder::create(params.number_of_streams)); - auto* set_stream = IrBuilder::create( - IrBuilder::create(stream_index)); + auto* num_streams = IrBuilder::create(params.number_of_streams); + auto* curr_stream_index = mod(add(i, j), num_streams); + auto* next_stream_index = mod(add(i, add(j, step_j)), num_streams); + auto* set_curr_stream = IrBuilder::create( + IrBuilder::create(curr_stream_index)); + auto* set_next_stream = IrBuilder::create( + IrBuilder::create(next_stream_index)); auto* my_device_index_val = IrBuilder::create(my_device_index_); auto* number_of_steps_per_ring_val = @@ -1194,25 +1201,27 @@ TEST_F( std::vector grouped_communications = {send, recv}; auto share_mem_handles = IrBuilder::create( std::move(grouped_communications)); - auto* wait_send = IrBuilder::create(send); - auto* wait_recv = IrBuilder::create(recv); auto* comm_cond = ne(j, sub(stop_j, hic->oneVal())); auto* comm_predicate = IrBuilder::create(comm_cond); auto* if_not_last_ring_step_post_comms = IrBuilder::create(comm_predicate); + + // Nonblocking--just signals the buffer is ready for the get transfer if_not_last_ring_step_post_comms->thenBody().push_back(send); + if_not_last_ring_step_post_comms->thenBody().push_back(set_next_stream); + // Block in recvPost on the next stream to do the get transfer if_not_last_ring_step_post_comms->thenBody().push_back(recv); + if_not_last_ring_step_post_comms->thenBody().push_back(set_curr_stream); - auto* cond = ne(j, hic->zeroVal()); - auto* wait_predicate = IrBuilder::create(cond); - auto* if_not_first_ring_step_wait = - IrBuilder::create(wait_predicate); - if_not_first_ring_step_wait->thenBody().push_back(wait_send); - if_not_first_ring_step_wait->thenBody().push_back(wait_recv); + // For the get protocol, recvWait is a NOP + // At the same time, sendWait will block waiting for the buffer to be + // IpcSemaphore::kReady but since on this stream we recvPosted the buffer last + // iteration, when that finishes it will be marked kReady anyways. So waiting + // for it to be kReady is unnecessary std::vector loop_j_body = { - set_stream, + set_curr_stream, tmp1->definition(), tmp2->definition(), tmp3->definition(), @@ -1220,7 +1229,6 @@ TEST_F( tva_j_next_slice->definition(), tvc_j->definition(), share_mem_handles, - if_not_first_ring_step_wait, if_not_last_ring_step_post_comms, mm}; for (Expr* expr : loop_j_body) { @@ -1230,30 +1238,6 @@ TEST_F( hic->pushBackTopLevelExprs(for_loop_i); - // Synchronize all streams - auto* i_stream = - IrBuilder::create(DataType::Index); // running index of the for-loop - auto* start_stream = hic->zeroVal(); - auto* stop_stream = - IrBuilder::create(params.number_of_streams, DataType::Index); - auto* step_stream = hic->oneVal(); - auto* for_loop_stream = IrBuilder::create( - /*IterDomain=*/makeContigConcreteTensor({params.number_of_streams}) - ->axis(0), - /*index=*/i_stream, - start_stream, - stop_stream, - step_stream, - /*vectorize=*/false, - /*vectorize_shift=*/nullptr, - /*unroll_required=*/false, - CircularBufferLoopStage::NotApplicable, - /*circular_buffer_loop_stage_depth=*/0); - auto* sync_stream = IrBuilder::create( - IrBuilder::create(i_stream)); - for_loop_stream->body().push_back(sync_stream); - hic->pushBackTopLevelExprs(for_loop_stream); - hic->addOutput(tmp1); hic->addOutput(tmp2); hic->addOutput(tmp3); @@ -1274,8 +1258,11 @@ TEST_F( {tvc_unsharded, tc_unsharded_}}; hie.runWithInput(std::move(inputs)); - + cudaDeviceSynchronize(); + communicator_->barrier(); validate(); + cudaDeviceSynchronize(); + communicator_->barrier(); } } @@ -1405,30 +1392,6 @@ TEST_F( hic->pushBackTopLevelExprs(for_loop_i); - // Synchronize all streams - auto* i_stream = - IrBuilder::create(DataType::Index); // running index of the for-loop - auto* start_stream = hic->zeroVal(); - auto* stop_stream = - IrBuilder::create(params.number_of_streams, DataType::Index); - auto* step_stream = hic->oneVal(); - auto* for_loop_stream = IrBuilder::create( - /*IterDomain=*/makeContigConcreteTensor({params.number_of_streams}) - ->axis(0), - /*index=*/i_stream, - start_stream, - stop_stream, - step_stream, - /*vectorize=*/false, - /*vectorize_shift=*/nullptr, - /*unroll_required=*/false, - CircularBufferLoopStage::NotApplicable, - /*circular_buffer_loop_stage_depth=*/0); - auto* sync_stream = IrBuilder::create( - IrBuilder::create(i_stream)); - for_loop_stream->body().push_back(sync_stream); - hic->pushBackTopLevelExprs(for_loop_stream); - hic->addOutput(tmp1); hic->addOutput(tmp2); hic->addOutput(tmp3);