From b7461e37682073fadf0743b6055cabdb3269d092 Mon Sep 17 00:00:00 2001 From: Nazarii Hnydyn <nazariig@mellanox.com> Date: Thu, 21 Feb 2019 20:30:16 +0200 Subject: [PATCH] Fixed review comments. Signed-off-by: Nazarii Hnydyn <nazariig@mellanox.com> --- sonic-xcvrd/scripts/xcvrd | 170 +++++++++++++++++++++++--------------- 1 file changed, 103 insertions(+), 67 deletions(-) diff --git a/sonic-xcvrd/scripts/xcvrd b/sonic-xcvrd/scripts/xcvrd index 0095e6516..40d478dbc 100644 --- a/sonic-xcvrd/scripts/xcvrd +++ b/sonic-xcvrd/scripts/xcvrd @@ -62,6 +62,7 @@ VOLT_UNIT = 'Volts' POWER_UNIT = 'dBm' BIAS_UNIT = 'mA' +TIMEOUT_SECS = 1 RUN = True #========================== Syslog wrappers ========================== @@ -219,7 +220,7 @@ def beautify_dom_info_dict(dom_info_dict): dom_info_dict['tx3power'] = strip_unit_and_beautify(dom_info_dict['tx3power'], POWER_UNIT) dom_info_dict['tx4power'] = strip_unit_and_beautify(dom_info_dict['tx4power'], POWER_UNIT) -# update all the sfp info to db +# update port sfp info in db def post_port_sfp_info_to_db(logical_port_name, table): ganged_port = False ganged_member_num = 1 @@ -258,7 +259,7 @@ def post_port_sfp_info_to_db(logical_port_name, table): log_error("This functionality is currently not implemented for this platform") sys.exit(3) -# update dom sensor info to db +# update port dom sensor info in db def post_port_dom_info_to_db(logical_port_name, table): ganged_port = False ganged_member_num = 1 @@ -307,8 +308,24 @@ def post_port_dom_info_to_db(logical_port_name, table): log_error("This functionality is currently not implemented for this platform") sys.exit(3) -# del sfp and dom info from db -def del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl): +# update port dom/sfp info in db +def post_port_sfp_dom_info_to_db(): + # Connect to STATE_DB and create transceiver dom/sfp info tables + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, + REDIS_HOSTNAME, + REDIS_PORT, + REDIS_TIMEOUT_MSECS) + int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO") + dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR") + + # Post all the current interface dom/sfp info to STATE_DB + logical_port_list = platform_sfputil.logical + for logical_port_name in logical_port_list: + post_port_sfp_info_to_db(logical_port_name, int_tbl) + post_port_dom_info_to_db(logical_port_name, dom_tbl) + +# delete port dom/sfp info from db +def del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl): ganged_port = False ganged_member_num = 1 @@ -332,29 +349,74 @@ def del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl): log_error("This functionality is currently not implemented for this platform") sys.exit(3) -# Timer thread wrapper class to update dom info to DB periodically +# wait for port config is done +def wait_for_port_config_done(): + # Connect to APPL_DB abd subscribe to PORT table notifications + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, + REDIS_HOSTNAME, + REDIS_PORT, + REDIS_TIMEOUT_MSECS) + + sel = swsscommon.Select() + sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(sst) + + # Make sure this daemon started after all port configured + while RUN: + (state, c) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + continue + if state != swsscommon.Select.OBJECT: + log_warning("sel.select() did not return swsscommon.Select.OBJECT") + continue + + (key, op, fvp) = sst.pop() + if key in ["PortConfigDone", "PortInitDone"]: + break + +# Thread wrapper class to update dom info periodically class dom_info_update_task: - def __init__(self, table): + def __init__(self): + self.task_thread = None self.task_stopping_event = threading.Event() - self.task_timer = None - self.dom_table = table - def task_run(self): - if self.task_stopping_event.isSet(): - log_error("Error: dom info update thread received stop event, exiting...") - return + def task_worker(self): + log_info("Start DOM monitoring loop") + + # Connect to STATE_DB and create transceiver dom/sfp info tables + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, + REDIS_HOSTNAME, + REDIS_PORT, + REDIS_TIMEOUT_MSECS) + int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO") + dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR") + + # Start loop to update dom info in DB periodically + while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): + logical_port_list = platform_sfputil.logical + for logical_port_name in logical_port_list: + post_port_dom_info_to_db(logical_port_name, dom_tbl) + # Delete all the information from DB and then exit logical_port_list = platform_sfputil.logical for logical_port_name in logical_port_list: - post_port_dom_info_to_db(logical_port_name, self.dom_table) + del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl) + + log_info("Stop DOM monitoring loop") + + def task_run(self): + if self.task_stopping_event.is_set(): + return - self.task_timer = threading.Timer(DOM_INFO_UPDATE_PERIOD_SECS, self.task_run) - self.task_timer.start() + self.task_thread = threading.Thread(target=self.task_worker) + self.task_thread.start() def task_stop(self): + if self.task_stopping_event.is_set(): + return + self.task_stopping_event.set() - self.task_timer.cancel() - self.task_timer.join() + self.task_thread.join() # Process wrapper class to update sfp state info periodically class sfp_state_update_task: @@ -365,7 +427,15 @@ class sfp_state_update_task: def task_worker(self, stopping_event): log_info("Start SFP monitoring loop") - # Start loop to listen to the SFP change event + # Connect to STATE_DB and create transceiver dom/sfp info tables + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, + REDIS_HOSTNAME, + REDIS_PORT, + REDIS_TIMEOUT_MSECS) + int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO") + dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR") + + # Start loop to listen to the sfp change event while not stopping_event.is_set(): status, port_dict = platform_sfputil.get_transceiver_change_event() if status: @@ -390,7 +460,7 @@ class sfp_state_update_task: else: # If get_transceiver_change_event() return error, will clean up the DB and then exit # TODO: next step need to define more error types to handle accordingly. - log_error("Method get_transceiver_change_event() returned error, exiting...") + log_error("Method get_transceiver_change_event() returned error. Exiting...") os.kill(os.getppid(), signal.SIGTERM) break @@ -404,7 +474,7 @@ class sfp_state_update_task: self.task_process.start() def task_stop(self): - if not self.task_process.is_alive: + if self.task_stopping_event.is_set(): return self.task_stopping_event.set() @@ -423,7 +493,7 @@ def main(): # Load platform-specific sfputil class err = load_platform_sfputil() if err != 0: - log_error("failed to load sfputil") + log_error("Failed to load sfputil", True) sys.exit(1) # Load port info @@ -435,45 +505,16 @@ def main(): log_error("Error reading port info (%s)" % str(e), True) sys.exit(2) - # Connect to STATE_DB and create transceiver info/dom info table - state_db = swsscommon.DBConnector(swsscommon.STATE_DB, - REDIS_HOSTNAME, - REDIS_PORT, - REDIS_TIMEOUT_MSECS) - int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO") - dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR") - - # Connect to APPL_DB abd subscribe to PORT table notifications - appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, - REDIS_HOSTNAME, - REDIS_PORT, - REDIS_TIMEOUT_MSECS) - - sel = swsscommon.Select() - sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(sst) - - # Make sure this daemon started after all port configured. - while RUN: - (state, c) = sel.select(SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - continue - if state != swsscommon.Select.OBJECT: - log_warning("sel.select() did not return swsscommon.Select.OBJECT") - continue - - (key, op, fvp) = sst.pop() - if key in ["PortConfigDone", "PortInitDone"]: - break + # Make sure this daemon started after all port configured + log_info("Wait for port config is done") + wait_for_port_config_done() - # Post all the current interface SFP info to STATE_DB - logical_port_list = platform_sfputil.logical - for logical_port_name in logical_port_list: - post_port_sfp_info_to_db(logical_port_name, int_tbl) - post_port_dom_info_to_db(logical_port_name, dom_tbl) + # Post all the current interface dom/sfp info to STATE_DB + log_info("Post all port DOM/SFP info to DB") + post_port_sfp_dom_info_to_db() - # Start the dom sensor info update timer thread - dom_info_update = dom_info_update_task(dom_tbl) + # Start the dom sensor info update thread + dom_info_update = dom_info_update_task() dom_info_update.task_run() # Start the sfp state info update process @@ -484,20 +525,15 @@ def main(): log_info("Start daemon main loop") while RUN: - time.sleep(1) + time.sleep(TIMEOUT_SECS) log_info("Stop daemon main loop") - # Stop the sfp state info update process - sfp_state_update.task_stop() - - # Stop the dom info update timer + # Stop the dom sensor info update thread dom_info_update.task_stop() - # Clean all the information from DB and then exit - logical_port_list = platform_sfputil.logical - for logical_port_name in logical_port_list: - del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl) + # Stop the sfp state info update process + sfp_state_update.task_stop() log_info("Shutting down...")