@@ -1,5 +1,8 @@
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2018, Mellanox Technologies. All rights reserved. See COPYING file
+"""
+Test module for pyverbs' device module.
+"""
import unittest
import resource
import random
@@ -11,23 +14,26 @@ import pyverbs.device as d
PAGE_SIZE = resource.getpagesize()
-class device_test(unittest.TestCase):
+class DeviceTest(unittest.TestCase):
"""
Test various functionalities of the Device class.
"""
- def test_dev_list(self):
+
+ @staticmethod
+ def test_dev_list():
"""
Verify that it's possible to get IB devices list.
"""
- lst = d.get_device_list()
+ d.get_device_list()
- def test_open_dev(self):
+ @staticmethod
+ def test_open_dev():
"""
Test ibv_open_device()
"""
lst = d.get_device_list()
for dev in lst:
- ctx = d.Context(name=dev.name.decode())
+ d.Context(name=dev.name.decode())
def test_query_device(self):
"""
@@ -39,7 +45,8 @@ class device_test(unittest.TestCase):
attr = ctx.query_device()
self.verify_device_attr(attr)
- def test_query_gid(self):
+ @staticmethod
+ def test_query_gid():
"""
Test ibv_query_gid()
"""
@@ -50,6 +57,12 @@ class device_test(unittest.TestCase):
@staticmethod
def verify_device_attr(attr):
+ """
+ Helper method that verifies correctness of some members of DeviceAttr
+ object.
+ :param attr: A DeviceAttr object
+ :return: None
+ """
assert attr.node_guid != 0
assert attr.sys_image_guid != 0
assert attr.max_mr_size > PAGE_SIZE
@@ -78,6 +91,12 @@ class device_test(unittest.TestCase):
@staticmethod
def verify_port_attr(attr):
+ """
+ Helper method that verifies correctness of some members of PortAttr
+ object.
+ :param attr: A PortAttr object
+ :return: None
+ """
assert 'Invalid' not in d.phys_state_to_str(attr.state)
assert 'Invalid' not in d.translate_mtu(attr.max_mtu)
assert 'Invalid' not in d.translate_mtu(attr.active_mtu)
@@ -98,7 +117,8 @@ class device_test(unittest.TestCase):
port_attr = ctx.query_port(p + 1)
self.verify_port_attr(port_attr)
- def test_query_port_bad_flow(self):
+ @staticmethod
+ def test_query_port_bad_flow():
""" Verify that querying non-existing ports fails as expected """
lst = d.get_device_list()
for dev in lst:
@@ -111,14 +131,18 @@ class device_test(unittest.TestCase):
assert 'Failed to query port' in e.args[0]
assert 'Invalid argument' in e.args[0]
else:
- raise PyverbsRDMAError('Successfully queried non-existing port {p}'.\
- format(p=port))
+ raise PyverbsRDMAError(
+ 'Successfully queried non-existing port {p}'. \
+ format(p=port))
+
-class dm_test(unittest.TestCase):
+class DMTest(unittest.TestCase):
"""
Test various functionalities of the DM class.
"""
- def test_create_dm(self):
+
+ @staticmethod
+ def test_create_dm():
"""
test ibv_alloc_dm()
"""
@@ -134,7 +158,8 @@ class dm_test(unittest.TestCase):
with d.DM(ctx, dm_attrs):
pass
- def test_destroy_dm(self):
+ @staticmethod
+ def test_destroy_dm():
"""
test ibv_free_dm()
"""
@@ -150,7 +175,8 @@ class dm_test(unittest.TestCase):
dm = d.DM(ctx, dm_attrs)
dm.close()
- def test_create_dm_bad_flow(self):
+ @staticmethod
+ def test_create_dm_bad_flow():
"""
test ibv_alloc_dm() with an illegal size and comp mask
"""
@@ -163,22 +189,27 @@ class dm_test(unittest.TestCase):
dm_len = attr.max_dm_size + 1
dm_attrs = u.get_dm_attrs(dm_len)
try:
- dm = d.DM(ctx, dm_attrs)
+ d.DM(ctx, dm_attrs)
except PyverbsRDMAError as e:
- assert 'Failed to allocate device memory of size' in e.args[0]
+ assert 'Failed to allocate device memory of size' in \
+ e.args[0]
assert 'Max available size' in e.args[0]
else:
- raise PyverbsError('Created a DM with size larger than max reported')
+ raise PyverbsError(
+ 'Created a DM with size larger than max reported')
dm_attrs.comp_mask = random.randint(1, 100)
try:
- dm = d.DM(ctx, dm_attrs)
+ d.DM(ctx, dm_attrs)
except PyverbsRDMAError as e:
- assert 'Failed to allocate device memory of size' in e.args[0]
+ assert 'Failed to allocate device memory of size' in \
+ e.args[0]
else:
- raise PyverbsError('Created a DM with illegal comp mask {c}'.\
- format(c=dm_attrs.comp_mask))
+ raise PyverbsError(
+ 'Created a DM with illegal comp mask {c}'. \
+ format(c=dm_attrs.comp_mask))
- def test_destroy_dm_bad_flow(self):
+ @staticmethod
+ def test_destroy_dm_bad_flow():
"""
test calling ibv_free_dm() twice
"""
@@ -188,13 +219,15 @@ class dm_test(unittest.TestCase):
attr = ctx.query_device_ex()
if attr.max_dm_size == 0:
return
- dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size, u.DM_ALIGNMENT)
+ dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size,
+ u.DM_ALIGNMENT)
dm_attrs = u.get_dm_attrs(dm_len)
dm = d.DM(ctx, dm_attrs)
dm.close()
dm.close()
- def test_dm_write(self):
+ @staticmethod
+ def test_dm_write():
"""
Test writing to the device memory
"""
@@ -204,15 +237,18 @@ class dm_test(unittest.TestCase):
attr = ctx.query_device_ex()
if attr.max_dm_size == 0:
return
- dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size, u.DM_ALIGNMENT)
+ dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size,
+ u.DM_ALIGNMENT)
dm_attrs = u.get_dm_attrs(dm_len)
with d.DM(ctx, dm_attrs) as dm:
data_length = random.randrange(4, dm_len, u.DM_ALIGNMENT)
- data_offset = random.randrange(0, dm_len - data_length, u.DM_ALIGNMENT)
+ data_offset = random.randrange(0, dm_len - data_length,
+ u.DM_ALIGNMENT)
data = u.get_data(data_length)
dm.copy_to_dm(data_offset, data.encode(), data_length)
- def test_dm_write_bad_flow(self):
+ @staticmethod
+ def test_dm_write_bad_flow():
"""
Test writing to the device memory with bad offset and length
"""
@@ -222,21 +258,25 @@ class dm_test(unittest.TestCase):
attr = ctx.query_device_ex()
if attr.max_dm_size == 0:
return
- dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size, u.DM_ALIGNMENT)
+ dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size,
+ u.DM_ALIGNMENT)
dm_attrs = u.get_dm_attrs(dm_len)
with d.DM(ctx, dm_attrs) as dm:
data_length = random.randrange(4, dm_len, u.DM_ALIGNMENT)
- data_offset = random.randrange(0, dm_len - data_length, u.DM_ALIGNMENT)
- data_offset += 1 # offset needs to be a multiple of 4
+ data_offset = random.randrange(0, dm_len - data_length,
+ u.DM_ALIGNMENT)
+ data_offset += 1 # offset needs to be a multiple of 4
data = u.get_data(data_length)
try:
dm.copy_to_dm(data_offset, data.encode(), data_length)
except PyverbsRDMAError as e:
- assert 'Failed to copy to dm' in e.args[0]
+ assert 'Failed to copy to dm' in e.args[0]
else:
- raise PyverbsError('Wrote to device memory with a bad offset')
+ raise PyverbsError(
+ 'Wrote to device memory with a bad offset')
- def test_dm_read(self):
+ @staticmethod
+ def test_dm_read():
"""
Test reading from the device memory
"""
@@ -246,11 +286,13 @@ class dm_test(unittest.TestCase):
attr = ctx.query_device_ex()
if attr.max_dm_size == 0:
return
- dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size, u.DM_ALIGNMENT)
+ dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size,
+ u.DM_ALIGNMENT)
dm_attrs = u.get_dm_attrs(dm_len)
with d.DM(ctx, dm_attrs) as dm:
data_length = random.randrange(4, dm_len, u.DM_ALIGNMENT)
- data_offset = random.randrange(0, dm_len - data_length, u.DM_ALIGNMENT)
+ data_offset = random.randrange(0, dm_len - data_length,
+ u.DM_ALIGNMENT)
data = u.get_data(data_length)
dm.copy_to_dm(data_offset, data.encode(), data_length)
read_str = dm.copy_from_dm(data_offset, data_length)
@@ -1,5 +1,8 @@
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019, Mellanox Technologies. All rights reserved. See COPYING file
+"""
+Test module for pyverbs' mr module.
+"""
import unittest
import random
@@ -14,11 +17,12 @@ import pyverbs.enums as e
MAX_IO_LEN = 1048576
-class mr_test(unittest.TestCase):
+class MRTest(unittest.TestCase):
"""
Test various functionalities of the MR class.
"""
- def test_reg_mr(self):
+ @staticmethod
+ def test_reg_mr():
""" Test ibv_reg_mr() """
lst = d.get_device_list()
for dev in lst:
@@ -27,7 +31,8 @@ class mr_test(unittest.TestCase):
with MR(pd, u.get_mr_length(), u.get_access_flags()) as mr:
pass
- def test_dereg_mr(self):
+ @staticmethod
+ def test_dereg_mr():
""" Test ibv_dereg_mr() """
lst = d.get_device_list()
for dev in lst:
@@ -36,48 +41,52 @@ class mr_test(unittest.TestCase):
with MR(pd, u.get_mr_length(), u.get_access_flags()) as mr:
mr.close()
- def test_reg_mr_bad_flow(self):
+ @staticmethod
+ def test_reg_mr_bad_flow():
""" Verify that trying to register a MR with None PD fails """
try:
- mr = MR(None, random.randint(0, 10000), u.get_access_flags())
+ MR(None, random.randint(0, 10000), u.get_access_flags())
except TypeError as te:
assert 'expected pyverbs.pd.PD' in te.args[0]
assert 'got NoneType' in te.args[0]
else:
raise PyverbsRDMAErrno('Created a MR with None PD')
- def test_dereg_mr_twice(self):
+ @staticmethod
+ def test_dereg_mr_twice():
""" Verify that explicit call to MR's close() doesn't fails """
lst = d.get_device_list()
for dev in lst:
with d.Context(name=dev.name.decode()) as ctx:
with PD(ctx) as pd:
with MR(pd, u.get_mr_length(), u.get_access_flags()) as mr:
- # Pyverbs supports multiple destruction of objects, we are
- # not expecting an exception here.
+ # Pyverbs supports multiple destruction of objects,
+ # we are not expecting an exception here.
mr.close()
mr.close()
- def test_reg_mr_bad_flags(self):
+ @staticmethod
+ def test_reg_mr_bad_flags():
""" Verify that illegal flags combination fails as expected """
lst = d.get_device_list()
for dev in lst:
with d.Context(name=dev.name.decode()) as ctx:
with PD(ctx) as pd:
flags = random.sample([e.IBV_ACCESS_REMOTE_WRITE,
- e.IBV_ACCESS_REMOTE_ATOMIC],
+ e.IBV_ACCESS_REMOTE_ATOMIC],
random.randint(1, 2))
mr_flags = 0
for i in flags:
mr_flags += i.value
try:
- mr = MR(pd, u.get_mr_length(), mr_flags)
+ MR(pd, u.get_mr_length(), mr_flags)
except PyverbsRDMAError as err:
assert 'Failed to register a MR' in err.args[0]
else:
raise PyverbsRDMAError('Registered a MR with illegal falgs')
- def test_write(self):
+ @staticmethod
+ def test_write():
"""
Test writing to MR's buffer
"""
@@ -90,7 +99,8 @@ class mr_test(unittest.TestCase):
write_len = min(random.randint(1, MAX_IO_LEN), mr_len)
mr.write(u.get_data(write_len), write_len)
- def test_read(self):
+ @staticmethod
+ def test_read():
"""
Test reading from MR's buffer
"""
@@ -108,7 +118,8 @@ class mr_test(unittest.TestCase):
read_str = mr.read(read_len, offset).decode()
assert read_str in write_str
- def test_lkey(self):
+ @staticmethod
+ def test_lkey():
"""
Test reading lkey property
"""
@@ -118,9 +129,10 @@ class mr_test(unittest.TestCase):
with PD(ctx) as pd:
length = u.get_mr_length()
with MR(pd, length, u.get_access_flags()) as mr:
- lkey = mr.lkey
+ mr.lkey
- def test_rkey(self):
+ @staticmethod
+ def test_rkey():
"""
Test reading rkey property
"""
@@ -130,9 +142,10 @@ class mr_test(unittest.TestCase):
with PD(ctx) as pd:
length = u.get_mr_length()
with MR(pd, length, u.get_access_flags()) as mr:
- rkey = mr.rkey
+ mr.rkey
- def test_buffer(self):
+ @staticmethod
+ def test_buffer():
"""
Test reading buf property
"""
@@ -142,52 +155,58 @@ class mr_test(unittest.TestCase):
with PD(ctx) as pd:
length = u.get_mr_length()
with MR(pd, length, u.get_access_flags()) as mr:
- buf = mr.buf
+ mr.buf
-class mw_test(unittest.TestCase):
+class MWTest(unittest.TestCase):
"""
Test various functionalities of the MW class.
"""
- def test_reg_mw(self):
+ @staticmethod
+ def test_reg_mw():
""" Test ibv_alloc_mw() """
lst = d.get_device_list()
for dev in lst:
with d.Context(name=dev.name.decode()) as ctx:
with PD(ctx) as pd:
- with MW(pd, random.choice([e.IBV_MW_TYPE_1, e.IBV_MW_TYPE_2])) as mw:
+ with MW(pd, random.choice([e.IBV_MW_TYPE_1,
+ e.IBV_MW_TYPE_2])):
pass
- def test_dereg_mw(self):
+ @staticmethod
+ def test_dereg_mw():
""" Test ibv_dealloc_mw() """
lst = d.get_device_list()
for dev in lst:
with d.Context(name=dev.name.decode()) as ctx:
with PD(ctx) as pd:
- with MW(pd, random.choice([e.IBV_MW_TYPE_1, e.IBV_MW_TYPE_2])) as mw:
+ with MW(pd, random.choice([e.IBV_MW_TYPE_1,
+ e.IBV_MW_TYPE_2])) as mw:
mw.close()
- def test_reg_mw_wrong_type(self):
+ @staticmethod
+ def test_reg_mw_wrong_type():
""" Test ibv_alloc_mw() """
lst = d.get_device_list()
for dev in lst:
with d.Context(name=dev.name.decode()) as ctx:
with PD(ctx) as pd:
try:
- mw_type =random.randint(3, 100)
- mw = MW(pd, mw_type)
- except PyverbsRDMAError as e:
+ mw_type = random.randint(3, 100)
+ MW(pd, mw_type)
+ except PyverbsRDMAError:
pass
else:
raise PyverbsError('Created a MW with type {t}'.\
format(t=mw_type))
-class dm_mr_test(unittest.TestCase):
+class DMMRTest(unittest.TestCase):
"""
Test various functionalities of the DMMR class.
"""
- def test_create_dm_mr(self):
+ @staticmethod
+ def test_create_dm_mr():
"""
Test ibv_reg_dm_mr
"""
@@ -204,10 +223,11 @@ class dm_mr_test(unittest.TestCase):
with d.DM(ctx, dm_attrs) as dm:
dm_mr_len = random.randint(1, dm_len)
dm_mr_offset = random.randint(0, (dm_len - dm_mr_len))
- dm_mr = DMMR(pd, dm_mr_len, e.IBV_ACCESS_ZERO_BASED, dm=dm,
- offset=dm_mr_offset)
+ DMMR(pd, dm_mr_len, e.IBV_ACCESS_ZERO_BASED, dm=dm,
+ offset=dm_mr_offset)
- def test_destroy_dm_mr(self):
+ @staticmethod
+ def test_destroy_dm_mr():
"""
Test freeing of dm_mr
"""
@@ -219,11 +239,11 @@ class dm_mr_test(unittest.TestCase):
return
with PD(ctx) as pd:
dm_len = random.randrange(u.MIN_DM_SIZE, attr.max_dm_size,
- u.DM_ALIGNMENT)
+ u.DM_ALIGNMENT)
dm_attrs = u.get_dm_attrs(dm_len)
with d.DM(ctx, dm_attrs) as dm:
dm_mr_len = random.randint(1, dm_len)
dm_mr_offset = random.randint(0, (dm_len - dm_mr_len))
- dm_mr = DMMR(pd, dm_mr_len, e.IBV_ACCESS_ZERO_BASED, dm=dm,
- offset=dm_mr_offset)
+ dm_mr = DMMR(pd, dm_mr_len, e.IBV_ACCESS_ZERO_BASED,
+ dm=dm, offset=dm_mr_offset)
dm_mr.close()
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019, Mellanox Technologies. All rights reserved. See COPYING file
-
+"""
+Test module for pyverbs' pd module.
+"""
import unittest
import random
@@ -9,11 +11,12 @@ import pyverbs.device as d
from pyverbs.pd import PD
-class pd_test(unittest.TestCase):
+class PDTest(unittest.TestCase):
"""
Test various functionalities of the PD class.
"""
- def test_alloc_pd(self):
+ @staticmethod
+ def test_alloc_pd():
"""
Test ibv_alloc_pd()
"""
@@ -23,7 +26,8 @@ class pd_test(unittest.TestCase):
with PD(ctx):
pass
- def test_dealloc_pd(self):
+ @staticmethod
+ def test_dealloc_pd():
"""
Test ibv_dealloc_pd()
"""
@@ -33,7 +37,8 @@ class pd_test(unittest.TestCase):
with PD(ctx) as pd:
pd.close()
- def test_multiple_pd_creation(self):
+ @staticmethod
+ def test_multiple_pd_creation():
"""
Test multiple creations and destructions of a PD object
"""
@@ -44,19 +49,21 @@ class pd_test(unittest.TestCase):
with PD(ctx) as pd:
pd.close()
- def test_create_pd_none_ctx(self):
+ @staticmethod
+ def test_create_pd_none_ctx():
"""
Verify that PD can't be created with a None context
"""
try:
- pd = PD(None)
+ PD(None)
except TypeError as te:
assert 'expected pyverbs.device.Context' in te.args[0]
assert 'got NoneType' in te.args[0]
else:
raise PyverbsRDMAErrno('Created a PD with None context')
- def test_destroy_pd_twice(self):
+ @staticmethod
+ def test_destroy_pd_twice():
"""
Test bad flow cases in destruction of a PD object
"""
@@ -1,11 +1,15 @@
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019, Mellanox Technologies. All rights reserved. See COPYING file
+"""
+Provide some useful helper function for pyverbs' tests.
+"""
from string import ascii_lowercase as al
import random
import pyverbs.device as d
import pyverbs.enums as e
+
MAX_MR_SIZE = 4194304
# Some HWs limit DM address and length alignment to 4 for read and write
# operations. Use a minimal length and alignment that respect that.
@@ -16,15 +20,26 @@ DM_ALIGNMENT = 4
MIN_DM_LOG_ALIGN = 0
MAX_DM_LOG_ALIGN = 6
+
def get_mr_length():
- # Allocating large buffers typically fails
+ """
+ Provide a random value for MR length. We avoid large buffers as these
+ allocations typically fails.
+ :return: A random MR length
+ """
return random.randint(0, MAX_MR_SIZE)
def get_access_flags():
+ """
+ Provide random legal access flags for an MR.
+ Since remote write and remote atomic require local write permission, if
+ one of them is randomly selected without local write, local write will be
+ added as well.
+ :return: A random legal value for MR flags
+ """
vals = list(e.ibv_access_flags)
selected = random.sample(vals, random.randint(1, 7))
- # Remote write / remote atomic are not allowed without local write
if e.IBV_ACCESS_REMOTE_WRITE in selected or e.IBV_ACCESS_REMOTE_ATOMIC in selected:
if not e.IBV_ACCESS_LOCAL_WRITE in selected:
selected.append(e.IBV_ACCESS_LOCAL_WRITE)
@@ -35,10 +50,21 @@ def get_access_flags():
def get_data(length):
+ """
+ Randomizes data of a given length.
+ :param length: Length of the data
+ :return: A random data of the given length
+ """
return ''.join(random.choice(al) for i in range(length))
def get_dm_attrs(dm_len):
+ """
+ Initializes an AllocDmAttr member with the given length and random
+ alignment. It currently sets comp_mask = 0 since other comp_mask values
+ are not supported.
+ :param dm_len:
+ :return: An initialized AllocDmAttr object
+ """
align = random.randint(MIN_DM_LOG_ALIGN, MAX_DM_LOG_ALIGN)
- # Comp mask != 0 is not supported
return d.AllocDmAttr(dm_len, align, 0)