@@ -398,6 +398,9 @@ cdef extern from '<infiniband/verbs.h>':
IBV_XRCD_INIT_ATTR_OFLAGS
IBV_XRCD_INIT_ATTR_RESERVED
+ cpdef enum:
+ IBV_WC_STANDARD_FLAGS
+
cdef unsigned long long IBV_DEVICE_RAW_SCATTER_FCS
cdef unsigned long long IBV_DEVICE_PCI_WRITE_END_PADDING
@@ -7,6 +7,7 @@ rdma_python_test(tests
test_addr.py
base.py
test_cq.py
+ test_cqex.py
test_device.py
test_mr.py
test_pd.py
new file mode 100644
@@ -0,0 +1,75 @@
+from pyverbs.cq import CqInitAttrEx, CQEX
+import pyverbs.enums as e
+from pyverbs.mr import MR
+
+from tests.base import RCResources, UDResources, XRCResources, RDMATestCase
+import tests.utils as u
+
+
+def create_ex_cq(res):
+ """
+ Create an Extended CQ using res's context and assign it to res's cq member.
+ IBV_WC_STANDARD_FLAGS is used for WC flags to avoid support differences
+ between devices.
+ :param res: An instance of TrafficResources
+ """
+ wc_flags = e.IBV_WC_STANDARD_FLAGS
+ cia = CqInitAttrEx(cqe=2000, wc_flags=wc_flags)
+ res.cq = CQEX(res.ctx, cia)
+
+
+class CqExUD(UDResources):
+ def create_cq(self):
+ create_ex_cq(self)
+
+ def create_mr(self):
+ self.mr = MR(self.pd, self.msg_size + self.GRH_SIZE,
+ e.IBV_ACCESS_LOCAL_WRITE)
+
+
+class CqExRC(RCResources):
+ def create_cq(self):
+ create_ex_cq(self)
+
+
+class CqExXRC(XRCResources):
+ def create_cq(self):
+ create_ex_cq(self)
+
+
+class CqExTestCase(RDMATestCase):
+ """
+ Run traffic over the existing UD, RC and XRC infrastructure, but use
+ ibv_cq_ex instead of legacy ibv_cq
+ """
+ def setUp(self):
+ super().setUp()
+ self.iters = 100
+ self.qp_dict = {'ud': CqExUD, 'rc': CqExRC, 'xrc': CqExXRC}
+
+ def create_players(self, qp_type):
+ client = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+ self.gid_index)
+ server = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+ self.gid_index)
+ if qp_type == 'xrc':
+ client.pre_run(server.psns, server.qps_num)
+ server.pre_run(client.psns, client.qps_num)
+ else:
+ client.pre_run(server.psn, server.qpn)
+ server.pre_run(client.psn, client.qpn)
+ return client, server
+
+ def test_ud_traffic_cq_ex(self):
+ client, server = self.create_players('ud')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=True)
+
+ def test_rc_traffic_cq_ex(self):
+ client, server = self.create_players('rc')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=True)
+
+ def test_xrc_traffic_cq_ex(self):
+ client, server = self.create_players('xrc')
+ u.xrc_traffic(client, server, is_cq_ex=True)
@@ -13,6 +13,7 @@ from pyverbs.addr import AHAttr, AH, GlobalRoute
from pyverbs.wr import SGE, SendWR, RecvWR
from pyverbs.qp import QPCap, QPInitAttrEx
from tests.base import XRCResources
+from pyverbs.cq import PollCqAttr
import pyverbs.device as d
import pyverbs.enums as e
@@ -342,6 +343,37 @@ def poll_cq(cq, count=1):
return wcs
+def poll_cq_ex(cqex, count=1):
+ """
+ Poll <count> completions from the extended CQ.
+ :param cq: CQEX to poll from
+ :param count: How many completions to poll
+ :return: None
+ """
+ poll_attr = PollCqAttr()
+ ret = cqex.start_poll(poll_attr)
+ while ret == 2: # ENOENT
+ ret = cqex.start_poll(poll_attr)
+ if ret != 0:
+ raise PyverbsRDMAErrno('Failed to poll CQ')
+ count -= 1
+ if cqex.status != e.IBV_WC_SUCCESS:
+ raise PyverbsRDMAErrno('Completion status is {s}'.
+ format(s=cqex.status))
+ # Now poll the rest of the packets
+ while count > 0:
+ ret = cqex.poll_next()
+ while ret == 2:
+ ret = cqex.poll_next()
+ if ret != 0:
+ raise PyverbsRDMAErrno('Failed to poll CQ')
+ if cqex.status != e.IBV_WC_SUCCESS:
+ raise PyverbsRDMAErrno('Completion status is {s}'.
+ format(s=cqex.status))
+ count -= 1
+ cqex.end_poll()
+
+
def validate(received_str, is_server, msg_size):
"""
Validates the received buffer against the expected result.
@@ -366,7 +398,7 @@ def validate(received_str, is_server, msg_size):
format(exp=expected_str, rcv=received_str))
-def traffic(client, server, iters, gid_idx, port):
+def traffic(client, server, iters, gid_idx, port, is_cq_ex=False):
"""
Runs basic traffic between two sides
:param client: client side, clients base class is BaseTraffic
@@ -374,8 +406,10 @@ def traffic(client, server, iters, gid_idx, port):
:param iters: number of traffic iterations
:param gid_idx: local gid index
:param port: IB port
+ :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
:return:
"""
+ poll = poll_cq_ex if is_cq_ex else poll_cq
s_recv_wr = get_recv_wr(server)
c_recv_wr = get_recv_wr(client)
post_recv(client.qp, c_recv_wr, client.num_msgs)
@@ -383,21 +417,21 @@ def traffic(client, server, iters, gid_idx, port):
for _ in range(iters):
c_send_wr = get_send_wr(client, False)
post_send(client, c_send_wr, gid_idx, port)
- poll_cq(client.cq)
- poll_cq(server.cq)
+ poll(client.cq)
+ poll(server.cq)
post_recv(client.qp, c_recv_wr)
msg_received = server.mr.read(server.msg_size, 0)
validate(msg_received, True, server.msg_size)
s_send_wr = get_send_wr(server, True)
post_send(server, s_send_wr, gid_idx, port)
- poll_cq(server.cq)
- poll_cq(client.cq)
+ poll(server.cq)
+ poll(client.cq)
post_recv(server.qp, s_recv_wr)
msg_received = client.mr.read(client.msg_size, 0)
validate(msg_received, False, client.msg_size)
-def xrc_traffic(client, server):
+def xrc_traffic(client, server, is_cq_ex=False):
"""
Runs basic xrc traffic, this function assumes that number of QPs, which
server and client have are equal, server.send_qp[i] is connected to
@@ -408,8 +442,10 @@ def xrc_traffic(client, server):
of XRCResources class
:param server: Aggregation object of the passive side, should be an instance
of XRCResources class
+ :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
:return: None
"""
+ poll = poll_cq_ex if is_cq_ex else poll_cq
client_srqn = client.srq.get_srq_num()
server_srqn = server.srq.get_srq_num()
s_recv_wr = get_recv_wr(server)
@@ -421,15 +457,15 @@ def xrc_traffic(client, server):
c_send_wr = get_send_wr(client, False)
c_send_wr.set_qp_type_xrc(server_srqn)
client.sqp_lst[i].post_send(c_send_wr)
- poll_cq(client.cq)
- poll_cq(server.cq)
+ poll(client.cq)
+ poll(server.cq)
msg_received = server.mr.read(server.msg_size, 0)
validate(msg_received, True, server.msg_size)
s_send_wr = get_send_wr(server, True)
s_send_wr.set_qp_type_xrc(client_srqn)
server.sqp_lst[i].post_send(s_send_wr)
- poll_cq(server.cq)
- poll_cq(client.cq)
+ poll(server.cq)
+ poll(client.cq)
msg_received = client.mr.read(client.msg_size, 0)
validate(msg_received, False, client.msg_size)
Add UD/RC/XRC traffic tests which use an extended CQ instead of the legacy one. Signed-off-by: Noa Osherovich <noaos@mellanox.com> --- pyverbs/libibverbs_enums.pxd | 3 ++ tests/CMakeLists.txt | 1 + tests/test_cqex.py | 75 ++++++++++++++++++++++++++++++++++++ tests/utils.py | 56 ++++++++++++++++++++++----- 4 files changed, 125 insertions(+), 10 deletions(-) create mode 100644 tests/test_cqex.py