diff --git a/dockers/docker-macsec/cli-plugin-tests/conftest.py b/dockers/docker-macsec/cli-plugin-tests/conftest.py new file mode 100644 index 000000000000..8812a8dd2ce3 --- /dev/null +++ b/dockers/docker-macsec/cli-plugin-tests/conftest.py @@ -0,0 +1,29 @@ +import pytest +import mock_tables # lgtm [py/unused-import] +from unittest import mock + +@pytest.fixture() +def mock_cfgdb(): + cfgdb = mock.Mock() + CONFIG = { + 'PORT': { + 'Ethernet0': { + } + } + } + + def get_entry(table, key): + if table not in CONFIG or key not in CONFIG[table]: + return {} + return CONFIG[table][key] + + def set_entry(table, key, data): + CONFIG.setdefault(table, {}) + CONFIG[table].setdefault(key, {}) + CONFIG[table][key] = data + + cfgdb.get_entry = mock.Mock(side_effect=get_entry) + cfgdb.set_entry = mock.Mock(side_effect=set_entry) + + yield cfgdb + diff --git a/dockers/docker-macsec/cli-plugin-tests/mock_tables.py b/dockers/docker-macsec/cli-plugin-tests/mock_tables.py new file mode 120000 index 000000000000..ea114a425cbf --- /dev/null +++ b/dockers/docker-macsec/cli-plugin-tests/mock_tables.py @@ -0,0 +1 @@ +../../docker-dhcp-relay/cli-plugin-tests/mock_tables.py \ No newline at end of file diff --git a/dockers/docker-macsec/cli-plugin-tests/test_config_macsec.py b/dockers/docker-macsec/cli-plugin-tests/test_config_macsec.py index 474fc8e5f381..c0c0d25d38c4 100644 --- a/dockers/docker-macsec/cli-plugin-tests/test_config_macsec.py +++ b/dockers/docker-macsec/cli-plugin-tests/test_config_macsec.py @@ -1,138 +1,144 @@ import sys +import traceback from unittest import mock - -# from utilities_common.db import Db - -# sys.path.append('../cli/config/plugins/') -# import macsec - -# from click.testing import CliRunner -# import config.main as config - -# def test_plugin_registration(self): -# cli = mock.MagicMock() -# macsec.register(cli) -# cli.add_command.assert_called_once_with(macsec.macsec) - -# def test_macsec_default_profile(): -# runner = CliRunner() -# db = Db() - -# profile_name = "test" -# primary_cak = "01234567890123456789012345678912" -# primary_ckn = "01234567890123456789012345678912" -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], [profile_name, "--primary_cak=" + primary_cak,"--primary_ckn=" + primary_ckn], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) -# assert profile_table -# assert profile_table["priority"] == "255" -# assert profile_table["cipher_suite"] == "GCM-AES-128" -# assert profile_table["primary_cak"] == primary_cak -# assert profile_table["primary_ckn"] == primary_ckn -# assert profile_table["policy"] == "security" -# assert "enable_replay_protect" not in profile_table -# assert "replay_window" not in profile_table -# assert profile_table["send_sci"] == "true" -# assert "rekey_period" not in profile_table - -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["del"], [profile_name], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) -# assert not profile_table - - -# def test_macsec_valid_profile(): -# runner = CliRunner() -# db = Db() - -# profile_name = "test" -# profile_map = { -# "primary_cak": "0123456789012345678901234567891201234567890123456789012345678912", -# "primary_ckn": "01234567890123456789012345678912", -# "priority": 64, -# "cipher_suite": "GCM-AES-XPN-256", -# "policy": "integrity_only", -# "enable_replay_protect": None, -# "replay_window": 100, -# "no_send_sci": None, -# "rekey_period": 30 * 60, -# } -# options = [profile_name] -# for k, v in profile_map.items(): -# options.append("--" + k) -# if v is not None: -# options[-1] += "=" + str(v) - -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], options, obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) -# assert profile_table -# assert profile_table["priority"] == str(profile_map["priority"]) -# assert profile_table["cipher_suite"] == profile_map["cipher_suite"] -# assert profile_table["primary_cak"] == profile_map["primary_cak"] -# assert profile_table["primary_ckn"] == profile_map["primary_ckn"] -# assert profile_table["policy"] == profile_map["policy"] -# if "enable_replay_protect" in profile_map: -# assert "enable_replay_protect" in profile_table and profile_table["enable_replay_protect"] == "true" -# assert profile_table["replay_window"] == str(profile_map["replay_window"]) -# if "send_sci" in profile_map: -# assert profile_table["send_sci"] == "true" -# if "no_send_sci" in profile_map: -# assert profile_table["send_sci"] == "false" -# if "rekey_period" in profile_map: -# assert profile_table["rekey_period"] == str(profile_map["rekey_period"]) - - -# def test_macsec_invalid_profile(): -# runner = CliRunner() -# db = Db() - -# # Loss primary cak and primary ckn -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test"], obj=db) -# assert result.exit_code != 0 - -# # Invalid primary cak -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test", "--primary_cak=abcdfghjk90123456789012345678912","--primary_ckn=01234567890123456789012345678912", "--cipher_suite=GCM-AES-128"], obj=db) -# assert result.exit_code != 0 - -# # Invalid primary cak length -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912", "--cipher_suite=GCM-AES-256"], obj=db) -# assert result.exit_code != 0 - - -# def test_macsec_port(): -# runner = CliRunner() -# db = Db() - -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# result = runner.invoke(config.config.commands["macsec"].commands["port"].commands["add"], ["Ethernet0", "test"], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# port_table = db.cfgdb.get_entry("PORT", "Ethernet0") -# assert port_table -# assert port_table["macsec"] == "test" - -# result = runner.invoke(config.config.commands["macsec"].commands["port"].commands["del"], ["Ethernet0"], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# port_table = db.cfgdb.get_entry("PORT", "Ethernet0") -# assert not port_table["macsec"] - - -# def test_macsec_invalid_operation(): -# runner = CliRunner() -# db = Db() - -# # Enable nonexisted profile -# result = runner.invoke(config.config.commands["macsec"].commands["port"].commands["add"], ["Ethernet0", "test"], obj=db) -# assert result.exit_code != 0 - -# # Delete nonexisted profile -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["del"], ["test"], obj=db) -# assert result.exit_code != 0 - -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) -# assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) -# # Repeat add profile -# result = runner.invoke(config.config.commands["macsec"].commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) -# assert result.exit_code != 0 +from click.testing import CliRunner +from utilities_common.db import Db + +sys.path.append('../cli/config/plugins/') +import macsec + + +profile_name = "test" +primary_cak = "01234567890123456789012345678912" +primary_ckn = "01234567890123456789012345678912" + + +class TestConfigMACsec(object): + def test_plugin_registration(self): + cli = mock.MagicMock() + macsec.register(cli) + cli.commands['macsec'].add_command.assert_called_once_with(macsec.macsec) + + def test_default_profile(self, mock_cfgdb): + runner = CliRunner() + db = Db() + db.cfgdb = mock_cfgdb + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], + [profile_name, "--primary_cak=" + primary_cak,"--primary_ckn=" + primary_ckn], + obj=db) + assert result.exit_code == 0 + profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) + assert profile_table + assert profile_table["priority"] == "255" + assert profile_table["cipher_suite"] == "GCM-AES-128" + assert profile_table["primary_cak"] == primary_cak + assert profile_table["primary_ckn"] == primary_ckn + assert profile_table["policy"] == "security" + assert "enable_replay_protect" not in profile_table + assert "replay_window" not in profile_table + assert profile_table["send_sci"] == "true" + assert "rekey_period" not in profile_table + + result = runner.invoke(macsec.macsec.commands["profile"].commands["del"], [profile_name], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) + assert not profile_table + + def test_macsec_valid_profile(self, mock_cfgdb): + runner = CliRunner() + db = Db() + db.cfgdb = mock_cfgdb + + profile_name = "test" + profile_map = { + "primary_cak": "0123456789012345678901234567891201234567890123456789012345678912", + "primary_ckn": "01234567890123456789012345678912", + "priority": 64, + "cipher_suite": "GCM-AES-XPN-256", + "policy": "integrity_only", + "enable_replay_protect": None, + "replay_window": 100, + "no_send_sci": None, + "rekey_period": 30 * 60, + } + options = [profile_name] + for k, v in profile_map.items(): + options.append("--" + k) + if v is not None: + options[-1] += "=" + str(v) + + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], options, obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + profile_table = db.cfgdb.get_entry("MACSEC_PROFILE", profile_name) + assert profile_table + assert profile_table["priority"] == str(profile_map["priority"]) + assert profile_table["cipher_suite"] == profile_map["cipher_suite"] + assert profile_table["primary_cak"] == profile_map["primary_cak"] + assert profile_table["primary_ckn"] == profile_map["primary_ckn"] + assert profile_table["policy"] == profile_map["policy"] + if "enable_replay_protect" in profile_map: + assert "enable_replay_protect" in profile_table and profile_table["enable_replay_protect"] == "true" + assert profile_table["replay_window"] == str(profile_map["replay_window"]) + if "send_sci" in profile_map: + assert profile_table["send_sci"] == "true" + if "no_send_sci" in profile_map: + assert profile_table["send_sci"] == "false" + if "rekey_period" in profile_map: + assert profile_table["rekey_period"] == str(profile_map["rekey_period"]) + + def test_macsec_invalid_profile(self, mock_cfgdb): + runner = CliRunner() + db = Db() + db.cfgdb = mock_cfgdb + + # Loss primary cak and primary ckn + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test"], obj=db) + assert result.exit_code != 0 + + # Invalid primary cak + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test", "--primary_cak=abcdfghjk90123456789012345678912","--primary_ckn=01234567890123456789012345678912", "--cipher_suite=GCM-AES-128"], obj=db) + assert result.exit_code != 0 + + # Invalid primary cak length + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912", "--cipher_suite=GCM-AES-256"], obj=db) + assert result.exit_code != 0 + + + def test_macsec_port(self, mock_cfgdb): + runner = CliRunner() + db = Db() + db.cfgdb = mock_cfgdb + + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + result = runner.invoke(macsec.macsec.commands["port"].commands["add"], ["Ethernet0", "test"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + port_table = db.cfgdb.get_entry("PORT", "Ethernet0") + assert port_table + assert port_table["macsec"] == "test" + + result = runner.invoke(macsec.macsec.commands["port"].commands["del"], ["Ethernet0"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + port_table = db.cfgdb.get_entry("PORT", "Ethernet0") + assert not port_table["macsec"] + + + def test_macsec_invalid_operation(self, mock_cfgdb): + runner = CliRunner() + db = Db() + db.cfgdb = mock_cfgdb + + # Enable nonexisted profile + result = runner.invoke(macsec.macsec.commands["port"].commands["add"], ["Ethernet0", "test"], obj=db) + assert result.exit_code != 0 + + # Delete nonexisted profile + result = runner.invoke(macsec.macsec.commands["profile"].commands["del"], ["test"], obj=db) + assert result.exit_code != 0 + + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + # Repeat add profile + result = runner.invoke(macsec.macsec.commands["profile"].commands["add"], ["test", "--primary_cak=01234567890123456789012345678912","--primary_ckn=01234567890123456789012345678912"], obj=db) + assert result.exit_code != 0 diff --git a/dockers/docker-macsec/cli/config/plugins/macsec.py b/dockers/docker-macsec/cli/config/plugins/macsec.py index ad0b68f48934..53578f0c7e8c 100644 --- a/dockers/docker-macsec/cli/config/plugins/macsec.py +++ b/dockers/docker-macsec/cli/config/plugins/macsec.py @@ -103,7 +103,6 @@ def add_profile(db, profile, priority, cipher_suite, primary_cak, primary_ckn, p Add MACsec profile """ ctx = click.get_current_context() - profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile) if not len(profile_entry) == 0: ctx.fail("{} already exists".format(profile)) @@ -169,7 +168,7 @@ def del_profile(db, profile): def register(cli): - cli.add_command(macsec) + cli.commands['macsec'].add_command(macsec) if __name__ == '__main__': diff --git a/rules/docker-macsec.mk b/rules/docker-macsec.mk index 64251c686f6c..c364f60a60d1 100644 --- a/rules/docker-macsec.mk +++ b/rules/docker-macsec.mk @@ -17,15 +17,13 @@ $(DOCKER_MACSEC)_LOAD_DOCKERS += $(DOCKER_CONFIG_ENGINE_BULLSEYE) $(DOCKER_MACSEC)_INSTALL_PYTHON_WHEELS = $(SONIC_UTILITIES_PY3) $(DOCKER_MACSEC)_INSTALL_DEBS = $(PYTHON3_SWSSCOMMON) $(LIBYANG_PY3) -SONIC_DOCKER_IMAGES += $(DOCKER_MACSEC) -# ifeq ($(INCLUDE_MACSEC), y) -# SONIC_INSTALL_DOCKER_IMAGES += $(DOCKER_MACSEC) -# endif +ifeq ($(INCLUDE_MACSEC), y) +SONIC_INSTALL_DOCKER_IMAGES += $(DOCKER_MACSEC) +endif -SONIC_DOCKER_DBG_IMAGES += $(DOCKER_MACSEC_DBG) -# ifeq ($(INCLUDE_MACSEC), y) -# SONIC_INSTALL_DOCKER_DBG_IMAGES += $(DOCKER_MACSEC_DBG) -# endif +ifeq ($(INCLUDE_MACSEC), y) +SONIC_INSTALL_DOCKER_DBG_IMAGES += $(DOCKER_MACSEC_DBG) +endif ifeq ($(INCLUDE_MACSEC),y) ifeq ($(INSTALL_DEBUG_TOOLS),y) @@ -44,8 +42,6 @@ $(DOCKER_MACSEC)_RUN_OPT += -v /host/warmboot:/var/warmboot $(DOCKER_MACSEC)_CLI_CONFIG_PLUGIN = /cli/config/plugins/macsec.py -# $(DOCKER_MACSEC)_CLI_SHOW_PLUGIN = /cli/show/plugins/show_dhcp_relay.py - $(DOCKER_MACSEC)_FILES += $(SUPERVISOR_PROC_EXIT_LISTENER_SCRIPT) SONIC_BULLSEYE_DOCKERS += $(DOCKER_MACSEC)