[PATCH rdma-core 8/8] tests: Add test using the new post send API

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add simple traffic tests that use the new post_send API.
Currently tested include:
- UD: send, send with immediate
- RC: send, send with immediate, RDMA write, RDMA read, atomic fetch
  and add, atomic compare and swap, bind memory window
- XRC: send, send with immediate

The existing traffic methods - traffic() and xrc_traffic() were
modified to support usage of the new API and are now checking whether
a send_op was provided or not.

As RDMA read and write do not require receive WQEs to be posted, an
extra traffic method, rdma_traffic, was added to the tests' utils
section.

Creation of a customized memory region is now available in the tests'
utils section.

Signed-off-by: Noa Osherovich <noaos@xxxxxxxxxxxx>
Reviewed-by: Edward Srouji <edwards@xxxxxxxxxxxx>
---
 tests/CMakeLists.txt |   1 +
 tests/test_odp.py    |  13 +-
 tests/test_qpex.py   | 295 +++++++++++++++++++++++++++++++++++++++++++
 tests/utils.py       | 207 +++++++++++++++++++++++++-----
 4 files changed, 478 insertions(+), 38 deletions(-)
 mode change 100755 => 100644 tests/CMakeLists.txt
 create mode 100644 tests/test_qpex.py

diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
old mode 100755
new mode 100644
index 6d702425886c..74930f69e19d
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -12,6 +12,7 @@ rdma_python_test(tests
   test_mr.py
   test_pd.py
   test_qp.py
+  test_qpex.py
   test_odp.py
   test_parent_domain.py
   test_rdmacm.py
diff --git a/tests/test_odp.py b/tests/test_odp.py
index d412a7792951..742ad81abb89 100755
--- a/tests/test_odp.py
+++ b/tests/test_odp.py
@@ -1,5 +1,5 @@
+from tests.utils import requires_odp, traffic, xrc_traffic, create_custom_mr
 from tests.base import RCResources, UDResources, XRCResources
-from tests.utils import requires_odp, traffic, xrc_traffic
 from tests.base import RDMATestCase
 from pyverbs.mr import MR
 import pyverbs.enums as e
@@ -8,21 +8,20 @@ import pyverbs.enums as e
 class OdpUD(UDResources):
     @requires_odp('ud')
     def create_mr(self):
-        self.mr = MR(self.pd, self.msg_size + self.GRH_SIZE,
-                     e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+        self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND,
+                                   self.msg_size + self.GRH_SIZE)
 
 
 class OdpRC(RCResources):
     @requires_odp('rc')
     def create_mr(self):
-        self.mr = MR(self.pd, self.msg_size,
-                     e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+        self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND)
+
 
 class OdpXRC(XRCResources):
     @requires_odp('xrc')
     def create_mr(self):
-        self.mr = MR(self.pd, self.msg_size,
-                     e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
+        self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND)
 
 
 class OdpTestCase(RDMATestCase):
diff --git a/tests/test_qpex.py b/tests/test_qpex.py
new file mode 100644
index 000000000000..922010bce3e5
--- /dev/null
+++ b/tests/test_qpex.py
@@ -0,0 +1,295 @@
+import unittest
+import random
+
+from pyverbs.qp import QPCap, QPInitAttrEx, QPAttr, QPEx, QP
+from pyverbs.pyverbs_error import PyverbsRDMAError
+from pyverbs.mr import MW, MWBindInfo
+from pyverbs.base import inc_rkey
+import pyverbs.enums as e
+from pyverbs.mr import MR
+
+from tests.base import UDResources, RCResources, RDMATestCase, XRCResources
+import tests.utils as u
+
+
+def create_qp_ex(agr_obj, qp_type, send_flags):
+    if qp_type == e.IBV_QPT_XRC_SEND:
+        cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=0, max_recv_sge=0,
+                    max_send_sge=1)
+    else:
+        cap = QPCap(max_send_wr=agr_obj.num_msgs, max_recv_wr=agr_obj.num_msgs,
+                    max_recv_sge=1, max_send_sge=1)
+    qia = QPInitAttrEx(cap=cap, qp_type=qp_type, scq=agr_obj.cq,
+                       rcq=agr_obj.cq, pd=agr_obj.pd, send_ops_flags=send_flags,
+                       comp_mask=e.IBV_QP_INIT_ATTR_PD |
+                                 e.IBV_QP_INIT_ATTR_SEND_OPS_FLAGS)
+    qp_attr = QPAttr(port_num=agr_obj.ib_port)
+    if qp_type == e.IBV_QPT_UD:
+        qp_attr.qkey = agr_obj.UD_QKEY
+        qp_attr.pkey_index = agr_obj.UD_PKEY_INDEX
+    if qp_type == e.IBV_QPT_RC:
+        qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+                                  e.IBV_ACCESS_REMOTE_READ | \
+                                  e.IBV_ACCESS_REMOTE_ATOMIC
+    try:
+        # We don't have capability bits for this
+        qp = QPEx(agr_obj.ctx, qia, qp_attr)
+    except PyverbsRDMAError as exp:
+        if 'Operation not supported' in exp.args[0]:
+            raise unittest.SkipTest('Extended QP not supported on this device')
+        raise exp
+    return qp
+
+
+class QpExUDSend(UDResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND)
+
+
+class QpExRCSend(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND)
+
+
+class QpExXRCSend(XRCResources):
+    def create_qp(self):
+        qp_attr = QPAttr(port_num=self.ib_port)
+        qp_attr.pkey_index = 0
+        for _ in range(self.qp_count):
+            attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV,
+                                   comp_mask=e.IBV_QP_INIT_ATTR_XRCD,
+                                   xrcd=self.xrcd)
+            qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+                                      e.IBV_ACCESS_REMOTE_READ
+            recv_qp = QP(self.ctx, attr_ex, qp_attr)
+            self.rqp_lst.append(recv_qp)
+
+            send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND, e.IBV_QP_EX_WITH_SEND)
+            self.sqp_lst.append(send_qp)
+            self.qps_num.append((recv_qp.qp_num, send_qp.qp_num))
+            self.psns.append(random.getrandbits(24))
+
+
+class QpExUDSendImm(UDResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+
+class QpExRCSendImm(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+
+class QpExXRCSendImm(XRCResources):
+    def create_qp(self):
+        qp_attr = QPAttr(port_num=self.ib_port)
+        qp_attr.pkey_index = 0
+        for _ in range(self.qp_count):
+            attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV,
+                                   comp_mask=e.IBV_QP_INIT_ATTR_XRCD,
+                                   xrcd=self.xrcd)
+            qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \
+                                      e.IBV_ACCESS_REMOTE_READ
+            recv_qp = QP(self.ctx, attr_ex, qp_attr)
+            self.rqp_lst.append(recv_qp)
+
+            send_qp = create_qp_ex(self, e.IBV_QPT_XRC_SEND,
+                                   e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+            self.sqp_lst.append(send_qp)
+            self.qps_num.append((recv_qp.qp_num, send_qp.qp_num))
+            self.psns.append(random.getrandbits(24))
+
+
+class QpExRCRDMAWrite(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_WRITE)
+
+    def create_mr(self):
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExRCRDMAWriteImm(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+                               e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM)
+
+    def create_mr(self):
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExRCRDMARead(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_READ)
+
+    def create_mr(self):
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_READ)
+
+
+class QpExRCAtomicCmpSwp(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+                               e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP)
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC)
+
+
+class QpExRCAtomicFetchAdd(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC,
+                               e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC)
+
+
+class QpExRCBindMw(RCResources):
+    def create_qp(self):
+        self.qp = create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_BIND_MW)
+
+    def create_mr(self):
+        self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE)
+
+
+class QpExTestCase(RDMATestCase):
+    """ Run traffic using the new post send API. """
+    def setUp(self):
+        super().setUp()
+        self.iters = 100
+        self.qp_dict = {'ud_send': QpExUDSend, 'rc_send': QpExRCSend,
+                        'xrc_send': QpExXRCSend, 'ud_send_imm': QpExUDSendImm,
+                        'rc_send_imm': QpExRCSendImm,
+                        'xrc_send_imm': QpExXRCSendImm,
+                        'rc_write': QpExRCRDMAWrite,
+                        'rc_write_imm': QpExRCRDMAWriteImm,
+                        'rc_read': QpExRCRDMARead,
+                        'rc_cmp_swp': QpExRCAtomicCmpSwp,
+                        'rc_fetch_add': QpExRCAtomicFetchAdd,
+                        'rc_bind_mw': QpExRCBindMw}
+
+    def create_players(self, qp_type):
+        client = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+                                       self.gid_index)
+        server = self.qp_dict[qp_type](self.dev_name, self.ib_port,
+                                       self.gid_index)
+        if 'xrc' in qp_type:
+            client.pre_run(server.psns, server.qps_num)
+            server.pre_run(client.psns, client.qps_num)
+        else:
+            client.pre_run(server.psn, server.qpn)
+            server.pre_run(client.psn, client.qpn)
+        return client, server
+
+    def test_qp_ex_ud_send(self):
+        client, server = self.create_players('ud_send')
+        u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                  is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND)
+
+    def test_qp_ex_rc_send(self):
+        client, server = self.create_players('rc_send')
+        u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                  is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND)
+
+    def test_qp_ex_xrc_send(self):
+        client, server = self.create_players('xrc_send')
+        u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND)
+
+    def test_qp_ex_ud_send_imm(self):
+        client, server = self.create_players('ud_send_imm')
+        u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                  is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+    def test_qp_ex_rc_send_imm(self):
+        client, server = self.create_players('rc_send_imm')
+        u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                  is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+    def test_qp_ex_xrc_send_imm(self):
+        client, server = self.create_players('xrc_send_imm')
+        u.xrc_traffic(client, server, send_op=e.IBV_QP_EX_WITH_SEND_WITH_IMM)
+
+    def test_qp_ex_rc_rdma_write(self):
+        client, server = self.create_players('rc_write')
+        client.rkey = server.mr.rkey
+        server.rkey = client.mr.rkey
+        client.raddr = server.mr.buf
+        server.raddr = client.mr.buf
+        u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                       is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE)
+
+    def test_qp_ex_rc_rdma_write_imm(self):
+        client, server = self.create_players('rc_write_imm')
+        client.rkey = server.mr.rkey
+        server.rkey = client.mr.rkey
+        client.raddr = server.mr.buf
+        server.raddr = client.mr.buf
+        u.traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                  is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM)
+
+    def test_qp_ex_rc_rdma_read(self):
+        client, server = self.create_players('rc_read')
+        client.rkey = server.mr.rkey
+        server.rkey = client.mr.rkey
+        client.raddr = server.mr.buf
+        server.raddr = client.mr.buf
+        server.mr.write('s' * server.msg_size, server.msg_size)
+        u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                       is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_RDMA_READ)
+
+    def test_qp_ex_rc_atomic_cmp_swp(self):
+        client, server = self.create_players('rc_cmp_swp')
+        client.msg_size = 8  # Atomic work on 64b operators
+        server.msg_size = 8
+        client.rkey = server.mr.rkey
+        server.rkey = client.mr.rkey
+        client.raddr = server.mr.buf
+        server.raddr = client.mr.buf
+        server.mr.write('s' * 8, 8)
+        u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                       is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP)
+
+    def test_qp_ex_rc_atomic_fetch_add(self):
+        client, server = self.create_players('rc_fetch_add')
+        client.msg_size = 8  # Atomic work on 64b operators
+        server.msg_size = 8
+        client.rkey = server.mr.rkey
+        server.rkey = client.mr.rkey
+        client.raddr = server.mr.buf
+        server.raddr = client.mr.buf
+        server.mr.write('s' * 8, 8)
+        u.rdma_traffic(client, server, self.iters, self.gid_index, self.ib_port,
+                       is_cq_ex=False, send_op=e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+
+    def test_qp_ex_rc_bind_mw(self):
+        """
+        Verify bind memory window operation using the new post_send API.
+        Instead of checking through regular pingpong style traffic, we'll
+        do as follows:
+        - Register an MR with remote write access
+        - Bind a MW without remote write permission to the MR
+        - Verify that remote write fails
+        Since it's a unique flow, it's an integral part of that test rather
+        than a utility method.
+        """
+        client, server = self.create_players('rc_bind_mw')
+        client_sge = u.get_send_element(client, False)[1]
+        # Create a MW and bind it
+        server.qp.wr_start()
+        server.qp.wr_id = 0x123
+        server.qp.wr_flags = e.IBV_SEND_SIGNALED
+        bind_info = MWBindInfo(server.mr, server.mr.buf, server.mr.length,
+                               e.IBV_ACCESS_LOCAL_WRITE)
+        mw = MW(server.pd, mw_type=e.IBV_MW_TYPE_2)
+        new_key = inc_rkey(server.mr.rkey)
+        server.qp.wr_bind_mw(mw, new_key, bind_info)
+        server.qp.wr_complete()
+        u.poll_cq(server.cq)
+        # Verify remote write fails
+        client.qp.wr_start()
+        client.qp.wr_id = 0x124
+        client.qp.wr_flags = e.IBV_SEND_SIGNALED
+        client.qp.wr_rdma_write(new_key, server.mr.buf)
+        client.qp.wr_set_sge(client_sge)
+        client.qp.wr_complete()
+        try:
+            u.poll_cq(client.cq)
+        except PyverbsRDMAError as exp:
+            if 'Completion status is Remote access error' not in exp.args[0]:
+                raise exp
+
diff --git a/tests/utils.py b/tests/utils.py
index 47eacfee35e5..20132a7cf40b 100755
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -7,6 +7,7 @@ from itertools import combinations as com
 from string import ascii_lowercase as al
 import unittest
 import random
+import socket
 
 from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
 from pyverbs.addr import AHAttr, AH, GlobalRoute
@@ -16,6 +17,7 @@ from tests.base import XRCResources
 from pyverbs.cq import PollCqAttr
 import pyverbs.device as d
 import pyverbs.enums as e
+from pyverbs.mr import MR
 
 MAX_MR_SIZE = 4194304
 # Some HWs limit DM address and length alignment to 4 for read and write
@@ -29,6 +31,7 @@ MAX_DM_LOG_ALIGN = 6
 # Raw Packet QP supports TSO header, which creates a larger send WQE.
 MAX_RAW_PACKET_SEND_WR = 2500
 GRH_SIZE = 40
+IMM_DATA = 1234
 
 
 def get_mr_length():
@@ -197,7 +200,7 @@ def random_qp_create_flags(qpt, attr_ex):
 
 def random_qp_init_attr_ex(attr_ex, attr, qpt=None):
     """
-    Create a random-valued QPInitAttrEX object with the given QP type.
+    Create a random-valued QPInitAttrEx object with the given QP type.
     QP type affects QP capabilities, so allow users to set it and still get
     valid attributes.
     :param attr_ex: Extended device attributes for capability checks
@@ -251,24 +254,38 @@ def wc_status_to_str(status):
     except KeyError:
         return 'Unknown WC status ({s})'.format(s=status)
 
+
+def create_custom_mr(agr_obj, additional_access_flags=0, size=None):
+    """
+    Creates a memory region using the aggregation object's PD.
+    If size is None, the agr_obj's message size is used to set the MR's size.
+    The access flags are local write and the additional_access_flags.
+    :param agr_obj: The aggregation object that creates the MR
+    :param additional_access_flags: Addition access flags to set in the MR
+    :param size: MR's length. If None, agr_obj.msg_size is used.
+    """
+    mr_length = size if size else agr_obj.msg_size
+    return MR(agr_obj.pd, mr_length,
+              e.IBV_ACCESS_LOCAL_WRITE | additional_access_flags)
+
 # Traffic helpers
 
-def get_send_wr(agr_obj, is_server):
+def get_send_element(agr_obj, is_server):
     """
-    Creates a single SGE Send WR for agr_obj's QP type. The content of the
-    message is either 's' for server side or 'c' for client side.
+    Creates a single SGE and a single Send WR for agr_obj's QP type. The content
+    of the message is either 's' for server side or 'c' for client side.
     :param agr_obj: Aggregation object which contains all resources necessary
     :param is_server: Indicates whether this is server or client side
-    :return: send wr
+    :return: send wr and its SGE
     """
+    mr = agr_obj.mr
     qp_type = agr_obj.sqp_lst[0].qp_type if isinstance(agr_obj, XRCResources) \
                 else agr_obj.qp.qp_type
-    mr = agr_obj.mr
     offset = GRH_SIZE if qp_type == e.IBV_QPT_UD else 0
-    send_sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey)
     msg = (agr_obj.msg_size + offset) * ('s' if is_server else 'c')
     mr.write(msg, agr_obj.msg_size + offset)
-    return SendWR(num_sge=1, sg=[send_sge])
+    sge = SGE(mr.buf + offset, agr_obj.msg_size, mr.lkey)
+    return SendWR(num_sge=1, sg=[sge]), sge
 
 
 def get_recv_wr(agr_obj):
@@ -286,6 +303,64 @@ def get_recv_wr(agr_obj):
     return RecvWR(sg=[recv_sge], num_sge=1)
 
 
+def get_global_ah(agr_obj, gid_index, port):
+    gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index),
+                     sgid_index=gid_index)
+    ah_attr = AHAttr(port_num=port, is_global=1, gr=gr,
+                     dlid=agr_obj.port_attr.lid)
+    return AH(agr_obj.pd, attr=ah_attr)
+
+
+def xrc_post_send(agr_obj, qp_num, send_object, gid_index, port, send_op=None):
+    agr_obj.qp = agr_obj.sqp_lst[qp_num]
+    if send_op:
+        post_send_ex(agr_obj, send_object, gid_index, port, send_op)
+    else:
+        post_send(agr_obj, send_object, gid_index, port)
+
+
+def post_send_ex(agr_obj, send_object, gid_index, port, send_op=None):
+    qp_type = agr_obj.qp.qp_type
+    agr_obj.qp.wr_start()
+    agr_obj.qp.wr_id = 0x123
+    agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED
+    if send_op == e.IBV_QP_EX_WITH_SEND:
+        agr_obj.qp.wr_send()
+    elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE:
+        agr_obj.qp.wr_rdma_write(agr_obj.rkey, agr_obj.raddr)
+    elif send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM:
+        agr_obj.qp.wr_send_imm(IMM_DATA)
+    elif send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM:
+        agr_obj.qp.wr_rdma_write_imm(agr_obj.rkey, agr_obj.raddr, IMM_DATA)
+    elif send_op == e.IBV_QP_EX_WITH_RDMA_READ:
+        agr_obj.qp.wr_rdma_read(agr_obj.rkey, agr_obj.raddr)
+    elif send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP:
+        # We're checking the returned value (remote's content), so cmp/swp
+        # values are of no importance.
+        agr_obj.qp.wr_atomic_cmp_swp(agr_obj.rkey, agr_obj.raddr, 42, 43)
+    elif send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD:
+        agr_obj.qp.wr_atomic_fetch_add(agr_obj.rkey, agr_obj.raddr, 1)
+    elif send_op == e.IBV_QP_EX_WITH_BIND_MW:
+        bind_info = MWBindInfo(agr_obj.mr, agr_obj.mr.buf, agr_obj.mr.rkey,
+                               e.IBV_ACCESS_REMOTE_WRITE)
+        mw = MW(agr_obj.pd, mw_type=e.IBV_MW_TYPE_2)
+        # A new rkey is needed to be set into bind_info, modify rkey
+        agr_obj.qp.wr_bind_mw(mw, agr_obj.mr.rkey + 12, bind_info)
+        agr_obj.qp.wr_complete()
+        return
+        #agr_obj.qp.wr_start()
+        #agr_obj.qp.wr_id = 0x123
+        #agr_obj.qp.wr_flags = e.IBV_SEND_SIGNALED
+        #agr_obj.qp.wr_send()
+    if qp_type == e.IBV_QPT_UD:
+        ah = get_global_ah(agr_obj, gid_index, port)
+        agr_obj.qp.wr_set_ud_addr(ah, agr_obj.rqpn, agr_obj.UD_QKEY)
+    if qp_type == e.IBV_QPT_XRC_SEND:
+        agr_obj.qp.wr_set_xrc_srqn(agr_obj.remote_srqn)
+    agr_obj.qp.wr_set_sge(send_object)
+    agr_obj.qp.wr_complete()
+
+
 def post_send(agr_obj, send_wr, gid_index, port):
     """
     Post a single send WR to the QP. Post_send's second parameter (send bad wr)
@@ -299,11 +374,7 @@ def post_send(agr_obj, send_wr, gid_index, port):
     """
     qp_type = agr_obj.qp.qp_type
     if qp_type == e.IBV_QPT_UD:
-        gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index),
-                         sgid_index=gid_index)
-        ah_attr = AHAttr(port_num=port, is_global=1, gr=gr,
-                         dlid=agr_obj.port_attr.lid)
-        ah = AH(agr_obj.pd, attr=ah_attr)
+        ah = get_global_ah(agr_obj, gid_index, port)
         send_wr.set_wr_ud(ah, agr_obj.rqpn, agr_obj.UD_QKEY)
     agr_obj.qp.post_send(send_wr, None)
 
@@ -321,7 +392,7 @@ def post_recv(qp, recv_wr, num_wqes=1):
         qp.post_recv(recv_wr, None)
 
 
-def poll_cq(cq, count=1):
+def poll_cq(cq, count=1, data=None):
     """
     Poll <count> completions from the CQ.
     Note: This function calls the blocking poll() method of the CQ
@@ -329,6 +400,8 @@ def poll_cq(cq, count=1):
     single CQ event when events are used.
     :param cq: CQ to poll from
     :param count: How many completions to poll
+    :param data: In case of a work request with immediate, the immediate data
+                 to be compared after poll
     :return: An array of work completions of length <count>, None
              when events are used
     """
@@ -339,15 +412,21 @@ def poll_cq(cq, count=1):
             if wc.status != e.IBV_WC_SUCCESS:
                 raise PyverbsRDMAError('Completion status is {s}'.
                                        format(s=wc_status_to_str(wc.status)))
+            if data:
+                if wc.wc_flags & e.IBV_WC_WITH_IMM == 0:
+                    raise PyverbsRDMAError('Completion without immediate')
+                assert socket.ntohl(wc.imm_data) == data
         count -= nc
     return wcs
 
 
-def poll_cq_ex(cqex, count=1):
+def poll_cq_ex(cqex, count=1, data=None):
     """
     Poll <count> completions from the extended CQ.
     :param cq: CQEX to poll from
     :param count: How many completions to poll
+    :param data: In case of a work request with immediate, the immediate data
+                 to be compared after poll
     :return: None
     """
     poll_attr = PollCqAttr()
@@ -360,6 +439,8 @@ def poll_cq_ex(cqex, count=1):
     if cqex.status != e.IBV_WC_SUCCESS:
         raise PyverbsRDMAErrno('Completion status is {s}'.
                                format(s=cqex.status))
+    if data:
+        assert data == socket.ntohl(cqex.read_imm_data())
     # Now poll the rest of the packets
     while count > 0:
         ret = cqex.poll_next()
@@ -370,6 +451,8 @@ def poll_cq_ex(cqex, count=1):
         if cqex.status != e.IBV_WC_SUCCESS:
             raise PyverbsRDMAErrno('Completion status is {s}'.
                                    format(s=cqex.status))
+        if data:
+            assert data == socket.ntohl(cqex.read_imm_data())
         count -= 1
     cqex.end_poll()
 
@@ -398,7 +481,13 @@ def validate(received_str, is_server, msg_size):
                 format(exp=expected_str, rcv=received_str))
 
 
-def traffic(client, server, iters, gid_idx, port, is_cq_ex=False):
+def send(agr_obj, send_wr, gid_index, port, send_op=None):
+    if send_op:
+        return post_send_ex(agr_obj, send_wr, gid_index, port, send_op)
+    return post_send(agr_obj, send_wr, gid_index, port)
+
+
+def traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None):
     """
     Runs basic traffic between two sides
     :param client: client side, clients base class is BaseTraffic
@@ -407,32 +496,83 @@ def traffic(client, server, iters, gid_idx, port, is_cq_ex=False):
     :param gid_idx: local gid index
     :param port: IB port
     :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+    :param send_op: If not None, new post send API is assumed.
     :return:
     """
     poll = poll_cq_ex if is_cq_ex else poll_cq
+    if send_op == e.IBV_QP_EX_WITH_SEND_WITH_IMM or \
+       send_op == e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM:
+        imm_data = IMM_DATA
+    else:
+        imm_data = None
+    # Using the new post send API, we need the SGE, not the SendWR
+    send_element_idx = 1 if send_op else 0
     s_recv_wr = get_recv_wr(server)
     c_recv_wr = get_recv_wr(client)
     post_recv(client.qp, c_recv_wr, client.num_msgs)
     post_recv(server.qp, s_recv_wr, server.num_msgs)
     read_offset = GRH_SIZE if client.qp.qp_type == e.IBV_QPT_UD else 0
     for _ in range(iters):
-        c_send_wr = get_send_wr(client, False)
-        post_send(client, c_send_wr, gid_idx, port)
+        c_send_wr = get_send_element(client, False)[send_element_idx]
+        send(client, c_send_wr, gid_idx, port, send_op)
         poll(client.cq)
-        poll(server.cq)
+        poll(server.cq, data=imm_data)
         post_recv(server.qp, s_recv_wr)
         msg_received = server.mr.read(server.msg_size, read_offset)
         validate(msg_received, True, server.msg_size)
-        s_send_wr = get_send_wr(server, True)
-        post_send(server, s_send_wr, gid_idx, port)
+        s_send_wr = get_send_element(server, True)[send_element_idx]
+        send(server, s_send_wr, gid_idx, port, send_op)
         poll(server.cq)
-        poll(client.cq)
+        poll(client.cq, data=imm_data)
         post_recv(client.qp, c_recv_wr)
         msg_received = client.mr.read(client.msg_size, read_offset)
         validate(msg_received, False, client.msg_size)
 
 
-def xrc_traffic(client, server, is_cq_ex=False):
+def rdma_traffic(client, server, iters, gid_idx, port, is_cq_ex=False, send_op=None):
+    """
+    Runs basic RDMA traffic between two sides. No receive WQEs are posted. For
+    RDMA send with immediate, use traffic().
+    :param client: client side, clients base class is BaseTraffic
+    :param server: server side, servers base class is BaseTraffic
+    :param iters: number of traffic iterations
+    :param gid_idx: local gid index
+    :param port: IB port
+    :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+    :param send_op: If not None, new post send API is assumed.
+    :return:
+    """
+    # Using the new post send API, we need the SGE, not the SendWR
+    send_element_idx = 1 if send_op else 0
+    same_side_check = (send_op == e.IBV_QP_EX_WITH_RDMA_READ or
+                       send_op == e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP or
+                       send_op == e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
+    for _ in range(iters):
+        c_send_wr = get_send_element(client, False)[send_element_idx]
+        send(client, c_send_wr, gid_idx, port, send_op)
+        poll_cq(client.cq)
+        if same_side_check:
+            msg_received = client.mr.read(client.msg_size, 0)
+        else:
+            msg_received = server.mr.read(server.msg_size, 0)
+        validate(msg_received, False if same_side_check else True,
+                 server.msg_size)
+        s_send_wr = get_send_element(server, True)[send_element_idx]
+        if same_side_check:
+            client.mr.write('c' * client.msg_size, client.msg_size)
+        send(server, s_send_wr, gid_idx, port, send_op)
+        poll_cq(server.cq)
+        if same_side_check:
+            msg_received = server.mr.read(client.msg_size, 0)
+        else:
+            msg_received = client.mr.read(server.msg_size, 0)
+        validate(msg_received, True if same_side_check else False,
+                 client.msg_size)
+        if same_side_check:
+            server.mr.write('s' * server.msg_size, server.msg_size)
+
+
+def xrc_traffic(client, server, is_cq_ex=False, send_op=None):
     """
     Runs basic xrc traffic, this function assumes that number of QPs, which
     server and client have are equal, server.send_qp[i] is connected to
@@ -444,27 +584,32 @@ def xrc_traffic(client, server, is_cq_ex=False):
     :param server: Aggregation object of the passive side, should be an instance
     of XRCResources class
     :param is_cq_ex: If True, use poll_cq_ex() rather than poll_cq()
+    :param send_op: If not None, new post send API is assumed.
     :return: None
     """
     poll = poll_cq_ex if is_cq_ex else poll_cq
-    client_srqn = client.srq.get_srq_num()
-    server_srqn = server.srq.get_srq_num()
+    server.remote_srqn = client.srq.get_srq_num()
+    client.remote_srqn = server.srq.get_srq_num()
     s_recv_wr = get_recv_wr(server)
     c_recv_wr = get_recv_wr(client)
     post_recv(client.srq, c_recv_wr, client.qp_count*client.num_msgs)
     post_recv(server.srq, s_recv_wr, server.qp_count*server.num_msgs)
+    # Using the new post send API, we need the SGE, not the SendWR
+    send_element_idx = 1 if send_op else 0
     for _ in range(client.num_msgs):
         for i in range(server.qp_count):
-            c_send_wr = get_send_wr(client, False)
-            c_send_wr.set_qp_type_xrc(server_srqn)
-            client.sqp_lst[i].post_send(c_send_wr)
+            c_send_wr = get_send_element(client, False)[send_element_idx]
+            if send_op is None:
+                c_send_wr.set_qp_type_xrc(client.remote_srqn)
+            xrc_post_send(client, i, c_send_wr, 0, 0, send_op)
             poll(client.cq)
             poll(server.cq)
             msg_received = server.mr.read(server.msg_size, 0)
             validate(msg_received, True, server.msg_size)
-            s_send_wr = get_send_wr(server, True)
-            s_send_wr.set_qp_type_xrc(client_srqn)
-            server.sqp_lst[i].post_send(s_send_wr)
+            s_send_wr = get_send_element(server, True)[send_element_idx]
+            if send_op is None:
+                s_send_wr.set_qp_type_xrc(server.remote_srqn)
+            xrc_post_send(server, i, s_send_wr, 0, 0, send_op)
             poll(server.cq)
             poll(client.cq)
             msg_received = client.mr.read(client.msg_size, 0)
-- 
2.21.0





[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux