Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions ambari-agent/conf/unix/agent-multiplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def __init__(self, args):
print("*** Params ***")
print("Start: %d" % self.start)
print("Num: %d" % self.num)
print("Prefix: %s" % self.prefix)
print("Command: %s" % self.command)
print(f"Prefix: {self.prefix}")
print(f"Command: {self.command}")

# All hostnames that will be managed by Ambari Agents on this host
self.hosts = []
Expand All @@ -105,7 +105,7 @@ def parse_configs(self):
Parse the configuration file to set the config params.
"""
if not os.path.exists(self.CONFIG_FILE):
print("Did not find Agent Multiplier config file: %s" % str(self.CONFIG_FILE))
print(f"Did not find Agent Multiplier config file: {str(self.CONFIG_FILE)}")
sys.exit(-1)

params = {}
Expand Down Expand Up @@ -141,11 +141,11 @@ def validate(self):
errors.append("Prefix is a required field")

if not os.path.isfile(self.source_config_file):
errors.append("Ambari Agent config file does not exist at %s" % self.source_config_file)
errors.append(f"Ambari Agent config file does not exist at {self.source_config_file}")

valid_commands = set(["start", "stop", "restart", "status"])
if self.command is None or self.command not in valid_commands:
errors.append("Command must be one of %s" % ", ".join(valid_commands))
errors.append(f"Command must be one of {', '.join(valid_commands)}")

if len(errors) > 0:
print("Error:")
Expand All @@ -171,24 +171,24 @@ def bootstrap(self):

for dir in [host_home_dir, host_log_dir, host_config_dir, host_pid_dir, host_prefix, host_cache_dir]:
if not os.path.isdir(dir):
print("Creating dir %s" % (dir))
print(f"Creating dir {dir}")
os.makedirs(dir)

# Copy config file
host_config_file = os.path.join(host_config_dir, "ambari-agent.ini")
if not os.path.isfile(host_config_file):
print("Copying config file %s" % str(host_config_file))
print(f"Copying config file {str(host_config_file)}")
shutil.copyfile(self.source_config_file, host_config_file)

# Copy version file
version_file = os.path.join(host_prefix, "version")
if not os.path.isfile(version_file):
print("Copying version file %s" % str(version_file))
print(f"Copying version file {str(version_file)}")
shutil.copyfile(self.source_version_file, version_file)

# Copy cache dir content
if not os.path.isdir(os.path.join(host_cache_dir, "stacks")):
print("Copying cache directory content %s" % str(host_cache_dir))
print(f"Copying cache directory content {str(host_cache_dir)}")
self.copytree(self.cache_dir, host_cache_dir)

# Create hostname.sh script to use custom FQDN for each agent.
Expand Down Expand Up @@ -227,7 +227,7 @@ def create_host_name_script(self, host_name, host_name_script):
"echo HOSTNAME"
with open(str(host_name_script), "w+") as f:
f.writelines(template.replace("HOSTNAME", host_name))
subprocess.call("chmod +x %s" % host_name_script, shell=True)
subprocess.call(f"chmod +x {host_name_script}", shell=True)

def change_config(self, config_file, config_dict):
"""
Expand All @@ -238,7 +238,7 @@ def change_config(self, config_file, config_dict):
# TODO, allow appending configs to [AGENT] section.

if not os.path.exists(config_file):
print("ERROR. Did not file config file: %s" % config_file)
print(f"ERROR. Did not file config file: {config_file}")
return

lines = []
Expand All @@ -265,11 +265,11 @@ def change_config(self, config_file, config_dict):
# TODO, if can append configs, then this is not needed.
if len(configs_found) < len(config_dict.keys()):
missing_configs = set(config_dict.keys()) - configs_found
print("ERROR: Did not find all required configs. Missing: %s" % ", ".join(missing_configs))
print(f"ERROR: Did not find all required configs. Missing: {', '.join(missing_configs)}")
sys.exit(-1)

if len(configs_changed) > 0:
print("Making changes to file %s" % config_file)
print(f"Making changes to file {config_file}")
with open(config_file, "w") as f:
f.writelines(new_lines)

