From 06af021c9f0bacd025f012fee6097dd594732084 Mon Sep 17 00:00:00 2001 From: Fu Hanxi Date: Tue, 28 Nov 2023 14:38:47 +0100 Subject: [PATCH] ci(pytest): stop ambiguous markers for multi-dut test case also add `tools/ci/idf_pytest/tests` for testing --- conftest.py | 68 +++-- pytest.ini | 1 + tools/ci/idf_pytest/constants.py | 105 +++++-- tools/ci/idf_pytest/plugin.py | 257 ++++++++++-------- tools/ci/idf_pytest/pytest.ini | 2 + tools/ci/idf_pytest/script.py | 49 +++- .../idf_pytest/tests/test_get_pytest_cases.py | 94 +++++++ tools/ci/idf_pytest/utils.py | 27 +- 8 files changed, 409 insertions(+), 194 deletions(-) create mode 100644 tools/ci/idf_pytest/pytest.ini create mode 100644 tools/ci/idf_pytest/tests/test_get_pytest_cases.py diff --git a/conftest.py b/conftest.py index 0fdc3ad45a..ed5bcaec0c 100644 --- a/conftest.py +++ b/conftest.py @@ -19,9 +19,9 @@ import logging import os import re import sys +import typing as t from copy import deepcopy from datetime import datetime -from typing import Callable, Optional import pytest from _pytest.config import Config @@ -34,13 +34,13 @@ try: from idf_ci_utils import IDF_PATH from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded - from idf_pytest.utils import format_case_id, get_target_marker_from_expr + from idf_pytest.utils import format_case_id except ImportError: sys.path.append(os.path.join(os.path.dirname(__file__), 'tools', 'ci')) from idf_ci_utils import IDF_PATH from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded - from idf_pytest.utils import format_case_id, get_target_marker_from_expr + from idf_pytest.utils import format_case_id try: import common_test_methods # noqa: F401 @@ -102,7 +102,7 @@ def test_case_name(request: FixtureRequest, target: str, config: str) -> str: @pytest.fixture @multi_dut_fixture -def build_dir(app_path: str, target: Optional[str], config: Optional[str]) -> str: +def build_dir(app_path: str, target: t.Optional[str], config: t.Optional[str]) -> str: """ Check local build dir with the following priority: @@ -138,7 +138,7 @@ def build_dir(app_path: str, target: Optional[str], config: Optional[str]) -> st @pytest.fixture(autouse=True) @multi_dut_fixture -def junit_properties(test_case_name: str, record_xml_attribute: Callable[[str, object], None]) -> None: +def junit_properties(test_case_name: str, record_xml_attribute: t.Callable[[str, object], None]) -> None: """ This fixture is autoused and will modify the junit report test case name to .. """ @@ -154,7 +154,7 @@ def set_test_case_name(request: FixtureRequest, test_case_name: str) -> None: # Log Util Functions # ###################### @pytest.fixture -def log_performance(record_property: Callable[[str, object], None]) -> Callable[[str, str], None]: +def log_performance(record_property: t.Callable[[str, object], None]) -> t.Callable[[str, str], None]: """ log performance item with pre-defined format to the console and record it under the ``properties`` tag in the junit report if available. @@ -172,7 +172,7 @@ def log_performance(record_property: Callable[[str, object], None]) -> Callable[ @pytest.fixture -def check_performance(idf_path: str) -> Callable[[str, float, str], None]: +def check_performance(idf_path: str) -> t.Callable[[str, float, str], None]: """ check if the given performance item meets the passing standard or not """ @@ -186,9 +186,9 @@ def check_performance(idf_path: str) -> Callable[[str, float, str], None]: """ def _find_perf_item(operator: str, path: str) -> float: - with open(path, 'r') as f: + with open(path) as f: data = f.read() - match = re.search(r'#define\s+IDF_PERFORMANCE_{}_{}\s+([\d.]+)'.format(operator, item.upper()), data) + match = re.search(fr'#define\s+IDF_PERFORMANCE_{operator}_{item.upper()}\s+([\d.]+)', data) return float(match.group(1)) # type: ignore def _check_perf(operator: str, standard_value: float) -> None: @@ -198,7 +198,7 @@ def check_performance(idf_path: str) -> Callable[[str, float, str], None]: ret = value >= standard_value if not ret: raise AssertionError( - "[Performance] {} value is {}, doesn't meet pass standard {}".format(item, value, standard_value) + f"[Performance] {item} value is {value}, doesn't meet pass standard {standard_value}" ) path_prefix = os.path.join(idf_path, 'components', 'idf_test', 'include') @@ -212,7 +212,7 @@ def check_performance(idf_path: str) -> Callable[[str, float, str], None]: for performance_file in performance_files: try: standard = _find_perf_item(op, performance_file) - except (IOError, AttributeError): + except (OSError, AttributeError): # performance file doesn't exist or match is not found in it continue @@ -221,13 +221,13 @@ def check_performance(idf_path: str) -> Callable[[str, float, str], None]: break if not found_item: - raise AssertionError('Failed to get performance standard for {}'.format(item)) + raise AssertionError(f'Failed to get performance standard for {item}') return real_func @pytest.fixture -def log_minimum_free_heap_size(dut: IdfDut, config: str) -> Callable[..., None]: +def log_minimum_free_heap_size(dut: IdfDut, config: str) -> t.Callable[..., None]: def real_func() -> None: res = dut.expect(r'Minimum free heap size: (\d+) bytes') logging.info( @@ -278,28 +278,52 @@ def pytest_addoption(parser: pytest.Parser) -> None: '--app-info-basedir', default=IDF_PATH, help='app info base directory. specify this value when you\'re building under a ' - 'different IDF_PATH. (Default: $IDF_PATH)', + 'different IDF_PATH. (Default: $IDF_PATH)', ) idf_group.addoption( '--app-info-filepattern', help='glob pattern to specify the files that include built app info generated by ' - '`idf-build-apps --collect-app-info ...`. will not raise ValueError when binary ' - 'paths not exist in local file system if not listed recorded in the app info.', + '`idf-build-apps --collect-app-info ...`. will not raise ValueError when binary ' + 'paths not exist in local file system if not listed recorded in the app info.', ) def pytest_configure(config: Config) -> None: # cli option "--target" - target = config.getoption('target') or '' + target = [_t.strip().lower() for _t in (config.getoption('target', '') or '').split(',') if _t.strip()] + + # add markers based on idf_pytest/constants.py + for name, description in { + **TARGET_MARKERS, + **ENV_MARKERS, + **SPECIAL_MARKERS, + }.items(): + config.addinivalue_line('markers', f'{name}: {description}') help_commands = ['--help', '--fixtures', '--markers', '--version'] for cmd in help_commands: if cmd in config.invocation_params.args: - target = 'unneeded' + target = ['unneeded'] break - if not target: # also could specify through markexpr via "-m" - target = get_target_marker_from_expr(config.getoption('markexpr') or '') + markexpr = config.getoption('markexpr') or '' + # check marker expr set via "pytest -m" + if not target and markexpr: + # we use `-m "esp32 and generic"` in our CI to filter the test cases + # this doesn't cover all use cases, but fit what we do in CI. + for marker in markexpr.split('and'): + marker = marker.strip() + if marker in TARGET_MARKERS: + target.append(marker) + + # "--target" must be set + if not target: + raise SystemExit( + """Pass `--target TARGET[,TARGET...]` to specify all targets the test cases are using. + - for single DUT, we run with `pytest --target esp32` + - for multi DUT, we run with `pytest --target esp32,esp32,esp32s2` to indicate all DUTs +""" + ) apps_list = None app_info_basedir = config.getoption('app_info_basedir') @@ -326,14 +350,10 @@ def pytest_configure(config: Config) -> None: config.stash[IDF_PYTEST_EMBEDDED_KEY] = IdfPytestEmbedded( target=target, - sdkconfig=config.getoption('sdkconfig'), apps_list=apps_list, ) config.pluginmanager.register(config.stash[IDF_PYTEST_EMBEDDED_KEY]) - for name, description in {**TARGET_MARKERS, **ENV_MARKERS, **SPECIAL_MARKERS}.items(): - config.addinivalue_line('markers', f'{name}: {description}') - def pytest_unconfigure(config: Config) -> None: _pytest_embedded = config.stash.get(IDF_PYTEST_EMBEDDED_KEY, None) diff --git a/pytest.ini b/pytest.ini index c9c0c43560..08ef25dfe7 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,6 +12,7 @@ addopts = --skip-check-coredump y --logfile-extension ".txt" --check-duplicates y + --ignore-glob */managed_components/* # ignore DeprecationWarning filterwarnings = diff --git a/tools/ci/idf_pytest/constants.py b/tools/ci/idf_pytest/constants.py index f5bb8743ae..ccbe00fb0d 100644 --- a/tools/ci/idf_pytest/constants.py +++ b/tools/ci/idf_pytest/constants.py @@ -6,7 +6,10 @@ Pytest Related Constants. Don't import third-party packages here. """ import os import typing as t +from collections import Counter from dataclasses import dataclass +from enum import Enum +from functools import cached_property from _pytest.python import Function from pytest_embedded.utils import to_list @@ -35,10 +38,11 @@ SPECIAL_MARKERS = { 'temp_skip': 'temp skip tests for specified targets both in ci and locally', 'nightly_run': 'tests should be executed as part of the nightly trigger pipeline', 'host_test': 'tests which should not be built at the build stage, and instead built in host_test stage', - 'qemu': 'build and test using qemu-system-xtensa, not real target', } ENV_MARKERS = { + # special markers + 'qemu': 'build and test using qemu, not real target', # single-dut markers 'generic': 'tests should be run on generic runners', 'flash_suspend': 'support flash suspend feature', @@ -89,7 +93,6 @@ ENV_MARKERS = { 'adc': 'ADC related tests should run on adc runners', 'xtal32k': 'Runner with external 32k crystal connected', 'no32kXtal': 'Runner with no external 32k crystal connected', - 'multi_dut_modbus_rs485': 'a pair of runners connected by RS485 bus', 'psramv0': 'Runner with PSRAM version 0', 'esp32eco3': 'Runner with esp32 eco3 connected', 'ecdsa_efuse': 'Runner with test ECDSA private keys programmed in efuse', @@ -98,6 +101,7 @@ ENV_MARKERS = { 'i2c_oled': 'Runner with ssd1306 I2C oled connected', 'httpbin': 'runner for tests that need to access the httpbin service', # multi-dut markers + 'multi_dut_modbus_rs485': 'a pair of runners connected by RS485 bus', 'ieee802154': 'ieee802154 related tests should run on ieee802154 runners.', 'openthread_br': 'tests should be used for openthread border router.', 'openthread_bbr': 'tests should be used for openthread border router linked to Internet.', @@ -113,6 +117,13 @@ ENV_MARKERS = { } +class CollectMode(str, Enum): + SINGLE_SPECIFIC = 'single_specific' + MULTI_SPECIFIC = 'multi_specific' + MULTI_ALL_WITH_PARAM = 'multi_all_with_param' + ALL = 'all' + + @dataclass class PytestApp: path: str @@ -122,38 +133,43 @@ class PytestApp: def __hash__(self) -> int: return hash((self.path, self.target, self.config)) + @cached_property + def build_dir(self) -> str: + return os.path.join(self.path, f'build_{self.target}_{self.config}') + @dataclass class PytestCase: - path: str - name: str - - apps: t.Set[PytestApp] - target: str + apps: t.List[PytestApp] item: Function def __hash__(self) -> int: return hash((self.path, self.name, self.apps, self.all_markers)) + @cached_property + def path(self) -> str: + return str(self.item.path) + + @cached_property + def name(self) -> str: + return self.item.originalname # type: ignore + + @cached_property + def targets(self) -> t.List[str]: + return [app.target for app in self.apps] + + @cached_property + def is_single_dut_test_case(self) -> bool: + return True if len(self.apps) == 1 else False + + # the following markers could be changed dynamically, don't use cached_property @property def all_markers(self) -> t.Set[str]: return {marker.name for marker in self.item.iter_markers()} - @property - def is_nightly_run(self) -> bool: - return 'nightly_run' in self.all_markers - @property def target_markers(self) -> t.Set[str]: - return {marker for marker in self.all_markers if marker in TARGET_MARKERS} - - @property - def env_markers(self) -> t.Set[str]: - return {marker for marker in self.all_markers if marker in ENV_MARKERS} - - @property - def skipped_targets(self) -> t.Set[str]: def _get_temp_markers_disabled_targets(marker_name: str) -> t.Set[str]: temp_marker = self.item.get_closest_marker(marker_name) @@ -179,4 +195,53 @@ class PytestCase: else: # we use `temp_skip` locally skip_targets = temp_skip_targets - return skip_targets + return {marker for marker in self.all_markers if marker in TARGET_MARKERS} - skip_targets + + @property + def env_markers(self) -> t.Set[str]: + return {marker for marker in self.all_markers if marker in ENV_MARKERS} + + @property + def target_with_amount_markers(self) -> t.Set[str]: + c: Counter = Counter() + for app in self.apps: + c[app.target] += 1 + + res = set() + for target, amount in c.items(): + if amount > 1: + res.add(f'{target}_{amount}') + else: + res.add(target) + + return res + + def all_built_in_app_lists(self, app_lists: t.Optional[t.List[str]] = None) -> bool: + if app_lists is None: + # ignore this feature + return True + + bin_found = [0] * len(self.apps) + for i, app in enumerate(self.apps): + if app.build_dir in app_lists: + bin_found[i] = 1 + + if sum(bin_found) == 0: + msg = f'Skip test case {self.name} because all following binaries are not listed in the app lists: ' + for app in self.apps: + msg += f'\n - {app.build_dir}' + + print(msg) + return False + + if sum(bin_found) == len(self.apps): + return True + + # some found, some not, looks suspicious + msg = f'Found some binaries of test case {self.name} are not listed in the app lists.' + for i, app in enumerate(self.apps): + if bin_found[i] == 0: + msg += f'\n - {app.build_dir}' + + msg += '\nMight be a issue of .build-test-rules.yml files' + return False diff --git a/tools/ci/idf_pytest/plugin.py b/tools/ci/idf_pytest/plugin.py index c5c5576a05..efae926208 100644 --- a/tools/ci/idf_pytest/plugin.py +++ b/tools/ci/idf_pytest/plugin.py @@ -1,9 +1,9 @@ # SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 -import logging import os import typing as t +from functools import cached_property from xml.etree import ElementTree as ET import pytest @@ -16,12 +16,13 @@ from pytest_embedded.plugin import parse_multi_dut_args from pytest_embedded.utils import find_by_suffix, to_list from pytest_ignore_test_results.ignore_results import ChildCase, ChildCasesStashKey -from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, PytestApp, PytestCase -from .utils import format_case_id, merge_junit_files +from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, CollectMode, PytestApp, PytestCase +from .utils import comma_sep_str_to_list, format_case_id, merge_junit_files IDF_PYTEST_EMBEDDED_KEY = pytest.StashKey['IdfPytestEmbedded']() ITEM_FAILED_CASES_KEY = pytest.StashKey[list]() ITEM_FAILED_KEY = pytest.StashKey[bool]() +ITEM_PYTEST_CASE_KEY = pytest.StashKey[PytestCase]() class IdfPytestEmbedded: @@ -33,80 +34,119 @@ class IdfPytestEmbedded: def __init__( self, - target: str, - sdkconfig: t.Optional[str] = None, + target: t.Union[t.List[str], str], + *, + single_target_duplicate_mode: bool = False, apps_list: t.Optional[t.List[str]] = None, ): - # CLI options to filter the test cases - self.target = target.lower() - self.sdkconfig = sdkconfig + if isinstance(target, str): + self.target = sorted(comma_sep_str_to_list(target)) + else: + self.target = sorted(target) + + if not self.target: + raise ValueError('`target` should not be empty') + + # these are useful while gathering all the multi-dut test cases + # when this mode is activated, + # + # pytest.mark.esp32 + # pytest.mark.parametrize('count', [2], indirect=True) + # def test_foo(dut): + # pass + # + # should be collected when running `pytest --target esp32` + # + # otherwise, it should be collected when running `pytest --target esp32,esp32` + self._single_target_duplicate_mode = single_target_duplicate_mode + self.apps_list = apps_list self.cases: t.List[PytestCase] = [] + @cached_property + def collect_mode(self) -> CollectMode: + if len(self.target) == 1: + if self.target[0] == CollectMode.MULTI_ALL_WITH_PARAM: + return CollectMode.MULTI_ALL_WITH_PARAM + else: + return CollectMode.SINGLE_SPECIFIC + else: + return CollectMode.MULTI_SPECIFIC + @staticmethod def get_param(item: Function, key: str, default: t.Any = None) -> t.Any: - # implement like this since this is a limitation of pytest, couldn't get fixture values while collecting - # https://github.com/pytest-dev/pytest/discussions/9689 + # funcargs is not calculated while collection + # callspec is something defined in parametrize if not hasattr(item, 'callspec'): return default return item.callspec.params.get(key, default) or default def item_to_pytest_case(self, item: Function) -> PytestCase: - count = 1 - case_path = str(item.path) - case_name = item.originalname - target = self.target + """ + Turn pytest item to PytestCase + """ + count = self.get_param(item, 'count', 1) - # funcargs is not calculated while collection - if hasattr(item, 'callspec'): - count = item.callspec.params.get('count', 1) - app_paths = to_list( - parse_multi_dut_args( - count, - self.get_param(item, 'app_path', os.path.dirname(case_path)), - ) - ) - configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', 'default'))) - targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', target))) - else: - app_paths = [os.path.dirname(case_path)] - configs = ['default'] - targets = [target] - - case_apps = set() - for i in range(count): - case_apps.add(PytestApp(app_paths[i], targets[i], configs[i])) - - return PytestCase( - case_path, - case_name, - case_apps, - self.target, - item, + # default app_path is where the test script locates + app_paths = to_list( + parse_multi_dut_args(count, os.path.relpath(self.get_param(item, 'app_path', os.path.dirname(item.path)))) ) + configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', DEFAULT_SDKCONFIG))) + targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', self.target[0]))) - @pytest.hookimpl(tryfirst=True) - def pytest_sessionstart(self, session: Session) -> None: - # same behavior for vanilla pytest-embedded '--target' - session.config.option.target = self.target + return PytestCase([PytestApp(app_paths[i], targets[i], configs[i]) for i in range(count)], item) @pytest.hookimpl(tryfirst=True) def pytest_collection_modifyitems(self, items: t.List[Function]) -> None: - item_to_case: t.Dict[Function, PytestCase] = {} + """ + Background info: - # Add Markers to the test cases + We're using `pytest.mark.[TARGET]` as a syntactic sugar to indicate that they are actually supported by all + the listed targets. For example, + + >>> @pytest.mark.esp32 + >>> @pytest.mark.esp32s2 + + should be treated as + + >>> @pytest.mark.parametrize('target', [ + >>> 'esp32', + >>> 'esp32s2', + >>> ], indirect=True) + + All single-dut test cases, and some of the multi-dut test cases with the same targets, are using this + way to indicate the supported targets. + + To avoid ambiguity, + + - when we're collecting single-dut test cases with esp32, we call + + `pytest --collect-only --target esp32` + + - when we're collecting multi-dut test cases, we list all the targets, even when they're the same + + `pytest --collect-only --target esp32,esp32` for two esp32 connected + `pytest --collect-only --target esp32,esp32s2` for esp32 and esp32s2 connected + + therefore, we have two different logic for searching test cases, explained in 2.1 and 2.2 + """ + # 1. Filter according to nighty_run related markers + if os.getenv('INCLUDE_NIGHTLY_RUN') == '1': + # nightly_run and non-nightly_run cases are both included + pass + elif os.getenv('NIGHTLY_RUN') == '1': + # only nightly_run cases are included + items[:] = [_item for _item in items if _item.get_closest_marker('nightly_run') is not None] + else: + # only non-nightly_run cases are included + items[:] = [_item for _item in items if _item.get_closest_marker('nightly_run') is None] + + # 2. Add markers according to special markers + item_to_case_dict: t.Dict[Function, PytestCase] = {} for item in items: - # generate PytestCase for each item - case = self.item_to_pytest_case(item) - item_to_case[item] = case - - # set default timeout 10 minutes for each case - if 'timeout' not in item.keywords: - item.add_marker(pytest.mark.timeout(10 * 60)) - - # add markers for special markers + item.stash[ITEM_PYTEST_CASE_KEY] = item_to_case_dict[item] = self.item_to_pytest_case(item) if 'supported_targets' in item.keywords: for _target in SUPPORTED_TARGETS: item.add_marker(_target) @@ -117,72 +157,55 @@ class IdfPytestEmbedded: for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]: item.add_marker(_target) + # 3.1. CollectMode.SINGLE_SPECIFIC, like `pytest --target esp32` + if self.collect_mode == CollectMode.SINGLE_SPECIFIC: + filtered_items = [] + for item in items: + case = item_to_case_dict[item] + + # single-dut one + if case.is_single_dut_test_case and self.target[0] in case.target_markers: + filtered_items.append(item) + + # multi-dut ones and in single_target_duplicate_mode + elif self._single_target_duplicate_mode and not case.is_single_dut_test_case: + # ignore those test cases with `target` defined in parametrize, since these will be covered in 3.3 + if self.get_param(item, 'target', None) is None and self.target[0] in case.target_markers: + filtered_items.append(item) + + items[:] = filtered_items + # 3.2. CollectMode.MULTI_SPECIFIC, like `pytest --target esp32,esp32` + elif self.collect_mode == CollectMode.MULTI_SPECIFIC: + items[:] = [_item for _item in items if item_to_case_dict[_item].targets == self.target] + + # 3.3. CollectMode.MULTI_ALL_WITH_PARAM, intended to be used by `get_pytest_cases` + else: + items[:] = [ + _item + for _item in items + if not item_to_case_dict[_item].is_single_dut_test_case + and self.get_param(_item, 'target', None) is not None + ] + + # 4. filter by `self.apps_list`, skip the test case if not listed + # should only be used in CI + items[:] = [_item for _item in items if item_to_case_dict[_item].all_built_in_app_lists(self.apps_list)] + + # OKAY!!! All left ones will be executed, sort it and add more markers + items[:] = sorted( + items, key=lambda x: (os.path.dirname(x.path), self.get_param(x, 'config', DEFAULT_SDKCONFIG)) + ) + for item in items: + case = item_to_case_dict[item] + # set default timeout 10 minutes for each case + if 'timeout' not in item.keywords: + item.add_marker(pytest.mark.timeout(10 * 60)) + # add 'xtal_40mhz' tag as a default tag for esp32c2 target # only add this marker for esp32c2 cases - if self.target == 'esp32c2' and 'esp32c2' in case.target_markers and 'xtal_26mhz' not in case.all_markers: + if 'esp32c2' in self.target and 'esp32c2' in case.targets and 'xtal_26mhz' not in case.all_markers: item.add_marker('xtal_40mhz') - # Filter the test cases - filtered_items = [] - for item in items: - case = item_to_case[item] - # filter by "nightly_run" marker - if os.getenv('INCLUDE_NIGHTLY_RUN') == '1': - # Do not filter nightly_run cases - pass - elif os.getenv('NIGHTLY_RUN') == '1': - if not case.is_nightly_run: - logging.debug( - 'Skipping test case %s because of this test case is not a nightly run test case', item.name - ) - continue - else: - if case.is_nightly_run: - logging.debug( - 'Skipping test case %s because of this test case is a nightly run test case', item.name - ) - continue - - # filter by target - if self.target not in case.target_markers: - continue - - if self.target in case.skipped_targets: - continue - - # filter by sdkconfig - if self.sdkconfig: - if self.get_param(item, 'config', DEFAULT_SDKCONFIG) != self.sdkconfig: - continue - - # filter by apps_list, skip the test case if not listed - # should only be used in CI - if self.apps_list is not None: - bin_not_found = False - for case_app in case.apps: - # in ci, always use build__ as build dir - binary_path = os.path.join(case_app.path, f'build_{case_app.target}_{case_app.config}') - if binary_path not in self.apps_list: - logging.info( - 'Skipping test case %s because binary path %s is not listed in app info list files', - item.name, - binary_path, - ) - bin_not_found = True - break - - if bin_not_found: - continue - - # finally! - filtered_items.append(item) - - # sort the test cases with (app folder, config) - items[:] = sorted( - filtered_items, - key=lambda x: (os.path.dirname(x.path), self.get_param(x, 'config', DEFAULT_SDKCONFIG)) - ) - def pytest_report_collectionfinish(self, items: t.List[Function]) -> None: for item in items: self.cases.append(self.item_to_pytest_case(item)) diff --git a/tools/ci/idf_pytest/pytest.ini b/tools/ci/idf_pytest/pytest.ini new file mode 100644 index 0000000000..0ee949b898 --- /dev/null +++ b/tools/ci/idf_pytest/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_files = test_*.py diff --git a/tools/ci/idf_pytest/script.py b/tools/ci/idf_pytest/script.py index a970864478..7f16bc91e6 100644 --- a/tools/ci/idf_pytest/script.py +++ b/tools/ci/idf_pytest/script.py @@ -12,7 +12,7 @@ from idf_py_actions.constants import PREVIEW_TARGETS as TOOLS_PREVIEW_TARGETS from idf_py_actions.constants import SUPPORTED_TARGETS as TOOLS_SUPPORTED_TARGETS from pytest_embedded.utils import to_list -from .constants import PytestCase +from .constants import CollectMode, PytestCase from .plugin import IdfPytestEmbedded @@ -35,15 +35,25 @@ def get_pytest_files(paths: t.List[str]) -> t.List[str]: def get_pytest_cases( paths: t.Union[str, t.List[str]], - target: str = 'all', + target: str = CollectMode.ALL, marker_expr: t.Optional[str] = None, filter_expr: t.Optional[str] = None, ) -> t.List[PytestCase]: - if target == 'all': - targets = TOOLS_SUPPORTED_TARGETS + TOOLS_PREVIEW_TARGETS - else: - targets = [target] + """ + For single-dut test cases, `target` could be + - [TARGET], e.g. `esp32`, to get the test cases for the given target + - or `single_all`, to get all single-dut test cases + For multi-dut test cases, `target` could be + - [TARGET,[TARGET...]], e.g. `esp32,esp32s2`, to get the test cases for the given targets + - or `multi_all`, to get all multi-dut test cases + + :param paths: paths to search for pytest scripts + :param target: target to get test cases for, detailed above + :param marker_expr: pytest marker expression, `-m` + :param filter_expr: pytest filter expression, `-k` + :return: list of test cases + """ paths = to_list(paths) cases: t.List[PytestCase] = [] @@ -52,12 +62,12 @@ def get_pytest_cases( print(f'WARNING: no pytest scripts found for target {target} under paths {", ".join(paths)}') return cases - for target in targets: - collector = IdfPytestEmbedded(target) + def _get_pytest_cases(_target: str, _single_target_duplicate_mode: bool = False) -> t.List[PytestCase]: + collector = IdfPytestEmbedded(_target, single_target_duplicate_mode=_single_target_duplicate_mode) with io.StringIO() as buf: with redirect_stdout(buf): - cmd = ['--collect-only', *pytest_scripts, '--target', target, '-q'] + cmd = ['--collect-only', *pytest_scripts, '--target', _target, '-q'] if marker_expr: cmd.extend(['-m', marker_expr]) if filter_expr: @@ -66,11 +76,24 @@ def get_pytest_cases( if res.value != ExitCode.OK: if res.value == ExitCode.NO_TESTS_COLLECTED: - print(f'WARNING: no pytest app found for target {target} under paths {", ".join(paths)}') + print(f'WARNING: no pytest app found for target {_target} under paths {", ".join(paths)}') else: print(buf.getvalue()) - raise RuntimeError(f'pytest collection failed at {", ".join(paths)} with command \"{" ".join(cmd)}\"') + raise RuntimeError( + f'pytest collection failed at {", ".join(paths)} with command \"{" ".join(cmd)}\"' + ) - cases.extend(collector.cases) + return collector.cases # type: ignore - return cases + if target == CollectMode.ALL: + targets = TOOLS_SUPPORTED_TARGETS + TOOLS_PREVIEW_TARGETS + [CollectMode.MULTI_ALL_WITH_PARAM] + else: + targets = [target] + + for _target in targets: + if target == CollectMode.ALL: + cases.extend(_get_pytest_cases(_target, _single_target_duplicate_mode=True)) + else: + cases.extend(_get_pytest_cases(_target)) + + return sorted(cases, key=lambda x: (x.path, x.name, str(x.targets))) diff --git a/tools/ci/idf_pytest/tests/test_get_pytest_cases.py b/tools/ci/idf_pytest/tests/test_get_pytest_cases.py new file mode 100644 index 0000000000..64a54bcead --- /dev/null +++ b/tools/ci/idf_pytest/tests/test_get_pytest_cases.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import sys +from pathlib import Path + +from idf_pytest.constants import CollectMode + +try: + from idf_pytest.script import get_pytest_cases +except ImportError: + sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) + + from idf_pytest.script import get_pytest_cases + +TEMPLATE_SCRIPT = ''' +import pytest + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +def test_foo_single(dut): + pass + +@pytest.mark.parametrize( + 'count, target', [ + (2, 'esp32|esp32s2'), + (3, 'esp32s2|esp32s2|esp32s3'), + ], indirect=True +) +def test_foo_multi(dut): + pass + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +@pytest.mark.parametrize( + 'count', [2], indirect=True +) +def test_foo_multi_with_marker(dut): + pass +''' + + +def test_get_pytest_cases_single_specific(tmp_path: Path) -> None: + script = tmp_path / 'pytest_get_pytest_cases_single_specific.py' + script.write_text(TEMPLATE_SCRIPT) + cases = get_pytest_cases([str(tmp_path)], 'esp32') + + assert len(cases) == 1 + assert cases[0].targets == ['esp32'] + + +def test_get_pytest_cases_multi_specific(tmp_path: Path) -> None: + script = tmp_path / 'pytest_get_pytest_cases_multi_specific.py' + script.write_text(TEMPLATE_SCRIPT) + cases = get_pytest_cases([str(tmp_path)], 'esp32s3,esp32s2, esp32s2') + + assert len(cases) == 1 + assert cases[0].targets == ['esp32s2', 'esp32s2', 'esp32s3'] + + +def test_get_pytest_cases_multi_all(tmp_path: Path) -> None: + script = tmp_path / 'pytest_get_pytest_cases_multi_all.py' + script.write_text(TEMPLATE_SCRIPT) + cases = get_pytest_cases([str(tmp_path)], CollectMode.MULTI_ALL_WITH_PARAM) + + assert len(cases) == 2 + assert cases[0].targets == ['esp32', 'esp32s2'] + assert cases[1].targets == ['esp32s2', 'esp32s2', 'esp32s3'] + + +def test_get_pytest_cases_all(tmp_path: Path) -> None: + script = tmp_path / 'pytest_get_pytest_cases_all.py' + script.write_text(TEMPLATE_SCRIPT) + cases = get_pytest_cases([str(tmp_path)], CollectMode.ALL) + + assert len(cases) == 6 + assert cases[0].targets == ['esp32', 'esp32s2'] + assert cases[0].name == 'test_foo_multi' + + assert cases[1].targets == ['esp32s2', 'esp32s2', 'esp32s3'] + assert cases[1].name == 'test_foo_multi' + + assert cases[2].targets == ['esp32', 'esp32'] + assert cases[2].name == 'test_foo_multi_with_marker' + + assert cases[3].targets == ['esp32s2', 'esp32s2'] + assert cases[3].name == 'test_foo_multi_with_marker' + + assert cases[4].targets == ['esp32'] + assert cases[4].name == 'test_foo_single' + + assert cases[5].targets == ['esp32s2'] + assert cases[5].name == 'test_foo_single' diff --git a/tools/ci/idf_pytest/utils.py b/tools/ci/idf_pytest/utils.py index aef1a776c5..1e06354668 100644 --- a/tools/ci/idf_pytest/utils.py +++ b/tools/ci/idf_pytest/utils.py @@ -6,10 +6,10 @@ import os import typing as t from xml.etree import ElementTree as ET -from .constants import TARGET_MARKERS - -def format_case_id(target: t.Optional[str], config: t.Optional[str], case: str, is_qemu: bool = False, params: t.Optional[dict] = None) -> str: +def format_case_id( + target: t.Optional[str], config: t.Optional[str], case: str, is_qemu: bool = False, params: t.Optional[dict] = None +) -> str: parts = [] if target: parts.append((str(target) + '_qemu') if is_qemu else str(target)) @@ -23,23 +23,6 @@ def format_case_id(target: t.Optional[str], config: t.Optional[str], case: str, return '.'.join(parts) -def get_target_marker_from_expr(markexpr: str) -> str: - candidates = set() - # we use `-m "esp32 and generic"` in our CI to filter the test cases - # this doesn't cover all use cases, but fit what we do in CI. - for marker in markexpr.split('and'): - marker = marker.strip() - if marker in TARGET_MARKERS: - candidates.add(marker) - - if len(candidates) > 1: - raise ValueError(f'Specified more than one target markers: {candidates}. Please specify no more than one.') - elif len(candidates) == 1: - return candidates.pop() - else: - raise ValueError('Please specify one target marker via "--target [TARGET]" or via "-m [TARGET]"') - - def merge_junit_files(junit_files: t.List[str], target_path: str) -> None: if len(junit_files) <= 1: return @@ -78,3 +61,7 @@ def merge_junit_files(junit_files: t.List[str], target_path: str) -> None: with open(target_path, 'wb') as fw: fw.write(ET.tostring(merged_testsuite)) + + +def comma_sep_str_to_list(s: str) -> t.List[str]: + return [s.strip() for s in s.split(',') if s.strip()]