-
Notifications
You must be signed in to change notification settings - Fork 6.9k
/
Copy pathharness.py
837 lines (714 loc) · 32.9 KB
/
harness.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from asyncio.log import logger
from enum import Enum
import platform
import re
import os
import sys
import subprocess
import shlex
from collections import OrderedDict
import xml.etree.ElementTree as ET
import logging
import threading
import time
import shutil
import json
from pytest import ExitCode
from twisterlib.reports import ReportStatus
from twisterlib.error import ConfigurationError, StatusAttributeError
from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
from twisterlib.handlers import Handler, terminate_process, SUPPORTED_SIMS_IN_PYTEST
from twisterlib.statuses import TwisterStatus
from twisterlib.testinstance import TestInstance
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
_WINDOWS = platform.system() == 'Windows'
result_re = re.compile(r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds")
class Harness:
GCOV_START = "GCOV_COVERAGE_DUMP_START"
GCOV_END = "GCOV_COVERAGE_DUMP_END"
FAULT = "ZEPHYR FATAL ERROR"
RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
RUN_FAILED = "PROJECT EXECUTION FAILED"
run_id_pattern = r"RunID: (?P<run_id>.*)"
def __init__(self):
self._status = TwisterStatus.NONE
self.reason = None
self.type = None
self.regex = []
self.matches = OrderedDict()
self.ordered = True
self.id = None
self.fail_on_fault = True
self.fault = False
self.capture_coverage = False
self.next_pattern = 0
self.record = None
self.record_pattern = None
self.record_as_json = None
self.recording = []
self.ztest = False
self.detected_suite_names = []
self.run_id = None
self.matched_run_id = False
self.run_id_exists = False
self.instance: TestInstance | None = None
self.testcase_output = ""
self._match = False
@property
def status(self) -> TwisterStatus:
return self._status
@status.setter
def status(self, value : TwisterStatus) -> None:
# Check for illegal assignments by value
try:
key = value.name if isinstance(value, Enum) else value
self._status = TwisterStatus[key]
except KeyError:
raise StatusAttributeError(self.__class__, value)
def configure(self, instance):
self.instance = instance
config = instance.testsuite.harness_config
self.id = instance.testsuite.id
self.run_id = instance.run_id
if instance.testsuite.ignore_faults:
self.fail_on_fault = False
if config:
self.type = config.get('type', None)
self.regex = config.get('regex', [])
self.ordered = config.get('ordered', True)
self.record = config.get('record', {})
if self.record:
self.record_pattern = re.compile(self.record.get("regex", ""))
self.record_as_json = self.record.get("as_json")
def build(self):
pass
def get_testcase_name(self):
"""
Get current TestCase name.
"""
return self.id
def translate_record(self, record: dict) -> dict:
if self.record_as_json:
for k in self.record_as_json:
if not k in record:
continue
try:
record[k] = json.loads(record[k]) if record[k] else {}
except json.JSONDecodeError as parse_error:
logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
f" {parse_error} for '{k}':'{record[k]}'")
# Don't set the Harness state to failed for recordings.
record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
return record
def parse_record(self, line) -> re.Match:
match = None
if self.record_pattern:
match = self.record_pattern.search(line)
if match:
rec = self.translate_record({ k:v.strip() for k,v in match.groupdict(default="").items() })
self.recording.append(rec)
return match
#
def process_test(self, line):
self.parse_record(line)
runid_match = re.search(self.run_id_pattern, line)
if runid_match:
run_id = runid_match.group("run_id")
self.run_id_exists = True
if run_id == str(self.run_id):
self.matched_run_id = True
if self.RUN_PASSED in line:
if self.fault:
self.status = TwisterStatus.FAIL
self.reason = "Fault detected while running test"
else:
self.status = TwisterStatus.PASS
if self.RUN_FAILED in line:
self.status = TwisterStatus.FAIL
self.reason = "Testsuite failed"
if self.fail_on_fault:
if self.FAULT == line:
self.fault = True
if self.GCOV_START in line:
self.capture_coverage = True
elif self.GCOV_END in line:
self.capture_coverage = False
class Robot(Harness):
is_robot_test = True
def configure(self, instance):
super(Robot, self).configure(instance)
self.instance = instance
config = instance.testsuite.harness_config
if config:
self.path = config.get('robot_testsuite', None)
self.option = config.get('robot_option', None)
def handle(self, line):
''' Test cases that make use of this harness care about results given
by Robot Framework which is called in run_robot_test(), so works of this
handle is trying to give a PASS or FAIL to avoid timeout, nothing
is writen into handler.log
'''
self.instance.status = TwisterStatus.PASS
tc = self.instance.get_case_or_create(self.id)
tc.status = TwisterStatus.PASS
def run_robot_test(self, command, handler):
start_time = time.time()
env = os.environ.copy()
if self.option:
if isinstance(self.option, list):
for option in self.option:
for v in str(option).split():
command.append(f'{v}')
else:
for v in str(self.option).split():
command.append(f'{v}')
if self.path is None:
raise PytestHarnessException(f'The parameter robot_testsuite is mandatory')
if isinstance(self.path, list):
for suite in self.path:
command.append(os.path.join(handler.sourcedir, suite))
else:
command.append(os.path.join(handler.sourcedir, self.path))
with subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
out, _ = renode_test_proc.communicate()
self.instance.execution_time = time.time() - start_time
if renode_test_proc.returncode == 0:
self.instance.status = TwisterStatus.PASS
# all tests in one Robot file are treated as a single test case,
# so its status should be set accordingly to the instance status
# please note that there should be only one testcase in testcases list
self.instance.testcases[0].status = TwisterStatus.PASS
else:
logger.error("Robot test failure: %s for %s" %
(handler.sourcedir, self.instance.platform.name))
self.instance.status = TwisterStatus.FAIL
self.instance.testcases[0].status = TwisterStatus.FAIL
if out:
with open(os.path.join(self.instance.build_dir, handler.log), "wt") as log:
log_msg = out.decode(sys.getdefaultencoding())
log.write(log_msg)
class Console(Harness):
def get_testcase_name(self):
'''
Get current TestCase name.
Console Harness id has only TestSuite id without TestCase name suffix.
Only the first TestCase name might be taken if available when a Ztest with
a single test case is configured to use this harness type for simplified
output parsing instead of the Ztest harness as Ztest suite should do.
'''
if self.instance and len(self.instance.testcases) == 1:
return self.instance.testcases[0].name
return super(Console, self).get_testcase_name()
def configure(self, instance):
super(Console, self).configure(instance)
if self.regex is None or len(self.regex) == 0:
self.status = TwisterStatus.FAIL
tc = self.instance.set_case_status_by_name(
self.get_testcase_name(),
TwisterStatus.FAIL,
f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
)
raise ConfigurationError(self.instance.name, tc.reason)
if self.type == "one_line":
self.pattern = re.compile(self.regex[0])
self.patterns_expected = 1
elif self.type == "multi_line":
self.patterns = []
for r in self.regex:
self.patterns.append(re.compile(r))
self.patterns_expected = len(self.patterns)
else:
self.status = TwisterStatus.FAIL
tc = self.instance.set_case_status_by_name(
self.get_testcase_name(),
TwisterStatus.FAIL,
f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
)
raise ConfigurationError(self.instance.name, tc.reason)
#
def handle(self, line):
if self.type == "one_line":
if self.pattern.search(line):
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
f"'{self.pattern.pattern}'")
self.next_pattern += 1
self.status = TwisterStatus.PASS
elif self.type == "multi_line" and self.ordered:
if (self.next_pattern < len(self.patterns) and
self.patterns[self.next_pattern].search(line)):
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
f"{self.next_pattern + 1}/{self.patterns_expected}):"
f"'{self.patterns[self.next_pattern].pattern}'")
self.next_pattern += 1
if self.next_pattern >= len(self.patterns):
self.status = TwisterStatus.PASS
elif self.type == "multi_line" and not self.ordered:
for i, pattern in enumerate(self.patterns):
r = self.regex[i]
if pattern.search(line) and not r in self.matches:
self.matches[r] = line
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
f"{len(self.matches)}/{self.patterns_expected}):"
f"'{pattern.pattern}'")
if len(self.matches) == len(self.regex):
self.status = TwisterStatus.PASS
else:
logger.error("Unknown harness_config type")
if self.fail_on_fault:
if self.FAULT in line:
self.fault = True
if self.GCOV_START in line:
self.capture_coverage = True
elif self.GCOV_END in line:
self.capture_coverage = False
self.process_test(line)
# Reset the resulting test state to FAIL when not all of the patterns were
# found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
# It might happen because of the pattern sequence diverged from the
# test code, the test platform has console issues, or even some other
# test image was executed.
# TODO: Introduce explicit match policy type to reject
# unexpected console output, allow missing patterns, deny duplicates.
if self.status == TwisterStatus.PASS and \
self.ordered and \
self.next_pattern < self.patterns_expected:
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
f" {self.next_pattern} of {self.patterns_expected}"
f" expected ordered patterns.")
self.status = TwisterStatus.FAIL
self.reason = "patterns did not match (ordered)"
if self.status == TwisterStatus.PASS and \
not self.ordered and \
len(self.matches) < self.patterns_expected:
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
f" {len(self.matches)} of {self.patterns_expected}"
f" expected unordered patterns.")
self.status = TwisterStatus.FAIL
self.reason = "patterns did not match (unordered)"
tc = self.instance.get_case_or_create(self.get_testcase_name())
if self.status == TwisterStatus.PASS:
tc.status = TwisterStatus.PASS
else:
tc.status = TwisterStatus.FAIL
class PytestHarnessException(Exception):
"""General exception for pytest."""
class Pytest(Harness):
def configure(self, instance: TestInstance):
super(Pytest, self).configure(instance)
self.running_dir = instance.build_dir
self.source_dir = instance.testsuite.source_dir
self.report_file = os.path.join(self.running_dir, 'report.xml')
self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
self.reserved_dut = None
self._output = []
def pytest_run(self, timeout):
try:
cmd = self.generate_command()
self.run_command(cmd, timeout)
except PytestHarnessException as pytest_exception:
logger.error(str(pytest_exception))
self.status = TwisterStatus.FAIL
self.instance.reason = str(pytest_exception)
finally:
self.instance.record(self.recording)
self._update_test_status()
if self.reserved_dut:
self.instance.handler.make_dut_available(self.reserved_dut)
def generate_command(self):
config = self.instance.testsuite.harness_config
handler: Handler = self.instance.handler
pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
pytest_args_yaml = config.get('pytest_args', []) if config else []
pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
command = [
'pytest',
'--twister-harness',
'-s', '-v',
f'--build-dir={self.running_dir}',
f'--junit-xml={self.report_file}',
'--log-file-level=DEBUG',
'--log-file-format=%(asctime)s.%(msecs)d:%(levelname)s:%(name)s: %(message)s',
f'--log-file={self.pytest_log_file_path}',
f'--platform={self.instance.platform.name}'
]
command.extend([os.path.normpath(os.path.join(
self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
if pytest_dut_scope:
command.append(f'--dut-scope={pytest_dut_scope}')
# Always pass output from the pytest test and the test image up to Twister log.
command.extend([
'--log-cli-level=DEBUG',
'--log-cli-format=%(levelname)s: %(message)s'
])
# Use the test timeout as the base timeout for pytest
base_timeout = handler.get_test_timeout()
command.append(f'--base-timeout={base_timeout}')
if handler.type_str == 'device':
command.extend(
self._generate_parameters_for_hardware(handler)
)
elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
command.append(f'--device-type={handler.type_str}')
elif handler.type_str == 'build':
command.append('--device-type=custom')
else:
raise PytestHarnessException(f'Support for handler {handler.type_str} not implemented yet')
if handler.type_str != 'device':
for fixture in handler.options.fixture:
command.append(f'--twister-fixture={fixture}')
command.extend(pytest_args_yaml)
if handler.options.pytest_args:
command.extend(handler.options.pytest_args)
return command
def _generate_parameters_for_hardware(self, handler: Handler):
command = ['--device-type=hardware']
hardware = handler.get_hardware()
if not hardware:
raise PytestHarnessException('Hardware is not available')
# update the instance with the device id to have it in the summary report
self.instance.dut = hardware.id
self.reserved_dut = hardware
if hardware.serial_pty:
command.append(f'--device-serial-pty={hardware.serial_pty}')
else:
command.extend([
f'--device-serial={hardware.serial}',
f'--device-serial-baud={hardware.baud}'
])
if hardware.flash_timeout:
command.append(f'--flash-timeout={hardware.flash_timeout}')
options = handler.options
if runner := hardware.runner or options.west_runner:
command.append(f'--runner={runner}')
if hardware.runner_params:
for param in hardware.runner_params:
command.append(f'--runner-params={param}')
if options.west_flash and options.west_flash != []:
command.append(f'--west-flash-extra-args={options.west_flash}')
if board_id := hardware.probe_id or hardware.id:
command.append(f'--device-id={board_id}')
if hardware.product:
command.append(f'--device-product={hardware.product}')
if hardware.pre_script:
command.append(f'--pre-script={hardware.pre_script}')
if hardware.post_flash_script:
command.append(f'--post-flash-script={hardware.post_flash_script}')
if hardware.post_script:
command.append(f'--post-script={hardware.post_script}')
if hardware.flash_before:
command.append(f'--flash-before={hardware.flash_before}')
for fixture in hardware.fixtures:
command.append(f'--twister-fixture={fixture}')
return command
def run_command(self, cmd, timeout):
cmd, env = self._update_command_with_env_dependencies(cmd)
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env
) as proc:
try:
reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
reader_t.start()
reader_t.join(timeout)
if reader_t.is_alive():
terminate_process(proc)
logger.warning('Timeout has occurred. Can be extended in testspec file. '
f'Currently set to {timeout} seconds.')
self.instance.reason = 'Pytest timeout'
self.status = TwisterStatus.FAIL
proc.wait(timeout)
except subprocess.TimeoutExpired:
self.status = TwisterStatus.FAIL
proc.kill()
if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
self.status = TwisterStatus.ERROR
self.instance.reason = f'Pytest error - return code {proc.returncode}'
with open(self.pytest_log_file_path, 'w') as log_file:
log_file.write(shlex.join(cmd) + '\n\n')
log_file.write('\n'.join(self._output))
@staticmethod
def _update_command_with_env_dependencies(cmd):
'''
If python plugin wasn't installed by pip, then try to indicate it to
pytest by update PYTHONPATH and append -p argument to pytest command.
'''
env = os.environ.copy()
if not PYTEST_PLUGIN_INSTALLED:
cmd.extend(['-p', 'twister_harness.plugin'])
pytest_plugin_path = os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'pytest-twister-harness', 'src')
env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
if _WINDOWS:
cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
else:
cmd_append_python_path = f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
else:
cmd_append_python_path = ''
cmd_to_print = cmd_append_python_path + shlex.join(cmd)
logger.debug('Running pytest command: %s', cmd_to_print)
return cmd, env
def _output_reader(self, proc):
self._output = []
while proc.stdout.readable() and proc.poll() is None:
line = proc.stdout.readline().decode().strip()
if not line:
continue
self._output.append(line)
logger.debug('PYTEST: %s', line)
self.parse_record(line)
proc.communicate()
def _update_test_status(self):
if self.status == TwisterStatus.NONE:
self.instance.testcases = []
try:
self._parse_report_file(self.report_file)
except Exception as e:
logger.error(f'Error when parsing file {self.report_file}: {e}')
self.status = TwisterStatus.FAIL
finally:
if not self.instance.testcases:
self.instance.init_cases()
self.instance.status = self.status if self.status != TwisterStatus.NONE else \
TwisterStatus.FAIL
if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
self.instance.reason = self.instance.reason or 'Pytest failed'
self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
def _parse_report_file(self, report):
tree = ET.parse(report)
root = tree.getroot()
if (elem_ts := root.find('testsuite')) is not None:
if elem_ts.get('failures') != '0':
self.status = TwisterStatus.FAIL
self.instance.reason = f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
elif elem_ts.get('errors') != '0':
self.status = TwisterStatus.ERROR
self.instance.reason = 'Error during pytest execution'
elif elem_ts.get('skipped') == elem_ts.get('tests'):
self.status = TwisterStatus.SKIP
else:
self.status = TwisterStatus.PASS
self.instance.execution_time = float(elem_ts.get('time'))
for elem_tc in elem_ts.findall('testcase'):
tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
tc.duration = float(elem_tc.get('time'))
elem = elem_tc.find('*')
if elem is None:
tc.status = TwisterStatus.PASS
else:
if elem.tag == ReportStatus.SKIP:
tc.status = TwisterStatus.SKIP
elif elem.tag == ReportStatus.FAIL:
tc.status = TwisterStatus.FAIL
else:
tc.status = TwisterStatus.ERROR
tc.reason = elem.get('message')
tc.output = elem.text
else:
self.status = TwisterStatus.SKIP
self.instance.reason = 'No tests collected'
class Gtest(Harness):
ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
_NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
_SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
TEST_START_PATTERN = f".*\\[ RUN \\] {_SUITE_TEST_NAME_PATTERN}"
TEST_PASS_PATTERN = f".*\\[ OK \\] {_SUITE_TEST_NAME_PATTERN}"
TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}"
TEST_FAIL_PATTERN = f".*\\[ FAILED \\] {_SUITE_TEST_NAME_PATTERN}"
FINISHED_PATTERN = (
".*(?:\\[==========\\] Done running all tests\\.|"
+ "\\[----------\\] Global test environment tear-down)"
)
def __init__(self):
super().__init__()
self.tc = None
self.has_failures = False
def handle(self, line):
# Strip the ANSI characters, they mess up the patterns
non_ansi_line = self.ANSI_ESCAPE.sub('', line)
if self.status != TwisterStatus.NONE:
return
# Check if we started running a new test
test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
if test_start_match:
# Add the suite name
suite_name = test_start_match.group("suite_name")
if suite_name not in self.detected_suite_names:
self.detected_suite_names.append(suite_name)
# Generate the internal name of the test
name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
# Assert that we don't already have a running test
assert (
self.tc is None
), "gTest error, {} didn't finish".format(self.tc)
# Check that the instance doesn't exist yet (prevents re-running)
tc = self.instance.get_case_by_name(name)
assert tc is None, "gTest error, {} running twice".format(tc)
# Create the test instance and set the context
tc = self.instance.get_case_or_create(name)
self.tc = tc
self.tc.status = TwisterStatus.STARTED
self.testcase_output += line + "\n"
self._match = True
# Check if the test run finished
finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
if finished_match:
tc = self.instance.get_case_or_create(self.id)
if self.has_failures or self.tc is not None:
self.status = TwisterStatus.FAIL
tc.status = TwisterStatus.FAIL
else:
self.status = TwisterStatus.PASS
tc.status = TwisterStatus.PASS
return
# Check if the individual test finished
state, name = self._check_result(non_ansi_line)
if state == TwisterStatus.NONE or name is None:
# Nothing finished, keep processing lines
return
# Get the matching test and make sure it's the same as the current context
tc = self.instance.get_case_by_name(name)
assert (
tc is not None and tc == self.tc
), "gTest error, mismatched tests. Expected {} but got {}".format(self.tc, tc)
# Test finished, clear the context
self.tc = None
# Update the status of the test
tc.status = state
if tc.status == TwisterStatus.FAIL:
self.has_failures = True
tc.output = self.testcase_output
self.testcase_output = ""
self._match = False
def _check_result(self, line):
test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
if test_pass_match:
return TwisterStatus.PASS, \
"{}.{}.{}".format(
self.id, test_pass_match.group("suite_name"),
test_pass_match.group("test_name")
)
test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
if test_skip_match:
return TwisterStatus.SKIP, \
"{}.{}.{}".format(
self.id, test_skip_match.group("suite_name"),
test_skip_match.group("test_name")
)
test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
if test_fail_match:
return TwisterStatus.FAIL, \
"{}.{}.{}".format(
self.id, test_fail_match.group("suite_name"),
test_fail_match.group("test_name")
)
return None, None
class Test(Harness):
__test__ = False # for pytest to skip this class when collects tests
RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
RUN_FAILED = "PROJECT EXECUTION FAILED"
test_suite_start_pattern = r"Running TESTSUITE (?P<suite_name>.*)"
ZTEST_START_PATTERN = r"START - (test_)?([a-zA-Z0-9_-]+)"
def handle(self, line):
test_suite_match = re.search(self.test_suite_start_pattern, line)
if test_suite_match:
suite_name = test_suite_match.group("suite_name")
self.detected_suite_names.append(suite_name)
testcase_match = re.search(self.ZTEST_START_PATTERN, line)
if testcase_match:
name = "{}.{}".format(self.id, testcase_match.group(2))
tc = self.instance.get_case_or_create(name)
# Mark the test as started, if something happens here, it is mostly
# due to this tests, for example timeout. This should in this case
# be marked as failed and not blocked (not run).
tc.status = TwisterStatus.STARTED
if testcase_match or self._match:
self.testcase_output += line + "\n"
self._match = True
result_match = result_re.match(line)
# some testcases are skipped based on predicates and do not show up
# during test execution, however they are listed in the summary. Parse
# the summary for status and use that status instead.
summary_re = re.compile(r"- (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds")
summary_match = summary_re.match(line)
if result_match:
matched_status = result_match.group(1)
name = "{}.{}".format(self.id, result_match.group(3))
tc = self.instance.get_case_or_create(name)
tc.status = TwisterStatus[matched_status]
if tc.status == TwisterStatus.SKIP:
tc.reason = "ztest skip"
tc.duration = float(result_match.group(4))
if tc.status == TwisterStatus.FAIL:
tc.output = self.testcase_output
self.testcase_output = ""
self._match = False
self.ztest = True
elif summary_match:
matched_status = summary_match.group(1)
self.detected_suite_names.append(summary_match.group(2))
name = "{}.{}".format(self.id, summary_match.group(4))
tc = self.instance.get_case_or_create(name)
tc.status = TwisterStatus[matched_status]
if tc.status == TwisterStatus.SKIP:
tc.reason = "ztest skip"
tc.duration = float(summary_match.group(5))
if tc.status == TwisterStatus.FAIL:
tc.output = self.testcase_output
self.testcase_output = ""
self._match = False
self.ztest = True
self.process_test(line)
if not self.ztest and self.status != TwisterStatus.NONE:
logger.debug(f"not a ztest and no state for {self.id}")
tc = self.instance.get_case_or_create(self.id)
if self.status == TwisterStatus.PASS:
tc.status = TwisterStatus.PASS
else:
tc.status = TwisterStatus.FAIL
tc.reason = "Test failure"
class Ztest(Test):
pass
class Bsim(Harness):
def build(self):
"""
Copying the application executable to BabbleSim's bin directory enables
running multidevice bsim tests after twister has built them.
"""
if self.instance is None:
return
original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
if not os.path.exists(original_exe_path):
logger.warning('Cannot copy bsim exe - cannot find original executable.')
return
bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
if not bsim_out_path:
logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
return
new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
if new_exe_name:
new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
else:
new_exe_name = self.instance.name
new_exe_name = f'bs_{new_exe_name}'
new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
shutil.copy(original_exe_path, new_exe_path)
class HarnessImporter:
@staticmethod
def get_harness(harness_name):
thismodule = sys.modules[__name__]
try:
if harness_name:
harness_class = getattr(thismodule, harness_name)
else:
harness_class = getattr(thismodule, 'Test')
return harness_class()
except AttributeError as e:
logger.debug(f"harness {harness_name} not implemented: {e}")
return None