@@ -11,9 +11,26 @@ import unittest
from .model import DriverModel
from .environ import environ
+from .failnth import FaultIterator
logger = logging.getLogger(__name__)
+class _AssertRaisesFaultContext(unittest.case._AssertRaisesContext):
+ def __enter__(self):
+ environ.enter_fault_inject()
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ if not environ.exit_fault_inject():
+ return False
+ if exc_type is None:
+ return True
+ if issubclass(exc_type, self.expected):
+ return True
+ if issubclass(exc_type, AssertionError):
+ return True
+ return super().__exit__(exc_type, exc_value, tb)
+
class DriverTest(unittest.TestCase, DriverModel):
def __init__(self, methodName=None):
super().__init__(methodName)
@@ -35,7 +52,10 @@ class DriverTest(unittest.TestCase, DriverModel):
super().tearDown()
def _callTestMethod(self, method):
- method()
+ fault = FaultIterator()
+ for nth in iter(fault):
+ logger.debug(f"fault inject: nth={nth}")
+ method()
self.assertFault()
def assertRegEqual(self, reg, data, msg=None):
@@ -54,3 +74,10 @@ class DriverTest(unittest.TestCase, DriverModel):
msg = environ.check_failure()
if msg:
raise self.failureException(msg)
+
+ def assertRaisesFault(self, *args, **kwargs):
+ context = _AssertRaisesFaultContext(OSError, self)
+ try:
+ return context.handle('assertRaises', args, kwargs)
+ finally:
+ context = None
@@ -12,6 +12,7 @@ import subprocess
from pathlib import Path
from .device import Device
+from .environ import environ
logger = logging.getLogger(__name__)
@@ -54,9 +55,11 @@ class Driver(object):
subprocess.check_output(
["/sbin/modprobe", self.module], stderr=subprocess.STDOUT
)
+ environ.notify_insmod(self.module)
def remove_mdule(self):
logger.debug(f'rmmod {self.module}')
+ environ.notify_rmmod()
subprocess.check_output(["/sbin/rmmod", self.module])
def setup(self):
@@ -9,6 +9,7 @@
import logging
from .dmesg import KernelMessage
+from .faulter import FaultInject
from .memleak import Kmemleak
logger = logging.getLogger(__name__)
@@ -17,12 +18,15 @@ class Environ(object):
def __init__(self):
self.kmsg = KernelMessage()
self.leak = Kmemleak()
+ self.fault = FaultInject()
def setup(self):
self.kmsg.setup()
self.leak.setup()
+ self.fault.setup()
def teardown(self):
+ self.fault.teardown()
self.leak.teardown()
self.kmsg.teardown()
@@ -36,4 +40,26 @@ class Environ(object):
return msg
return self.leak.check_failure()
+ def enable_fault_inject(self, feature):
+ """Enable fault injection feature"""
+ self.fault.enable_feature(feature)
+
+ def fault_running(self):
+ """Fault injection has been enabled"""
+ return self.fault.running
+
+ def enter_fault_inject(self):
+ """Enter fault injection"""
+ self.fault.start_features()
+
+ def exit_fault_inject(self):
+ """Exit fault injection"""
+ return self.fault.stop_features()
+
+ def notify_insmod(self, name):
+ self.fault.filter_module(name)
+
+ def notify_rmmod(self):
+ self.fault.filter_module(None)
+
environ = Environ()
new file mode 100755
@@ -0,0 +1,57 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import os
+import re
+import logging
+
+from pathlib import Path
+from .environ import environ
+
+logger = logging.getLogger(__name__)
+
+class FaultIterator(object):
+ def __init__(self, max_loop = 0):
+ self._max_loop = max_loop
+ self._cur_fail = 0
+ self._max_fail = 3
+ self.path = Path(f"/proc/self/fail-nth")
+
+ def __iter__(self):
+ self.nth = -1
+ return self
+
+ def __next__(self):
+ self.nth += 1
+ if not self.nth:
+ return self.nth
+ if not environ.fault_running():
+ logger.debug('fault inject not running')
+ raise StopIteration
+ if not os.path.exists(self.path):
+ logger.debug('fault inject not exists')
+ raise StopIteration
+ if self._max_loop and self._max_loop < self.nth:
+ raise StopIteration
+ if self.read_nth() > 0:
+ self.write_nth(0)
+ self._cur_fail += 1
+ if self._cur_fail >= self._max_fail:
+ logger.debug('end fault inject')
+ raise StopIteration
+ else:
+ self._cur_fail = 0
+ self.write_nth(self.nth)
+ return self.nth
+
+ def read_nth(self):
+ return int(self.path.read_text().rstrip())
+
+ def write_nth(self, val):
+ logger.debug(f"write {val} to fail-nth")
+ self.path.write_text(str(val))
new file mode 100755
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+from .faults import FailModule
+
+class FaultInject(object):
+ def __init__(self):
+ self.enabled = False
+ self.running = False
+ self.features = []
+ for subclass in FailModule.__subclasses__():
+ self.features.append(subclass())
+
+ def setup(self):
+ pass
+
+ def teardown(self):
+ self.running = False
+
+ def start_features(self):
+ if not self.enabled:
+ return
+ for feature in self.features:
+ feature.start()
+ self.running = True
+
+ def stop_features(self):
+ if not self.enabled:
+ return False
+ for feature in self.features:
+ feature.stop()
+ return True
+
+ def filter_module(self, module):
+ for feature in self.features:
+ feature.filter_module(module)
+
+ def enable_feature(self, name):
+ for feature in self.features:
+ if name in [feature.key, 'all']:
+ if feature.has_support:
+ feature.enabled = True
+ self.enabled = True
new file mode 100755
@@ -0,0 +1,13 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+__all__ = ['FailModule']
+
+from .fail import FailModule
+from .slab import FailSlab
+from .page import FailPage
new file mode 100755
@@ -0,0 +1,86 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import re
+import os
+import logging
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+class FailModule(object):
+ name = None
+ key = None
+
+ def __init__(self):
+ self.has_support = os.path.exists(f"/sys/kernel/debug/{self.name}")
+ self.ftext = Path(f"/sys/kernel/debug/{self.name}/require-start")
+ self.fdata = Path(f"/sys/kernel/debug/{self.name}/require-end")
+ self.fdepth = Path(f"/sys/kernel/debug/{self.name}/stacktrace-depth")
+ self.nowarn = Path(f"/sys/kernel/debug/{self.name}/verbose")
+ self.enabled = False
+ self.module = None
+
+ def feature_enabled(self):
+ if not self.has_support:
+ return False
+ return self.enabled
+
+ def filter_module(self, name):
+ if name is None:
+ self.module = None
+ else:
+ self.module = re.sub('-', '_', name)
+
+ def enable_verbose(self):
+ if not self.feature_enabled():
+ return
+ self.nowarn.write_text('1')
+
+ def disable_verbose(self):
+ if not self.feature_enabled():
+ return
+ self.nowarn.write_text('0')
+
+ def enable_module_filter(self):
+ if not self.feature_enabled():
+ return
+ if self.module is None:
+ return
+ logger.debug(f"enter module filter for fail {self.name}")
+ mtext = Path(f"/sys/module/{self.module}/sections/.text")
+ mdata = Path(f"/sys/module/{self.module}/sections/.data")
+ self.ftext.write_text(mtext.read_text().rstrip())
+ self.fdata.write_text(mdata.read_text().rstrip())
+ self.fdepth.write_text('32')
+
+ def disable_module_filter(self):
+ if not self.feature_enabled():
+ return
+ if self.module is None:
+ return
+ logger.debug(f"exit module filter for fail {self.name}")
+ self.ftext.write_text('0')
+ self.fdata.write_text('0')
+ self.fdepth.write_text('32')
+
+ def enable_feature(self):
+ pass
+
+ def disable_feature(self):
+ pass
+
+ def start(self):
+ self.enable_module_filter()
+ self.enable_verbose()
+ self.enable_feature()
+
+ def stop(self):
+ self.disable_feature()
+ self.disable_verbose()
+ self.disable_module_filter()
new file mode 100755
@@ -0,0 +1,40 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import logging
+from pathlib import Path
+
+from .fail import FailModule
+
+FAILPAGE_IGNORE_HMEM = '/sys/kernel/debug/fail_page_alloc/ignore-gfp-highmem'
+FAILPAGE_IGNORE_WAIT = '/sys/kernel/debug/fail_page_alloc/ignore-gfp-wait'
+
+logger = logging.getLogger(__name__)
+
+class FailPage(FailModule):
+ name = 'fail_page_alloc'
+ key = 'page'
+
+ def __init__(self):
+ super().__init__()
+ self.ignore_hmem = Path(FAILPAGE_IGNORE_HMEM)
+ self.ignore_wait = Path(FAILPAGE_IGNORE_WAIT)
+
+ def enable_feature(self):
+ if not self.feature_enabled():
+ return
+ logger.debug("enter fail page injection")
+ self.ignore_hmem.write_text('N')
+ self.ignore_wait.write_text('N')
+
+ def disable_feature(self):
+ if not self.feature_enabled():
+ return
+ logger.debug("exit fail page injection")
+ self.ignore_hmem.write_text('Y')
+ self.ignore_wait.write_text('Y')
new file mode 100755
@@ -0,0 +1,36 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Kernel device driver verification
+#
+# Copyright (C) 2022-2023 Huawei Technologies Co., Ltd
+# Author: Wei Yongjun <weiyongjun1@huawei.com>
+
+import logging
+from pathlib import Path
+
+from .fail import FailModule
+
+FAILSLAB_IGNORE = '/sys/kernel/debug/failslab/ignore-gfp-wait'
+
+logger = logging.getLogger(__name__)
+
+class FailSlab(FailModule):
+ name = 'failslab'
+ key = 'slab'
+
+ def __init__(self):
+ super().__init__()
+ self.ignore = Path(FAILSLAB_IGNORE)
+
+ def enable_feature(self):
+ if not self.feature_enabled():
+ return
+ logger.debug("enter fail slab injection")
+ self.ignore.write_text('N')
+
+ def disable_feature(self):
+ if not self.feature_enabled():
+ return
+ logger.debug("exit fail slab injection")
+ self.ignore.write_text('Y')