diff mbox series

[-next,01/14] kddv/core: Implement a ligth weight device driver test framework

Message ID 20231118104040.386381-2-zhangxiaoxu@huaweicloud.com (mailing list archive)
State New, archived
Headers show
Series Implement a ligth weight device driver test framework | expand

Commit Message

huaweicloud Nov. 18, 2023, 10:40 a.m. UTC
From: Zhang Xiaoxu <zhangxiaoxu5@huawei.com>

Implement a ligth weight device driver test framework, KDDV(Kernel
Device Driver Verification). Which using eBPF based bus controllers
to mockup device chipsets.

Directory structure as below:

  $ tree tools/testing/kddv/kddv
  tools/testing/kddv/kddv/
  ├── core           # general data structure of the kddv
  │   ├── __init__.py
  │   ├── ddunit.py  # basic testcase class
  │   ├── device.py  # device common info and operations
  │   ├── driver.py  # driver controller, load modules and bind device
  │   ├── mockup.py  # bpf controller, mockup hardware
  │   └── model.py   # drivers common info and operations
  ├── __init__.py
  ├── Makefile
  └── tests             # test case folder
      └── __init__.py

Signed-off-by: Wei Yongjun <weiyongjun1@huawei.com>
Signed-off-by: Zhang Xiaoxu <zhangxiaoxu5@huawei.com>
---
 tools/testing/kddv/.gitignore             |   3 +
 tools/testing/kddv/Makefile               |  25 ++++
 tools/testing/kddv/kddv/Makefile          |  22 +++
 tools/testing/kddv/kddv/__init__.py       |   0
 tools/testing/kddv/kddv/core/__init__.py  |   7 +
 tools/testing/kddv/kddv/core/ddunit.py    |  44 ++++++
 tools/testing/kddv/kddv/core/device.py    |  78 +++++++++++
 tools/testing/kddv/kddv/core/driver.py    |  79 +++++++++++
 tools/testing/kddv/kddv/core/mockup.py    | 157 ++++++++++++++++++++++
 tools/testing/kddv/kddv/core/model.py     |  87 ++++++++++++
 tools/testing/kddv/kddv/tests/__init__.py |   0
 11 files changed, 502 insertions(+)
 create mode 100644 tools/testing/kddv/.gitignore
 create mode 100644 tools/testing/kddv/Makefile
 create mode 100644 tools/testing/kddv/kddv/Makefile
 create mode 100755 tools/testing/kddv/kddv/__init__.py
 create mode 100755 tools/testing/kddv/kddv/core/__init__.py
 create mode 100755 tools/testing/kddv/kddv/core/ddunit.py
 create mode 100755 tools/testing/kddv/kddv/core/device.py
 create mode 100755 tools/testing/kddv/kddv/core/driver.py
 create mode 100755 tools/testing/kddv/kddv/core/mockup.py
 create mode 100755 tools/testing/kddv/kddv/core/model.py
 create mode 100755 tools/testing/kddv/kddv/tests/__init__.py
diff mbox series

Patch

