Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#376: Partition buffers of size larger than the _bigmpi.blocksize into blocks while sending and receiving with MPI #383

Merged
merged 4 commits into from
Nov 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions unidist/core/backends/mpi/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,21 @@ def mpi_send_buffer(comm, buffer, dest_rank, data_type=MPI.CHAR, buffer_size=Non
"""
if buffer_size:
comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
comm.Send([buffer, data_type], dest=dest_rank, tag=common.MPITag.BUFFER)
else:
buffer_size = len(buffer)
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
arunjose696 marked this conversation as resolved.
Show resolved Hide resolved
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
comm.Send(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)


def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank):
Expand Down Expand Up @@ -494,8 +508,21 @@ def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank):
requests = []
h1 = comm.isend(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
requests.append((h1, None))
h2 = comm.Isend([buffer, MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER)
requests.append((h2, buffer))
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
arunjose696 marked this conversation as resolved.
Show resolved Hide resolved
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
h2 = comm.Isend(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)
requests.append((h2, buffer))

return requests


Expand Down Expand Up @@ -525,7 +552,22 @@ def mpi_recv_buffer(comm, source_rank, result_buffer=None):
if result_buffer is None:
buf_size = comm.recv(source=source_rank, tag=common.MPITag.OBJECT)
result_buffer = bytearray(buf_size)
comm.Recv(result_buffer, source=source_rank, tag=common.MPITag.BUFFER)
else:
buf_size = len(result_buffer)
# Maximum block size MPI is able to send/recv
block_size = pkl5._bigmpi.blocksize
arunjose696 marked this conversation as resolved.
Show resolved Hide resolved
partitions = list(range(0, buf_size, block_size))
partitions.append(buf_size)
num_partitions = len(partitions)
with pkl5._bigmpi as bigmpi:
for i in range(num_partitions):
if i + 1 < num_partitions:
tmp_buffer = bytearray(partitions[i + 1] - partitions[i])
comm.Recv(
bigmpi(tmp_buffer), source=source_rank, tag=common.MPITag.BUFFER
)
result_buffer[partitions[i] : partitions[i + 1]] = tmp_buffer

return result_buffer


Expand Down
Loading