-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[cherry-pick]Add security hub API test case (#19377)
Add security hub API test case Fix #19269 Signed-off-by: Yang Jiao <[email protected]>
- Loading branch information
1 parent
0107eb3
commit 3c62b75
Showing
5 changed files
with
233 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import base | ||
from v2_swagger_client.rest import ApiException | ||
from urllib.parse import quote | ||
|
||
|
||
class SecurityHub(base.Base): | ||
|
||
|
||
def __init__(self): | ||
super(SecurityHub, self).__init__(api_type="securityhub") | ||
|
||
|
||
def get_security_summary(self, with_dangerous_cve=True, with_dangerous_artifact=True, expect_status_code=200, expect_response_body=None, **kwargs): | ||
try: | ||
return_data, status_code, _ = self._get_client(**kwargs).get_security_summary_with_http_info(with_dangerous_cve=with_dangerous_cve, with_dangerous_artifact=with_dangerous_artifact) | ||
except ApiException as e: | ||
base._assert_status_code(expect_status_code, e.status) | ||
if expect_response_body is not None: | ||
base._assert_status_body(expect_response_body, e.body) | ||
return | ||
base._assert_status_code(expect_status_code, status_code) | ||
return return_data | ||
|
||
|
||
def list_vulnerabilities(self, q=None, tune_count=True, with_tag=True, page=1, page_size=10, expect_status_code=200, expect_response_body=None, **kwargs): | ||
try: | ||
if q is not None: | ||
q = quote(q) | ||
return_data, status_code, _ = self._get_client(**kwargs).list_vulnerabilities_with_http_info(q=q, tune_count=tune_count, with_tag=with_tag, page=page, page_size=page_size) | ||
else: | ||
return_data, status_code, _ = self._get_client(**kwargs).list_vulnerabilities_with_http_info(tune_count=tune_count, with_tag=with_tag, page=page, page_size=page_size) | ||
except ApiException as e: | ||
base._assert_status_code(expect_status_code, e.status) | ||
if expect_response_body is not None: | ||
base._assert_status_body(expect_response_body, e.body) | ||
return | ||
base._assert_status_code(expect_status_code, status_code) | ||
return return_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
import unittest | ||
|
||
from testutils import harbor_server, ADMIN_CLIENT, suppress_urllib3_warning | ||
from library.project import Project | ||
from library.user import User | ||
from library.repository import push_image_to_project | ||
from library.scan import Scan | ||
from library.artifact import Artifact | ||
from library.securityhub import SecurityHub | ||
|
||
class TestSecurityHub(unittest.TestCase): | ||
|
||
|
||
@suppress_urllib3_warning | ||
def setUp(self): | ||
self.project = Project() | ||
self.user = User() | ||
self.scan = Scan() | ||
self.artifact = Artifact() | ||
self.securityhub = SecurityHub() | ||
self.image = "ghcr.io/goharbor/notary-server-photon" | ||
self.new_image = "notary-server-photon" | ||
self.tag = "v2.2.0" | ||
self.digest = "sha256:379bf2c7cd55b4214ced7f9a885b46f81992eb01abebfd67693f5cb394611ad1" | ||
|
||
|
||
def testSecurityHub(self): | ||
""" | ||
Test case: | ||
Security Hub | ||
Test step and expected result: | ||
1. Create a new user(UA); | ||
2. Create a new project(PA) by user(UA); | ||
3. Push a new image(IA) in project(PA) by user(UA); | ||
4. Send scan image command and get tag(TA) information to check scan result, it should be finished; | ||
5. Get vulnerability system summary; | ||
6. Check the vulnerability system summary is correct; | ||
7. Check the Get the vulnerability list API; | ||
8. Check Get Vulnerability List API Search by severity; | ||
9. Check Get Vulnerability List API Search by project name; | ||
10. Check Get Vulnerability List API Search by repository name; | ||
11. Check Get Vulnerability List API Search by tag; | ||
12. Check Get Vulnerability List API Search by digest; | ||
13. Check Get Vulnerability List API Search by package; | ||
14. Check Get Vulnerability List API Search by cve_id; | ||
15. Check Get Vulnerability List API Search by cvss3; | ||
16. Check Get Vulnerability List API Search by all options; | ||
""" | ||
url = ADMIN_CLIENT["endpoint"] | ||
user_password = "Aa123456" | ||
|
||
# 1. Create user(UA) | ||
_, user_name = self.user.create_user(user_password=user_password, **ADMIN_CLIENT) | ||
user_client = dict(endpoint=url, username=user_name, password=user_password, with_scan_overview = True) | ||
|
||
# 2. Create private project(PA) by user(UA) | ||
project_id, project_name = self.project.create_project(metadata={"public": "false"}, **user_client) | ||
|
||
# 3. Push a new image(IA) in project(PA) by user(UA) | ||
repository_name, tag = push_image_to_project(project_name, harbor_server, user_name, user_password, self.image, self.tag, new_image=self.new_image) | ||
|
||
# 4. Send scan image command and get tag(TA) information to check scan result, it should be finished | ||
self.scan.scan_artifact(project_name, self.new_image, tag, **user_client) | ||
self.artifact.check_image_scan_result(project_name, self.new_image, tag, **user_client) | ||
|
||
# 5. Get vulnerability system summary | ||
security_summary = self.securityhub.get_security_summary(**ADMIN_CLIENT) | ||
|
||
# 6. Check the vulnerability system summary is correct | ||
self.check_security_summary(security_summary, repository_name) | ||
|
||
# 7. Check the Get the vulnerability list API | ||
vulnerabilities = self.securityhub.list_vulnerabilities(**ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities) | ||
|
||
# 8. Check Get Vulnerability List API Search by severity | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Critical", **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, severity="Critical") | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=High", **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, severity="High") | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Medium", **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, severity="Medium") | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Low", **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, severity="Low") | ||
|
||
# 9. Check Get Vulnerability List API Search by project name | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="project_id=%s" % project_id, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, project_id=project_id) | ||
vulnerability = vulnerabilities[0] | ||
|
||
# 10. Check Get Vulnerability List API Search by repository name | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="repository_name=%s" % repository_name, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, repository_name=repository_name) | ||
|
||
# 11. Check Get Vulnerability List API Search by tag | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="tag=%s" % tag, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, tag=tag) | ||
|
||
# 12. Check Get Vulnerability List API Search by digest | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="digest=%s" % self.digest, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, digest=self.digest) | ||
|
||
# 13. Check Get Vulnerability List API Search by package | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="package=%s" % vulnerability.package, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, package=vulnerability.package) | ||
|
||
# 14. Check Get Vulnerability List API Search by cve_id | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="cve_id=%s" % vulnerability.cve_id, **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, cve_id=vulnerability.cve_id) | ||
|
||
# 15. Check Get Vulnerability List API Search by cvss3 | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="cvss_score_v3=[%s~%s]" % (vulnerability.cvss_v3_score, vulnerability.cvss_v3_score), **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, cvss3_from=vulnerability.cvss_v3_score, cvss3_to=vulnerability.cvss_v3_score) | ||
|
||
# 16. Check Get Vulnerability List API Search by all options | ||
vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=%s,project_id=%s,repository_name=%s,tag=%s,digest=%s,package=%s,cve_id=%s,cvss_score_v3=[%s~%s]" % | ||
(vulnerability.severity, vulnerability.project_id, vulnerability.repository_name, vulnerability.tags[0], vulnerability.digest, vulnerability.package, vulnerability.cve_id, vulnerability.cvss_v3_score, vulnerability.cvss_v3_score), **ADMIN_CLIENT) | ||
self.check_vulnerabilities(vulnerabilities, severity=vulnerability.severity, project_id=vulnerability.project_id, repository_name=vulnerability.repository_name, tag=vulnerability.tags[0], digest=vulnerability.digest, package=vulnerability.package, cve_id=vulnerability.cve_id, cvss3_from=vulnerability.cvss_v3_score, cvss3_to=vulnerability.cvss_v3_score) | ||
|
||
def check_security_summary(self, security_summary, repository_name): | ||
# Check the summary is correct | ||
self.assertTrue(security_summary.critical_cnt > 0) | ||
self.assertTrue(security_summary.high_cnt > 0) | ||
self.assertTrue(security_summary.medium_cnt > 0) | ||
self.assertTrue(security_summary.low_cnt > 0) | ||
self.assertTrue(security_summary.fixable_cnt > 0) | ||
self.assertTrue(security_summary.scanned_cnt > 0) | ||
self.assertTrue(security_summary.total_artifact > 0) | ||
self.assertTrue(security_summary.total_vuls > 0) | ||
# Check the dangerous artifacts is correct | ||
dangerous_artifacts = security_summary.dangerous_artifacts | ||
self.assertTrue(0 < len(dangerous_artifacts) <= 5) | ||
artifact_sorted_list = sorted(dangerous_artifacts, key=lambda artifact: (artifact.critical_cnt, artifact.high_cnt, artifact.medium_cnt), reverse=True) | ||
self.assertListEqual(dangerous_artifacts, artifact_sorted_list) | ||
dangerous_artifacts_dict = {item.repository_name: item for item in dangerous_artifacts} | ||
self.assertIn(repository_name, dangerous_artifacts_dict.keys()) | ||
for key, value in dangerous_artifacts_dict.items(): | ||
if key == repository_name: | ||
self.assertEqual(value.digest, self.digest) | ||
self.assertTrue(value.critical_cnt > 0) | ||
self.assertTrue(value.high_cnt > 0) | ||
self.assertTrue(value.medium_cnt > 0) | ||
break | ||
# Check the dangerous cves is correct | ||
dangerous_cves = security_summary.dangerous_cves | ||
self.assertTrue(0 < len(dangerous_cves) <= 5) | ||
cve_sorted_list = sorted(dangerous_cves, key=lambda cve: cve.cvss_score_v3) | ||
self.assertListEqual(dangerous_cves, cve_sorted_list) | ||
|
||
|
||
def check_vulnerabilities(self, vulnerabilities, severity=None, cve_id=None, cvss3_from=0.0, cvss3_to=10.0, project_id=None, repository_name=None, package=None, tag=None, digest=None): | ||
for vulnerability in vulnerabilities: | ||
self.assertEqual(vulnerability.severity, severity) if severity is not None else self.assertIsNotNone(vulnerability.severity) | ||
self.assertEqual(vulnerability.cve_id, cve_id) if cve_id is not None else self.assertIsNotNone(vulnerability.cve_id) | ||
self.assertEqual(vulnerability.project_id, project_id) if project_id is not None else self.assertIsNotNone(vulnerability.project_id) | ||
self.assertEqual(vulnerability.repository_name, repository_name) if repository_name is not None else self.assertIsNotNone(vulnerability.repository_name) | ||
self.assertEqual(vulnerability.package, package) if package is not None else self.assertIsNotNone(vulnerability.package) | ||
self.assertEqual(vulnerability.tags[0], tag) if tag is not None else self.assertIsNotNone(vulnerability.tags[0]) | ||
self.assertEqual(vulnerability.digest, digest) if digest is not None else self.assertIsNotNone(vulnerability.digest) | ||
self.assertTrue(cvss3_from <= vulnerability.cvss_v3_score <= cvss3_to) | ||
self.assertIsNotNone(vulnerability.desc) | ||
self.assertIsNotNone(vulnerability.links) | ||
self.assertIsNotNone(vulnerability.version) | ||
|
||
|
||
def testSecurityHubAPIValidate(self): | ||
""" | ||
Test case: | ||
Log Rotaion Permission API | ||
Test step and expected result: | ||
1. Create a new user(UA); | ||
2. User(UA) should not have permission to call get vulnerability system summary API; | ||
3. User(UA) should not have permission to call get Vulnerability List API; | ||
""" | ||
url = ADMIN_CLIENT["endpoint"] | ||
user_password = "Aa123456" | ||
# 1. Create user(UA) | ||
_, user_name = self.user.create_user(user_password=user_password, **ADMIN_CLIENT) | ||
user_client = dict(endpoint=url, username=user_name, password=user_password) | ||
# 2. User(UA) should not have permission to call get vulnerability system summary API | ||
self.securityhub.get_security_summary(expect_status_code=403, expect_response_body="FORBIDDEN", **user_client) | ||
# 3. User(UA) should not have permission to call get Vulnerability List API | ||
self.securityhub.list_vulnerabilities(expect_status_code=403, expect_response_body="FORBIDDEN", **user_client) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters