Skip to content

Commit

Permalink
update0926
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyd66 authored and jingyd66 committed Sep 26, 2024
1 parent adb61bb commit a32e8c8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
50 changes: 32 additions & 18 deletions handler/analyzer/analyze_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
import datetime
import os
import re

import csv
from tabulate import tabulate
from common.command import get_observer_version_by_sql
from handler.base_shell_handler import BaseShellHandler
from common.obdiag_exception import OBDIAGFormatException
from common.obdiag_exception import OBDIAGFormatException, OBDIAGDBConnException
from common.constant import const
from common.command import LocalClient, SshClient
from common.command import SshClient
from common.ob_log_level import OBLogLevel
from handler.meta.ob_error import OB_RET_DICT
from common.command import download_file, get_logfile_name_list, mkdir, delete_file
from common.tool import DirectoryUtil, TimeUtils, Util, StringUtils
from common.tool import Util
Expand Down Expand Up @@ -75,7 +74,7 @@ def __init__(self, context):
)
except Exception as e:
self.stdio.error("Failed to connect to database: {0}".format(e))
raise OBDIAGFormatException("Failed to connect to database: {0}".format(e))
raise OBDIAGDBConnException("Failed to connect to database: {0}".format(e))

def init_config(self):
self.nodes = self.context.cluster_config['servers']
Expand All @@ -96,11 +95,10 @@ def init_option(self):
to_option = Util.get_option(options, 'to')
since_option = Util.get_option(options, 'since')
store_dir_option = Util.get_option(options, 'store_dir')
files_option = Util.get_option(options, 'files')
tenant_option = Util.get_option(options, 'tenant')
queue_option = Util.get_option(options, 'queue')
if tenant_option is None:
self.stdio.error('Error: tenant must input ')
self.stdio.error('--tenant option was not provided')
return False
self.tenant = tenant_option
observer_version = self.get_version()
Expand All @@ -113,7 +111,7 @@ def init_option(self):
self.stdio.verbose("sql is {0}".format(sql))
sql_result = self.obconn.execute_sql_return_cursor_dictionary(sql).fetchall()
if len(sql_result) <= 0:
self.stdio.error('Error: tenant is {0} not in this cluster '.format(tenant_option))
self.stdio.error('tenant is {0} not in this cluster '.format(tenant_option))
return False
self.stdio.verbose("sql_result is {0}".format(sql_result))
for row in sql_result:
Expand Down Expand Up @@ -163,8 +161,8 @@ def get_version(self):
try:
observer_version = get_observer_version_by_sql(self.ob_cluster, self.stdio)
except Exception as e:
self.stdio.warn("AnalyzeHandler Failed to get observer version:{0}".format(e))
self.stdio.verbose("AnalyzeHandler.init get observer version: {0}".format(observer_version))
self.stdio.warn("AnalyzeQueueHandler failed to get observer version:{0}".format(e))
self.stdio.verbose("AnalyzeQueueHandler get observer version: {0}".format(observer_version))
return observer_version

def handle(self):
Expand Down Expand Up @@ -192,15 +190,18 @@ def handle_from_node(node):
self.nodes = nodes_new
for node in self.nodes:
handle_from_node(node)
self.stdio.print(analyze_tuples)
self.stdio.verbose(analyze_tuples)
table_data = []
headers = ['IP', 'Tenant Name', 'From_TimeStamp', 'To_TimeStamp', 'Is Queue', 'Queue Limit', 'Over Queue Limit Count', 'Max Queue']
headers = ['IP', 'Tenant Name', 'From_Time', 'To_Time', 'Is Queue', 'Queue Limit', 'Over Queue Limit Count', 'Max Queue']
for ip, info in analyze_tuples:
row = [ip, info['tenant_name'], info['from_datetime_timestamp'], info['to_datetime_timestamp'], info['is_queue'], info['queue_limit'], info['over_queue_limit'], info['max_queue']]
table_data.append(row)
queue_result = tabulate(table_data, headers=headers, tablefmt="pretty")
self.stdio.print("\nQueue Result:")
self.stdio.print(queue_result)
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), str(queue_result))
last_info = "\nFor more details, please run cmd \033[32m' cat {0} '\033[0m\n".format(os.path.join(local_store_parent_dir, "result_details.txt"))
self.stdio.print(last_info)
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": queue_result})

def __handle_from_node(self, node, local_store_parent_dir):
Expand All @@ -215,8 +216,7 @@ def __handle_from_node(self, node, local_store_parent_dir):
local_store_dir = "{0}/{1}".format(local_store_parent_dir, ssh_client.get_name())
DirectoryUtil.mkdir(path=local_store_dir, stdio=self.stdio)
except Exception as e:
ssh_failed = True
raise Exception("Please check the {0}".format(self.config_path))
raise Exception(("failed to handle from node: {0}, error: {1}".format(node, e)))

from_datetime_timestamp = TimeUtils.timestamp_to_filename_time(TimeUtils.datetime_to_timestamp(self.from_time_str))
to_datetime_timestamp = TimeUtils.timestamp_to_filename_time(TimeUtils.datetime_to_timestamp(self.to_time_str))
Expand All @@ -239,10 +239,9 @@ def __handle_from_node(self, node, local_store_parent_dir):
node_results.append(file_result)
delete_file(ssh_client, gather_dir_full_path, self.stdio)
ssh_client.ssh_close()
self.stdio.print(node_results)
self.__write_to_csv(local_store_parent_dir, node_results)
count, max_queue_value = self.count_and_find_max_queues(node_results, queue_limit)
self.stdio.print(count)
self.stdio.print(max_queue_value)
self.stdio.verbose("count:{0}, max_queue_value:{1}".format(count, max_queue_value))
result_dict['tenant_name'] = self.tenant
if max_queue_value > queue_limit:
result_dict['is_queue'] = 'yes'
Expand All @@ -253,7 +252,6 @@ def __handle_from_node(self, node, local_store_parent_dir):
result_dict['max_queue'] = max_queue_value
result_dict['from_datetime_timestamp'] = from_datetime_timestamp
result_dict['to_datetime_timestamp'] = to_datetime_timestamp
self.stdio.print(result_dict)
return result_dict

def count_and_find_max_queues(self, data, queue_limit):
Expand Down Expand Up @@ -395,3 +393,19 @@ def __parse_log_lines(self, file_full_path):

results.append(result)
return results

def __write_to_csv(self, local_store_parent_dir, data):
try:
if not data or not isinstance(data, list) or not data[0] or not isinstance(data[0], list):
raise ValueError("Data is not in the expected format. It should be a non-empty list of lists containing dictionaries.")
fieldnames = data[0][0].keys()
file_path = os.path.join(local_store_parent_dir, "node_results.csv")
with open(file_path, mode='w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in data[0]:
writer.writerow(row)
except ValueError as ve:
self.stdio.exception(f"ValueError: {ve}")
except Exception as e:
self.stdio.exception(f"an unexpected error occurred: {e}")
2 changes: 1 addition & 1 deletion init_obdiag_cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ _obdiag_completion() {
;;
analyze)
if [ "$COMP_CWORD" -eq 2 ]; then
type_list="log flt_trace parameter variable index_space"
type_list="log flt_trace parameter variable index_space queue"
elif [ "${COMP_WORDS[2]}" = "parameter" ] && [ "$COMP_CWORD" -eq 3 ]; then
type_list="diff default"
elif [ "${COMP_WORDS[2]}" = "variable" ] && [ "$COMP_CWORD" -eq 3 ]; then
Expand Down

0 comments on commit a32e8c8

Please sign in to comment.