Expand All @@ -280,7 +280,7 @@ def modify_etc_hosts_file(self):
"""
etc_hosts = "/etc/hosts"
if not os.path.isfile(etc_hosts):
print("ERROR. Did not find file %s" % etc_hosts)
print(f"ERROR. Did not find file {etc_hosts}")
return

lines = []
Expand All @@ -300,7 +300,7 @@ def modify_etc_hosts_file(self):
new_lines.append(line)

if line_changed:
print("Making changes to %s" % etc_hosts)
print(f"Making changes to {etc_hosts}")
with open(etc_hosts, "w") as f:
f.writelines(new_lines)

Expand All @@ -318,42 +318,42 @@ def run(self):
self.cmd_status()

def cmd_start(self):
print("Starting %d host(s)" % len(self.hosts))
print(f"Starting {len(self.hosts)} host(s)")
for host in self.hosts:
cmd = "ambari-agent start --home %s" % (host.home_dir)
cmd = f"ambari-agent start --home {host.home_dir}"
os.environ['AMBARI_AGENT_CONF_DIR'] = os.path.join(host.home_dir, "etc/ambari-agent/conf")
subprocess.call(cmd, shell=True, env=os.environ)

def cmd_stop(self):
print("Stopping %d host(s)" % len(self.hosts))
print(f"Stopping {len(self.hosts)} host(s)")
for host in self.hosts:
cmd = "ambari-agent stop --home %s" % (host.home_dir)
cmd = f"ambari-agent stop --home {host.home_dir}"
os.environ['AMBARI_AGENT_CONF_DIR'] = os.path.join(host.home_dir, "etc/ambari-agent/conf")
subprocess.call(cmd, shell=True, env=os.environ)

def cmd_restart(self):
print("Restarting %d host(s)" % len(self.hosts))
print(f"Restarting {len(self.hosts)} host(s)")
for host in self.hosts:
cmd = "ambari-agent restart --home %s" % (host.home_dir)
cmd = f"ambari-agent restart --home {host.home_dir}"
os.environ['AMBARI_AGENT_CONF_DIR'] = os.path.join(host.home_dir, "etc/ambari-agent/conf")
subprocess.call(cmd, shell=True, env=os.environ)

def cmd_status(self):
print("Summary of Agent Status:")
print("Total agents: %d\n" % len(self.hosts))
print(f"Total agents: {len(self.hosts)}\n")
(running_hosts, unknown_hosts, stopped_hosts) = self.aggregate_status()

print("Running agents: %d" % len(running_hosts))
print(f"Running agents: {len(running_hosts)}")
if self.verbose and len(running_hosts):
print("(%s)\n" % (", ".join(running_hosts)))
print(f"({', '.join(running_hosts)})\n")

print("Unknown agents: %d" % len(unknown_hosts))
print(f"Unknown agents: {len(unknown_hosts)}")
if self.verbose and len(unknown_hosts):
print("(%s)\n" % (", ".join(unknown_hosts)))
print(f"({', '.join(unknown_hosts)})\n")

print("Stopped agents: %d" % len(stopped_hosts))
print(f"Stopped agents: {len(stopped_hosts)}")
if self.verbose and len(stopped_hosts):
print("(%s)\n" % (", ".join(stopped_hosts)))
print(f"({', '.join(stopped_hosts)})\n")

def aggregate_status(self):
"""
Expand Down
8 changes: 4 additions & 4 deletions ambari-agent/conf/unix/upgrade_agent_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

if os.path.isfile(CONFIG_FILE_BACKUP):
if os.path.isfile(CONFIG_FILE):
print("Upgrading configs in {0}".format(CONFIG_FILE))
print("Values will be updated from {0} except the following list: {1}, {2}".format(CONFIG_FILE_BACKUP, PROPERTIES_TO_REWRITE, SECTIONS_TO_REMOVE))
print(f"Upgrading configs in {CONFIG_FILE}")
print(f"Values will be updated from {CONFIG_FILE_BACKUP} except the following list: {PROPERTIES_TO_REWRITE}, {SECTIONS_TO_REMOVE}")

agent_config_backup = configparser.ConfigParser()
agent_config_backup.read(CONFIG_FILE_BACKUP)
Expand All @@ -54,6 +54,6 @@
with (open(CONFIG_FILE, "w")) as new_agent_config:
agent_config.write(new_agent_config)
else:
print("Values are not updated, configs {0} is not found".format(CONFIG_FILE))
print(f"Values are not updated, configs {CONFIG_FILE} is not found")
else:
print("Values are not updated, backup {0} is not found".format(CONFIG_FILE_BACKUP))
print(f"Values are not updated, backup {CONFIG_FILE_BACKUP} is not found")
8 changes: 4 additions & 4 deletions ambari-agent/conf/windows/service_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def agent_main():
possible_args = ' or '.join(str(x) for x in possible_args_numbers)
parser.error("Invalid number of arguments. Entered: " + str(len(args)) + ", required: " + possible_args)

