Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ambari-agent/src/main/python/ambari_agent/ActionQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ def execute_command(self, command):
if com['taskId'] == command['taskId']:
logger.info("Command with taskId = {cid} was rescheduled by server. "
"Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId))
self.commandStatuses.delete_command_data(command['taskId'])
return

# final result to stdout
Expand Down
10 changes: 6 additions & 4 deletions ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def __init__(self, initializer_module):
self.log_max_symbols_size = initializer_module.config.log_max_symbols_size
self.reported_reports = set()

def delete_command_data(self, key):
# delete stale data about this command
with self.lock:
self.reported_reports.discard(key)
self.current_state.pop(key, None)

def put_command_status(self, command, report):
"""
Expand All @@ -66,11 +71,8 @@ def put_command_status(self, command, report):
from ActionQueue import ActionQueue

key = command['taskId']

# delete stale data about this command
with self.lock:
self.reported_reports.discard(key)
self.current_state.pop(key, None)
self.delete_command_data(key)

is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
updatable = report['status'] == CommandStatus.in_progress and self.command_update_output
Expand Down