diff --git a/tools/testing/kddv/.gitignore b/tools/testing/kddv/.gitignore
new file mode 100644
index 000000000000..159cec72565a
--- /dev/null
+++ b/tools/testing/kddv/.gitignore
@@ -0,0 +1,3 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+__pycache__/
+*.py[cod]
\ No newline at end of file
diff --git a/tools/testing/kddv/Makefile b/tools/testing/kddv/Makefile
new file mode 100644
index 000000000000..f4ef60a1342b
--- /dev/null
+++ b/tools/testing/kddv/Makefile
@@ -0,0 +1,25 @@ 
+# SPDX-License-Identifier: GPL-2.0
+include ../../scripts/Makefile.include
+
+SUBDIRS := kddv
+
+all:
+	@for SUBDIR in $(SUBDIRS); do \
+		$(MAKE) -C $$SUBDIR;\
+	done;
+
+install:
+ifdef INSTALL_PATH
+	@for SUBDIR in $(SUBDIRS); do \
+		$(MAKE) INSTALL_PATH=$(INSTALL_PATH)/$$SUBDIR -C $$SUBDIR install;\
+	done;
+else
+	$(error Error: set INSTALL_PATH to use install)
+endif
+
+clean:
+	@for SUBDIR in $(SUBDIRS); do \
+		$(MAKE) -C $$SUBDIR clean;\
+	done;
+
+.PHONY: all install clean
diff --git a/tools/testing/kddv/kddv/Makefile b/tools/testing/kddv/kddv/Makefile
new file mode 100644
index 000000000000..a68112154669
--- /dev/null
+++ b/tools/testing/kddv/kddv/Makefile
@@ -0,0 +1,22 @@ 
+# SPDX-License-Identifier: GPL-2.0
+include ../../../scripts/Makefile.include
+
+INSTALL ?= install
+
+all:
+	@for SUBDIR in $(SUBDIRS); do		\
+		$(MAKE) -C $$SUBDIR;		\
+	done;
+
+install:
+	$(INSTALL) -m 0755 -d $(INSTALL_PATH)
+	$(INSTALL) __init__.py $(INSTALL_PATH)
+	cp -rf core/ $(INSTALL_PATH)
+	cp -rf tests/ $(INSTALL_PATH)
+
+clean:
+	@for SUBDIR in $(SUBDIRS); do		\
+		$(MAKE) -C $$SUBDIR clean;	\
+	done;
+
+.PHONY: all install clean
diff --git a/tools/testing/kddv/kddv/__init__.py b/tools/testing/kddv/kddv/__init__.py
new file mode 100755
index 000000000000..e69de29bb2d1
diff --git a/tools/testing/kddv/kddv/core/__init__.py b/tools/testing/kddv/kddv/core/__init__.py
new file mode 100755
index 000000000000..45a35c909e86
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/__init__.py
@@ -0,0 +1,7 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
diff --git a/tools/testing/kddv/kddv/core/ddunit.py b/tools/testing/kddv/kddv/core/ddunit.py
new file mode 100755
index 000000000000..fd4ab9fe048c
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/ddunit.py
@@ -0,0 +1,44 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import logging
+import unittest
+
+from .model import DriverModel
+
+logger = logging.getLogger(__name__)
+
+class DriverTest(unittest.TestCase, DriverModel):
+    def __init__(self, methodName=None):
+        super().__init__(methodName)
+        DriverModel.__init__(self)
+
+    def setUp(self):
+        super().setUp()
+        try:
+            self.driver.setup()
+        except:
+            self.skipTest(f"Module {self.module_name} not found")
+        self.mockup.load()
+
+    def tearDown(self):
+        self.mockup.unload()
+        self.driver.teardown()
+        super().tearDown()
+
+    def assertRegEqual(self, reg, data, msg=None):
+        value = self.read_reg(reg)
+        self.assertEqual(value, data, msg)
+
+    def assertRegBitsEqual(self, reg, data, mask, msg=None):
+        value = self.read_reg(reg)
+        self.assertEqual(value & mask, data & mask, msg)
+
+    def assertRegsEqual(self, reg, data, msg=None):
+        value = self.read_regs(reg, len(data))
+        self.assertListEqual(value, data, msg)
diff --git a/tools/testing/kddv/kddv/core/device.py b/tools/testing/kddv/kddv/core/device.py
new file mode 100755
index 000000000000..db862890e9a4
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/device.py
@@ -0,0 +1,78 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import os
+import logging
+
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+class Device(object):
+    bus = None
+
+    def __init__(self, bus, driver, busid, addr, devid):
+        self.bus = bus
+        self.driver = driver
+        self.busid = busid
+        self.addr = addr
+        self.devid = devid
+        self.status = False
+        self.path = Path(f"/sys/bus/{self.bus}/devices/{self.device_id}")
+
+    def __enter__(self):
+        self.bind()
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.unbind()
+
+    def __del__(self):
+        self.unbind()
+
+    @classmethod
+    def create(cls, bus, driver, busid, addr, devid):
+        for subclass in cls.__subclasses__():
+            if subclass.bus == bus:
+                return subclass(bus, driver, busid, addr, devid)
+        return cls(bus, driver, busid, addr, devid)
+
+    @property
+    def device_id(self):
+        return self.devid
+
+    def bind(self):
+        if self.status is True:
+            return
+        self.status = True
+        self.driver.bind(self.device_id)
+
+    def unbind(self):
+        if self.status is False:
+            return
+        try:
+            self.driver.unbind(self.device_id)
+        except:
+            pass
+        self.status = False
+
+    def read_attr(self, attr):
+        path = self.path / attr
+        if not os.path.exists(path):
+            logger.info(f"attr '{attr}' not exists")
+            return None
+        logger.debug(f"read from {path}")
+        return path.read_text().rstrip()
+
+    def write_attr(self, attr, val):
+        path = self.path / attr
+        if not os.path.exists(path):
+            logger.info(f"attr '{attr}' not exists")
+            return
+        logger.debug(f"write '{val}' to {path}")
+        return path.write_bytes(val.encode())
diff --git a/tools/testing/kddv/kddv/core/driver.py b/tools/testing/kddv/kddv/core/driver.py
new file mode 100755
index 000000000000..55ad804068b5
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/driver.py
@@ -0,0 +1,79 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import os
+import logging
+import subprocess
+from pathlib import Path
+
+from .device import Device
+
+logger = logging.getLogger(__name__)
+
+class Driver(object):
+    def __init__(self, p):
+        self.bus = p.bus
+        self.domain_nr = p.domain_nr
+        self.bus_id = p.bus_id
+        self.parent_bus = p.parent_bus
+        self.driver = p.driver_name
+        self.module = p.module_name
+        self.depmod = p.dependencies
+        self.path = Path(f"/sys/bus/{self.bus}/drivers/{self.driver}")
+
+    def write_ctrl(self, spath, val):
+        path = self.path / spath
+        logger.debug(f"write '{val}' to {path}")
+        return path.write_text(val)
+
+    def disable_autoprobe(self):
+        logger.debug(f"disable {self.bus} drivers autoprobe")
+        path = Path(f"/sys/bus/{self.bus}/drivers_autoprobe")
+        path.write_text("0")
+
+    def enable_autoprobe(self):
+        logger.debug(f"enable {self.bus} drivers autoprobe")
+        path = Path(f"/sys/bus/{self.bus}/drivers_autoprobe")
+        path.write_text("1")
+
+    def probe_depmod(self):
+        if not self.depmod:
+            return
+        for mod in self.depmod:
+            logger.debug(f'modprobe {mod}')
+            subprocess.check_output(["/sbin/modprobe", mod])
+
+    def probe_module(self):
+        self.probe_depmod()
+        logger.debug(f'modprobe {self.module}')
+        subprocess.check_output(
+            ["/sbin/modprobe", self.module], stderr=subprocess.STDOUT
+        )
+
+    def remove_mdule(self):
+        logger.debug(f'rmmod {self.module}')
+        subprocess.check_output(["/sbin/rmmod", self.module])
+
+    def setup(self):
+        self.disable_autoprobe()
+        self.probe_module()
+
+    def teardown(self):
+        self.remove_mdule()
+        self.enable_autoprobe()
+
+    def bind(self, devid):
+        if os.path.exists(self.path / devid):
+            return
+        self.write_ctrl("bind", devid)
+
+    def unbind(self, devid):
+        self.write_ctrl("unbind", devid)
+
+    def device(self, addr, devid):
+        return Device.create(self.bus, self, self.bus_id, addr, devid)
diff --git a/tools/testing/kddv/kddv/core/mockup.py b/tools/testing/kddv/kddv/core/mockup.py
new file mode 100755
index 000000000000..b5e6c83c9164
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/mockup.py
@@ -0,0 +1,157 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import os
+import re
+import json
+import struct
+import logging
+import subprocess
+
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ROOT_BPF = os.path.join(ROOT_DIR, 'data', 'bpf')
+DEST_BPF = '/sys/fs/bpf'
+
+class Mockup(object):
+    bus = None
+
+    def __init__(self, bus, p) -> None:
+        self.bus = bus
+        self.bpf = p.bpf
+        self.addr = p.address
+        self.devid = p.device_id
+        self.regshift = p.regshift
+        self.regbytes = p.regbytes
+        self.valbytes = p.valbytes
+        self.regmaps = p.regmaps
+
+    @classmethod
+    def create(cls, bus, p):
+        for sub in cls.__subclasses__():
+            if sub.bus == bus:
+                return sub(bus, p)
+        return cls(bus, p)
+
+    @property
+    def device_id(self):
+        return f"{self.addr}"
+
+    @property
+    def bpftool(self):
+        return os.environ.get('BPFTOOL_PATH', 'bpftool')
+
+    def get_valbytes(self):
+        if self.valbytes == 3:
+            return 4
+        return self.valbytes
+
+    def search_file(self, path, filename):
+        for curfile in os.listdir(path):
+            abspath = os.path.join(path, curfile)
+            if os.path.isdir(abspath):
+                subfile = self.search_file(abspath, filename)
+                if subfile is not None:
+                    return subfile
+            if curfile == filename:
+                return abspath
+        return None
+
+    def bpf_prog_name(self):
+        bpf_name = re.sub("-", "_", os.path.basename(self.bpf))
+        return f'{bpf_name}'[:15]
+
+    def load_bpf(self):
+        if self.bpf is None:
+            return
+        bpf_file = self.search_file(ROOT_BPF, f"{self.bpf}.o")
+        if bpf_file is None:
+            logger.error(f'bpf file {self.bpf} not found')
+            return
+        logger.debug(f'load bpf {self.bpf}.o')
+        bpf_path = os.path.join(DEST_BPF, self.bpf_prog_name())
+        if os.path.exists(bpf_path):
+            os.unlink(bpf_path)
+        cmds = [self.bpftool, 'prog', 'load']
+        cmds += [bpf_file, bpf_path]
+        cmds += ['autoattach']
+        logger.debug(' '.join(cmds))
+        subprocess.check_output(cmds)
+
+    def unload_bpf(self):
+        if self.bpf is None:
+            return
+        logger.debug(f'unload bpf {self.bpf}.o')
+        bpf_path = os.path.join(DEST_BPF, self.bpf_prog_name())
+        if os.path.exists(bpf_path):
+            os.unlink(bpf_path)
+
+    def create_device(self):
+        pass
+
+    def remove_device(self):
+        pass
+
+    def load(self):
+        self.load_bpf()
+        self.load_regmaps()
+        self.create_device()
+
+    def unload(self):
+        self.remove_device()
+        self.unload_bpf()
+
+    def bpf_map_name(self):
+        bpf_name = re.sub("-", "_", self.bpf)
+        return f'regs_{bpf_name}'[:15]
+
+    def to_bpf_bytes(self, val, len):
+        return list("%d" % n for n in list(val.to_bytes(len, 'little')))
+
+    def load_regmaps(self):
+        for reg, value in self.regmaps.items():
+            if isinstance(value, list):
+                self.write_regs(reg, value)
+            else:
+                self.write_reg(reg, value)
+
+    def read_reg(self, addr):
+        if self.bpf is None:
+            return
+        cmds = [self.bpftool, 'map', 'lookup']
+        cmds += ['name', self.bpf_map_name()]
+        cmds += ['key']
+        cmds += self.to_bpf_bytes(addr, 4)
+        logger.debug(' '.join(cmds))
+        mapval = subprocess.check_output(cmds)
+        return json.loads(mapval).get("value", 0)
+
+    def read_regs(self, addr, len):
+        data = []
+        for i in range(len):
+            data += [self.read_reg(addr + (i << self.regshift))]
+        return data
+
+    def write_reg(self, addr, val):
+        if self.bpf is None:
+            return
+        cmds = [self.bpftool, 'map', 'update']
+        cmds += ['name', self.bpf_map_name()]
+        cmds += ['key']
+        cmds += self.to_bpf_bytes(addr, 4)
+        cmds += ['value']
+        cmds += self.to_bpf_bytes(val, self.get_valbytes())
+        logger.debug(' '.join(cmds))
+        subprocess.check_output(cmds)
+
+    def write_regs(self, addr, data):
+        for i in range(len(data)):
+            self.write_reg(addr + (i << self.regshift), data[i])
diff --git a/tools/testing/kddv/kddv/core/model.py b/tools/testing/kddv/kddv/core/model.py
new file mode 100755
index 000000000000..9da4716df7dc
--- /dev/null
+++ b/tools/testing/kddv/kddv/core/model.py
@@ -0,0 +1,87 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+from .driver import Driver
+from .mockup import Mockup
+
+class DriverModel(object):
+    bus = None
+    name = None
+    addr = 0x00
+
+    def __init__(self):
+        self.driver = Driver(self)
+        self.mockup = Mockup.create(self.bus, self)
+
+    @property
+    def driver_name(self):
+        return self.name
+
+    @property
+    def module_name(self):
+        return self.name
+
+    @property
+    def dependencies(self):
+        """List of module dependencies by running tests."""
+        return None
+
+    @property
+    def domain_nr(self):
+        return 0
+
+    @property
+    def bus_id(self):
+        return 0
+
+    @property
+    def parent_bus(self):
+        return None
+
+    @property
+    def bpf(self):
+        return None
+
+    @property
+    def address(self):
+        return self.addr
+
+    @property
+    def device_id(self):
+        return self.name
+
+    @property
+    def regshift(self):
+        return 0
+
+    @property
+    def regbytes(self):
+        return 1
+
+    @property
+    def valbytes(self):
+        return 1
+
+    @property
+    def regmaps(self):
+        return {}
+
+    def device(self):
+        return self.driver.device(self.address, self.device_id)
+
+    def read_reg(self, addr):
+        return self.mockup.read_reg(addr)
+
+    def read_regs(self, addr, len):
+        return self.mockup.read_regs(addr, len)
+
+    def write_reg(self, addr, val):
+        self.mockup.write_reg(addr, val)
+
+    def write_regs(self, addr, data):
+        self.mockup.write_regs(addr, data)
diff --git a/tools/testing/kddv/kddv/tests/__init__.py b/tools/testing/kddv/kddv/tests/__init__.py
new file mode 100755
index 000000000000..e69de29bb2d1