options.exit_message = "Ambari Agent '%s' completed successfully." % action
options.exit_message = f"Ambari Agent '{action}' completed successfully."
try:
if action == SETUP_ACTION:
setup(options)
Expand All @@ -236,14 +236,14 @@ def agent_main():
for warning in options.warnings:
print_warning_msg(warning)
pass
options.exit_message = "Ambari Agent '%s' completed with warnings." % action
options.exit_message = f"Ambari Agent '{action}' completed with warnings."
pass
except FatalException as e:
if e.reason is not None:
print_error_msg("Exiting with exit code {0}. \nREASON: {1}".format(e.code, e.reason))
print_error_msg(f"Exiting with exit code {e.code}. \nREASON: {e.reason}")
sys.exit(e.code)
except NonFatalException as e:
options.exit_message = "Ambari Agent '%s' completed with warnings." % action
options.exit_message = f"Ambari Agent '{action}' completed with warnings."
if e.reason is not None:
print_warning_msg(e.reason)

Expand Down
18 changes: 9 additions & 9 deletions ambari-agent/src/main/python/ambari_agent/ActionQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def interrupt(self):
def cancel(self, commands):
for command in commands:

logger.info("Canceling command with taskId = {tid}".format(tid = str(command['target_task_id'])))
logger.info(f"Canceling command with taskId = {str(command['target_task_id'])}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(pprint.pformat(command))

Expand Down Expand Up @@ -151,7 +151,7 @@ def run(self):
if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
retry_able = command['commandParams']['command_retry_enabled'] == "true"
if retry_able:
logger.info("Kicking off a thread for the command, id={} taskId={}".format(command['commandId'], command['taskId']))
logger.info(f"Kicking off a thread for the command, id={command['commandId']} taskId={command['taskId']}")
t = threading.Thread(target=self.process_command, args=(command,))
t.daemon = True
t.start()
Expand Down Expand Up @@ -204,7 +204,7 @@ def process_command(self, command):
else:
logger.error("Unrecognized command %s", pprint.pformat(command))
except Exception:
logger.exception("Exception while processing {0} command".format(command_type))
logger.exception(f"Exception while processing {command_type} command")

def tasks_in_progress_or_pending(self):
return not self.commandQueue.empty() or self.recovery_manager.has_active_command()
Expand Down Expand Up @@ -271,7 +271,7 @@ def execute_command(self, command):

while retry_duration >= 0:
if taskId in self.taskIdsToCancel:
logger.info('Command with taskId = {0} canceled'.format(taskId))
logger.info(f'Command with taskId = {taskId} canceled')
command_canceled = True

self.taskIdsToCancel.discard(taskId)
Expand Down Expand Up @@ -303,7 +303,7 @@ def execute_command(self, command):
else:
status = CommandStatus.failed
if (command_result['exitcode'] == -signal.SIGTERM) or (command_result['exitcode'] == -signal.SIGKILL):
logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
logger.info(f'Command with taskId = {taskId} was canceled!')
command_canceled = True
self.taskIdsToCancel.discard(taskId)
break
Expand All @@ -314,7 +314,7 @@ def execute_command(self, command):
delay = retry_duration
retry_duration -= delay # allow one last attempt
command_result['stderr'] += "\n\nCommand failed. Retrying command execution ...\n\n"
logger.info("Retrying command with taskId = {cid} after a wait of {delay}".format(cid=taskId, delay=delay))
logger.info(f"Retrying command with taskId = {taskId} after a wait of {delay}")
if 'agentLevelParams' not in command:
command['agentLevelParams'] = {}

Expand All @@ -341,7 +341,7 @@ def execute_command(self, command):

# final result to stdout
command_result['stdout'] += '\n\nCommand completed successfully!\n' if status == CommandStatus.completed else '\n\nCommand failed after ' + str(num_attempts) + ' tries\n'
logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == CommandStatus.completed else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=num_attempts))
logger.info(f'Command with taskId = {taskId} completed successfully!' if status == CommandStatus.completed else f'Command with taskId = {taskId} failed after {num_attempts} tries')

