From 1a8cd6b0d86138f91e0b381b2473dd9284a9aa32 Mon Sep 17 00:00:00 2001 From: Andrew Onishuk Date: Sun, 29 Jul 2018 13:11:30 +0300 Subject: [PATCH] AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk) --- .../src/main/python/ambari_agent/ActionQueue.py | 1 + .../src/main/python/ambari_agent/CommandStatusDict.py | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 393e65176bb..072083aa4df 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -327,6 +327,7 @@ def execute_command(self, command): if com['taskId'] == command['taskId']: logger.info("Command with taskId = {cid} was rescheduled by server. " "Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId)) + self.commandStatuses.delete_command_data(command['taskId']) return # final result to stdout diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index 567ea2d3d58..d27dad44d33 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -58,6 +58,11 @@ def __init__(self, initializer_module): self.log_max_symbols_size = initializer_module.config.log_max_symbols_size self.reported_reports = set() + def delete_command_data(self, key): + # delete stale data about this command + with self.lock: + self.reported_reports.discard(key) + self.current_state.pop(key, None) def put_command_status(self, command, report): """ @@ -66,11 +71,8 @@ def put_command_status(self, command, report): from ActionQueue import ActionQueue key = command['taskId'] - # delete stale data about this command - with self.lock: - self.reported_reports.discard(key) - self.current_state.pop(key, None) + self.delete_command_data(key) is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]}) updatable = report['status'] == CommandStatus.in_progress and self.command_update_output