Skip to content

Commit fd35308

Browse files
authored
Fix: queued scan count (#850)
Wait 0, 10, 20, 30, ..., or 90 miliseconds when add a new scan in the queue. This try to avoid a race condition counting the amount of queued scans when many task are started at the same time. This leads into wrong queue position number shown in the logs and could jump the max_queued_scans setting in some cases.
1 parent 52ef7a0 commit fd35308

13 files changed

+268
-230
lines changed

ospd/command/command.py

+85-81
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343

4444

4545
class BaseCommand(metaclass=InitSubclassMeta):
46-
4746
name = None
4847
description = None
4948
attributes = None
@@ -544,99 +543,104 @@ def handle_xml(self, xml: Element) -> bytes:
544543
Return:
545544
Response string for <start_scan> command.
546545
"""
546+
with self._daemon.scan_collection.scan_collection_lock:
547+
current_queued_scans = self._daemon.get_count_queued_scans()
548+
if (
549+
self._daemon.max_queued_scans
550+
and current_queued_scans >= self._daemon.max_queued_scans
551+
):
552+
logger.info(
553+
'Maximum number of queued scans set to %d reached.',
554+
self._daemon.max_queued_scans,
555+
)
556+
raise OspdCommandError(
557+
'Maximum number of queued scans set to '
558+
f'{str(self._daemon.max_queued_scans)} reached.',
559+
'start_scan',
560+
)
547561

548-
current_queued_scans = self._daemon.get_count_queued_scans()
549-
if (
550-
self._daemon.max_queued_scans
551-
and current_queued_scans >= self._daemon.max_queued_scans
552-
):
553-
logger.info(
554-
'Maximum number of queued scans set to %d reached.',
555-
self._daemon.max_queued_scans,
556-
)
557-
raise OspdCommandError(
558-
'Maximum number of queued scans set to '
559-
f'{str(self._daemon.max_queued_scans)} reached.',
560-
'start_scan',
561-
)
562-
563-
target_str = xml.get('target')
564-
ports_str = xml.get('ports')
565-
566-
# For backward compatibility, if target and ports attributes are set,
567-
# <targets> element is ignored.
568-
if target_str is None or ports_str is None:
569-
target_element = xml.find('targets/target')
570-
if target_element is None:
571-
raise OspdCommandError('No targets or ports', 'start_scan')
562+
target_str = xml.get('target')
563+
ports_str = xml.get('ports')
564+
565+
# For backward compatibility, if target and ports attributes
566+
# are set, <targets> element is ignored.
567+
if target_str is None or ports_str is None:
568+
target_element = xml.find('targets/target')
569+
if target_element is None:
570+
raise OspdCommandError('No targets or ports', 'start_scan')
571+
else:
572+
scan_target = OspRequest.process_target_element(
573+
target_element
574+
)
572575
else:
573-
scan_target = OspRequest.process_target_element(target_element)
574-
else:
575-
scan_target = {
576-
'hosts': target_str,
577-
'ports': ports_str,
578-
'credentials': {},
579-
'exclude_hosts': '',
580-
'finished_hosts': '',
581-
'options': {},
582-
}
583-
logger.warning(
584-
"Legacy start scan command format is being used, which "
585-
"is deprecated since 20.08. Please read the documentation "
586-
"for start scan command."
587-
)
576+
scan_target = {
577+
'hosts': target_str,
578+
'ports': ports_str,
579+
'credentials': {},
580+
'exclude_hosts': '',
581+
'finished_hosts': '',
582+
'options': {},
583+
}
584+
logger.warning(
585+
"Legacy start scan command format is being used, which "
586+
"is deprecated since 20.08. Please read the documentation "
587+
"for start scan command."
588+
)
588589

589-
scan_id = xml.get('scan_id')
590-
if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
591-
raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
590+
scan_id = xml.get('scan_id')
591+
if (
592+
scan_id is not None
593+
and scan_id != ''
594+
and not valid_uuid(scan_id)
595+
):
596+
raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
597+
598+
if xml.get('parallel'):
599+
logger.warning(
600+
"parallel attribute of start_scan will be ignored, sice "
601+
"parallel scan is not supported by OSPd."
602+
)
592603

593-
if xml.get('parallel'):
594-
logger.warning(
595-
"parallel attribute of start_scan will be ignored, sice "
596-
"parallel scan is not supported by OSPd."
604+
scanner_params = xml.find('scanner_params')
605+
if scanner_params is None:
606+
scanner_params = {}
607+
608+
# params are the parameters we got from the <scanner_params> XML.
609+
params = self._daemon.preprocess_scan_params(scanner_params)
610+
611+
# VTS is an optional element. If present should not be empty.
612+
vt_selection = {} # type: Dict
613+
scanner_vts = xml.find('vt_selection')
614+
if scanner_vts is not None:
615+
if len(scanner_vts) == 0:
616+
raise OspdCommandError('VTs list is empty', 'start_scan')
617+
else:
618+
vt_selection = OspRequest.process_vts_params(scanner_vts)
619+
620+
scan_params = self._daemon.process_scan_params(params)
621+
scan_id_aux = scan_id
622+
scan_id = self._daemon.create_scan(
623+
scan_id, scan_target, scan_params, vt_selection
597624
)
598625

599-
scanner_params = xml.find('scanner_params')
600-
if scanner_params is None:
601-
scanner_params = {}
602-
603-
# params are the parameters we got from the <scanner_params> XML.
604-
params = self._daemon.preprocess_scan_params(scanner_params)
626+
if not scan_id:
627+
id_ = Element('id')
628+
id_.text = scan_id_aux
629+
return simple_response_str('start_scan', 100, 'Continue', id_)
605630

606-
# VTS is an optional element. If present should not be empty.
607-
vt_selection = {} # type: Dict
608-
scanner_vts = xml.find('vt_selection')
609-
if scanner_vts is not None:
610-
if len(scanner_vts) == 0:
611-
raise OspdCommandError('VTs list is empty', 'start_scan')
612-
else:
613-
vt_selection = OspRequest.process_vts_params(scanner_vts)
614-
615-
scan_params = self._daemon.process_scan_params(params)
616-
scan_id_aux = scan_id
617-
scan_id = self._daemon.create_scan(
618-
scan_id, scan_target, scan_params, vt_selection
619-
)
631+
logger.info(
632+
'Scan %s added to the queue in position %d.',
633+
scan_id,
634+
self._daemon.get_count_queued_scans() + 1,
635+
)
620636

621-
if not scan_id:
622637
id_ = Element('id')
623-
id_.text = scan_id_aux
624-
return simple_response_str('start_scan', 100, 'Continue', id_)
625-
626-
logger.info(
627-
'Scan %s added to the queue in position %d.',
628-
scan_id,
629-
current_queued_scans + 1,
630-
)
631-
632-
id_ = Element('id')
633-
id_.text = scan_id
638+
id_.text = scan_id
634639

635-
return simple_response_str('start_scan', 200, 'OK', id_)
640+
return simple_response_str('start_scan', 200, 'OK', id_)
636641

637642

638643
class GetMemoryUsage(BaseCommand):
639-
640644
name = "get_memory_usage"
641645
description = "print the memory consumption of all processes"
642646
attributes = {

ospd/ospd.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -542,10 +542,10 @@ def handle_client_stream(self, stream: Stream) -> None:
542542
except (AttributeError, ValueError) as message:
543543
logger.error(message)
544544
return
545-
except (ssl.SSLError) as exception:
545+
except ssl.SSLError as exception:
546546
logger.debug('Error: %s', exception)
547547
break
548-
except (socket.timeout) as exception:
548+
except socket.timeout as exception:
549549
logger.debug('Request timeout: %s', exception)
550550
break
551551

ospd/resultlist.py

-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ def add_result_to_list(
138138
qod: str = '',
139139
uri: str = '',
140140
) -> None:
141-
142141
result = OrderedDict() # type: Dict
143142
result['type'] = result_type
144143
result['name'] = name

ospd/scan.py

+4
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,13 @@ def __init__(self, file_storage_dir: str) -> None:
7777
) # type: Optional[multiprocessing.managers.SyncManager]
7878
self.scans_table = dict() # type: Dict
7979
self.file_storage_dir = file_storage_dir
80+
self.scan_collection_lock = (
81+
None
82+
) # type: Optional[multiprocessing.managers.Lock]
8083

8184
def init(self):
8285
self.data_manager = multiprocessing.Manager()
86+
self.scan_collection_lock = self.data_manager.RLock()
8387

8488
def add_result(
8589
self,

ospd_openvas/daemon.py

-4
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,6 @@ def __init__(
519519
self._mqtt_broker_port = mqtt_broker_port
520520

521521
def init(self, server: BaseServer) -> None:
522-
523522
notus_handler = NotusResultHandler(self.report_results)
524523

525524
if self._mqtt_broker_address:
@@ -610,7 +609,6 @@ def get_feed_info(self) -> Dict[str, Any]:
610609
feed_info = {}
611610
with feed_info_file.open(encoding='utf-8') as fcontent:
612611
for line in fcontent:
613-
614612
try:
615613
key, value = line.split('=', 1)
616614
except ValueError:
@@ -1000,7 +998,6 @@ def report_results(self, results: list, scan_id: str) -> bool:
1000998

1001999
@staticmethod
10021000
def is_openvas_process_alive(openvas_process: psutil.Popen) -> bool:
1003-
10041001
try:
10051002
if openvas_process.status() == psutil.STATUS_ZOMBIE:
10061003
logger.debug("Process is a Zombie, waiting for it to clean up")
@@ -1191,7 +1188,6 @@ def exec_scan(self, scan_id: str):
11911188

11921189
got_results = False
11931190
while True:
1194-
11951191
openvas_process_is_alive = self.is_openvas_process_alive(
11961192
openvas_process
11971193
)

ospd_openvas/notus.py

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ class Cache:
6767
def __init__(
6868
self, main_db: MainDB, prefix: str = "internal/notus/advisories"
6969
):
70-
7170
self._main_db = main_db
7271
# Check if it was previously uploaded
7372
self.ctx, _ = OpenvasDB.find_database_by_pattern(

ospd_openvas/nvticache.py

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838

3939

4040
class NVTICache(BaseDB):
41-
4241
QOD_TYPES = {
4342
'exploit': '100',
4443
'remote_vul': '99',

0 commit comments

Comments
 (0)