From 70fb68cdd200da1d47f7192959b326970388e67c Mon Sep 17 00:00:00 2001 From: Andrew Onishuk Date: Tue, 24 Jul 2018 18:32:45 +0300 Subject: [PATCH] AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk) --- .../src/main/python/ambari_agent/ActionQueue.py | 1 + .../src/main/python/ambari_agent/CommandStatusDict.py | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index e1830f47b68..866781e6ebc 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -350,6 +350,7 @@ def execute_command(self, command): if com['taskId'] == command['taskId']: logger.info('Command with taskId = {cid} was rescheduled by server. ' 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId)) + self.commandStatuses.delete_command_data(command['taskId']) return # final result to stdout diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index 84af058c8f6..c3af4bf6f7f 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -57,6 +57,11 @@ def __init__(self, initializer_module): self.log_max_symbols_size = initializer_module.config.log_max_symbols_size self.reported_reports = set() + def delete_command_data(self, key): + # delete stale data about this command + with self.lock: + self.reported_reports.discard(key) + self.current_state.pop(key, None) def put_command_status(self, command, report): """ @@ -65,11 +70,8 @@ def put_command_status(self, command, report): from ActionQueue import ActionQueue key = command['taskId'] - # delete stale data about this command - with self.lock: - self.reported_reports.discard(key) - self.current_state.pop(key, None) + self.delete_command_data(key) is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]}) updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and self.command_update_output