Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added validation, error handling and some debug messages for credentials #416

Merged
merged 3 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Added
- Validate port list to be sent to openvas. [#411](https://github.com/greenbone/ospd-openvas/pull/411)
- Validate credentials to be sent to openvas. [#416](https://github.com/greenbone/ospd-openvas/pull/416)

### Changed
### Removed
Expand Down
21 changes: 19 additions & 2 deletions ospd_openvas/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,10 +1264,27 @@ def exec_scan(self, scan_id: str):

# Set credentials
if not scan_prefs.prepare_credentials_for_openvas():
error = (
'All authentifications contain errors.'
+ 'Starting unauthenticated scan instead.'
)
self.add_scan_error(
scan_id, name='', host='', value='Malformed credential.'
scan_id,
name='',
host='',
value=error,
)
do_not_launch = True
logger.error(error)
errors = scan_prefs.get_error_messages()
for e in errors:
error = 'Malformed credential. ' + e
self.add_scan_error(
scan_id,
name='',
host='',
value=error,
)
logger.error(error)

if not scan_prefs.prepare_plugins_for_openvas():
self.add_scan_error(
Expand Down
119 changes: 103 additions & 16 deletions ospd_openvas/preferencehandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,20 @@ def __init__(

self.nvti = nvticache

self.errors = []

def prepare_scan_id_for_openvas(self):
"""Create the openvas scan id and store it in the redis kb.
Return the openvas scan_id.
"""
self.kbdb.add_scan_id(self.scan_id)

def get_error_messages(self) -> List:
"""Returns the Error List and reset it"""
ret = self.errors
self.errors = []
return ret

@property
def target_options(self) -> Dict:
"""Return target options from Scan collection"""
Expand Down Expand Up @@ -577,8 +585,7 @@ def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
if prefs_val:
self.kbdb.add_scan_preferences(self.scan_id, prefs_val)

@staticmethod
def build_credentials_as_prefs(credentials: Dict) -> List[str]:
def build_credentials_as_prefs(self, credentials: Dict) -> List[str]:
"""Parse the credential dictionary.
Arguments:
credentials: Dictionary with the credentials.
Expand All @@ -595,23 +602,36 @@ def build_credentials_as_prefs(credentials: Dict) -> List[str]:
username = cred_params.get('username', '')
password = cred_params.get('password', '')

# Check service ssh
if service == 'ssh':
port = cred_params.get('port', '')
cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
cred_prefs_list.append(
OID_SSH_AUTH
+ ':1:'
+ 'entry:SSH login '
+ 'name:|||{0}'.format(username)
)
# For ssh check the Port
port = cred_params.get('port', '22')
if not port:
port = '22'
warning = (
"Missing port number for ssh credentials."
+ " Using default port 22."
)
logger.warning(warning)
elif not port.isnumeric():
self.errors.append(
"Port for SSH '" + port + "' is not a valid number."
)
continue
elif int(port) > 65535 or int(port) < 1:
self.errors.append(
"Port for SSH is out of range (1-65535): " + port
)
continue
# For ssh check the credential type
if cred_type == 'up':
cred_prefs_list.append(
OID_SSH_AUTH
+ ':3:'
+ 'password:SSH password '
+ '(unsafe!):|||{0}'.format(password)
)
else:
elif cred_type == 'usk':
private = cred_params.get('private', '')
cred_prefs_list.append(
OID_SSH_AUTH
Expand All @@ -625,7 +645,30 @@ def build_credentials_as_prefs(credentials: Dict) -> List[str]:
+ 'file:SSH private key:|||'
+ '{0}'.format(private)
)
if service == 'smb':
elif cred_type:
self.errors.append(
"Unknown Credential Type for SSH: "
+ cred_type
+ ". Use 'up' for Username + Password"
+ " or 'usk' for Username + SSH Key."
)
continue
else:
self.errors.append(
"Missing Credential Type for SSH."
+ " Use 'up' for Username + Password"
+ " or 'usk' for Username + SSH Key."
)
continue
cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
cred_prefs_list.append(
OID_SSH_AUTH
+ ':1:'
+ 'entry:SSH login '
+ 'name:|||{0}'.format(username)
)
# Check servic smb
elif service == 'smb':
cred_prefs_list.append(
OID_SMB_AUTH
+ ':1:entry'
Expand All @@ -637,7 +680,8 @@ def build_credentials_as_prefs(credentials: Dict) -> List[str]:
+ 'password:SMB password:|||'
+ '{0}'.format(password)
)
if service == 'esxi':
# Check service esxi
elif service == 'esxi':
cred_prefs_list.append(
OID_ESXI_AUTH
+ ':1:entry:'
Expand All @@ -650,13 +694,47 @@ def build_credentials_as_prefs(credentials: Dict) -> List[str]:
+ 'password:ESXi login password:|||'
+ '{0}'.format(password)
)

if service == 'snmp':
# Check service snmp
elif service == 'snmp':
community = cred_params.get('community', '')
auth_algorithm = cred_params.get('auth_algorithm', '')
privacy_password = cred_params.get('privacy_password', '')
privacy_algorithm = cred_params.get('privacy_algorithm', '')

if not privacy_algorithm:
if privacy_password:
self.errors.append(
"When no privacy algorithm is used, the privacy"
+ " password also has to be empty."
)
continue
elif (
not privacy_algorithm == "aes"
and not privacy_algorithm == "des"
):
self.errors.append(
"Unknows privacy algorithm used: "
+ privacy_algorithm
+ ". Use 'aes', 'des' or '' (none)."
)
continue

if not auth_algorithm:
self.errors.append(
"Missing authentification algorithm for SNMP."
+ " Use 'md5' or 'sha1'."
)
continue
elif (
not auth_algorithm == "md5" and not auth_algorithm == "sha1"
):
self.errors.append(
"Unknown authentification algorithm: "
+ auth_algorithm
+ ". Use 'md5' or 'sha1'."
)
continue

cred_prefs_list.append(
OID_SNMP_AUTH
+ ':1:'
Expand Down Expand Up @@ -691,20 +769,29 @@ def build_credentials_as_prefs(credentials: Dict) -> List[str]:
+ 'radio:SNMPv3 Privacy Algorithm:|||'
+ '{0}'.format(privacy_algorithm)
)
elif service:
self.errors.append(
"Unknown service type for credential: " + service
)
else:
self.errors.append("Missing service type for credential.")

return cred_prefs_list

def prepare_credentials_for_openvas(self) -> bool:
"""Get the credentials from the scan collection and store them
in the kb."""
logger.debug("Looking for given Credentials...")
credentials = self.scan_collection.get_credentials(self.scan_id)
if credentials:
cred_prefs = self.build_credentials_as_prefs(credentials)
if cred_prefs:
self.kbdb.add_credentials_to_scan_preferences(
self.scan_id, cred_prefs
)

logger.debug("Credentials added to the kb.")
else:
logger.debug("No credentials found.")
if credentials and not cred_prefs:
return False

Expand Down
Loading