-
Notifications
You must be signed in to change notification settings - Fork 670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sfputil] Firmware download/upgrade CLI support for QSFP-DD #1947
Changes from all commits
a65f974
2fe2671
f71a7f0
83d8b94
f53f590
e666291
716d3c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
import sys | ||
import natsort | ||
import ast | ||
import time | ||
import datetime | ||
|
||
import subprocess | ||
import click | ||
|
@@ -26,12 +28,20 @@ | |
PLATFORM_JSON = 'platform.json' | ||
PORT_CONFIG_INI = 'port_config.ini' | ||
|
||
EXIT_FAIL = -1 | ||
EXIT_SUCCESS = 0 | ||
ERROR_PERMISSIONS = 1 | ||
ERROR_CHASSIS_LOAD = 2 | ||
ERROR_SFPUTILHELPER_LOAD = 3 | ||
ERROR_PORT_CONFIG_LOAD = 4 | ||
ERROR_NOT_IMPLEMENTED = 5 | ||
ERROR_INVALID_PORT = 6 | ||
SMBUS_BLOCK_WRITE_SIZE = 32 | ||
# Default host password as per CMIS spec: | ||
# http://www.qsfp-dd.com/wp-content/uploads/2021/05/CMIS5p0.pdf | ||
CDB_DEFAULT_HOST_PASSWORD = 0x00001011 | ||
|
||
MAX_LPL_FIRMWARE_BLOCK_SIZE = 116 #Bytes | ||
|
||
# TODO: We should share these maps and the formatting functions between sfputil and sfpshow | ||
QSFP_DATA_MAP = { | ||
|
@@ -225,6 +235,17 @@ | |
# Global logger instance | ||
log = logger.Logger(SYSLOG_IDENTIFIER) | ||
|
||
def is_sfp_present(port_name): | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
presence = sfp.get_presence() | ||
except NotImplementedError: | ||
click.echo("sfp get_presence() NOT implemented!", err=True) | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
return bool(presence) | ||
|
||
# ========================== Methods for formatting output ========================== | ||
|
||
|
@@ -408,6 +429,19 @@ def logical_port_name_to_physical_port_list(port_name): | |
else: | ||
return [int(port_name)] | ||
|
||
def logical_port_to_physical_port_index(port_name): | ||
if not platform_sfputil.is_logical_port(port_name): | ||
click.echo("Error: invalid port '{}'\n".format(port_name)) | ||
print_all_valid_port_values() | ||
sys.exit(ERROR_INVALID_PORT) | ||
|
||
physical_port = logical_port_name_to_physical_port_list(port_name)[0] | ||
if physical_port is None: | ||
click.echo("Error: No physical port found for logical port '{}'".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
return physical_port | ||
|
||
|
||
def print_all_valid_port_values(): | ||
click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) | ||
|
@@ -805,6 +839,37 @@ def lpmode(port): | |
|
||
click.echo(tabulate(output_table, table_header, tablefmt='simple')) | ||
|
||
def show_firmware_version(physical_port): | ||
try: | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
api = sfp.get_xcvr_api() | ||
out = api.get_module_fw_info() | ||
click.echo(out['info']) | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
# 'fwversion' subcommand | ||
@show.command() | ||
@click.argument('port_name', metavar='<port_name>', required=True) | ||
def fwversion(port_name): | ||
"""Show firmware version of the transceiver""" | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
presence = sfp.get_presence() | ||
except NotImplementedError: | ||
click.echo("sfp get_presence() NOT implemented!") | ||
sys.exit(EXIT_FAIL) | ||
|
||
if not presence: | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
show_firmware_version(physical_port) | ||
sys.exit(EXIT_SUCCESS) | ||
|
||
# 'lpmode' subgroup | ||
@cli.group() | ||
|
@@ -903,6 +968,265 @@ def reset(port_name): | |
|
||
i += 1 | ||
|
||
# 'firmware' subgroup | ||
@cli.group() | ||
def firmware(): | ||
"""Download/Upgrade firmware on the transceiver""" | ||
pass | ||
|
||
def run_firmware(port_name, mode): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Make the inactive firmware as the current running firmware | ||
@port_name: | ||
@mode: 0, 1, 2, 3 different modes to run the firmware | ||
Returns 1 on success, and exit_code = -1 on failure | ||
""" | ||
status = 0 | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
if mode == 0: | ||
click.echo("Running firmware: Non-hitless Reset to Inactive Image") | ||
elif mode == 1: | ||
click.echo("Running firmware: Hitless Reset to Inactive Image") | ||
elif mode == 2: | ||
click.echo("Running firmware: Attempt non-hitless Reset to Running Image") | ||
elif mode == 3: | ||
click.echo("Running firmware: Attempt Hitless Reset to Running Image") | ||
else: | ||
click.echo("Running firmware: Unknown mode {}".format(mode)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
try: | ||
status = api.cdb_run_firmware(mode) | ||
except NotImplementedError: | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
click.echo("This functionality is not applicable for this transceiver") | ||
sys.exit(EXIT_FAIL) | ||
|
||
return status | ||
|
||
def commit_firmware(port_name): | ||
status = 0 | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
try: | ||
status = api.cdb_commit_firmware() | ||
except NotImplementedError: | ||
click.echo("This functionality is not applicable for this transceiver") | ||
|
||
return status | ||
|
||
def download_firmware(port_name, filepath): | ||
"""Download firmware on the transceiver""" | ||
try: | ||
fd = open(filepath, 'rb') | ||
fd.seek(0, 2) | ||
file_size = fd.tell() | ||
fd.seek(0, 0) | ||
except FileNotFoundError: | ||
click.echo("Firmware file {} NOT found".format(filepath)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is NOT applicable to this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
try: | ||
fwinfo = api.get_module_fw_mgmt_feature() | ||
if fwinfo['status'] == True: | ||
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature'] | ||
else: | ||
click.echo("Failed to fetch CDB Firmware management features") | ||
sys.exit(EXIT_FAIL) | ||
except NotImplementedError: | ||
click.echo("This functionality is NOT applicable for this transceiver") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
click.echo('CDB: Starting firmware download') | ||
startdata = fd.read(startLPLsize) | ||
status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size) | ||
if status != 1: | ||
click.echo('CDB: Start firmware download failed - status {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
# Increase the optoe driver's write max to speed up firmware download | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE) | ||
|
||
with click.progressbar(length=file_size, label="Downloading ...") as bar: | ||
address = 0 | ||
BLOCK_SIZE = MAX_LPL_FIRMWARE_BLOCK_SIZE if lplonly_flag else maxblocksize | ||
remaining = file_size - startLPLsize | ||
while remaining > 0: | ||
count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining | ||
data = fd.read(count) | ||
if len(data) != count: | ||
click.echo("Firmware file read failed!") | ||
sys.exit(EXIT_FAIL) | ||
|
||
if lplonly_flag: | ||
status = api.cdb_lpl_block_write(address, data) | ||
else: | ||
status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength) | ||
if (status != 1): | ||
click.echo("CDB: firmware download failed! - status {}".format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
bar.update(count) | ||
address += count | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we make sure count matches len of the data read There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If that does not match, you trigger assert. However this is a runtime error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now, exiting |
||
remaining -= count | ||
|
||
# Restore the optoe driver's write max to '1' (default value) | ||
sfp.set_optoe_write_max(1) | ||
|
||
status = api.cdb_firmware_download_complete() | ||
click.echo('CDB: firmware download complete') | ||
return status | ||
|
||
# 'run' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.option('--mode', default="1", type=click.Choice(["0", "1", "2", "3"]), show_default=True, | ||
help="0 = Non-hitless Reset to Inactive Image\n \ | ||
1 = Hitless Reset to Inactive Image (Default)\n \ | ||
2 = Attempt non-hitless Reset to Running Image\n \ | ||
3 = Attempt Hitless Reset to Running Image\n") | ||
def run(port_name, mode): | ||
"""Run the firmware with default mode=1""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
status = run_firmware(port_name, int(mode)) | ||
if status != 1: | ||
click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware run in mode={} success".format(mode)) | ||
|
||
# 'commit' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
def commit(port_name): | ||
"""Commit the running firmware""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
status = commit_firmware(port_name) | ||
if status != 1: | ||
click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware commit successful") | ||
|
||
# 'upgrade' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.argument('filepath', required=True, default=None) | ||
def upgrade(port_name, filepath): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Upgrade firmware on the transceiver""" | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
show_firmware_version(physical_port) | ||
|
||
status = download_firmware(port_name, filepath) | ||
if status == 1: | ||
click.echo("Firmware download complete success") | ||
else: | ||
click.echo("Firmware download complete failed! CDB status = {}".format(status)) | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sys.exit(EXIT_FAIL) | ||
|
||
status = run_firmware(port_name, 1) | ||
if status != 1: | ||
click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware run in mode 1 successful") | ||
|
||
status = commit_firmware(port_name) | ||
if status != 1: | ||
click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware commit successful") | ||
|
||
# 'download' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.argument('filepath', required=True, default=None) | ||
def download(port_name, filepath): | ||
"""Download firmware on the transceiver""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
start = time.time() | ||
status = download_firmware(port_name, filepath) | ||
if status == 1: | ||
click.echo("Firmware download complete success") | ||
else: | ||
click.echo("Firmware download complete failed! status = {}".format(status)) | ||
sys.exit(EXIT_FAIL) | ||
end = time.time() | ||
click.echo("Total download Time: {}".format(str(datetime.timedelta(seconds=end-start)))) | ||
|
||
|
||
# 'unlock' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.option('--password', type=click.INT, help="Password in integer\n") | ||
def unlock(port_name, password): | ||
"""Unlock the firmware download feature via CDB host password""" | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
if password is None: | ||
password = CDB_DEFAULT_HOST_PASSWORD | ||
try: | ||
status = api.cdb_enter_host_password(int(password)) | ||
except NotImplementedError: | ||
click.echo("This functionality is not applicable for this transceiver") | ||
sys.exit(EXIT_FAIL) | ||
|
||
if status == 1: | ||
click.echo("CDB: Host password accepted") | ||
else: | ||
click.echo("CDB: Host password NOT accepted! status = {}".format(status)) | ||
|
||
# 'version' subcommand | ||
@cli.command() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this function available in some other library? can we reuse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i couldn't find one in sonic-utilities