old mode 100755
new mode 100644
@@ -12,6 +12,7 @@ rdma_python_test(tests
test_mr.py
test_pd.py
test_qp.py
+ test_qpex.py
test_odp.py
test_parent_domain.py
test_rdmacm.py
@@ -1,5 +1,5 @@
+from tests.utils import requires_odp, traffic, xrc_traffic, create_custom_mr
from tests.base import RCResources, UDResources, XRCResources
-from tests.utils import requires_odp, traffic, xrc_traffic
from tests.base import RDMATestCase
from pyverbs.mr import MR
import pyverbs.enums as e
@@ -8,21 +8,20 @@ import pyverbs.enums as e
class OdpUD(UDResources):
@requires_odp('ud')
def create_mr(self):
- self.mr = MR(self.pd, self.msg_size + self.GRH_SIZE,
- e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+ self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND,
+ self.msg_size + self.GRH_SIZE)
class OdpRC(RCResources):
@requires_odp('rc')
def create_mr(self):
- self.mr = MR(self.pd, self.msg_size,
- e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+ self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND)
+
class OdpXRC(XRCResources):
@requires_odp('xrc')
def create_mr(self):
- self.mr = MR(self.pd, self.msg_size,
- e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+ self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND)
class OdpTestCase(RDMATestCase):
new file mode 100644
@@ -0,0 +1,295 @@
+import unittest
+import random
+
+from pyverbs.qp import QPCap, QPInitAttrEx, QPAttr, QPEx, QP
+from pyverbs.pyverbs_error import PyverbsRDMAError
+from pyverbs.mr import MW, MWBindInfo
+from pyverbs.base import inc_rkey
+import pyverbs.enums as e
+from pyverbs.mr import MR
+
+from tests.base import UDResources, RCResources, RDMATestCase, XRCResources
+import tests.utils as u
+
+
+def create_qp_ex(agr_obj, qp_type, send_flags):
+ if qp_type == e.IBV_QPT_XRC_SEND:
+ cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=0, max_recv_sge=0,
+ max_send_sge=1)
+ else:
+ cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=agr_obj.num_msgs,
+ max_recv_sge=1, max_send_sge=1)
+ qia = QPInitAttrEx(cap=cap, qp_type=qp_type, scq=agr_obj.cq,
+ rcq=agr_obj.cq, pd=agr_obj.pd, send_ops_flags=send_flags,
+ comp_mask=e.IBV_QP_INIT_ATTR_PD |
+ e.IBV_QP_INIT_ATTR_SEND_OPS_FLAGS)
+ qp_attr = QPAttr(port_num=agr_obj.ib_port)
+ if qp_type == e.IBV_QPT_UD:
+ qp_attr.qkey = agr_obj.UD_QKEY
+ qp_attr.pkey_index = agr_obj.UD_PKEY_INDEX
+ if qp_type == e.IBV_QPT_RC:
+ qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+ e.IBV_ACCESS_REMOTE_READ | \
+ e.IBV_ACCESS_REMOTE_ATOMIC
+ try:
+ # We don't have capability bits for this
+ qp = QPEx(agr_obj.ctx, qia, qp_attr)
+ except PyverbsRDMAError as exp:
+ if 'Operation not supported' in exp.args[0]:
+ raise unittest.SkipTest('Extended QP not supported on this device')
+ raise exp
+ return qp
+
+
+class QpExUDSend(UDResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND)
+
+
+class QpExRCSend(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND)
+
+
+class QpExXRCSend(XRCResources):
+ def create_qp(self):
+ qp_attr = QPAttr(port_num=self.ib_port)
+ qp_attr.pkey_index = 0
+ for _ in range(self.qp_count):
+ attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV,
+ comp_mask=e.IBV_QP_INIT_ATTR_XRCD,
+ xrcd=self.xrcd)
+ qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+ e.IBV_ACCESS_REMOTE_READ
+ recv_qp = QP(self.ctx, attr_ex, qp_attr)
+ self.rqp_lst.append(recv_qp)
+
+ send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND, e.IBV_QP_EX_WITH_SEND)
+ self.sqp_lst.append(send_qp)
+ self.qps_num.append((recv_qp.qp_num, send_qp.qp_num))
+ self.psns.append(random.getrandbits(24))
+
+
+class QpExUDSendImm(UDResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+
+class QpExRCSendImm(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+
+class QpExXRCSendImm(XRCResources):
+ def create_qp(self):
+ qp_attr = QPAttr(port_num=self.ib_port)
+ qp_attr.pkey_index = 0
+ for _ in range(self.qp_count):
+ attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV,
+ comp_mask=e.IBV_QP_INIT_ATTR_XRCD,
+ xrcd=self.xrcd)
+ qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+ e.IBV_ACCESS_REMOTE_READ
+ recv_qp = QP(self.ctx, attr_ex, qp_attr)
+ self.rqp_lst.append(recv_qp)
+
+ send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND,
+ e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+ self.sqp_lst.append(send_qp)
+ self.qps_num.append((recv_qp.qp_num, send_qp.qp_num))
+ self.psns.append(random.getrandbits(24))
+
+
+class QpExRCRDMAWrite(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_WRITE)
+
+ def create_mr(self):
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExRCRDMAWriteImm(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+ e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM)
+
+ def create_mr(self):
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExRCRDMARead(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_READ)
+
+ def create_mr(self):
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_READ)
+
+
+class QpExRCAtomicCmpSwp(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+ e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP)
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC)
+
+
+class QpExRCAtomicFetchAdd(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+ e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC)
+
+
+class QpExRCBindMw(RCResources):
+ def create_qp(self):
+ self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_BIND_MW)
+
+ def create_mr(self):
+ self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExTestCase(RDMATestCase):
+ """ Run traffic using the new post send API. """
+ def setUp(self):
+ super().setUp()
+ self.iters = 100
+ self.qp_dict = {'ud_send': QpExUDSend, 'rc_send': QpExRCSend,
+ 'xrc_send': QpExXRCSend, 'ud_send_imm': QpExUDSendImm,
+ 'rc_send_imm': QpExRCSendImm,
+ 'xrc_send_imm': QpExXRCSendImm,
+ 'rc_write': QpExRCRDMAWrite,
+ 'rc_write_imm': QpExRCRDMAWriteImm,
+ 'rc_read': QpExRCRDMARead,
+ 'rc_cmp_swp': QpExRCAtomicCmpSwp,
+ 'rc_fetch_add': QpExRCAtomicFetchAdd,
+ 'rc_bind_mw': QpExRCBindMw}
+
+ def create_players(self, qp_type):
+ client = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+ self.gid_index)
+ server = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+ self.gid_index)
+ if 'xrc' in qp_type:
+ client.pre_run(server.psns, server.qps_num)
+ server.pre_run(client.psns, client.qps_num)
+ else:
+ client.pre_run(server.psn, server.qpn)
+ server.pre_run(client.psn, client.qpn)
+ return client, server
+
+ def test_qp_ex_ud_send(self):
+ client, server = self.create_players('ud_send')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND)
+
+ def test_qp_ex_rc_send(self):
+ client, server = self.create_players('rc_send')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND)
+
+ def test_qp_ex_xrc_send(self):
+ client, server = self.create_players('xrc_send')
+ u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND)
+
+ def test_qp_ex_ud_send_imm(self):
+ client, server = self.create_players('ud_send_imm')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+ def test_qp_ex_rc_send_imm(self):
+ client, server = self.create_players('rc_send_imm')
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+ def test_qp_ex_xrc_send_imm(self):
+ client, server = self.create_players('xrc_send_imm')
+ u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+ def test_qp_ex_rc_rdma_write(self):
+ client, server = self.create_players('rc_write')
+ client.rkey = server.mr.rkey
+ server.rkey = client.mr.rkey
+ client.raddr = server.mr.buf
+ server.raddr = client.mr.buf
+ u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE)
+
+ def test_qp_ex_rc_rdma_write_imm(self):
+ client, server = self.create_players('rc_write_imm')
+ client.rkey = server.mr.rkey
+ server.rkey = client.mr.rkey
+ client.raddr = server.mr.buf
+ server.raddr = client.mr.buf
+ u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM)
+
+ def test_qp_ex_rc_rdma_read(self):
+ client, server = self.create_players('rc_read')
+ client.rkey = server.mr.rkey
+ server.rkey = client.mr.rkey
+ client.raddr = server.mr.buf
+ server.raddr = client.mr.buf
+ server.mr.write('s' * server.msg_size, server.msg_size)
+ u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_READ)
+
+ def test_qp_ex_rc_atomic_cmp_swp(self):
+ client, server = self.create_players('rc_cmp_swp')
+ client.msg_size = 8 # Atomic work on 64b operators
+ server.msg_size = 8
+ client.rkey = server.mr.rkey
+ server.rkey = client.mr.rkey
+ client.raddr = server.mr.buf
+ server.raddr = client.mr.buf
+ server.mr.write('s' * 8, 8)
+ u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP)
+
+ def test_qp_ex_rc_atomic_fetch_add(self):
+ client, server = self.create_players('rc_fetch_add')
+ client.msg_size = 8 # Atomic work on 64b operators
+ server.msg_size = 8
+ client.rkey = server.mr.rkey
+ server.rkey = client.mr.rkey
+ client.raddr = server.mr.buf
+ server.raddr = client.mr.buf
+ server.mr.write('s' * 8, 8)
+ u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+ is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+
+ def test_qp_ex_rc_bind_mw(self):
+ """
+ Verify bind memory window operation using the new post_send API.
+ Instead of checking through regular pingpong style traffic, we'll
+ do as follows:
+ - Register an MR with remote write access
+ - Bind a MW without remote write permission to the MR
+ - Verify that remote write fails
+ Since it's a unique flow, it's an integral part of that test rather
+ than a utility method.
+ """
+ client, server = self.create_players('rc_bind_mw')
+ client_sge = u.get_send_element(client, False)[1]
+ # Create a MW and bind it
+ server.qp.wr_start()
+ server.qp.wr_id = 0x123
+ server.qp.wr_flags = e.IBV_SEND_SIGNALED
+ bind_info = MWBindInfo(server.mr, server.mr.buf, server.mr.length,
+ e.IBV_ACCESS_LOCAL_WRITE)
+ mw = MW(server.pd, mw_type=e.IBV_MW_TYPE_2)
+ new_key = inc_rkey(server.mr.rkey)
+ server.qp.wr_bind_mw(mw, new_key, bind_info)
+ server.qp.wr_complete()
+ u.poll_cq(server.cq)
+ # Verify remote write fails
+ client.qp.wr_start()
+ client.qp.wr_id = 0x124
+ client.qp.wr_flags = e.IBV_SEND_SIGNALED
+ client.qp.wr_rdma_write(new_key, server.mr.buf)
+ client.qp.wr_set_sge(client_sge)
+ client.qp.wr_complete()
+ try:
+ u.poll_cq(client.cq)
+ except PyverbsRDMAError as exp:
+ if 'Completion status is Remote access error' not in exp.args[0]:
+ raise exp
+
@@ -7,6 +7,7 @@ from itertools import combinations as com
from string import ascii_lowercase as al
import unittest
import random
+import socket
from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
from pyverbs.addr import AHAttr, AH, GlobalRoute
@@ -16,6 +17,7 @@ from tests.base import XRCResources
from pyverbs.cq import PollCqAttr
import pyverbs.device as d
import pyverbs.enums as e
+from pyverbs.mr import MR
MAX_MR_SIZE = 4194304
# Some HWs limit DM address and length alignment to 4 for read and write
@@ -29,6 +31,7 @@ MAX_DM_LOG_ALIGN = 6
# Raw Packet QP supports TSO header, which creates a larger send WQE.
MAX_RAW_PACKET_SEND_WR = 2500
GRH_SIZE = 40
+IMM_DATA = 1234
def get_mr_length():
@@ -197,7 +200,7 @@ def random_qp_create_flags(qpt, attr_ex):
def random_qp_init_attr_ex(attr_ex, attr, qpt=None):
"""
- Create a random-valued QPInitAttrEX object with the given QP type.
+ Create a random-valued QPInitAttrEx object with the given QP type.
QP type affects QP capabilities, so allow users to set it and still get
valid attributes.
:param attr_ex: Extended device attributes for capability checks
@@ -251,24 +254,38 @@ def wc_status_to_str(status):
except KeyError:
return 'Unknown WC status ({s})'.format(s=status)
+
+def create_custom_mr(agr_obj, additional_access_flags=0, size=None):
+ """
+ Creates a memory region using the aggregation object's PD.
+ If size is None, the agr_obj's message size is used to set the MR's size.
+ The access flags are local write and the additional_access_flags.
+ :param agr_obj: The aggregation object that creates the MR
+ :param additional_access_flags: Addition access flags to set in the MR
+ :param size: MR's length. If None, agr_obj.msg_size is used.
+ """
+ mr_length = size if size else agr_obj.msg_size
+ return MR(agr_obj.pd, mr_length,
+ e.IBV_ACCESS_LOCAL_WRITE | additional_access_flags)
+
# Traffic helpers
-def get_send_wr(agr_obj, is_server):
+def get_send_element(agr_obj, is_server):
"""
- Creates a single SGE Send WR for agr_obj's QP type. The content of the
- message is either 's' for server side or 'c' for client side.
+ Creates a single SGE and a single Send WR for agr_obj's QP type. The content
+ of the message is either 's' for server side or 'c' for client side.
:param agr_obj: Aggregation object which contains all resources necessary
:param is_server: Indicates whether this is server or client side
- :return: send wr
+ :return: send wr and its SGE
"""
+ mr = agr_obj.mr
qp_type = agr_obj.sqp_lst[0].qp_type if isinstance(agr_obj, XRCResources) \
else agr_obj.qp.qp_type
- mr = agr_obj.mr
offset = GRH_SIZE if qp_type == e.IBV_QPT_UD else 0
- send_sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey)
msg = (agr_obj.msg_size + offset) * ('s' if is_server else 'c')
mr.write(msg, agr_obj.msg_size + offset)
- return SendWR(num_sge=1, sg=[send_sge])
+ sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey)
+ return SendWR(num_sge=1, sg=[sge]), sge
def get_recv_wr(agr_obj):
@@ -286,6 +303,64 @@ def get_recv_wr(agr_obj):
return RecvWR(sg=[recv_sge], num_sge=1)
+def get_global_ah(agr_obj, gid_index, port):
+ gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index),
+ sgid_index=gid_index)
+ ah_attr = AHAttr(port_num=port, is_global=1, gr=gr,
+ dlid=agr_obj.port_attr.lid)
+ return AH(agr_obj.pd, attr=ah_attr)
+
+
+def xrc_post_send(agr_obj, qp_num, send_object, gid_index, port, send_op=None):
+ agr_obj.qp = agr_obj.sqp_lst[qp_num]
+ if send_op:
+ post_send_ex(agr_obj, send_object, gid_index, port, send_op)
+ else:
+ post_send(agr_obj, send_object, gid_index, port)
+
+
+def post_send_ex(agr_obj, send_object, gid_index, port, send_op=None):
+ qp_type = agr_obj.qp.qp_type
+ agr_obj.qp.wr_start()
+ agr_obj.qp.wr_id = 0x123
+ agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED
+ if send_op == e.IBV_QP_EX_WITH_SEND:
+ agr_obj.qp.wr_send()
+ elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE:
+ agr_obj.qp.wr_rdma_write(agr_obj.rkey, agr_obj.raddr)
+ elif send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM:
+ agr_obj.qp.wr_send_imm(IMM_DATA)
+ elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM:
+ agr_obj.qp.wr_rdma_write_imm(agr_obj.rkey, agr_obj.raddr, IMM_DATA)
+ elif send_op == e.IBV_QP_EX_WITH_RDMA_READ:
+ agr_obj.qp.wr_rdma_read(agr_obj.rkey, agr_obj.raddr)
+ elif send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP:
+ # We're checking the returned value (remote's content), so cmp/swp
+ # values are of no importance.
+ agr_obj.qp.wr_atomic_cmp_swp(agr_obj.rkey, agr_obj.raddr, 42, 43)
+ elif send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD:
+ agr_obj.qp.wr_atomic_fetch_add(agr_obj.rkey, agr_obj.raddr, 1)
+ elif send_op == e.IBV_QP_EX_WITH_BIND_MW:
+ bind_info = MWBindInfo(agr_obj.mr, agr_obj.mr.buf, agr_obj.mr.rkey,
+ e.IBV_ACCESS_REMOTE_WRITE)
+ mw = MW(agr_obj.pd, mw_type=e.IBV_MW_TYPE_2)
+ # A new rkey is needed to be set into bind_info, modify rkey
+ agr_obj.qp.wr_bind_mw(mw, agr_obj.mr.rkey + 12, bind_info)
+ agr_obj.qp.wr_complete()
+ return
+ #agr_obj.qp.wr_start()
+ #agr_obj.qp.wr_id = 0x123
+ #agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED
+ #agr_obj.qp.wr_send()
+ if qp_type == e.IBV_QPT_UD:
+ ah = get_global_ah(agr_obj, gid_index, port)
+ agr_obj.qp.wr_set_ud_addr(ah, agr_obj.rqpn, agr_obj.UD_QKEY)
+ if qp_type == e.IBV_QPT_XRC_SEND:
+ agr_obj.qp.wr_set_xrc_srqn(agr_obj.remote_srqn)
+ agr_obj.qp.wr_set_sge(send_object)
+ agr_obj.qp.wr_complete()
+
+
def post_send(agr_obj, send_wr, gid_index, port):
"""
Post a single send WR to the QP. Post_send's second parameter (send bad wr)
@@ -299,11 +374,7 @@ def post_send(agr_obj, send_wr, gid_index, port):
"""
qp_type = agr_obj.qp.qp_type
if qp_type == e.IBV_QPT_UD:
- gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index),
- sgid_index=gid_index)
- ah_attr = AHAttr(port_num=port, is_global=1, gr=gr,
- dlid=agr_obj.port_attr.lid)
- ah = AH(agr_obj.pd, attr=ah_attr)
+ ah = get_global_ah(agr_obj, gid_index, port)
send_wr.set_wr_ud(ah, agr_obj.rqpn, agr_obj.UD_QKEY)
agr_obj.qp.post_send(send_wr, None)
@@ -321,7 +392,7 @@ def post_recv(qp, recv_wr, num_wqes=1):
qp.post_recv(recv_wr, None)
-def poll_cq(cq, count=1):
+def poll_cq(cq, count=1, data=None):
"""
Poll <count> completions from the CQ.
Note: This function calls the blocking poll() method of the CQ
@@ -329,6 +400,8 @@ def poll_cq(cq, count=1):
single CQ event when events are used.
:param cq: CQ to poll from
:param count: How many completions to poll
+ :param data: In case of a work request with immediate, the immediate data
+ to be compared after poll
:return: An array of work completions of length <count>, None
when events are used
"""
@@ -339,15 +412,21 @@ def poll_cq(cq, count=1):
if wc.status != e.IBV_WC_SUCCESS:
raise PyverbsRDMAError('Completion status is {s}'.
format(s=wc_status_to_str(wc.status)))
+ if data:
+ if wc.wc_flags & e.IBV_WC_WITH_IMM == 0:
+ raise PyverbsRDMAError('Completion without immediate')
+ assert socket.ntohl(wc.imm_data) == data
count -= nc
return wcs
-def poll_cq_ex(cqex, count=1):
+def poll_cq_ex(cqex, count=1, data=None):
"""
Poll <count> completions from the extended CQ.
:param cq: CQEX to poll from
:param count: How many completions to poll
+ :param data: In case of a work request with immediate, the immediate data
+ to be compared after poll
:return: None
"""
poll_attr = PollCqAttr()
@@ -360,6 +439,8 @@ def poll_cq_ex(cqex, count=1):
if cqex.status != e.IBV_WC_SUCCESS:
raise PyverbsRDMAErrno('Completion status is {s}'.
format(s=cqex.status))
+ if data:
+ assert data == socket.ntohl(cqex.read_imm_data())
# Now poll the rest of the packets
while count > 0:
ret = cqex.poll_next()
@@ -370,6 +451,8 @@ def poll_cq_ex(cqex, count=1):
if cqex.status != e.IBV_WC_SUCCESS:
raise PyverbsRDMAErrno('Completion status is {s}'.
format(s=cqex.status))
+ if data:
+ assert data == socket.ntohl(cqex.read_imm_data())
count -= 1
cqex.end_poll()
@@ -398,7 +481,13 @@ def validate(received_str, is_server, msg_size):
format(exp=expected_str, rcv=received_str))
-def traffic(client, server, iters, gid_idx, port, is_cq_ex=False):
+def send(agr_obj, send_wr, gid_index, port, send_op=None):
+ if send_op:
+ return post_send_ex(agr_obj, send_wr, gid_index, port, send_op)
+ return post_send(agr_obj, send_wr, gid_index, port)
+
+
+def traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None):
"""
Runs basic traffic between two sides
:param client: client side, clients base class is BaseTraffic
@@ -407,32 +496,83 @@ def traffic(client, server, iters, gid_idx, port, is_cq_ex=False):
:param gid_idx: local gid index
:param port: IB port
:param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+ :param send_op: If not None, new post send API is assumed.
:return:
"""
poll = poll_cq_ex if is_cq_ex else poll_cq
+ if send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM or \
+ send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM:
+ imm_data = IMM_DATA
+ else:
+ imm_data = None
+ # Using the new post send API, we need the SGE, not the SendWR
+ send_element_idx = 1 if send_op else 0
s_recv_wr = get_recv_wr(server)
c_recv_wr = get_recv_wr(client)
post_recv(client.qp, c_recv_wr, client.num_msgs)
post_recv(server.qp, s_recv_wr, server.num_msgs)
read_offset = GRH_SIZE if client.qp.qp_type == e.IBV_QPT_UD else 0
for _ in range(iters):
- c_send_wr = get_send_wr(client, False)
- post_send(client, c_send_wr, gid_idx, port)
+ c_send_wr = get_send_element(client, False)[send_element_idx]
+ send(client, c_send_wr, gid_idx, port, send_op)
poll(client.cq)
- poll(server.cq)
+ poll(server.cq, data=imm_data)
post_recv(server.qp, s_recv_wr)
msg_received = server.mr.read(server.msg_size, read_offset)
validate(msg_received, True, server.msg_size)
- s_send_wr = get_send_wr(server, True)
- post_send(server, s_send_wr, gid_idx, port)
+ s_send_wr = get_send_element(server, True)[send_element_idx]
+ send(server, s_send_wr, gid_idx, port, send_op)
poll(server.cq)
- poll(client.cq)
+ poll(client.cq, data=imm_data)
post_recv(client.qp, c_recv_wr)
msg_received = client.mr.read(client.msg_size, read_offset)
validate(msg_received, False, client.msg_size)
-def xrc_traffic(client, server, is_cq_ex=False):
+def rdma_traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None):
+ """
+ Runs basic RDMA traffic between two sides. No receive WQEs are posted. For
+ RDMA send with immediate, use traffic().
+ :param client: client side, clients base class is BaseTraffic
+ :param server: server side, servers base class is BaseTraffic
+ :param iters: number of traffic iterations
+ :param gid_idx: local gid index
+ :param port: IB port
+ :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+ :param send_op: If not None, new post send API is assumed.
+ :return:
+ """
+ # Using the new post send API, we need the SGE, not the SendWR
+ send_element_idx = 1 if send_op else 0
+ same_side_check = (send_op == e.IBV_QP_EX_WITH_RDMA_READ or
+ send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP or
+ send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+ for _ in range(iters):
+ c_send_wr = get_send_element(client, False)[send_element_idx]
+ send(client, c_send_wr, gid_idx, port, send_op)
+ poll_cq(client.cq)
+ if same_side_check:
+ msg_received = client.mr.read(client.msg_size, 0)
+ else:
+ msg_received = server.mr.read(server.msg_size, 0)
+ validate(msg_received, False if same_side_check else True,
+ server.msg_size)
+ s_send_wr = get_send_element(server, True)[send_element_idx]
+ if same_side_check:
+ client.mr.write('c' * client.msg_size, client.msg_size)
+ send(server, s_send_wr, gid_idx, port, send_op)
+ poll_cq(server.cq)
+ if same_side_check:
+ msg_received = server.mr.read(client.msg_size, 0)
+ else:
+ msg_received = client.mr.read(server.msg_size, 0)
+ validate(msg_received, True if same_side_check else False,
+ client.msg_size)
+ if same_side_check:
+ server.mr.write('s' * server.msg_size, server.msg_size)
+
+
+def xrc_traffic(client, server, is_cq_ex=False, send_op=None):
"""
Runs basic xrc traffic, this function assumes that number of QPs, which
server and client have are equal, server.send_qp[i] is connected to
@@ -444,27 +584,32 @@ def xrc_traffic(client, server, is_cq_ex=False):
:param server: Aggregation object of the passive side, should be an instance
of XRCResources class
:param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+ :param send_op: If not None, new post send API is assumed.
:return: None
"""
poll = poll_cq_ex if is_cq_ex else poll_cq
- client_srqn = client.srq.get_srq_num()
- server_srqn = server.srq.get_srq_num()
+ server.remote_srqn = client.srq.get_srq_num()
+ client.remote_srqn = server.srq.get_srq_num()
s_recv_wr = get_recv_wr(server)
c_recv_wr = get_recv_wr(client)
post_recv(client.srq, c_recv_wr, client.qp_count*client.num_msgs)
post_recv(server.srq, s_recv_wr, server.qp_count*server.num_msgs)
+ # Using the new post send API, we need the SGE, not the SendWR
+ send_element_idx = 1 if send_op else 0
for _ in range(client.num_msgs):
for i in range(server.qp_count):
- c_send_wr = get_send_wr(client, False)
- c_send_wr.set_qp_type_xrc(server_srqn)
- client.sqp_lst[i].post_send(c_send_wr)
+ c_send_wr = get_send_element(client, False)[send_element_idx]
+ if send_op is None:
+ c_send_wr.set_qp_type_xrc(client.remote_srqn)
+ xrc_post_send(client, i, c_send_wr, 0, 0, send_op)
poll(client.cq)
poll(server.cq)
msg_received = server.mr.read(server.msg_size, 0)
validate(msg_received, True, server.msg_size)
- s_send_wr = get_send_wr(server, True)
- s_send_wr.set_qp_type_xrc(client_srqn)
- server.sqp_lst[i].post_send(s_send_wr)
+ s_send_wr = get_send_element(server, True)[send_element_idx]
+ if send_op is None:
+ s_send_wr.set_qp_type_xrc(server.remote_srqn)
+ xrc_post_send(server, i, s_send_wr, 0, 0, send_op)
poll(server.cq)
poll(client.cq)
msg_received = client.mr.read(client.msg_size, 0)