From 6d2ffd50c918ac81297a02845e8a752802d849dd Mon Sep 17 00:00:00 2001 From: Zhihong Zhang Date: Tue, 27 Aug 2024 17:18:20 -0400 Subject: [PATCH 1/2] Added check for duplicate RM request --- nvflare/apis/utils/reliable_message.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/nvflare/apis/utils/reliable_message.py b/nvflare/apis/utils/reliable_message.py index 18b765a314..f822d9c6f2 100644 --- a/nvflare/apis/utils/reliable_message.py +++ b/nvflare/apis/utils/reliable_message.py @@ -45,6 +45,7 @@ STATUS_NOT_RECEIVED = "not_received" STATUS_REPLIED = "replied" STATUS_ABORTED = "aborted" +STATUS_DUP_REQUEST = "dup_request" # Topics for Reliable Message TOPIC_RELIABLE_REQUEST = "RM.RELIABLE_REQUEST" @@ -227,6 +228,7 @@ class ReliableMessage: _topic_to_handle = {} _req_receivers = {} # tx id => receiver + _req_completed = {} # tx id => expiration _enabled = False _executor = None _query_interval = 1.0 @@ -293,6 +295,9 @@ def _receive_request(cls, topic: str, request: Shareable, fl_ctx: FLContext): # no handler registered for this topic! cls.error(fl_ctx, f"no handler registered for request {rm_topic=}") return make_reply(ReturnCode.TOPIC_UNKNOWN) + if cls._req_completed.get(tx_id): + cls.debug(fl_ctx, "Completed tx_id received") + return _status_reply(STATUS_DUP_REQUEST) receiver = cls._get_or_create_receiver(rm_topic, request, handler_f) cls.debug(fl_ctx, f"received request {rm_topic=}") return receiver.process(request, fl_ctx) @@ -337,6 +342,7 @@ def release_request_receiver(cls, receiver: _RequestReceiver, fl_ctx: FLContext) """ with cls._tx_lock: cls._req_receivers.pop(receiver.tx_id, None) + cls._register_completed_req(receiver.tx_id, receiver.tx_timeout) cls.debug(fl_ctx, f"released request receiver of TX {receiver.tx_id}") @classmethod @@ -580,6 +586,10 @@ def _send_request( # the ack is a status report - check status status = ack.get_header(HEADER_STATUS) if status and status != STATUS_NOT_RECEIVED: + if status == STATUS_DUP_REQUEST: + cls.debug(fl_ctx, "Duplicate req status received") + return ack + # status should never be STATUS_NOT_RECEIVED, unless there is a bug in the receiving logic # STATUS_NOT_RECEIVED is only possible during "query" phase. cls.debug(fl_ctx, f"received status ack: {rc=} {status=}") @@ -679,3 +689,15 @@ def _query_result( cls.debug(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=} {status=} {op=}") else: cls.debug(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=}") + + @classmethod + def _register_completed_req(cls, tx_id, tx_timeout): + # Remove expired entries, need to use a copy of the keys + now = time.time() + for key in list(cls._req_completed.keys()): + expiration = cls._req_completed.get(key) + if expiration and expiration < now: + cls._req_completed.pop(key, None) + + # Expire in 2 x tx_timeout + cls._req_completed[tx_id] = now + 2 * tx_timeout From cf40ab2f87e3f498f74bf571bab832218265bbad Mon Sep 17 00:00:00 2001 From: Zhihong Zhang Date: Tue, 27 Aug 2024 18:01:15 -0400 Subject: [PATCH 2/2] Addressed PR comment --- nvflare/apis/utils/reliable_message.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nvflare/apis/utils/reliable_message.py b/nvflare/apis/utils/reliable_message.py index f822d9c6f2..54acb3b2f6 100644 --- a/nvflare/apis/utils/reliable_message.py +++ b/nvflare/apis/utils/reliable_message.py @@ -341,8 +341,8 @@ def release_request_receiver(cls, receiver: _RequestReceiver, fl_ctx: FLContext) """ with cls._tx_lock: - cls._req_receivers.pop(receiver.tx_id, None) cls._register_completed_req(receiver.tx_id, receiver.tx_timeout) + cls._req_receivers.pop(receiver.tx_id, None) cls.debug(fl_ctx, f"released request receiver of TX {receiver.tx_id}") @classmethod @@ -586,10 +586,6 @@ def _send_request( # the ack is a status report - check status status = ack.get_header(HEADER_STATUS) if status and status != STATUS_NOT_RECEIVED: - if status == STATUS_DUP_REQUEST: - cls.debug(fl_ctx, "Duplicate req status received") - return ack - # status should never be STATUS_NOT_RECEIVED, unless there is a bug in the receiving logic # STATUS_NOT_RECEIVED is only possible during "query" phase. cls.debug(fl_ctx, f"received status ack: {rc=} {status=}")