[PATCH rdma-core 09/14] tests: Add traffic helper methods

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Maxim Chicherin <maximc@xxxxxxxxxxxx>

This patch adds common methods needed for data path such as creation
of send/recv work requests, post_send/recv wrappers etc.

Signed-off-by: Maxim Chicherin <maximc@xxxxxxxxxxxx>
---
 tests/utils.py | 151 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 150 insertions(+), 1 deletion(-)

diff --git a/tests/utils.py b/tests/utils.py
index 20a7e8d38e54..881ce4d03634 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -8,6 +8,9 @@ from string import ascii_lowercase as al
 import unittest
 import random
 
+from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
+from pyverbs.addr import AHAttr, AH, GlobalRoute
+from pyverbs.wr import SGE, SendWR, RecvWR
 from pyverbs.qp import QPCap, QPInitAttrEx
 import pyverbs.device as d
 import pyverbs.enums as e
@@ -23,6 +26,7 @@ MIN_DM_LOG_ALIGN = 0
 MAX_DM_LOG_ALIGN = 6
 # Raw Packet QP supports TSO header, which creates a larger send WQE.
 MAX_RAW_PACKET_SEND_WR = 2500
+GRH_SIZE = 40
 
 
 def get_mr_length():
@@ -242,9 +246,154 @@ def wc_status_to_str(status):
     except KeyError:
         return 'Unknown WC status ({s})'.format(s=status)
 
+# Traffic helpers
+
+def get_send_wr(agr_obj, is_server):
+    """
+    Creates a single SGE Send WR for agr_obj's QP type. The content of the
+    message is either 's' for server side or 'c' for client side.
+    :param agr_obj: Aggregation object which contains all resources necessary
+    :param is_server: Indicates whether this is server or client side
+    :return: send wr
+    """
+    qp_type = agr_obj.qp.qp_type
+    mr = agr_obj.mr
+    if qp_type == e.IBV_QPT_UD:
+        send_sge = SGE(mr.buf + GRH_SIZE, agr_obj.msg_size, mr.lkey)
+    else:
+        send_sge = SGE(mr.buf, agr_obj.msg_size, mr.lkey)
+    msg = agr_obj.msg_size * ('s' if is_server else 'c')
+    mr.write(msg, agr_obj.msg_size)
+    return SendWR(num_sge=1, sg=[send_sge])
+
+
+def get_recv_wr(agr_obj):
+    """
+    Creates a single SGE Recv WR for agr_obj's QP type.
+    :param agr_obj: Aggregation object which contains all resources necessary
+    :return: recv wr
+    """
+    qp_type = agr_obj.qp.qp_type
+    mr = agr_obj.mr
+    if qp_type == e.IBV_QPT_UD:
+        recv_sge = SGE(mr.buf, agr_obj.msg_size + GRH_SIZE, mr.lkey)
+    else:
+        recv_sge = SGE(mr.buf, agr_obj.msg_size, mr.lkey)
+    return RecvWR(sg=[recv_sge], num_sge=1)
+
+
+def post_send(agr_obj, send_wr, gid_index, port):
+    """
+    Post a single send WR to the QP. Post_send's second parameter (send bad wr)
+    is ignored for simplicity. For UD traffic an address vector is added as
+    well.
+    :param agr_obj: aggregation object which contains all resources necessary
+    :param send_wr: Send work request to post send
+    :param gid_index: Local gid index
+    :param port: IB port number
+    :return: None
+    """
+    qp_type = agr_obj.qp.qp_type
+    if qp_type == e.IBV_QPT_UD:
+        gr = GlobalRoute(dgid=agr_obj.ctx.query_gid(port, gid_index),
+                         sgid_index=gid_index)
+        ah_attr = AHAttr(port_num=port, is_global=1, gr=gr,
+                         dlid=agr_obj.port_attr.lid)
+        ah = AH(agr_obj.pd, attr=ah_attr)
+        send_wr.set_wr_ud(ah, agr_obj.rqpn, agr_obj.UD_QKEY)
+    agr_obj.qp.post_send(send_wr, None)
+
+
+def post_recv(qp, recv_wr, num_wqes=1):
+    """
+    Call the QP's post_recv() method <num_wqes> times. Post_recv's second
+    parameter (recv bad wr) is ignored for simplicity.
+    :param qp: QP which posts receive work request
+    :param recv_wr: Receive work request to post
+    :param num_wqes: Number of WQEs to post
+    :return: None
+    """
+    for _ in range(num_wqes):
+        qp.post_recv(recv_wr, None)
 
-# Decorators
 
+def poll_cq(cq, count=1):
+    """
+    Poll <count> completions from the CQ.
+    Note: This function calls the blocking poll() method of the CQ
+    until <count> completions were received. Alternatively, gets a
+    single CQ event when events are used.
+    :param cq: CQ to poll from
+    :param count: How many completions to poll
+    :return: An array of work completions of length <count>, None
+             when events are used
+    """
+    wcs = None
+    while count > 0:
+        nc, wcs = cq.poll(count)
+        for wc in wcs:
+            if wc.status != e.IBV_WC_SUCCESS:
+                raise PyverbsRDMAError('Completion status is {s}'.
+                                       format(s=wc_status_to_str(wc.status)))
+        count -= nc
+    return wcs
+
+
+def validate(received_str, is_server, msg_size):
+    """
+    Validates the received buffer against the expected result.
+    The application should set client's send buffer to 'c's and the
+    server's send buffer to 's's.
+    If the expected buffer is different than the actual, an exception will
+    be raised.
+    :param received_str: The received buffer to check
+    :param is_server: Indicates whether this is the server (receiver) or
+                      client side
+    :param msg_size: the message size of the received packet
+    :return: None
+    """
+    expected_str = msg_size * ('c' if is_server else 's')
+    received_str = received_str.decode()
+    if received_str[0:msg_size] == \
+            expected_str[0:msg_size]:
+        return
+    else:
+        raise PyverbsError(
+            'Data validation failure: expected {exp}, received {rcv}'.
+                format(exp=expected_str, rcv=received_str))
+
+
+def traffic(client, server, iters, gid_idx, port):
+    """
+    Runs basic traffic between two sides
+    :param client: client side, clients base class is BaseTraffic
+    :param server: server side, servers base class is BaseTraffic
+    :param iters: number of traffic iterations
+    :param gid_idx: local gid index
+    :param port: IB port
+    :return:
+    """
+    s_recv_wr = get_recv_wr(server)
+    c_recv_wr = get_recv_wr(client)
+    post_recv(client.qp, c_recv_wr, client.num_msgs)
+    post_recv(server.qp, s_recv_wr, server.num_msgs)
+    for _ in range(iters):
+        c_send_wr = get_send_wr(client, False)
+        post_send(client, c_send_wr, gid_idx, port)
+        poll_cq(client.cq)
+        poll_cq(server.cq)
+        post_recv(client.qp, c_recv_wr)
+        msg_received = server.mr.read(server.msg_size, 0)
+        validate(msg_received, True, server.msg_size)
+        s_send_wr = get_send_wr(server, True)
+        post_send(server, s_send_wr, gid_idx, port)
+        poll_cq(server.cq)
+        poll_cq(client.cq)
+        post_recv(server.qp, s_recv_wr)
+        msg_received = client.mr.read(client.msg_size, 0)
+        validate(msg_received, False, client.msg_size)
+
+# Decorators
 def requires_odp(qp_type):
     def outer(func):
         def inner(instance):
-- 
2.21.0




[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux