diff --git a/lisa/microsoft/testsuites/dpdk/common.py b/lisa/microsoft/testsuites/dpdk/common.py index 931dfb2dd0..be4a085ff6 100644 --- a/lisa/microsoft/testsuites/dpdk/common.py +++ b/lisa/microsoft/testsuites/dpdk/common.py @@ -15,7 +15,7 @@ from lisa.operating_system import Debian, Fedora, Oracle, Posix, Suse, Ubuntu from lisa.tools import Git, Lscpu, Tar, Wget from lisa.tools.lscpu import CpuArchitecture -from lisa.util import UnsupportedDistroException +from lisa.util import UnsupportedDistroException, parse_version DPDK_STABLE_GIT_REPO = "https://dpdk.org/git/dpdk-stable" @@ -289,7 +289,7 @@ def force_dpdk_default_source(variables: Dict[str, Any]) -> None: variables["dpdk_source"] = DPDK_STABLE_GIT_REPO -_UBUNTU_LTS_VERSIONS = ["24.4.0", "22.4.0", "20.4.0", "18.4.0"] +_UBUNTU_LTS_VERSIONS = ["24.4.0", "22.4.0", "20.4.0"] # see https://ubuntu.com/about/release-cycle @@ -331,9 +331,8 @@ def check_dpdk_support(node: Node) -> None: f"is_latest_or_prerelease? ({is_ubuntu_latest_or_prerelease(node.os)})" f" is_lts_version? ({is_ubuntu_lts_version(node.os)})" ) - # TODO: undo special casing for 18.04 when it's usage is less common supported = ( - node.os.information.version == "18.4.0" + node.os.information.version > "18.4.0" or is_ubuntu_latest_or_prerelease(node.os) or is_ubuntu_lts_version(node.os) ) @@ -472,3 +471,89 @@ class Pmd(str, Enum): class DpdkGradeMetric(str, Enum): PPS = "pps" BPS = "bps" + + +class DpdkMpRole(str, Enum): + # dpdk multiprocessing allows numerous secondary processes to + # share a single primary context. Testpmd and other apps allow this + # to occur fairly transparently, but require it to be declared + # at start time. The primary process has a proc_id of '0' + # This is unfortunate, since it's the nice python default for + # integer arguments. + # + # Use this enum to differentiate between primary and secondary + # multiple process context types. There is no single process + # context type, because this argument will be passed as Optional. + # So DpdkMpRole is either None or PRIMARY | SECONDARY + + PRIMARY_PROCESS = "primary" + SECONDARY_PROCESS = "secondary" + + +class TestpmdForwardMode(str, Enum): + # Forwarding modes for the testpmd application. + # + # txonly : generate and transmit packets without validating received + # traffic (useful for pure transmit/throughput tests). + # rxonly : receive and count packets without transmitting (useful for + # pure receive/throughput or loss measurement). + # 5tswap : swap the 5‑tuple fields between source and destination for + # each packet (IP src/dst, L4 src/dst ports, and protocol), + # exercising header rewrite paths. + + TX_ONLY = "txonly" + RX_ONLY = "rxonly" + FIVE_TUPLE_SWAP = "5tswap" + + +_dpdk_default_source_dict = { + "Ubuntu": { + "20.4.0": "v25.11", + "22.4.0": "v24.11", + "24.4.0": "v24.11", + "25.4.0": "v25.11", + "26.4.0": "v25.11", + }, + "Debian": { + "10.0.0": "v22.11", + "11.0.0": "v24.11", + "12.0.0": "v24.11", + "13.0.0": "v25.11", + }, + "Redhat": { + "8.6.0": "v24.11", + "9.0.0": "v25.11", + }, + "CentOs": { + "8.6.0": "v24.11", + "9.0.0": "v25.11", + }, +} + + +def get_dpdk_default_source_version(node: Node) -> str: + # match major.minor os versions for supported distros + # to lkg dpdk versions for the source installation. + # Versions are evaluated at >= for the os version. + + os_version = node.os.information.version + os_match = _dpdk_default_source_dict.get(node.os.name, None) + if os_match is None: + raise UnsupportedDistroException( + node.os, + f"Unsupported distro {node.os.name}, cannot determine " + "default DPDK source version for testpmd.", + ) + for version_threshold, dpdk_version in os_match.items(): + if ( + os_version >= version_threshold + and os_version.major == parse_version(version_threshold).major + ): + return dpdk_version + # if we get here, the os version is too old to have a supported dpdk version + raise UnsupportedDistroException( + node.os, + f"Unsupported distro version {str(os_version)} for {node.os.name}. " + "Use a version >= the following versions: " + f"{', '.join(os_match.keys())}", + ) diff --git a/lisa/microsoft/testsuites/dpdk/dpdkperf.py b/lisa/microsoft/testsuites/dpdk/dpdkperf.py index b4ffec65ed..f954e95c8a 100644 --- a/lisa/microsoft/testsuites/dpdk/dpdkperf.py +++ b/lisa/microsoft/testsuites/dpdk/dpdkperf.py @@ -10,7 +10,6 @@ verify_dpdk_build, verify_dpdk_l3fwd_ntttcp_tcp, verify_dpdk_send_receive, - verify_dpdk_send_receive_multi_txrx_queue, ) from lisa import ( @@ -168,7 +167,7 @@ def perf_dpdk_minimal_failsafe_pmd( log: Logger, variables: Dict[str, Any], ) -> None: - self._run_dpdk_perf_test(Pmd.FAILSAFE, result, log, variables) + self._run_dpdk_testpmd_perf_test(Pmd.FAILSAFE, result, log, variables) @TestCaseMetadata( description=""" @@ -189,7 +188,7 @@ def perf_dpdk_minimal_netvsc_pmd( log: Logger, variables: Dict[str, Any], ) -> None: - self._run_dpdk_perf_test(Pmd.NETVSC, result, log, variables) + self._run_dpdk_testpmd_perf_test(Pmd.NETVSC, result, log, variables) @TestCaseMetadata( description=""" @@ -211,12 +210,12 @@ def perf_dpdk_multi_queue_failsafe_pmd( log: Logger, variables: Dict[str, Any], ) -> None: - self._run_dpdk_perf_test( + self._run_dpdk_testpmd_perf_test( Pmd.FAILSAFE, result, log, variables, - use_queues=True, + multiple_queues=True, ) @TestCaseMetadata( @@ -238,12 +237,12 @@ def perf_dpdk_multi_queue_netvsc_pmd( log: Logger, variables: Dict[str, Any], ) -> None: - self._run_dpdk_perf_test( + self._run_dpdk_testpmd_perf_test( Pmd.NETVSC, result, log, variables, - use_queues=True, + multiple_queues=True, ) @TestCaseMetadata( @@ -280,13 +279,14 @@ def perf_dpdk_l3fwd_ntttcp_tcp( is_perf_test=True, ) - def _run_dpdk_perf_test( + def _run_dpdk_testpmd_perf_test( self, pmd: Pmd, test_result: TestResult, log: Logger, variables: Dict[str, Any], - use_queues: bool = False, + multiple_queues: bool = False, + hugepage_size: HugePageSize = HugePageSize.HUGE_2MB, ) -> None: environment = test_result.environment assert environment, "fail to get environment from testresult" @@ -294,23 +294,15 @@ def _run_dpdk_perf_test( # run build + validation to populate results self._validate_core_counts_are_equal(test_result) try: - if use_queues: - send_kit, receive_kit = verify_dpdk_send_receive_multi_txrx_queue( - environment, - log, - variables, - pmd, - result=test_result, - ) - else: - send_kit, receive_kit = verify_dpdk_send_receive( - environment, - log, - variables, - pmd, - HugePageSize.HUGE_2MB, - result=test_result, - ) + send_kit, receive_kit = verify_dpdk_send_receive( + environment, + log, + variables, + pmd, + multiple_queues=multiple_queues, + hugepage_size=hugepage_size, + result=test_result, + ) except UnsupportedPackageVersionException as err: raise SkippedException(err) diff --git a/lisa/microsoft/testsuites/dpdk/dpdksuite.py b/lisa/microsoft/testsuites/dpdk/dpdksuite.py index ad4012fc76..6b27b135c0 100644 --- a/lisa/microsoft/testsuites/dpdk/dpdksuite.py +++ b/lisa/microsoft/testsuites/dpdk/dpdksuite.py @@ -17,6 +17,7 @@ from microsoft.testsuites.dpdk.dpdkovs import DpdkOvs from microsoft.testsuites.dpdk.dpdkutil import ( UIO_HV_GENERIC_SYSFS_PATH, + TestpmdForwardMode, UnsupportedPackageVersionException, check_send_receive_compatibility, do_parallel_cleanup, @@ -541,9 +542,9 @@ def run_testpmd_hotplug_send_test( raise SkippedException(err) testpmd = test_kit.testpmd test_nic = node.nics.get_secondary_nic() - testpmd_cmd = testpmd.generate_testpmd_command([test_nic], 0, "txonly") + testpmd_cmd = testpmd.generate_testpmd_command([test_nic], 0, "txonly", pmd) kit_cmd_pairs = { - test_kit: testpmd_cmd, + test_kit: [testpmd_cmd], } run_testpmd_concurrent( @@ -855,7 +856,7 @@ def verify_dpdk_send_receive_multi_txrx_queue_4k_mtu_netvsc( # allow configuring for different platforms mtu_size = 4000 try: - snd, rcv = verify_dpdk_send_receive_multi_txrx_queue( + verify_dpdk_send_receive_multi_txrx_queue( environment, log, variables, @@ -1077,6 +1078,44 @@ def verify_dpdk_send_receive_gb_hugepages_netvsc( except UnsupportedPackageVersionException as err: raise SkippedException(err) + @TestCaseMetadata( + description=""" + Tests a sender/receiver 5-tuple-swap forwarding setup + for direct netvsc pmd setup. + Sender sends the packets, receiver receives them. + We check the traffic received matches the amount sent + and received by the forwarder. + Test uses 1GB hugepages. + """, + priority=2, + requirement=simple_requirement( + min_core_count=32, + min_nic_count=2, + network_interface=Sriov(), + min_count=2, + unsupported_features=[Gpu, Infiniband], + ), + ) + def verify_dpdk_testpmd_5tswap_gb_hugepages_netvsc( + self, + environment: Environment, + log: Logger, + variables: Dict[str, Any], + result: TestResult, + ) -> None: + try: + verify_dpdk_send_receive( + environment, + log, + variables, + Pmd.NETVSC, + HugePageSize.HUGE_1GB, + result=result, + receiver_mode=TestpmdForwardMode.FIVE_TUPLE_SWAP, + ) + except UnsupportedPackageVersionException as err: + raise SkippedException(err) + @TestCaseMetadata( description=""" Run testpmd with multiple senders to a single receiver diff --git a/lisa/microsoft/testsuites/dpdk/dpdktestpmd.py b/lisa/microsoft/testsuites/dpdk/dpdktestpmd.py index a01d837bb1..79f286ece7 100644 --- a/lisa/microsoft/testsuites/dpdk/dpdktestpmd.py +++ b/lisa/microsoft/testsuites/dpdk/dpdktestpmd.py @@ -3,18 +3,22 @@ import re from pathlib import PurePath, PurePosixPath -from typing import Any, List, Tuple, Type +from typing import Any, List, Optional, Tuple, Type from assertpy import assert_that, fail from microsoft.testsuites.dpdk.common import ( + DPDK_STABLE_GIT_REPO, DependencyInstaller, Downloader, + DpdkMpRole, GitDownloader, Installer, OsPackageDependencies, PackageManagerInstall, + Pmd, TarDownloader, get_debian_backport_repo_args, + get_dpdk_default_source_version, is_url_for_git_repo, is_url_for_tarball, unsupported_os_thrower, @@ -96,22 +100,6 @@ # declare package/tool dependencies for DPDK source installation DPDK_SOURCE_INSTALL_PACKAGES = DependencyInstaller( requirements=[ - OsPackageDependencies( - matcher=lambda x: isinstance(x, Ubuntu) - and x.information.codename == "bionic", - packages=[ - "build-essential", - "libmnl-dev", - "libelf-dev", - "libnuma-dev", - "dpkg-dev", - "pkg-config", - "python3-pip", - # 18.04 doesn't need linux-modules-extra-azure - # since it will never have MANA support - ], - stop_on_match=True, - ), # install linux-modules-extra-azure if it's available for mana_ib # older debian kernels won't have mana_ib packaged, # so skip the check on those kernels. @@ -403,9 +391,16 @@ class DpdkTestpmd(Tool): @property def command(self) -> str: - if not self._testpmd_install_path: - return "testpmd" - return self._testpmd_install_path + # if dpdk is already installed, find the binary and check the version + if self.find_testpmd_binary(assert_on_fail=False): + pkgconfig = self.node.tools[Pkgconfig] + if pkgconfig.package_info_exists(self._dpdk_lib_name): + self._dpdk_version_info = pkgconfig.get_package_version( + self._dpdk_lib_name + ) + return self._testpmd_install_path + else: + return "dpdk-testpmd" _rte_target = "x86_64-native-linuxapp-gcc" @@ -469,7 +464,7 @@ def use_package_manager_install(self) -> bool: return False def generate_testpmd_include( - self, node_nic: NicInfo, vdev_id: int, force_netvsc: bool = False + self, node_nic: NicInfo, vdev_id: int, pmd: Pmd ) -> str: # handle generating different flags for pmds/device combos for testpmd @@ -492,42 +487,29 @@ def generate_testpmd_include( else: include_flag = "-w" - include_flag = f' {include_flag} "{node_nic.pci_slot}"' + include_flag = f'{include_flag} "{node_nic.pci_slot}"' # build pmd argument - if self.has_dpdk_version() and self.get_dpdk_version() < "18.11.0": - pmd_name = "net_failsafe" - pmd_flags = f"dev({node_nic.pci_slot}),dev(iface={node_nic.name},force=1)" - elif self.is_mana: - # mana selects by mac, just return the vdev info directly - if node_nic.module_name == "uio_hv_generic" or force_netvsc: - return f' --vdev="{node_nic.pci_slot},mac={node_nic.mac_addr}" ' - # if mana_ib is present, use mana friendly args - elif self.node.tools[Modprobe].module_exists("mana_ib"): + if pmd == Pmd.FAILSAFE: + if self.is_mana: return ( - f' --vdev="net_vdev_netvsc0,mac={node_nic.mac_addr}"' - f' --vdev="{node_nic.pci_slot},mac={node_nic.mac_addr}" ' + f'--vdev="net_vdev_netvsc{vdev_id},mac={node_nic.mac_addr}"' + f' --vdev="{node_nic.pci_slot},mac={node_nic.mac_addr}"' ) else: - # use eth interface for failsafe otherwise - # test will probably fail due to low throughput - pmd_name = "net_vdev_netvsc" - pmd_flags = f"iface={node_nic.name}" - # reset include flag for MANA since there is only one interface - include_flag = "" - else: - # mlnx setup for failsafe - pmd_name = "net_vdev_netvsc" - pmd_flags = f"iface={node_nic.name},force=1" - if node_nic.module_name == "hv_netvsc": - # primary/upper/master nic is bound to hv_netvsc - # when using net_failsafe implicitly or explicitly. - # Set up net_failsafe/net_vdev_netvsc args here - return f'--vdev="{pmd_name}{vdev_id},{pmd_flags}" ' + include_flag - elif node_nic.module_name == "uio_hv_generic": - # if using netvsc pmd, just let -w or -a select - # which device to use. No other args are needed. - return include_flag + return ( + f"{include_flag} " + f"--vdev=net_vdev_netvsc{vdev_id},iface={node_nic.name}" + ) + if pmd == Pmd.NETVSC: + # mana can use vports where there is only one pci device + # so can't use the easy mode there + if self.is_mana: + return f'--vdev="{node_nic.pci_slot},mac={node_nic.mac_addr}"' + else: + # otherwise just include the device id + # it's already been set up by the earlier driver binding + return include_flag else: # if we're all the way through and haven't picked a pmd, something # has gone wrong. fail fast @@ -544,11 +526,16 @@ def generate_testpmd_command( nic_to_include: List[NicInfo], vdev_id: int, mode: str, + pmd: Pmd, extra_args: str = "", multiple_queues: bool = False, service_cores: int = 1, mtu: int = 0, mbuf_size: int = 0, + mp_role: Optional[DpdkMpRole] = None, + num_procs: int = 0, + proc_id: int = 0, + core_list: Optional[List[int]] = None, ) -> str: # testpmd \ # -l \ @@ -560,11 +547,20 @@ def generate_testpmd_command( # --forward-mode=txonly \ # --eth-peer=, \ # --stats-period + # --proc-id + + if mp_role: + mp_args = self._generate_mp_arguments( + mp_role=mp_role, num_procs=num_procs, proc_id=proc_id + ) + else: + mp_args = "" # pick amount of queues for tx/rx (txq/rxq flag) # our tests use equal amounts for rx and tx + if multiple_queues: - if self.is_mana and mode == "txonly": + if self.is_mana and mode in ["rxonly", "5tswap"]: queues = 8 else: queues = 4 @@ -577,7 +573,7 @@ def generate_testpmd_command( # generate the flags for which devices to include in the tests nic_include_infos = [] for nic in nic_to_include: - nic_include_infos += [self.generate_testpmd_include(nic, vdev_id)] + nic_include_infos += [self.generate_testpmd_include(nic, vdev_id, pmd)] vdev_id += 1 # infer core count to assign based on number of queues @@ -604,14 +600,28 @@ def generate_testpmd_command( forwarding_cores = max_core_index - service_cores # core range argument - core_list = f"-l 1-{max_core_index}" + if core_list: + # validate that the provided core list has enough cores + # for forwarding plus the service core(s) + required_cores = forwarding_cores + service_cores + assert_that(len(core_list)).described_as( + f"core_list has {len(core_list)} cores, but {required_cores} " + f"are required ({forwarding_cores} " + f"forwarding + {service_cores} service)" + ).is_greater_than_or_equal_to(required_cores) + # override forwarding_cores with the actual count from core_list + forwarding_cores = len(core_list) - service_cores + core_list_arg = f"-l {','.join(map(str, core_list))}" + else: + core_list_arg = f"-l 1-{max_core_index}" + if extra_args: extra_args = extra_args.strip() else: extra_args = "" # mana pmd needs tx/rx descriptors declared. if self.is_mana: - extra_args += f" --txd={txd} --rxd={txd} --stats 2" + extra_args += f" --txd={txd} --rxd={txd} " if queues > 1: extra_args += f" --txq={queues} --rxq={queues}" @@ -661,9 +671,11 @@ def generate_testpmd_command( debug_logging = "--log-level netvsc,debug" nic_includes = " ".join(nic_include_infos) return ( - f"{self._testpmd_install_path} {core_list} " - f"{nic_includes} {debug_logging} -- --forward-mode={mode} " - f"-a --stats-period 2 --nb-cores={forwarding_cores} {extra_args} " + f"{self._testpmd_install_path} {core_list_arg} " + f"{nic_includes} {debug_logging} --proc-type=auto " + f"-- --forward-mode={mode} " + f"-a --stats-period 4 --nb-cores={forwarding_cores} " + f"{mp_args} {extra_args}" ) def run_for_n_seconds(self, cmd: str, timeout: int) -> str: @@ -853,8 +865,16 @@ def get_example_app_path(self, app_name: str) -> PurePath: def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - self._dpdk_source = kwargs.pop("dpdk_source", PACKAGE_MANAGER_SOURCE) - self._dpdk_branch = kwargs.pop("dpdk_branch", "main") + # this should be set by initialize_node_resources first, + # but we'll also set a default here to avoid issues if Testpmd + # is ever used without calling that function first. + self._dpdk_source = kwargs.pop("dpdk_source", DPDK_STABLE_GIT_REPO) + try: + self._dpdk_branch = kwargs.pop( + "dpdk_branch", get_dpdk_default_source_version(self.node) + ) + except UnsupportedDistroException as err: + raise SkippedException(err) self._sample_apps_to_build = kwargs.pop("sample_apps", []) self._dpdk_version_info = VersionInfo(0, 0) self._testpmd_install_path: str = "" @@ -903,6 +923,45 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self._dpdk_lib_name ) + def _generate_mp_arguments( + self, mp_role: DpdkMpRole, num_procs: int, proc_id: int + ) -> str: + # Check and set multi_process arguments for testpmd. + mp_arguments = "" + + # IFF testpmd is being invoked with multiple processes, + # we must check that: + # primary process has a proc-id of 0 + # proc-id < num procs + # num_procs > 1 + # Otherwise we can omit all of these arguments + + assert_that(num_procs).described_as( + "Test bug: dpdk mp context requires num_procs arg > 1" + ).is_greater_than(1) + + if mp_role == DpdkMpRole.PRIMARY_PROCESS: + mp_arguments = f"--num-procs={num_procs} --proc-id 0" + elif mp_role == DpdkMpRole.SECONDARY_PROCESS: + # check the caller has provided a proc_id that makes sense, + # this would indicate a bug in the test case itself. + assert_that(proc_id).described_as( + "Test bug: dpdk proc-id argument must be > 0 for secondary process" + ).is_greater_than(0) + + assert_that(proc_id).described_as( + ( + f"Test bug: dpdk proc-id argument ({proc_id}) " + f"must be < num_procs argument ({num_procs})." + ) + ).is_less_than(num_procs) + mp_arguments = f"--num-procs={num_procs} --proc-id {proc_id}" + else: + raise LisaException( + "Test bug: no mp arguments defined for " f"dpdk mp role: {str(mp_role)}" + ) + return mp_arguments + def _determine_network_hardware(self) -> None: lspci = self.node.tools[Lspci] device_list = lspci.get_devices_by_type(DEVICE_TYPE_SRIOV) diff --git a/lisa/microsoft/testsuites/dpdk/dpdkutil.py b/lisa/microsoft/testsuites/dpdk/dpdkutil.py index 0274be7288..36605958fe 100644 --- a/lisa/microsoft/testsuites/dpdk/dpdkutil.py +++ b/lisa/microsoft/testsuites/dpdk/dpdkutil.py @@ -15,17 +15,20 @@ DPDK_STABLE_GIT_REPO, Downloader, DpdkGradeMetric, + DpdkMpRole, GitDownloader, Installer, PackageManagerInstall, Pmd, TarDownloader, + TestpmdForwardMode, check_dpdk_support, + get_dpdk_default_source_version, is_url_for_git_repo, is_url_for_tarball, update_kernel_from_repo, ) -from microsoft.testsuites.dpdk.dpdktestpmd import PACKAGE_MANAGER_SOURCE, DpdkTestpmd +from microsoft.testsuites.dpdk.dpdktestpmd import DpdkTestpmd from microsoft.testsuites.dpdk.rdmacore import ( RDMA_CORE_MANA_DEFAULT_SOURCE, RDMA_CORE_PACKAGE_MANAGER_DEPENDENCIES, @@ -126,19 +129,9 @@ def __init__( def _set_forced_source_by_distro(node: Node, variables: Dict[str, Any]) -> None: - # if mana is present, force a source build of 24.11 - # if no other source was provided. + # if mana is present, force a source build from DPDK_STABLE if none is set. if node.nics.is_mana_device_present(): variables["dpdk_source"] = variables.get("dpdk_source", DPDK_STABLE_GIT_REPO) - variables["dpdk_branch"] = variables.get("dpdk_branch", "v24.11") - # DPDK packages 17.11 which is EOL and doesn't have the - # net_vdev_netvsc pmd used for simple handling of hyper-v - # guests. Force stable source build on this platform. - # Default to 20.11 unless another version is provided by the - # user. 20.11 is the latest dpdk version for 18.04. - elif isinstance(node.os, Ubuntu) and node.os.information.version < "20.4.0": - variables["dpdk_source"] = variables.get("dpdk_source", DPDK_STABLE_GIT_REPO) - variables["dpdk_branch"] = variables.get("dpdk_branch", "v20.11") def get_rdma_core_installer( @@ -207,7 +200,7 @@ def generate_send_receive_run_info( multiple_queues: bool = False, use_service_cores: int = 1, set_mtu: int = 0, -) -> Dict[DpdkTestResources, str]: +) -> Dict[DpdkTestResources, List[str]]: snd_nic, rcv_nic = [x.node.nics.get_secondary_nic() for x in [sender, receiver]] # for MTU test: check that we can fetch the max MTU size for the NIC if set_mtu: @@ -223,10 +216,12 @@ def generate_send_receive_run_info( ) else: maxmtu_int = 0 + snd_cmd = sender.testpmd.generate_testpmd_command( [snd_nic], 0, "txonly", + pmd, extra_args=f"--tx-ip={snd_nic.ip_addr},{rcv_nic.ip_addr}", multiple_queues=multiple_queues, service_cores=use_service_cores, @@ -237,6 +232,7 @@ def generate_send_receive_run_info( [rcv_nic], 0, "rxonly", + pmd, multiple_queues=multiple_queues, service_cores=use_service_cores, mtu=set_mtu, @@ -244,13 +240,108 @@ def generate_send_receive_run_info( ) kit_cmd_pairs = { - sender: snd_cmd, - receiver: rcv_cmd, + sender: [snd_cmd], + receiver: [rcv_cmd], } return kit_cmd_pairs +def generate_5tswap_run_info( + pmd: Pmd, + sender: DpdkTestResources, + receiver: DpdkTestResources, + multiple_queues: bool = False, + use_service_cores: int = 1, + set_mtu: int = 0, +) -> Dict[DpdkTestResources, List[str]]: + snd_nic, rcv_nic = [x.node.nics.get_secondary_nic() for x in [sender, receiver]] + # for MTU test: check that we can fetch the max MTU size for the NIC + if set_mtu: + check_nic = sender.node.nics.get_primary_nic().lower + maxmtu = sender.node.tools[Ip].get_detail(check_nic, "maxmtu") + if not maxmtu: + raise SkippedException("Could not verify maxmtu for DPDK max mtu test.") + maxmtu_int = int(maxmtu) + if set_mtu > maxmtu_int: + raise SkippedException( + "Requested MTU size exceeds max mtu for DPDK mtu test: " + f"{set_mtu} > {maxmtu}." + ) + else: + maxmtu_int = 0 + + # Dynamically allocate cores for primary and secondary processes. + # We need non-overlapping core lists for each process. + # Use only cores from NUMA node 0 for better memory locality. + # Reserve core 0 for the OS, use remaining cores split between processes. + sender_lscpu = sender.node.tools[Lscpu] + sender_cpu_info = sender_lscpu.get_cpu_info() + # Get cores on NUMA node 0, excluding core 0 (reserved for OS) + sender_numa0_cores = sorted( + [cpu.cpu for cpu in sender_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] + ) + # Split cores between primary (txonly) and secondary (rxonly) processes + # Use even-indexed cores for primary, odd-indexed for secondary + primary_cores = sender_numa0_cores[::2] # every other core starting at index 0 + secondary_cores = sender_numa0_cores[1::2] # every other core starting at index 1 + + # similarly for receiver - use NUMA node 0 cores only + receiver_lscpu = receiver.node.tools[Lscpu] + receiver_cpu_info = receiver_lscpu.get_cpu_info() + receiver_available_cores = sorted( + [cpu.cpu for cpu in receiver_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] + ) + + snd_cmd = sender.testpmd.generate_testpmd_command( + [snd_nic], + 0, + "txonly", + pmd, + multiple_queues=multiple_queues, + service_cores=use_service_cores, + mtu=set_mtu, + mbuf_size=maxmtu_int, + mp_role=DpdkMpRole.PRIMARY_PROCESS, + num_procs=2, + proc_id=0, + core_list=primary_cores, + extra_args=f"--tx-ip={snd_nic.ip_addr},{rcv_nic.ip_addr}", + ) + snd_mp_cmd = sender.testpmd.generate_testpmd_command( + [snd_nic], + 0, + "rxonly", + pmd, + multiple_queues=multiple_queues, + service_cores=use_service_cores, + mtu=set_mtu, + mbuf_size=maxmtu_int, + mp_role=DpdkMpRole.SECONDARY_PROCESS, + num_procs=2, + proc_id=1, + core_list=secondary_cores, + ) + rcv_cmd = receiver.testpmd.generate_testpmd_command( + [rcv_nic], + 0, + "5tswap", + pmd, + multiple_queues=multiple_queues, + service_cores=use_service_cores, + mtu=set_mtu, + mbuf_size=maxmtu_int, + core_list=receiver_available_cores, + ) + + dpdk_kit_cmds = { + sender: [snd_cmd, snd_mp_cmd], + receiver: [rcv_cmd], + } + + return dpdk_kit_cmds + + def generate_testpmd_multiple_port_command( pmd: Pmd, senders: List[DpdkTestResources], @@ -294,7 +385,6 @@ def generate_testpmd_multiple_port_command( else: maxmtu_int = 0 kit_cmd_pairs: Dict[DpdkTestResources, str] = dict() - receiver_includes: List[str] = [] for i in range(len(senders)): # get the sender sender = senders[i] @@ -310,6 +400,7 @@ def generate_testpmd_multiple_port_command( [sender_nic], 0, "txonly", + pmd, extra_args=f"--tx-ip={sender_nic.ip_addr},{receiver_nic.ip_addr}", multiple_queues=multiple_queues, service_cores=use_service_cores, @@ -318,18 +409,13 @@ def generate_testpmd_multiple_port_command( ) # store this senders command kit_cmd_pairs[sender] = snd_cmd - # receiver needs multiple ports, so only generate the include. - receiver_include = receiver.testpmd.generate_testpmd_include( - receiver_nics[sender_subnet], i - ) - # and save it - receiver_includes += [receiver_include] # and generate the command with multiple ports for the single receiver: rcv_cmd = receiver.testpmd.generate_testpmd_command( list([receiver_nics[key] for key in receiver_nics]), 0, "rxonly", + pmd, multiple_queues=multiple_queues, service_cores=use_service_cores, mtu=set_mtu, @@ -417,12 +503,18 @@ def initialize_node_resources( sample_apps: Union[List[str], None] = None, test_nics: Union[List[NicInfo], None] = None, ) -> DpdkTestResources: - _set_forced_source_by_distro(node, variables) check_pmd_support(node, pmd) - - dpdk_source = variables.get("dpdk_source", PACKAGE_MANAGER_SOURCE) - dpdk_branch = variables.get("dpdk_branch", "") - rdma_source = variables.get("rdma_source", "") + # Set dpdk source defaults for supported distros if none is set by the user. + # This must be set ahead of setting the default for rdma-core since the default + # for one is picked based on the other. + dpdk_source = variables.get("dpdk_source", DPDK_STABLE_GIT_REPO) + try: + dpdk_branch = variables.get( + "dpdk_branch", get_dpdk_default_source_version(node) + ) + except UnsupportedDistroException as err: + raise SkippedException(err) + rdma_source = variables.get("rdma_source", RDMA_CORE_MANA_DEFAULT_SOURCE) rdma_branch = variables.get("rdma_branch", "") force_net_failsafe_pmd = variables.get("dpdk_force_net_failsafe_pmd", False) log.info( @@ -460,13 +552,18 @@ def initialize_node_resources( # create tool, initialize testpmd tool (installs dpdk) # use create over get to avoid skipping # reinitialization of tool when new arguments are present - testpmd: DpdkTestpmd = node.tools.create( - DpdkTestpmd, - dpdk_source=dpdk_source, - dpdk_branch=dpdk_branch, - sample_apps=sample_apps, - force_net_failsafe_pmd=force_net_failsafe_pmd, - ) + try: + testpmd: DpdkTestpmd = node.tools.create( + DpdkTestpmd, + dpdk_source=dpdk_source, + dpdk_branch=dpdk_branch, + sample_apps=sample_apps, + force_net_failsafe_pmd=force_net_failsafe_pmd, + ) + except UnsupportedDistroException as err: + # if DPDK installation or test is not supported, skip. + raise SkippedException(err) + # Tools will skip installation if the binary is present, so # force invoke install. Installer will skip if the correct # *type* of installation is already installed, @@ -531,12 +628,12 @@ def check_send_receive_compatibility(test_kits: List[DpdkTestResources]) -> None def run_testpmd_concurrent( - node_cmd_pairs: Dict[DpdkTestResources, str], + node_cmd_pairs: Dict[DpdkTestResources, List[str]], seconds: int, log: Logger, hotplug_sriov: bool = False, -) -> Dict[DpdkTestResources, str]: - output: Dict[DpdkTestResources, str] = dict() +) -> Dict[Tuple[DpdkTestResources, str], str]: + output: Dict[Tuple[DpdkTestResources, str], str] = dict() task_manager = start_testpmd_concurrent(node_cmd_pairs, seconds, log, output) if hotplug_sriov: @@ -572,24 +669,29 @@ def run_testpmd_concurrent( def start_testpmd_concurrent( - node_cmd_pairs: Dict[DpdkTestResources, str], + node_cmd_pairs: Dict[DpdkTestResources, List[str]], seconds: int, log: Logger, - output: Dict[DpdkTestResources, str], -) -> TaskManager[Tuple[DpdkTestResources, str]]: - cmd_pairs_as_tuples = deque(node_cmd_pairs.items()) - - def _collect_dict_result(result: Tuple[DpdkTestResources, str]) -> None: - output[result[0]] = result[1] + output: Dict[Tuple[DpdkTestResources, str], str], +) -> TaskManager[Tuple[DpdkTestResources, str, str]]: + command_pairs_as_tuples: List[Tuple[DpdkTestResources, str]] = [] + kits_and_commands = deque(node_cmd_pairs.items()) + for kit_and_commands in kits_and_commands: + kit, commands = kit_and_commands + for command in commands: + command_pairs_as_tuples.append((kit, command)) + + def _collect_dict_result(result: Tuple[DpdkTestResources, str, str]) -> None: + output[result[0], result[1]] = result[2] def _run_command_with_testkit( run_kit: Tuple[DpdkTestResources, str], - ) -> Tuple[DpdkTestResources, str]: + ) -> Tuple[DpdkTestResources, str, str]: testkit, cmd = run_kit - return (testkit, testkit.testpmd.run_for_n_seconds(cmd, seconds)) + return (testkit, cmd, testkit.testpmd.run_for_n_seconds(cmd, seconds)) task_manager = run_in_parallel_async( - [partial(_run_command_with_testkit, x) for x in cmd_pairs_as_tuples], + [partial(_run_command_with_testkit, x) for x in command_pairs_as_tuples], _collect_dict_result, ) @@ -665,7 +767,7 @@ def verify_dpdk_build( test_nic = node.nics.get_secondary_nic() testpmd_cmd = testpmd.generate_testpmd_command( - [test_nic], 0, "txonly", multiple_queues=multiple_queues + [test_nic], 0, "txonly", pmd, multiple_queues=multiple_queues ) testpmd.run_for_n_seconds(testpmd_cmd, 10) tx_pps = testpmd.get_mean_tx_pps() @@ -692,78 +794,203 @@ def verify_dpdk_send_receive( set_mtu: int = 0, check_sender_packet_drops: bool = False, grading_metric: DpdkGradeMetric = DpdkGradeMetric.PPS, + receiver_mode: TestpmdForwardMode = TestpmdForwardMode.RX_ONLY, ) -> Tuple[DpdkTestResources, DpdkTestResources]: - # helpful to have the public ips labeled for debugging - external_ips = [] - for node in environment.nodes.list(): - if isinstance(node, RemoteNode): - external_ips += node.connection_info[ - constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS - ] - else: - raise SkippedException() - # skip MTU test if not on MANA (for now). - if set_mtu and not node.nics.is_mana_device_present(): - raise SkippedException("set mtu test is intended for MANA VMs only.") - log.debug((f"\nsender:{external_ips[0]}\nreceiver:{external_ips[1]}\n")) + # Validate nodes and log external IPs + _validate_nodes_and_log_ips(environment, log, set_mtu) - # get test duration variable if set - # enables long-running tests to shakeQoS and SLB issue + # Get test duration and timeouts test_duration: int = variables.get("dpdk_test_duration", 15) kill_timeout = test_duration + 5 + + # Initialize nodes concurrently test_kits = init_nodes_concurrent( environment, log, variables, pmd, hugepage_size=hugepage_size ) - check_send_receive_compatibility(test_kits) sender, receiver = test_kits - # annotate test result before starting + # Annotate and generate commands if result is not None: annotate_dpdk_test_result(test_kit=sender, test_result=result, log=log) - kit_cmd_pairs = generate_send_receive_run_info( + kit_cmd_pairs = _generate_receiver_commands( pmd, sender, receiver, - use_service_cores=use_service_cores, - multiple_queues=multiple_queues, - set_mtu=set_mtu, - ) - receive_timeout = kill_timeout + 10 - receive_result = receiver.node.tools[Timeout].start_with_timeout( - kit_cmd_pairs[receiver], - receive_timeout, - constants.SIGINT, - kill_timeout=receive_timeout, - ) - receive_result.wait_output("start packet forwarding") - sender_result = sender.node.tools[Timeout].start_with_timeout( - kit_cmd_pairs[sender], - test_duration, - constants.SIGINT, - kill_timeout=kill_timeout, + receiver_mode, + use_service_cores, + multiple_queues, + set_mtu, ) - results = dict() - results[sender] = sender.testpmd.process_testpmd_output(sender_result.wait_result()) - results[receiver] = receiver.testpmd.process_testpmd_output( - receive_result.wait_result() + # Run testpmd processes + sender_processes, receiver_processes = _run_testpmd_processes( + sender, receiver, kit_cmd_pairs, test_duration, kill_timeout ) - # helpful to have the outputs labeled - log.debug(f"\nSENDER:\n{results[sender]}") - log.debug(f"\nRECEIVER:\n{results[receiver]}") + # Collect results + _collect_testpmd_results( + sender, receiver, sender_processes, receiver_processes, log + ) - rcv_rx_pps = receiver.testpmd.get_mean_rx_pps() + # Grade and validate results snd_tx_pps = sender.testpmd.get_mean_tx_pps() + rcv_rx_pps = receiver.testpmd.get_mean_rx_pps() + log.info(f"receiver rx-pps: {rcv_rx_pps}") log.info(f"sender tx-pps: {snd_tx_pps}") + if result: + result.information["snd_pps"] = snd_tx_pps + result.information["rcv_pps"] = rcv_rx_pps + + # Check for kernel errors sender.dmesg.check_kernel_errors(force_run=True) receiver.dmesg.check_kernel_errors(force_run=True) + + # Grade based on metric + _grade_dpdk_results( + grading_metric, rcv_rx_pps, snd_tx_pps, result, sender, receiver, log + ) + + # Check packet drops + if check_sender_packet_drops: + sender.testpmd.check_tx_packet_drops() + receiver.testpmd.check_rx_packet_drops() + annotate_packet_drops(log, result, receiver) + + # Validate 5-tuple swap if applicable + if receiver_mode == TestpmdForwardMode.FIVE_TUPLE_SWAP: + _validate_5tswap_results( + sender, receiver, grading_metric, log, sender_processes + ) + + return sender, receiver + + +def _validate_nodes_and_log_ips( + environment: Environment, log: Logger, set_mtu: int +) -> None: + """Validate nodes are remote and check MTU test compatibility.""" + external_ips = [] + for node in environment.nodes.list(): + if not isinstance(node, RemoteNode): + raise SkippedException() + external_ips += node.connection_info[ + constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS + ] + if set_mtu and not node.nics.is_mana_device_present(): + raise SkippedException("set mtu test is intended for MANA VMs only.") + log.debug((f"\nsender:{external_ips[0]}\nreceiver:{external_ips[1]}\n")) + + +def _generate_receiver_commands( + pmd: Pmd, + sender: DpdkTestResources, + receiver: DpdkTestResources, + receiver_mode: TestpmdForwardMode, + use_service_cores: int, + multiple_queues: bool, + set_mtu: int, +) -> Dict[DpdkTestResources, List[str]]: + """Generate testpmd commands based on receiver mode.""" + if receiver_mode == TestpmdForwardMode.RX_ONLY: + return generate_send_receive_run_info( + pmd, + sender, + receiver, + use_service_cores=use_service_cores, + multiple_queues=multiple_queues, + set_mtu=set_mtu, + ) + elif receiver_mode == TestpmdForwardMode.FIVE_TUPLE_SWAP: + return generate_5tswap_run_info( + pmd, + sender, + receiver, + use_service_cores=use_service_cores, + multiple_queues=multiple_queues, + set_mtu=set_mtu, + ) + else: + raise LisaException( + f"Unsupported TestpmdForwardMode for receiver: {receiver_mode}" + ) + + +def _run_testpmd_processes( + sender: DpdkTestResources, + receiver: DpdkTestResources, + kit_cmd_pairs: Dict[DpdkTestResources, List[str]], + test_duration: int, + kill_timeout: int, +) -> Tuple[List[Process], List[Process]]: + """Start testpmd processes on sender and receiver.""" + receiver_timeout = kill_timeout + 10 + receiver_processes: List[Process] = [] + sender_processes: List[Process] = [] + + for command in kit_cmd_pairs[receiver]: + proc = receiver.node.tools[Timeout].start_with_timeout( + command=command, + timeout=receiver_timeout, + signal=constants.SIGINT, + kill_timeout=receiver_timeout, + ) + proc.wait_output("start packet forwarding") + receiver_processes.append(proc) + + for command in kit_cmd_pairs[sender]: + proc = sender.node.tools[Timeout].start_with_timeout( + command=command, + timeout=test_duration, + signal=constants.SIGINT, + kill_timeout=kill_timeout, + ) + sender_processes.append(proc) + proc.wait_output("start packet forwarding") + + return sender_processes, receiver_processes + + +def _collect_testpmd_results( + sender: DpdkTestResources, + receiver: DpdkTestResources, + sender_processes: List[Process], + receiver_processes: List[Process], + log: Logger, +) -> Dict[DpdkTestResources, str]: + """Collect and log testpmd output from all processes.""" + results = {} + results[sender] = sender.testpmd.process_testpmd_output( + sender_processes[0].wait_result() + ) + results[receiver] = receiver.testpmd.process_testpmd_output( + receiver_processes[0].wait_result() + ) + + log.debug(f"\nSENDER:\n{results[sender]}") + for tx_secondary in [x.wait_result() for x in sender_processes[1:]]: + log.debug(f"\nSENDER_SECONDARY:\n{tx_secondary.stdout}") + log.debug(f"\nRECEIVER:\n{results[receiver]}") + for rx_secondary in [x.wait_result() for x in receiver_processes[1:]]: + log.debug(f"\nRECEIVER_SECONDARY:\n{rx_secondary.stdout}") + + return results + + +def _grade_dpdk_results( + grading_metric: DpdkGradeMetric, + rcv_rx_pps: int, + snd_tx_pps: int, + result: Optional[TestResult], + sender: DpdkTestResources, + receiver: DpdkTestResources, + log: Logger, +) -> None: + """Grade DPDK results based on selected metric.""" if grading_metric == DpdkGradeMetric.PPS: - # differences in NIC type throughput can lead to different snd/rcv counts assert_that(rcv_rx_pps).described_as( "Throughput for RECEIVE was below the correct order-of-magnitude" ).is_greater_than(DPDK_PPS_THRESHOLD) @@ -771,35 +998,61 @@ def verify_dpdk_send_receive( "Throughput for SEND was below the correct order of magnitude" ).is_greater_than(DPDK_PPS_THRESHOLD) elif grading_metric == DpdkGradeMetric.BPS: - # grading bits per second is non-trivial since - # Azure internal SKU information is not exposed to the guest, - # also it's difficult to look up the expected Mbps for a given SKU. - sender_gbps = sender.testpmd.check_bps_data("TX") receiver_gbps = receiver.testpmd.check_bps_data("RX") - - # so just annotate test result if it's available - # a test crashing because of no data or dpdk failing to start - # will still result in a fail. if result: result.information["tx_gbps"] = sender_gbps result.information["rx_gbps"] = receiver_gbps - else: - pass # no-op if no grading is required. - # sender packet drops are common when network bandwidth is - # artificially throttled by the sku, so checking sender - # is optional - if check_sender_packet_drops: - sender.testpmd.check_tx_packet_drops() - # verify receiver didn't drop most of the packets - receiver.testpmd.check_rx_packet_drops() +def _validate_5tswap_results( + sender: DpdkTestResources, + receiver: DpdkTestResources, + grading_metric: DpdkGradeMetric, + log: Logger, + sender_processes: List[Process], +) -> None: + """Validate 5-tuple swap packet forwarding results.""" + if grading_metric != DpdkGradeMetric.PPS: + return - # annotate the amount of dropped packets on the receiver - annotate_packet_drops(log, result, receiver) + rcv_rx_pps = receiver.testpmd.get_mean_rx_pps() + rcv_tx_pps = receiver.testpmd.get_mean_tx_pps() - return sender, receiver + log.info(f"receiver rx-pps: {rcv_rx_pps}") + assert_that(rcv_rx_pps).described_as("receiver received no packets!").is_not_zero() + + log.info(f"receiver tx-pps: {rcv_tx_pps}") + assert_that(rcv_tx_pps).described_as( + f"receiver forwarded no packets after receiving {rcv_rx_pps}" + ).is_not_zero() + + forwarded_over_received = abs(rcv_tx_pps / rcv_rx_pps) + assert_that(forwarded_over_received).described_as( + "receiver re-send pps was unexpectedly low!" + ).is_close_to(1.0, 0.2) + + # Verify sender secondary process received forwarded packets + tx_secondary_results = [x.wait_result() for x in sender_processes[1:]] + assert_that(tx_secondary_results).described_as( + "Sender secondary process result is missing!" + ).is_not_empty() + assert_that(tx_secondary_results[0].stdout).described_as( + "Sender secondary process output was empty" + ).is_not_empty() + + sender.testpmd.process_testpmd_output(tx_secondary_results[0]) + snd_rx_pps = sender.testpmd.get_mean_rx_pps() + + log.info(f"sender secondary rx-pps: {snd_rx_pps}") + assert_that(snd_rx_pps).described_as( + "Sender secondary process did not receive forwarded packets" + ).is_greater_than(DPDK_PPS_THRESHOLD) + + forwarded_over_received = abs(snd_rx_pps / rcv_tx_pps) + assert_that(forwarded_over_received).described_as( + "sender secondary process received pps was unexpectedly low!" + ).is_close_to(1.0, 0.25) def annotate_packet_drops( @@ -1277,10 +1530,10 @@ def verify_dpdk_l3fwd_ntttcp_tcp( # generate the dpdk include arguments to add to our commandline include_devices = [ fwd_kit.testpmd.generate_testpmd_include( - subnet_a_nics[forwarder], dpdk_port_a, force_netvsc=True + subnet_a_nics[forwarder], dpdk_port_a, pmd ), fwd_kit.testpmd.generate_testpmd_include( - subnet_b_nics[forwarder], dpdk_port_b, force_netvsc=True + subnet_b_nics[forwarder], dpdk_port_b, pmd ), ] diff --git a/lisa/tools/meson.py b/lisa/tools/meson.py index bcc6fb2782..d9d3d42494 100644 --- a/lisa/tools/meson.py +++ b/lisa/tools/meson.py @@ -14,7 +14,11 @@ class Meson(Tool): - _minimum_version = parse_version("0.52.0") + # Keep this in sync with the minimum Meson version required by the DPDK + # builds this tool is expected to support; raising it can break older + # distro/package-manager environments unless the install/bootstrap flow + # is updated as well. + _minimum_version = parse_version("0.57.2") @property def command(self) -> str: