-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added non-blocking root communicator #1478
base: develop
Are you sure you want to change the base?
Changes from all commits
4835c73
926fd00
f53ca2f
5025063
d8faf4a
4a7e3f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,16 +49,49 @@ const char* mpiBlockingReceiveMessages(MPI_Comm comm) | |
return charArray; | ||
} | ||
|
||
const char* mpiNonBlockingReceiveMessages(MPI_Comm comm, int tag) | ||
{ | ||
const int mpiTag = (tag == 0) ? LJ_TAG : tag; | ||
char* charArray = nullptr; | ||
int messageSize = -1; | ||
MPI_Status mpiStatus; | ||
|
||
// Get size and source of MPI message | ||
int mpiFlag = 0; | ||
MPI_Iprobe(MPI_ANY_SOURCE, tag, comm, &mpiFlag, &mpiStatus); | ||
|
||
if (mpiFlag == 1) { | ||
MPI_Get_count(&mpiStatus, MPI_CHAR, &messageSize); | ||
|
||
// Setup where to receive the char array | ||
charArray = new char[messageSize + 1]; | ||
charArray[messageSize] = '\0'; | ||
|
||
// Receive packed Message | ||
MPI_Recv(charArray, | ||
messageSize, | ||
MPI_CHAR, | ||
mpiStatus.MPI_SOURCE, | ||
mpiTag, | ||
comm, | ||
&mpiStatus); | ||
Comment on lines
+70
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I understand the MPI API, this is actually a blocking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's correct. The non-blocking part is the call to MPI_Iprobe, but then the Recv is blocking. My intent here is to be sure that the receive is fully finished before anything else is done, but to not block any further execution if there are no messages to be received (i.e. when mpiFlag is false). I can change the function name to clarify the intent here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify the above point, MPI_Iprobe is used instead of MPI_probe because the former will return with an mpiFlag value regardless of whether messages need to be received, whereas the latter is a blocking call that will only return if there is a message to be received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, the combination of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does also make me think about renaming this communicator to something like "NonCollectiveCommunicator" rather than "NonBlockingCommunicator". It's true that it calls these non-blocking functions, but I think the main feature is actually that we don't rely on collective calls to communicate messages to root. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that idea. |
||
} | ||
|
||
return charArray; | ||
} | ||
|
||
void mpiNonBlockingSendMessages(MPI_Comm comm, | ||
int destinationRank, | ||
const char* packedMessagesToBeSent) | ||
const char* packedMessagesToBeSent, | ||
int tag) | ||
{ | ||
const int mpiTag = (tag == 0) ? LJ_TAG : tag; | ||
MPI_Request mpiRequest; | ||
MPI_Isend(const_cast<char*>(packedMessagesToBeSent), | ||
strlen(packedMessagesToBeSent), | ||
MPI_CHAR, | ||
destinationRank, | ||
LJ_TAG, | ||
mpiTag, | ||
comm, | ||
&mpiRequest); | ||
MPI_Request_free(&mpiRequest); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright (c) 2017-2024, Lawrence Livermore National Security, LLC and | ||
// other Axom Project Developers. See the top-level LICENSE file for details. | ||
// | ||
// SPDX-License-Identifier: (BSD-3-Clause) | ||
|
||
/*! | ||
****************************************************************************** | ||
* | ||
* \file NonCollectiveRootCommunicator.cpp | ||
* | ||
* \brief Implementation of the NonCollectiveRootCommunicator class. | ||
* | ||
****************************************************************************** | ||
*/ | ||
|
||
#include "axom/lumberjack/NonCollectiveRootCommunicator.hpp" | ||
#include "axom/lumberjack/MPIUtility.hpp" | ||
|
||
namespace axom | ||
{ | ||
namespace lumberjack | ||
{ | ||
void NonCollectiveRootCommunicator::initialize(MPI_Comm comm, int ranksLimit) | ||
{ | ||
static int mpiTag = 32767; | ||
m_mpiComm = comm; | ||
MPI_Comm_rank(m_mpiComm, &m_mpiCommRank); | ||
MPI_Comm_size(m_mpiComm, &m_mpiCommSize); | ||
m_ranksLimit = ranksLimit; | ||
m_mpiTag = mpiTag; | ||
++mpiTag; | ||
} | ||
|
||
void NonCollectiveRootCommunicator::finalize() { } | ||
|
||
int NonCollectiveRootCommunicator::rank() { return m_mpiCommRank; } | ||
|
||
void NonCollectiveRootCommunicator::ranksLimit(int value) { m_ranksLimit = value; } | ||
|
||
int NonCollectiveRootCommunicator::ranksLimit() { return m_ranksLimit; } | ||
|
||
int NonCollectiveRootCommunicator::numPushesToFlush() { return 1; } | ||
|
||
void NonCollectiveRootCommunicator::push(const char* packedMessagesToBeSent, | ||
std::vector<const char*>& receivedPackedMessages) | ||
{ | ||
if(m_mpiCommRank == 0) | ||
{ | ||
const char* currPackedMessages = nullptr; | ||
bool receive_messages = true; | ||
while(receive_messages) | ||
{ | ||
currPackedMessages = mpiNonBlockingReceiveMessages(m_mpiComm, m_mpiTag); | ||
|
||
if(isPackedMessagesEmpty(currPackedMessages)) | ||
{ | ||
if (currPackedMessages == nullptr ) | ||
{ | ||
receive_messages = false; | ||
} | ||
else | ||
{ | ||
delete [] currPackedMessages; | ||
} | ||
|
||
} | ||
else | ||
{ | ||
receivedPackedMessages.push_back(currPackedMessages); | ||
} | ||
|
||
currPackedMessages = nullptr; | ||
|
||
} | ||
} | ||
else | ||
{ | ||
if(isPackedMessagesEmpty(packedMessagesToBeSent) == false) | ||
{ | ||
mpiNonBlockingSendMessages(m_mpiComm, 0, packedMessagesToBeSent, m_mpiTag); | ||
} | ||
} | ||
} | ||
|
||
bool NonCollectiveRootCommunicator::isOutputNode() | ||
{ | ||
if(m_mpiCommRank == 0) | ||
{ | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
int NonCollectiveRootCommunicator::mpiTag() const { return m_mpiTag; } | ||
|
||
} // end namespace lumberjack | ||
} // end namespace axom |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
// Copyright (c) 2017-2024, Lawrence Livermore National Security, LLC and | ||
// other Axom Project Developers. See the top-level LICENSE file for details. | ||
// | ||
// SPDX-License-Identifier: (BSD-3-Clause) | ||
|
||
/*! | ||
******************************************************************************* | ||
* \file NonCollectiveRootCommunicator.hpp | ||
* | ||
* \brief This file contains the class definition of the | ||
* NonCollectiveRootCommunicator. | ||
******************************************************************************* | ||
*/ | ||
|
||
#ifndef NONCOLLECTIVEROOTCOMMUNICATOR_HPP | ||
#define NONCOLLECTIVEROOTCOMMUNICATOR_HPP | ||
|
||
#include "axom/lumberjack/Lumberjack.hpp" | ||
#include "axom/lumberjack/Communicator.hpp" | ||
|
||
namespace axom | ||
{ | ||
namespace lumberjack | ||
{ | ||
/*! | ||
******************************************************************************* | ||
* \class NonCollectiveRootCommunicator | ||
* | ||
* \brief Based off of RootCommunicator. This communicator pushes | ||
messages from any rank to root non-collectively, if any messages are sent. | ||
******************************************************************************* | ||
*/ | ||
class NonCollectiveRootCommunicator : public axom::lumberjack::Communicator | ||
{ | ||
public: | ||
/*! | ||
***************************************************************************** | ||
* \brief Called to initialize the Communicator. | ||
* | ||
* This performs any setup work the Communicator needs before doing any work. | ||
* It is required that this is called before using the Communicator. | ||
* | ||
* \param [in] comm The MPI Communicator | ||
* \param [in] ranksLimit Limit on how many ranks are individually tracked per | ||
* Message. | ||
***************************************************************************** | ||
*/ | ||
void initialize(MPI_Comm comm, int ranksLimit); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Called to finalize the Communicator. | ||
* | ||
* This performs any cleanup work the Communicator needs to do before going | ||
* away.It is required that this is the last function called by the | ||
* Communicator. | ||
***************************************************************************** | ||
*/ | ||
void finalize(); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Returns the MPI rank of this node | ||
***************************************************************************** | ||
*/ | ||
int rank(); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Sets the rank limit. | ||
* | ||
* This is the limit on how many ranks generated a given message are | ||
* individually tracked per Message. After the limit has been reached, only | ||
* the Message::rankCount is incremented. | ||
* | ||
* \param [in] value Limits how many ranks are tracked per Message. | ||
***************************************************************************** | ||
*/ | ||
void ranksLimit(int value); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Returns the rank limit. | ||
* | ||
* This is the limit on how many ranks generated a given message are | ||
* individually tracked per Message. After the limit has been reached, only | ||
* the Message::rankCount is incremented. | ||
***************************************************************************** | ||
*/ | ||
int ranksLimit(); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Function used by the Lumberjack class to indicate how many | ||
* individual pushes fully flush all currently held Message classes to the | ||
* root node. The Communicator class's tree structure dictates this. | ||
***************************************************************************** | ||
*/ | ||
int numPushesToFlush(); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief This pushes all messages to the root node. | ||
* | ||
* All messages are pushed to the root node. This is the same as | ||
* RootCommunicator::pushMessagesFully for this Communicator. | ||
* | ||
* \param [in] packedMessagesToBeSent All of this rank's Message classes | ||
* packed into a single buffer. | ||
* \param [in,out] receivedPackedMessages Received packed message buffers from | ||
* this nodes children. | ||
***************************************************************************** | ||
*/ | ||
void push(const char* packedMessagesToBeSent, | ||
std::vector<const char*>& receivedPackedMessages); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Function used by the Lumberjack to indicate whether this node should | ||
* be outputting messages. Only the root node outputs messages. | ||
***************************************************************************** | ||
*/ | ||
bool isOutputNode(); | ||
|
||
/*! | ||
***************************************************************************** | ||
* \brief Returns the MPI tag used for this communicator. | ||
***************************************************************************** | ||
*/ | ||
int mpiTag() const; | ||
|
||
private: | ||
MPI_Comm m_mpiComm; | ||
int m_mpiCommRank; | ||
int m_mpiCommSize; | ||
int m_ranksLimit; | ||
int m_mpiTag; | ||
}; | ||
|
||
} // end namespace lumberjack | ||
} // end namespace axom | ||
|
||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MPI_Iprobe
is nonblocking here, so is there a chance thempiFlag
is not set totrue
when it is expected to be? Would it be better to have this be a blockingMPI_Probe
? Basing this comment off this stackoverflow post: https://stackoverflow.com/questions/43823458/mpi-iprobe-vs-mpi-probeAdditionally, if using
MPI_Iprobe
, shouldmpiFlag
default be set tofalse
, so it can be set totrue
only by a successful function call?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the mpiFlag will be set in either context to either true or false, but to your point, it is safer to initialize this as false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stackoverflow example illustrates an interesting but slightly different approach than what I'm intending to do. They are calling MPI_Iprobe in a while loop that does not exit until it returns a flag that is non-zero. In my case, I am checking to see if any messages need to be received only once, and if there are no messages, the function exits by returning nullptr. This intent in the stackoverflow example is to continuously monitor the status, whereas I'm only intending to periodically monitor the status whenever the code path enters into this function. Both could be relevant to the problem I'm trying to solve with this communicator, where the root rank needs to receive information from other ranks that they are aborting. I had a preference toward the latter option (periodically monitoring the status whenever the root rank reaches a point where it enters this code path) because it seemed to me like the more efficient option, even if it comes at a cost of sometimes not receiving the status before the program aborts. But I'm not really sure which option is best for this scenario. I'd be curious to hear your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I would expect the latter option to have less overhead, doing a single poll with
MPI_Iprobe
instead of spinning onMPI_Iprobe
until status is updated in the former case. Nevertheless, I might not be considering something, so am also curious if others have ideas.