diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 393e65176bb..bdfc41f10c7 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -325,8 +325,9 @@ def execute_command(self, command): with self.lock, self.commandQueue.mutex: for com in self.commandQueue.queue: if com['taskId'] == command['taskId']: - logger.info("Command with taskId = {cid} was rescheduled by server. " - "Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId)) + logger.info('Command with taskId = {cid} was rescheduled by server. ' + 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId)) + self.commandStatuses.delete_command_data(command['taskId']) return # final result to stdout diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index 567ea2d3d58..d27dad44d33 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -58,6 +58,11 @@ def __init__(self, initializer_module): self.log_max_symbols_size = initializer_module.config.log_max_symbols_size self.reported_reports = set() + def delete_command_data(self, key): + # delete stale data about this command + with self.lock: + self.reported_reports.discard(key) + self.current_state.pop(key, None) def put_command_status(self, command, report): """ @@ -66,11 +71,8 @@ def put_command_status(self, command, report): from ActionQueue import ActionQueue key = command['taskId'] - # delete stale data about this command - with self.lock: - self.reported_reports.discard(key) - self.current_state.pop(key, None) + self.delete_command_data(key) is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]}) updatable = report['status'] == CommandStatus.in_progress and self.command_update_output