role_result = self.commandStatuses.generate_report_template(command)
role_result.update({
Expand Down Expand Up @@ -405,9 +405,9 @@ def log_command_output(self, text, taskId):
chunks = split_on_chunks(hide_passwords(text), MAX_SYMBOLS_PER_LOG_MESSAGE)
if len(chunks) > 1:
for i in range(len(chunks)):
logger.info("Cmd log for taskId={0} and chunk {1}/{2} of log for command: \n".format(taskId, i+1, len(chunks)) + chunks[i])
logger.info(f"Cmd log for taskId={taskId} and chunk {i + 1}/{len(chunks)} of log for command: \n" + chunks[i])
else:
logger.info("Cmd log for taskId={0}: ".format(taskId) + chunks[0])
logger.info(f"Cmd log for taskId={taskId}: " + chunks[0])

def get_retry_delay(self, last_delay):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def reschedule(self):
# jobs without valid UUIDs should be unscheduled
if uuid_valid is False:
jobs_removed += 1
logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
logger.info(f"[AlertScheduler] Unscheduling {scheduled_job.name}")
self._collector.remove_by_uuid(scheduled_job.name)
self.__scheduler.unschedule_job(scheduled_job)

Expand Down Expand Up @@ -222,7 +222,7 @@ def reschedule_all(self):
# unschedule all scheduled jobs
for scheduled_job in scheduled_jobs:
jobs_removed += 1
logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
logger.info(f"[AlertScheduler] Unscheduling {scheduled_job.name}")
self._collector.remove_by_uuid(scheduled_job.name)
self.__scheduler.unschedule_job(scheduled_job)

Expand Down Expand Up @@ -255,7 +255,7 @@ def __load_definitions(self):

# cache the cluster and cluster hash after loading the JSON
if clusterName != '' and clusterHash is not None:
logger.info('[AlertScheduler] Caching cluster {0} with alert hash {1}'.format(clusterName, clusterHash))
logger.info(f'[AlertScheduler] Caching cluster {clusterName} with alert hash {clusterHash}')

for definition in command_json['alertDefinitions']:
alert = self.__json_to_callable(clusterName, hostName, publicHostName, Utils.get_mutable_copy(definition))
Expand All @@ -282,7 +282,7 @@ def __json_to_callable(self, clusterName, hostName, publicHostName, json_definit
source_type = source.get('type', '')

if logger.isEnabledFor(logging.DEBUG):
logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
logger.debug(f"[AlertScheduler] Creating job type {source_type} with {str(json_definition)}")


if source_type == AlertSchedulerHandler.TYPE_METRIC:
Expand Down
2 changes: 1 addition & 1 deletion ambari-agent/src/main/python/ambari_agent/AmbariAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def check_native_libs_support():
not_loaded_extensions.append("simplejson")

if not_loaded_extensions:
logger.warning("Some native extensions not available for module(s): {}, it may affect execution performance".format(",".join(not_loaded_extensions)))
logger.warning(f"Some native extensions not available for module(s): {','.join(not_loaded_extensions)}, it may affect execution performance")


def main():
Expand Down
2 changes: 1 addition & 1 deletion ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def get_resolved_config(cls, home_dir=""):
if os.path.exists(configPath):
config.read(configPath)
else:
raise Exception("No config found at {0}, use default".format(configPath))
raise Exception(f"No config found at {configPath}, use default")

except Exception as err:
logger.warn(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ def __init__(self, command, commandId, on_background_command_started, on_backgro
self.on_background_command_complete_callback = on_background_command_complete_callback

def __str__(self):
return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId)
return f"[BackgroundHandle: pid='{self.pid}', status='{self.status}', exitCode='{self.exitCode}', commandId='{self.commandId}']"
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def cache_delete(self, cache_update, cache_hash):

for cluster_id in cache_update:
if not cluster_id in mutable_dict:
logger.error("Cannot do alert_definitions delete for cluster cluster_id={0}, because do not have information about the cluster".format(cluster_id))
logger.error(f"Cannot do alert_definitions delete for cluster cluster_id={cluster_id}, because do not have information about the cluster")
continue

# deleting whole cluster
Expand All @@ -95,7 +95,7 @@ def cache_delete(self, cache_update, cache_hash):
index_of_alert = self.get_alert_definition_index_by_id(mutable_dict, cluster_id, id_to_update)

if index_of_alert == None:
raise Exception("Cannot delete an alert with id={0}".format(id_to_update))
raise Exception(f"Cannot delete an alert with id={id_to_update}")

del mutable_dict[cluster_id]['alertDefinitions'][index_of_alert]

Expand Down
Loading