Add simple traffic tests that use the new post_send API. Currently tested include: - UD: send, send with immediate - RC: send, send with immediate, RDMA write, RDMA read, atomic fetch and add, atomic compare and swap, bind memory window - XRC: send, send with immediate The existing traffic methods - traffic() and xrc_traffic() were modified to support usage of the new API and are now checking whether a send_op was provided or not. As RDMA read and write do not require receive WQEs to be posted, an extra traffic method, rdma_traffic, was added to the tests' utils section. Creation of a customized memory region is now available in the tests' utils section. Signed-off-by: Noa Osherovich <noaos@xxxxxxxxxxxx> Reviewed-by: Edward Srouji <edwards@xxxxxxxxxxxx> --- tests/CMakeLists.txt | 1 + tests/test_odp.py | 13 +- tests/test_qpex.py | 295 +++++++++++++++++++++++++++++++++++++++++++ tests/utils.py | 207 +++++++++++++++++++++++++----- 4 files changed, 478 insertions(+), 38 deletions(-) mode change 100755 => 100644 tests/CMakeLists.txt create mode 100644 tests/test_qpex.py diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt old mode 100755 new mode 100644 index 6d702425886c..74930f69e19d --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -12,6 +12,7 @@ rdma_python_test(tests test_mr.py test_pd.py test_qp.py + test_qpex.py test_odp.py test_parent_domain.py test_rdmacm.py diff --git a/tests/test_odp.py b/tests/test_odp.py index d412a7792951..742ad81abb89 100755 --- a/tests/test_odp.py +++ b/tests/test_odp.py @@ -1,5 +1,5 @@ +from tests.utils import requires_odp, traffic, xrc_traffic, create_custom_mr from tests.base import RCResources, UDResources, XRCResources -from tests.utils import requires_odp, traffic, xrc_traffic from tests.base import RDMATestCase from pyverbs.mr import MR import pyverbs.enums as e @@ -8,21 +8,20 @@ import pyverbs.enums as e class OdpUD(UDResources): @requires_odp('ud') def create_mr(self): - self.mr = MR(self.pd, self.msg_size + self.GRH_SIZE, - e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND) + self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND, + self.msg_size + self.GRH_SIZE) class OdpRC(RCResources): @requires_odp('rc') def create_mr(self): - self.mr = MR(self.pd, self.msg_size, - e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND) + self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND) + class OdpXRC(XRCResources): @requires_odp('xrc') def create_mr(self): - self.mr = MR(self.pd, self.msg_size, - e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND) + self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND) class OdpTestCase(RDMATestCase): diff --git a/tests/test_qpex.py b/tests/test_qpex.py new file mode 100644 index 000000000000..922010bce3e5 --- /dev/null +++ b/tests/test_qpex.py @@ -0,0 +1,295 @@ +import unittest +import random + +from pyverbs.qp import QPCap, QPInitAttrEx, QPAttr, QPEx, QP +from pyverbs.pyverbs_error import PyverbsRDMAError +from pyverbs.mr import MW, MWBindInfo +from pyverbs.base import inc_rkey +import pyverbs.enums as e +from pyverbs.mr import MR + +from tests.base import UDResources, RCResources, RDMATestCase, XRCResources +import tests.utils as u + + +def create_qp_ex(agr_obj, qp_type, send_flags): + if qp_type == e.IBV_QPT_XRC_SEND: + cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=0, max_recv_sge=0, + max_send_sge=1) + else: + cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=agr_obj.num_msgs, + max_recv_sge=1, max_send_sge=1) + qia = QPInitAttrEx(cap=cap, qp_type=qp_type, scq=agr_obj.cq, + rcq=agr_obj.cq, pd=agr_obj.pd, send_ops_flags=send_flags, + comp_mask=e.IBV_QP_INIT_ATTR_PD | + e.IBV_QP_INIT_ATTR_SEND_OPS_FLAGS) + qp_attr = QPAttr(port_num=agr_obj.ib_port) + if qp_type == e.IBV_QPT_UD: + qp_attr.qkey = agr_obj.UD_QKEY + qp_attr.pkey_index = agr_obj.UD_PKEY_INDEX + if qp_type == e.IBV_QPT_RC: + qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \ + e.IBV_ACCESS_REMOTE_READ | \ + e.IBV_ACCESS_REMOTE_ATOMIC + try: + # We don't have capability bits for this + qp = QPEx(agr_obj.ctx, qia, qp_attr) + except PyverbsRDMAError as exp: + if 'Operation not supported' in exp.args[0]: + raise unittest.SkipTest('Extended QP not supported on this device') + raise exp + return qp + + +class QpExUDSend(UDResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND) + + +class QpExRCSend(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND) + + +class QpExXRCSend(XRCResources): + def create_qp(self): + qp_attr = QPAttr(port_num=self.ib_port) + qp_attr.pkey_index = 0 + for _ in range(self.qp_count): + attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV, + comp_mask=e.IBV_QP_INIT_ATTR_XRCD, + xrcd=self.xrcd) + qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \ + e.IBV_ACCESS_REMOTE_READ + recv_qp = QP(self.ctx, attr_ex, qp_attr) + self.rqp_lst.append(recv_qp) + + send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND, e.IBV_QP_EX_WITH_SEND) + self.sqp_lst.append(send_qp) + self.qps_num.append((recv_qp.qp_num, send_qp.qp_num)) + self.psns.append(random.getrandbits(24)) + + +class QpExUDSendImm(UDResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND_WITH_IMM) + + +class QpExRCSendImm(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND_WITH_IMM) + + +class QpExXRCSendImm(XRCResources): + def create_qp(self): + qp_attr = QPAttr(port_num=self.ib_port) + qp_attr.pkey_index = 0 + for _ in range(self.qp_count): + attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV, + comp_mask=e.IBV_QP_INIT_ATTR_XRCD, + xrcd=self.xrcd) + qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \ + e.IBV_ACCESS_REMOTE_READ + recv_qp = QP(self.ctx, attr_ex, qp_attr) + self.rqp_lst.append(recv_qp) + + send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND, + e.IBV_QP_EX_WITH_SEND_WITH_IMM) + self.sqp_lst.append(send_qp) + self.qps_num.append((recv_qp.qp_num, send_qp.qp_num)) + self.psns.append(random.getrandbits(24)) + + +class QpExRCRDMAWrite(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_WRITE) + + def create_mr(self): + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) + + +class QpExRCRDMAWriteImm(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, + e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM) + + def create_mr(self): + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) + + +class QpExRCRDMARead(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_READ) + + def create_mr(self): + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_READ) + + +class QpExRCAtomicCmpSwp(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, + e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP) + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC) + + +class QpExRCAtomicFetchAdd(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, + e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD) + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC) + + +class QpExRCBindMw(RCResources): + def create_qp(self): + self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_BIND_MW) + + def create_mr(self): + self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) + + +class QpExTestCase(RDMATestCase): + """ Run traffic using the new post send API. """ + def setUp(self): + super().setUp() + self.iters = 100 + self.qp_dict = {'ud_send': QpExUDSend, 'rc_send': QpExRCSend, + 'xrc_send': QpExXRCSend, 'ud_send_imm': QpExUDSendImm, + 'rc_send_imm': QpExRCSendImm, + 'xrc_send_imm': QpExXRCSendImm, + 'rc_write': QpExRCRDMAWrite, + 'rc_write_imm': QpExRCRDMAWriteImm, + 'rc_read': QpExRCRDMARead, + 'rc_cmp_swp': QpExRCAtomicCmpSwp, + 'rc_fetch_add': QpExRCAtomicFetchAdd, + 'rc_bind_mw': QpExRCBindMw} + + def create_players(self, qp_type): + client = self.qp_dict[qp_type](self.dev_name, self.ib_port, + self.gid_index) + server = self.qp_dict[qp_type](self.dev_name, self.ib_port, + self.gid_index) + if 'xrc' in qp_type: + client.pre_run(server.psns, server.qps_num) + server.pre_run(client.psns, client.qps_num) + else: + client.pre_run(server.psn, server.qpn) + server.pre_run(client.psn, client.qpn) + return client, server + + def test_qp_ex_ud_send(self): + client, server = self.create_players('ud_send') + u.traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND) + + def test_qp_ex_rc_send(self): + client, server = self.create_players('rc_send') + u.traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND) + + def test_qp_ex_xrc_send(self): + client, server = self.create_players('xrc_send') + u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND) + + def test_qp_ex_ud_send_imm(self): + client, server = self.create_players('ud_send_imm') + u.traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM) + + def test_qp_ex_rc_send_imm(self): + client, server = self.create_players('rc_send_imm') + u.traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM) + + def test_qp_ex_xrc_send_imm(self): + client, server = self.create_players('xrc_send_imm') + u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM) + + def test_qp_ex_rc_rdma_write(self): + client, server = self.create_players('rc_write') + client.rkey = server.mr.rkey + server.rkey = client.mr.rkey + client.raddr = server.mr.buf + server.raddr = client.mr.buf + u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE) + + def test_qp_ex_rc_rdma_write_imm(self): + client, server = self.create_players('rc_write_imm') + client.rkey = server.mr.rkey + server.rkey = client.mr.rkey + client.raddr = server.mr.buf + server.raddr = client.mr.buf + u.traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM) + + def test_qp_ex_rc_rdma_read(self): + client, server = self.create_players('rc_read') + client.rkey = server.mr.rkey + server.rkey = client.mr.rkey + client.raddr = server.mr.buf + server.raddr = client.mr.buf + server.mr.write('s' * server.msg_size, server.msg_size) + u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_READ) + + def test_qp_ex_rc_atomic_cmp_swp(self): + client, server = self.create_players('rc_cmp_swp') + client.msg_size = 8 # Atomic work on 64b operators + server.msg_size = 8 + client.rkey = server.mr.rkey + server.rkey = client.mr.rkey + client.raddr = server.mr.buf + server.raddr = client.mr.buf + server.mr.write('s' * 8, 8) + u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP) + + def test_qp_ex_rc_atomic_fetch_add(self): + client, server = self.create_players('rc_fetch_add') + client.msg_size = 8 # Atomic work on 64b operators + server.msg_size = 8 + client.rkey = server.mr.rkey + server.rkey = client.mr.rkey + client.raddr = server.mr.buf + server.raddr = client.mr.buf + server.mr.write('s' * 8, 8) + u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port, + is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD) + + def test_qp_ex_rc_bind_mw(self): + """ + Verify bind memory window operation using the new post_send API. + Instead of checking through regular pingpong style traffic, we'll + do as follows: + - Register an MR with remote write access + - Bind a MW without remote write permission to the MR + - Verify that remote write fails + Since it's a unique flow, it's an integral part of that test rather + than a utility method. + """ + client, server = self.create_players('rc_bind_mw') + client_sge = u.get_send_element(client, False)[1] + # Create a MW and bind it + server.qp.wr_start() + server.qp.wr_id = 0x123 + server.qp.wr_flags = e.IBV_SEND_SIGNALED + bind_info = MWBindInfo(server.mr, server.mr.buf, server.mr.length, + e.IBV_ACCESS_LOCAL_WRITE) + mw = MW(server.pd, mw_type=e.IBV_MW_TYPE_2) + new_key = inc_rkey(server.mr.rkey) + server.qp.wr_bind_mw(mw, new_key, bind_info) + server.qp.wr_complete() + u.poll_cq(server.cq) + # Verify remote write fails + client.qp.wr_start() + client.qp.wr_id = 0x124 + client.qp.wr_flags = e.IBV_SEND_SIGNALED + client.qp.wr_rdma_write(new_key, server.mr.buf) + client.qp.wr_set_sge(client_sge) + client.qp.wr_complete() + try: + u.poll_cq(client.cq) + except PyverbsRDMAError as exp: + if 'Completion status is Remote access error' not in exp.args[0]: + raise exp + diff --git a/tests/utils.py b/tests/utils.py index 47eacfee35e5..20132a7cf40b 100755 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,6 +7,7 @@ from itertools import combinations as com from string import ascii_lowercase as al import unittest import random +import socket from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError from pyverbs.addr import AHAttr, AH, GlobalRoute @@ -16,6 +17,7 @@ from tests.base import XRCResources from pyverbs.cq import PollCqAttr import pyverbs.device as d import pyverbs.enums as e +from pyverbs.mr import MR MAX_MR_SIZE = 4194304 # Some HWs limit DM address and length alignment to 4 for read and write @@ -29,6 +31,7 @@ MAX_DM_LOG_ALIGN = 6 # Raw Packet QP supports TSO header, which creates a larger send WQE. MAX_RAW_PACKET_SEND_WR = 2500 GRH_SIZE = 40 +IMM_DATA = 1234 def get_mr_length(): @@ -197,7 +200,7 @@ def random_qp_create_flags(qpt, attr_ex): def random_qp_init_attr_ex(attr_ex, attr, qpt=None): """ - Create a random-valued QPInitAttrEX object with the given QP type. + Create a random-valued QPInitAttrEx object with the given QP type. QP type affects QP capabilities, so allow users to set it and still get valid attributes. :param attr_ex: Extended device attributes for capability checks @@ -251,24 +254,38 @@ def wc_status_to_str(status): except KeyError: return 'Unknown WC status ({s})'.format(s=status) + +def create_custom_mr(agr_obj, additional_access_flags=0, size=None): + """ + Creates a memory region using the aggregation object's PD. + If size is None, the agr_obj's message size is used to set the MR's size. + The access flags are local write and the additional_access_flags. + :param agr_obj: The aggregation object that creates the MR + :param additional_access_flags: Addition access flags to set in the MR + :param size: MR's length. If None, agr_obj.msg_size is used. + """ + mr_length = size if size else agr_obj.msg_size + return MR(agr_obj.pd, mr_length, + e.IBV_ACCESS_LOCAL_WRITE | additional_access_flags) + # Traffic helpers -def get_send_wr(agr_obj, is_server): +def get_send_element(agr_obj, is_server): """ - Creates a single SGE Send WR for agr_obj's QP type. The content of the - message is either 's' for server side or 'c' for client side. + Creates a single SGE and a single Send WR for agr_obj's QP type. The content + of the message is either 's' for server side or 'c' for client side. :param agr_obj: Aggregation object which contains all resources necessary :param is_server: Indicates whether this is server or client side - :return: send wr + :return: send wr and its SGE """ + mr = agr_obj.mr qp_type = agr_obj.sqp_lst[0].qp_type if isinstance(agr_obj, XRCResources) \ else agr_obj.qp.qp_type - mr = agr_obj.mr offset = GRH_SIZE if qp_type == e.IBV_QPT_UD else 0 - send_sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey) msg = (agr_obj.msg_size + offset) * ('s' if is_server else 'c') mr.write(msg, agr_obj.msg_size + offset) - return SendWR(num_sge=1, sg=[send_sge]) + sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey) + return SendWR(num_sge=1, sg=[sge]), sge def get_recv_wr(agr_obj): @@ -286,6 +303,64 @@ def get_recv_wr(agr_obj): return RecvWR(sg=[recv_sge], num_sge=1) +def get_global_ah(agr_obj, gid_index, port): + gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index), + sgid_index=gid_index) + ah_attr = AHAttr(port_num=port, is_global=1, gr=gr, + dlid=agr_obj.port_attr.lid) + return AH(agr_obj.pd, attr=ah_attr) + + +def xrc_post_send(agr_obj, qp_num, send_object, gid_index, port, send_op=None): + agr_obj.qp = agr_obj.sqp_lst[qp_num] + if send_op: + post_send_ex(agr_obj, send_object, gid_index, port, send_op) + else: + post_send(agr_obj, send_object, gid_index, port) + + +def post_send_ex(agr_obj, send_object, gid_index, port, send_op=None): + qp_type = agr_obj.qp.qp_type + agr_obj.qp.wr_start() + agr_obj.qp.wr_id = 0x123 + agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED + if send_op == e.IBV_QP_EX_WITH_SEND: + agr_obj.qp.wr_send() + elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE: + agr_obj.qp.wr_rdma_write(agr_obj.rkey, agr_obj.raddr) + elif send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM: + agr_obj.qp.wr_send_imm(IMM_DATA) + elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM: + agr_obj.qp.wr_rdma_write_imm(agr_obj.rkey, agr_obj.raddr, IMM_DATA) + elif send_op == e.IBV_QP_EX_WITH_RDMA_READ: + agr_obj.qp.wr_rdma_read(agr_obj.rkey, agr_obj.raddr) + elif send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP: + # We're checking the returned value (remote's content), so cmp/swp + # values are of no importance. + agr_obj.qp.wr_atomic_cmp_swp(agr_obj.rkey, agr_obj.raddr, 42, 43) + elif send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD: + agr_obj.qp.wr_atomic_fetch_add(agr_obj.rkey, agr_obj.raddr, 1) + elif send_op == e.IBV_QP_EX_WITH_BIND_MW: + bind_info = MWBindInfo(agr_obj.mr, agr_obj.mr.buf, agr_obj.mr.rkey, + e.IBV_ACCESS_REMOTE_WRITE) + mw = MW(agr_obj.pd, mw_type=e.IBV_MW_TYPE_2) + # A new rkey is needed to be set into bind_info, modify rkey + agr_obj.qp.wr_bind_mw(mw, agr_obj.mr.rkey + 12, bind_info) + agr_obj.qp.wr_complete() + return + #agr_obj.qp.wr_start() + #agr_obj.qp.wr_id = 0x123 + #agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED + #agr_obj.qp.wr_send() + if qp_type == e.IBV_QPT_UD: + ah = get_global_ah(agr_obj, gid_index, port) + agr_obj.qp.wr_set_ud_addr(ah, agr_obj.rqpn, agr_obj.UD_QKEY) + if qp_type == e.IBV_QPT_XRC_SEND: + agr_obj.qp.wr_set_xrc_srqn(agr_obj.remote_srqn) + agr_obj.qp.wr_set_sge(send_object) + agr_obj.qp.wr_complete() + + def post_send(agr_obj, send_wr, gid_index, port): """ Post a single send WR to the QP. Post_send's second parameter (send bad wr) @@ -299,11 +374,7 @@ def post_send(agr_obj, send_wr, gid_index, port): """ qp_type = agr_obj.qp.qp_type if qp_type == e.IBV_QPT_UD: - gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index), - sgid_index=gid_index) - ah_attr = AHAttr(port_num=port, is_global=1, gr=gr, - dlid=agr_obj.port_attr.lid) - ah = AH(agr_obj.pd, attr=ah_attr) + ah = get_global_ah(agr_obj, gid_index, port) send_wr.set_wr_ud(ah, agr_obj.rqpn, agr_obj.UD_QKEY) agr_obj.qp.post_send(send_wr, None) @@ -321,7 +392,7 @@ def post_recv(qp, recv_wr, num_wqes=1): qp.post_recv(recv_wr, None) -def poll_cq(cq, count=1): +def poll_cq(cq, count=1, data=None): """ Poll <count> completions from the CQ. Note: This function calls the blocking poll() method of the CQ @@ -329,6 +400,8 @@ def poll_cq(cq, count=1): single CQ event when events are used. :param cq: CQ to poll from :param count: How many completions to poll + :param data: In case of a work request with immediate, the immediate data + to be compared after poll :return: An array of work completions of length <count>, None when events are used """ @@ -339,15 +412,21 @@ def poll_cq(cq, count=1): if wc.status != e.IBV_WC_SUCCESS: raise PyverbsRDMAError('Completion status is {s}'. format(s=wc_status_to_str(wc.status))) + if data: + if wc.wc_flags & e.IBV_WC_WITH_IMM == 0: + raise PyverbsRDMAError('Completion without immediate') + assert socket.ntohl(wc.imm_data) == data count -= nc return wcs -def poll_cq_ex(cqex, count=1): +def poll_cq_ex(cqex, count=1, data=None): """ Poll <count> completions from the extended CQ. :param cq: CQEX to poll from :param count: How many completions to poll + :param data: In case of a work request with immediate, the immediate data + to be compared after poll :return: None """ poll_attr = PollCqAttr() @@ -360,6 +439,8 @@ def poll_cq_ex(cqex, count=1): if cqex.status != e.IBV_WC_SUCCESS: raise PyverbsRDMAErrno('Completion status is {s}'. format(s=cqex.status)) + if data: + assert data == socket.ntohl(cqex.read_imm_data()) # Now poll the rest of the packets while count > 0: ret = cqex.poll_next() @@ -370,6 +451,8 @@ def poll_cq_ex(cqex, count=1): if cqex.status != e.IBV_WC_SUCCESS: raise PyverbsRDMAErrno('Completion status is {s}'. format(s=cqex.status)) + if data: + assert data == socket.ntohl(cqex.read_imm_data()) count -= 1 cqex.end_poll() @@ -398,7 +481,13 @@ def validate(received_str, is_server, msg_size): format(exp=expected_str, rcv=received_str)) -def traffic(client, server, iters, gid_idx, port, is_cq_ex=False): +def send(agr_obj, send_wr, gid_index, port, send_op=None): + if send_op: + return post_send_ex(agr_obj, send_wr, gid_index, port, send_op) + return post_send(agr_obj, send_wr, gid_index, port) + + +def traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None): """ Runs basic traffic between two sides :param client: client side, clients base class is BaseTraffic @@ -407,32 +496,83 @@ def traffic(client, server, iters, gid_idx, port, is_cq_ex=False): :param gid_idx: local gid index :param port: IB port :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq() + :param send_op: If not None, new post send API is assumed. :return: """ poll = poll_cq_ex if is_cq_ex else poll_cq + if send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM or \ + send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM: + imm_data = IMM_DATA + else: + imm_data = None + # Using the new post send API, we need the SGE, not the SendWR + send_element_idx = 1 if send_op else 0 s_recv_wr = get_recv_wr(server) c_recv_wr = get_recv_wr(client) post_recv(client.qp, c_recv_wr, client.num_msgs) post_recv(server.qp, s_recv_wr, server.num_msgs) read_offset = GRH_SIZE if client.qp.qp_type == e.IBV_QPT_UD else 0 for _ in range(iters): - c_send_wr = get_send_wr(client, False) - post_send(client, c_send_wr, gid_idx, port) + c_send_wr = get_send_element(client, False)[send_element_idx] + send(client, c_send_wr, gid_idx, port, send_op) poll(client.cq) - poll(server.cq) + poll(server.cq, data=imm_data) post_recv(server.qp, s_recv_wr) msg_received = server.mr.read(server.msg_size, read_offset) validate(msg_received, True, server.msg_size) - s_send_wr = get_send_wr(server, True) - post_send(server, s_send_wr, gid_idx, port) + s_send_wr = get_send_element(server, True)[send_element_idx] + send(server, s_send_wr, gid_idx, port, send_op) poll(server.cq) - poll(client.cq) + poll(client.cq, data=imm_data) post_recv(client.qp, c_recv_wr) msg_received = client.mr.read(client.msg_size, read_offset) validate(msg_received, False, client.msg_size) -def xrc_traffic(client, server, is_cq_ex=False): +def rdma_traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None): + """ + Runs basic RDMA traffic between two sides. No receive WQEs are posted. For + RDMA send with immediate, use traffic(). + :param client: client side, clients base class is BaseTraffic + :param server: server side, servers base class is BaseTraffic + :param iters: number of traffic iterations + :param gid_idx: local gid index + :param port: IB port + :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq() + :param send_op: If not None, new post send API is assumed. + :return: + """ + # Using the new post send API, we need the SGE, not the SendWR + send_element_idx = 1 if send_op else 0 + same_side_check = (send_op == e.IBV_QP_EX_WITH_RDMA_READ or + send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP or + send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD) + for _ in range(iters): + c_send_wr = get_send_element(client, False)[send_element_idx] + send(client, c_send_wr, gid_idx, port, send_op) + poll_cq(client.cq) + if same_side_check: + msg_received = client.mr.read(client.msg_size, 0) + else: + msg_received = server.mr.read(server.msg_size, 0) + validate(msg_received, False if same_side_check else True, + server.msg_size) + s_send_wr = get_send_element(server, True)[send_element_idx] + if same_side_check: + client.mr.write('c' * client.msg_size, client.msg_size) + send(server, s_send_wr, gid_idx, port, send_op) + poll_cq(server.cq) + if same_side_check: + msg_received = server.mr.read(client.msg_size, 0) + else: + msg_received = client.mr.read(server.msg_size, 0) + validate(msg_received, True if same_side_check else False, + client.msg_size) + if same_side_check: + server.mr.write('s' * server.msg_size, server.msg_size) + + +def xrc_traffic(client, server, is_cq_ex=False, send_op=None): """ Runs basic xrc traffic, this function assumes that number of QPs, which server and client have are equal, server.send_qp[i] is connected to @@ -444,27 +584,32 @@ def xrc_traffic(client, server, is_cq_ex=False): :param server: Aggregation object of the passive side, should be an instance of XRCResources class :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq() + :param send_op: If not None, new post send API is assumed. :return: None """ poll = poll_cq_ex if is_cq_ex else poll_cq - client_srqn = client.srq.get_srq_num() - server_srqn = server.srq.get_srq_num() + server.remote_srqn = client.srq.get_srq_num() + client.remote_srqn = server.srq.get_srq_num() s_recv_wr = get_recv_wr(server) c_recv_wr = get_recv_wr(client) post_recv(client.srq, c_recv_wr, client.qp_count*client.num_msgs) post_recv(server.srq, s_recv_wr, server.qp_count*server.num_msgs) + # Using the new post send API, we need the SGE, not the SendWR + send_element_idx = 1 if send_op else 0 for _ in range(client.num_msgs): for i in range(server.qp_count): - c_send_wr = get_send_wr(client, False) - c_send_wr.set_qp_type_xrc(server_srqn) - client.sqp_lst[i].post_send(c_send_wr) + c_send_wr = get_send_element(client, False)[send_element_idx] + if send_op is None: + c_send_wr.set_qp_type_xrc(client.remote_srqn) + xrc_post_send(client, i, c_send_wr, 0, 0, send_op) poll(client.cq) poll(server.cq) msg_received = server.mr.read(server.msg_size, 0) validate(msg_received, True, server.msg_size) - s_send_wr = get_send_wr(server, True) - s_send_wr.set_qp_type_xrc(client_srqn) - server.sqp_lst[i].post_send(s_send_wr) + s_send_wr = get_send_element(server, True)[send_element_idx] + if send_op is None: + s_send_wr.set_qp_type_xrc(server.remote_srqn) + xrc_post_send(server, i, s_send_wr, 0, 0, send_op) poll(server.cq) poll(client.cq) msg_received = client.mr.read(client.msg_size, 0) -- 2.21.0