From: Harish Chegondi <harish.chegondi@xxxxxxxxx> Usage of single lock prevents fetching posted and processing receive work queue entries from progressing simultaneously and impacts overall performance. Fracture the single lock used for posting and processing Receive Work Queue Entries (RWQEs) to allow the circular buffer to be filled and emptied at the same time. Two new spinlocks - one for the producers and one for the consumers used for posting and processing RWQEs simultaneously and the two indices are define on two different cache lines. The threshold count is used to avoid reading other index in different cache line every time. Signed-off-by: Harish Chegondi <harish.chegondi@xxxxxxxxx> Signed-off-by: Kamenee Arumugam <kamenee.arumugam@xxxxxxxxx> Reviewed-by: Mike Marciniszyn <mike.marciniszyn@xxxxxxxxx> Signed-off-by: Dennis Dalessandro <dennis.dalessandro@xxxxxxxxx> --- drivers/infiniband/sw/rdmavt/qp.c | 265 ++++++++++++++++++++++++++---------- drivers/infiniband/sw/rdmavt/qp.h | 2 drivers/infiniband/sw/rdmavt/rc.c | 41 +++--- drivers/infiniband/sw/rdmavt/srq.c | 95 ++++++++----- include/rdma/rdmavt_qp.h | 30 ++++ 5 files changed, 308 insertions(+), 125 deletions(-) diff --git a/drivers/infiniband/sw/rdmavt/qp.c b/drivers/infiniband/sw/rdmavt/qp.c index a1bd8cf..f3c77e2 100644 --- a/drivers/infiniband/sw/rdmavt/qp.c +++ b/drivers/infiniband/sw/rdmavt/qp.c @@ -57,6 +57,8 @@ #include "vt.h" #include "trace.h" +#define RVT_RWQ_COUNT_THRESHOLD 16 + static void rvt_rc_timeout(struct timer_list *t); /* @@ -807,6 +809,63 @@ static void rvt_remove_qp(struct rvt_dev_info *rdi, struct rvt_qp *qp) } /** + * rvt_free_rq - free memory allocated for rvt_rq struct + * @rvt_rq: request queue data structure + */ +static void rvt_free_rq(struct rvt_rq *rq) +{ + kvfree(rq->kwq); + rq->kwq = NULL; + vfree(rq->wq); + rq->wq = NULL; +} + +/** + * rvt_alloc_rq - allocate memory for user or kernel buffer + * @rq: receive queue data structure + * @size: number of request queue entries + * @node: The NUMA node + * @user: True if user data is available or not false + * + * Return: If memory allocation filed, return -ENONEM + * This function is used by both shared receive + * queues and non-shared receive queues to allocate + * memory. + */ +int rvt_alloc_rq(struct rvt_rq *rq, u32 size, int node, + struct ib_udata *udata) +{ + if (udata) { + rq->wq = + vmalloc_user(sizeof(struct rvt_rwq) + + size); + if (!rq->wq) + goto bail; + /* need kwq with no buffers */ + rq->kwq = kzalloc_node(sizeof(*rq->kwq), + GFP_KERNEL, node); + if (!rq->kwq) + goto bail; + rq->kwq->curr_wq = rq->wq->wq; + } else { + /* need kwq with buffers */ + rq->kwq = + vzalloc_node(sizeof(struct rvt_krwq) + + size, node); + if (!rq->kwq) + goto bail; + rq->kwq->curr_wq = rq->kwq->wq; + } + + spin_lock_init(&rq->kwq->p_lock); + spin_lock_init(&rq->kwq->c_lock); + return 0; +bail: + rvt_free_rq(rq); + return -ENOMEM; +} + +/** * rvt_init_qp - initialize the QP state to the reset state * @qp: the QP to init or reinit * @type: the QP type @@ -855,9 +914,10 @@ static void rvt_init_qp(struct rvt_dev_info *rdi, struct rvt_qp *qp, qp->r_head_ack_queue = 0; qp->s_tail_ack_queue = 0; qp->s_num_rd_atomic = 0; - if (qp->r_rq.wq) { - qp->r_rq.wq->head = 0; - qp->r_rq.wq->tail = 0; + if (qp->r_rq.kwq) { + qp->r_rq.kwq->head = 0; + qp->r_rq.kwq->tail = 0; + qp->r_rq.kwq->count = qp->r_rq.size; } qp->r_sge.num_sge = 0; atomic_set(&qp->s_reserved_used, 0); @@ -1051,17 +1111,12 @@ struct ib_qp *rvt_create_qp(struct ib_pd *ibpd, qp->r_rq.max_sge = init_attr->cap.max_recv_sge; sz = (sizeof(struct ib_sge) * qp->r_rq.max_sge) + sizeof(struct rvt_rwqe); - if (udata) - qp->r_rq.wq = vmalloc_user( - sizeof(struct rvt_rwq) + - qp->r_rq.size * sz); - else - qp->r_rq.wq = vzalloc_node( - sizeof(struct rvt_rwq) + - qp->r_rq.size * sz, - rdi->dparms.node); - if (!qp->r_rq.wq) + err = rvt_alloc_rq(&qp->r_rq, qp->r_rq.size * sz, + rdi->dparms.node, udata); + if (err) { + ret = ERR_PTR(err); goto bail_driver_priv; + } } /* @@ -1071,7 +1126,6 @@ struct ib_qp *rvt_create_qp(struct ib_pd *ibpd, spin_lock_init(&qp->r_lock); spin_lock_init(&qp->s_hlock); spin_lock_init(&qp->s_lock); - spin_lock_init(&qp->r_rq.lock); atomic_set(&qp->refcount, 0); atomic_set(&qp->local_ops_pending, 0); init_waitqueue_head(&qp->wait); @@ -1208,8 +1262,8 @@ struct ib_qp *rvt_create_qp(struct ib_pd *ibpd, rvt_free_qpn(&rdi->qp_dev->qpn_table, qp->ibqp.qp_num); bail_rq_wq: - if (!qp->ip) - vfree(qp->r_rq.wq); + vfree(qp->r_rq.wq); + kvfree(qp->r_rq.kwq); bail_driver_priv: rdi->driver_f.qp_priv_free(rdi, qp); @@ -1240,6 +1294,7 @@ int rvt_error_qp(struct rvt_qp *qp, enum ib_wc_status err) struct ib_wc wc; int ret = 0; struct rvt_dev_info *rdi = ib_to_rvt(qp->ibqp.device); + unsigned long flags; lockdep_assert_held(&qp->r_lock); lockdep_assert_held(&qp->s_lock); @@ -1275,30 +1330,40 @@ int rvt_error_qp(struct rvt_qp *qp, enum ib_wc_status err) } wc.status = IB_WC_WR_FLUSH_ERR; - if (qp->r_rq.wq) { - struct rvt_rwq *wq; + if (qp->r_rq.kwq) { u32 head; u32 tail; - - spin_lock(&qp->r_rq.lock); - + struct rvt_rwq *wq = NULL; + struct rvt_krwq *kwq = NULL; + + spin_lock_irqsave(&qp->r_rq.kwq->c_lock, flags); + if (qp->ip) { + wq = qp->r_rq.wq; + head = wq->head; + tail = wq->tail; + } else { + kwq = qp->r_rq.kwq; + head = kwq->head; + tail = kwq->tail; + } /* sanity check pointers before trusting them */ - wq = qp->r_rq.wq; - head = wq->head; if (head >= qp->r_rq.size) head = 0; - tail = wq->tail; if (tail >= qp->r_rq.size) tail = 0; while (tail != head) { - wc.wr_id = rvt_get_rwqe_ptr(&qp->r_rq, tail)->wr_id; + wc.wr_id = + rvt_get_rwqe_ptr(&qp->r_rq, tail)->wr_id; if (++tail >= qp->r_rq.size) tail = 0; - rvt_cq_enter(ibcq_to_rvtcq(qp->ibqp.recv_cq), &wc, 1); + rvt_cq_enter(ibcq_to_rvtcq(qp->ibqp.recv_cq), + &wc, 1); } - wq->tail = tail; - - spin_unlock(&qp->r_rq.lock); + if (qp->ip) + wq->tail = tail; + else + kwq->tail = tail; + spin_unlock_irqrestore(&qp->r_rq.kwq->c_lock, flags); } else if (qp->ibqp.event_handler) { ret = 1; } @@ -1640,9 +1705,8 @@ int rvt_destroy_qp(struct ib_qp *ibqp) if (qp->ip) kref_put(&qp->ip->ref, rvt_release_mmap_info); - else - vfree(qp->r_rq.wq); - vfree(qp->s_wq); + kvfree(qp->r_rq.kwq); + rdi->driver_f.qp_priv_free(rdi, qp); kfree(qp->s_ack_queue); rdma_destroy_ah_attr(&qp->remote_ah_attr); @@ -1727,8 +1791,9 @@ int rvt_post_recv(struct ib_qp *ibqp, const struct ib_recv_wr *wr, const struct ib_recv_wr **bad_wr) { struct rvt_qp *qp = ibqp_to_rvtqp(ibqp); - struct rvt_rwq *wq = qp->r_rq.wq; unsigned long flags; + struct rvt_krwq *wq = qp->r_rq.kwq; + int qp_err_flush = (ib_rvt_state_ops[qp->state] & RVT_FLUSH_RECV) && !qp->ibqp.srq; @@ -1748,12 +1813,12 @@ int rvt_post_recv(struct ib_qp *ibqp, const struct ib_recv_wr *wr, return -EINVAL; } - spin_lock_irqsave(&qp->r_rq.lock, flags); + spin_lock_irqsave(&qp->r_rq.kwq->p_lock, flags); next = wq->head + 1; if (next >= qp->r_rq.size) next = 0; - if (next == wq->tail) { - spin_unlock_irqrestore(&qp->r_rq.lock, flags); + if (next == READ_ONCE(wq->tail)) { + spin_unlock_irqrestore(&qp->r_rq.kwq->p_lock, flags); *bad_wr = wr; return -ENOMEM; } @@ -1776,10 +1841,10 @@ int rvt_post_recv(struct ib_qp *ibqp, const struct ib_recv_wr *wr, * Make sure queue entry is written * before the head index. */ - smp_wmb(); + smp_store_release(&wq->head, next); wq->head = next; } - spin_unlock_irqrestore(&qp->r_rq.lock, flags); + spin_unlock_irqrestore(&qp->r_rq.kwq->p_lock, flags); } return 0; } @@ -2151,7 +2216,7 @@ int rvt_post_srq_recv(struct ib_srq *ibsrq, const struct ib_recv_wr *wr, const struct ib_recv_wr **bad_wr) { struct rvt_srq *srq = ibsrq_to_rvtsrq(ibsrq); - struct rvt_rwq *wq; + struct rvt_krwq *wq; unsigned long flags; for (; wr; wr = wr->next) { @@ -2164,13 +2229,13 @@ int rvt_post_srq_recv(struct ib_srq *ibsrq, const struct ib_recv_wr *wr, return -EINVAL; } - spin_lock_irqsave(&srq->rq.lock, flags); - wq = srq->rq.wq; + spin_lock_irqsave(&srq->rq.kwq->p_lock, flags); + wq = srq->rq.kwq; next = wq->head + 1; if (next >= srq->rq.size) next = 0; - if (next == wq->tail) { - spin_unlock_irqrestore(&srq->rq.lock, flags); + if (next == READ_ONCE(wq->tail)) { + spin_unlock_irqrestore(&srq->rq.kwq->p_lock, flags); *bad_wr = wr; return -ENOMEM; } @@ -2181,9 +2246,9 @@ int rvt_post_srq_recv(struct ib_srq *ibsrq, const struct ib_recv_wr *wr, for (i = 0; i < wr->num_sge; i++) wqe->sg_list[i] = wr->sg_list[i]; /* Make sure queue entry is written before the head index. */ - smp_wmb(); + smp_store_release(&wq->head, next); wq->head = next; - spin_unlock_irqrestore(&srq->rq.lock, flags); + spin_unlock_irqrestore(&srq->rq.kwq->p_lock, flags); } return 0; } @@ -2239,6 +2304,50 @@ static int init_sge(struct rvt_qp *qp, struct rvt_rwqe *wqe) return 0; } +/* + * get_count - count numbers of request work queue entries + * in circular buffer + * @rq: data structure for request queue entry + * @tail: tail indices of the circular buffer + * @head: head indices of the circular buffer + * + * Return - total number of entries in the circular buffer + */ +static u32 get_count(struct rvt_rq *rq, u32 tail, u32 head) +{ + u32 count; + + count = head; + + if (count >= rq->size) + count = 0; + if (count < tail) + count += rq->size - tail; + else + count -= tail; + + return count; +} + +/* + * get_rvt_head - get head indices of the circular buffer + * @rq: data structure for request queue entry + * @ip: the QP + * + * Return - head index value + */ +static u32 get_rvt_head(struct rvt_rq *rq, void *ip) +{ + u32 head; + + if (ip) + head = READ_ONCE(rq->wq->head); + else + head = rq->kwq->head; + + return head; +} + /** * rvt_get_rwqe - copy the next RWQE into the QP's RWQE * @qp: the QP @@ -2253,39 +2362,54 @@ int rvt_get_rwqe(struct rvt_qp *qp, bool wr_id_only) { unsigned long flags; struct rvt_rq *rq; + struct rvt_krwq *kwq; struct rvt_rwq *wq; struct rvt_srq *srq; struct rvt_rwqe *wqe; void (*handler)(struct ib_event *, void *); u32 tail; + u32 head; int ret; + void *ip = NULL; if (qp->ibqp.srq) { srq = ibsrq_to_rvtsrq(qp->ibqp.srq); handler = srq->ibsrq.event_handler; rq = &srq->rq; + ip = srq->ip; } else { srq = NULL; handler = NULL; rq = &qp->r_rq; + ip = qp->ip; } - spin_lock_irqsave(&rq->lock, flags); + spin_lock_irqsave(&rq->kwq->c_lock, flags); if (!(ib_rvt_state_ops[qp->state] & RVT_PROCESS_RECV_OK)) { ret = 0; goto unlock; } + kwq = rq->kwq; + if (ip) { + wq = rq->wq; + tail = wq->tail; + } else { + tail = kwq->tail; + } - wq = rq->wq; - tail = wq->tail; /* Validate tail before using it since it is user writable. */ if (tail >= rq->size) tail = 0; - if (unlikely(tail == wq->head)) { + + if (kwq->count < RVT_RWQ_COUNT_THRESHOLD) { + head = get_rvt_head(rq, ip); + kwq->count = get_count(rq, tail, head); + } + if (unlikely(kwq->count == 0)) { ret = 0; goto unlock; } - /* Make sure entry is read after head index is read. */ + /* Make sure entry is read after the count is read. */ smp_rmb(); wqe = rvt_get_rwqe_ptr(rq, tail); /* @@ -2295,43 +2419,42 @@ int rvt_get_rwqe(struct rvt_qp *qp, bool wr_id_only) */ if (++tail >= rq->size) tail = 0; - wq->tail = tail; + if (ip) + wq->tail = tail; + else + kwq->tail = tail; if (!wr_id_only && !init_sge(qp, wqe)) { ret = -1; goto unlock; } qp->r_wr_id = wqe->wr_id; + kwq->count--; ret = 1; set_bit(RVT_R_WRID_VALID, &qp->r_aflags); if (handler) { - u32 n; - /* * Validate head pointer value and compute * the number of remaining WQEs. */ - n = wq->head; - if (n >= rq->size) - n = 0; - if (n < tail) - n += rq->size - tail; - else - n -= tail; - if (n < srq->limit) { - struct ib_event ev; - - srq->limit = 0; - spin_unlock_irqrestore(&rq->lock, flags); - ev.device = qp->ibqp.device; - ev.element.srq = qp->ibqp.srq; - ev.event = IB_EVENT_SRQ_LIMIT_REACHED; - handler(&ev, srq->ibsrq.srq_context); - goto bail; + if (kwq->count < srq->limit) { + kwq->count = get_count(rq, tail, get_rvt_head(rq, ip)); + if (kwq->count < srq->limit) { + struct ib_event ev; + + srq->limit = 0; + spin_unlock_irqrestore(&rq->kwq->c_lock, flags); + ev.device = qp->ibqp.device; + ev.element.srq = qp->ibqp.srq; + ev.event = IB_EVENT_SRQ_LIMIT_REACHED; + handler(&ev, srq->ibsrq.srq_context); + goto bail; + } } } + unlock: - spin_unlock_irqrestore(&rq->lock, flags); + spin_unlock_irqrestore(&rq->kwq->c_lock, flags); bail: return ret; } diff --git a/drivers/infiniband/sw/rdmavt/qp.h b/drivers/infiniband/sw/rdmavt/qp.h index 6d88397..01031d7 100644 --- a/drivers/infiniband/sw/rdmavt/qp.h +++ b/drivers/infiniband/sw/rdmavt/qp.h @@ -68,4 +68,6 @@ int rvt_post_srq_recv(struct ib_srq *ibsrq, const struct ib_recv_wr *wr, const struct ib_recv_wr **bad_wr); int rvt_wss_init(struct rvt_dev_info *rdi); void rvt_wss_exit(struct rvt_dev_info *rdi); +int rvt_alloc_rq(struct rvt_rq *rq, u32 size, int node, + struct ib_udata *udata); #endif /* DEF_RVTQP_H */ diff --git a/drivers/infiniband/sw/rdmavt/rc.c b/drivers/infiniband/sw/rdmavt/rc.c index 6131cc5..0ddddaf 100644 --- a/drivers/infiniband/sw/rdmavt/rc.c +++ b/drivers/infiniband/sw/rdmavt/rc.c @@ -105,25 +105,34 @@ __be32 rvt_compute_aeth(struct rvt_qp *qp) u32 min, max, x; u32 credits; struct rvt_rwq *wq = qp->r_rq.wq; + struct rvt_krwq *kwq = qp->r_rq.kwq; u32 head; u32 tail; - /* sanity check pointers before trusting them */ - head = wq->head; - if (head >= qp->r_rq.size) - head = 0; - tail = wq->tail; - if (tail >= qp->r_rq.size) - tail = 0; - /* - * Compute the number of credits available (RWQEs). - * There is a small chance that the pair of reads are - * not atomic, which is OK, since the fuzziness is - * resolved as further ACKs go out. - */ - credits = head - tail; - if ((int)credits < 0) - credits += qp->r_rq.size; + credits = READ_ONCE(kwq->count); + if (credits == 0) { + /* sanity check pointers before trusting them */ + if (qp->ip) { + head = READ_ONCE(wq->head); + tail = READ_ONCE(wq->tail); + } else { + head = READ_ONCE(kwq->head); + tail = READ_ONCE(kwq->tail); + } + if (head >= qp->r_rq.size) + head = 0; + if (tail >= qp->r_rq.size) + tail = 0; + /* + * Compute the number of credits available (RWQEs). + * There is a small chance that the pair of reads are + * not atomic, which is OK, since the fuzziness is + * resolved as further ACKs go out. + */ + credits = head - tail; + if ((int)credits < 0) + credits += qp->r_rq.size; + } /* * Binary search the credit table to find the code to * use. diff --git a/drivers/infiniband/sw/rdmavt/srq.c b/drivers/infiniband/sw/rdmavt/srq.c index 78e06fc..e654643 100644 --- a/drivers/infiniband/sw/rdmavt/srq.c +++ b/drivers/infiniband/sw/rdmavt/srq.c @@ -51,7 +51,7 @@ #include "srq.h" #include "vt.h" - +#include "qp.h" /** * rvt_driver_srq_init - init srq resources on a per driver basis * @rdi: rvt dev structure @@ -80,6 +80,7 @@ struct ib_srq *rvt_create_srq(struct ib_pd *ibpd, struct rvt_srq *srq; u32 sz; struct ib_srq *ret; + int err; if (srq_init_attr->srq_type != IB_SRQT_BASIC) return ERR_PTR(-EOPNOTSUPP); @@ -101,11 +102,9 @@ struct ib_srq *rvt_create_srq(struct ib_pd *ibpd, srq->rq.max_sge = srq_init_attr->attr.max_sge; sz = sizeof(struct ib_sge) * srq->rq.max_sge + sizeof(struct rvt_rwqe); - srq->rq.wq = udata ? - vmalloc_user(sizeof(struct rvt_rwq) + srq->rq.size * sz) : - vzalloc_node(sizeof(struct rvt_rwq) + srq->rq.size * sz, - dev->dparms.node); - if (!srq->rq.wq) { + err = rvt_alloc_rq(&srq->rq, (srq->rq.size * sz), + dev->dparms.node, udata); + if (err) { ret = ERR_PTR(-ENOMEM); goto bail_srq; } @@ -115,7 +114,6 @@ struct ib_srq *rvt_create_srq(struct ib_pd *ibpd, * See rvt_mmap() for details. */ if (udata && udata->outlen >= sizeof(__u64)) { - int err; u32 s = sizeof(struct rvt_rwq) + srq->rq.size * sz; srq->ip = @@ -137,7 +135,6 @@ struct ib_srq *rvt_create_srq(struct ib_pd *ibpd, /* * ib_create_srq() will initialize srq->ibsrq. */ - spin_lock_init(&srq->rq.lock); srq->limit = srq_init_attr->attr.srq_limit; spin_lock(&dev->n_srqs_lock); @@ -162,6 +159,7 @@ struct ib_srq *rvt_create_srq(struct ib_pd *ibpd, kfree(srq->ip); bail_wq: vfree(srq->rq.wq); + kvfree(srq->rq.kwq); bail_srq: kfree(srq); return ret; @@ -182,30 +180,34 @@ int rvt_modify_srq(struct ib_srq *ibsrq, struct ib_srq_attr *attr, { struct rvt_srq *srq = ibsrq_to_rvtsrq(ibsrq); struct rvt_dev_info *dev = ib_to_rvt(ibsrq->device); - struct rvt_rwq *wq; int ret = 0; + struct rvt_rq tmp_rq; + struct rvt_krwq *kwq; + struct rvt_rwq *wq; if (attr_mask & IB_SRQ_MAX_WR) { - struct rvt_rwq *owq; + struct rvt_krwq *okwq; + struct rvt_rwq *owq = NULL; struct rvt_rwqe *p; u32 sz, size, n, head, tail; /* Check that the requested sizes are below the limits. */ if ((attr->max_wr > dev->dparms.props.max_srq_wr) || ((attr_mask & IB_SRQ_LIMIT) ? - attr->srq_limit : srq->limit) > attr->max_wr) + attr->srq_limit : srq->limit) > attr->max_wr) { return -EINVAL; + } sz = sizeof(struct rvt_rwqe) + srq->rq.max_sge * sizeof(struct ib_sge); size = attr->max_wr + 1; - wq = udata ? - vmalloc_user(sizeof(struct rvt_rwq) + size * sz) : - vzalloc_node(sizeof(struct rvt_rwq) + size * sz, - dev->dparms.node); - if (!wq) - return -ENOMEM; - + memset(&tmp_rq, 0, sizeof(tmp_rq)); + ret = rvt_alloc_rq(&tmp_rq, size * sz, dev->dparms.node, + udata); + if (ret) { + ret = -ENOMEM; + goto bail_free; + } /* Check that we can write the offset to mmap. */ if (udata && udata->inlen >= sizeof(__u64)) { __u64 offset_addr; @@ -223,14 +225,20 @@ int rvt_modify_srq(struct ib_srq *ibsrq, struct ib_srq_attr *attr, goto bail_free; } - spin_lock_irq(&srq->rq.lock); + spin_lock_irq(&srq->rq.kwq->c_lock); /* * validate head and tail pointer values and compute * the number of remaining WQEs. */ - owq = srq->rq.wq; - head = owq->head; - tail = owq->tail; + okwq = srq->rq.kwq; + if (udata) { + owq = srq->rq.wq; + head = owq->head; + tail = owq->tail; + } else { + head = okwq->head; + tail = okwq->tail; + } if (head >= srq->rq.size || tail >= srq->rq.size) { ret = -EINVAL; goto bail_unlock; @@ -245,7 +253,14 @@ int rvt_modify_srq(struct ib_srq *ibsrq, struct ib_srq_attr *attr, goto bail_unlock; } n = 0; - p = wq->wq; + if (udata) { + wq = tmp_rq.wq; + p = wq->wq; + } else { + kwq = tmp_rq.kwq; + p = kwq->wq; + } + while (tail != head) { struct rvt_rwqe *wqe; int i; @@ -260,22 +275,31 @@ int rvt_modify_srq(struct ib_srq *ibsrq, struct ib_srq_attr *attr, if (++tail >= srq->rq.size) tail = 0; } - srq->rq.wq = wq; + + srq->rq.kwq = tmp_rq.kwq; + if (udata) { + srq->rq.wq = tmp_rq.wq; + tmp_rq.wq->head = n; + tmp_rq.wq->tail = 0; + } else { + tmp_rq.kwq->head = n; + tmp_rq.kwq->tail = 0; + } srq->rq.size = size; - wq->head = n; - wq->tail = 0; if (attr_mask & IB_SRQ_LIMIT) srq->limit = attr->srq_limit; - spin_unlock_irq(&srq->rq.lock); + spin_unlock_irq(&srq->rq.kwq->c_lock); - vfree(owq); + if (udata) + vfree(owq); + kvfree(okwq); if (srq->ip) { struct rvt_mmap_info *ip = srq->ip; struct rvt_dev_info *dev = ib_to_rvt(srq->ibsrq.device); u32 s = sizeof(struct rvt_rwq) + size * sz; - rvt_update_mmap_info(dev, ip, s, wq); + rvt_update_mmap_info(dev, ip, s, tmp_rq.wq); /* * Return the offset to mmap. @@ -299,19 +323,21 @@ int rvt_modify_srq(struct ib_srq *ibsrq, struct ib_srq_attr *attr, spin_unlock_irq(&dev->pending_lock); } } else if (attr_mask & IB_SRQ_LIMIT) { - spin_lock_irq(&srq->rq.lock); + spin_lock_irq(&srq->rq.kwq->c_lock); if (attr->srq_limit >= srq->rq.size) ret = -EINVAL; else srq->limit = attr->srq_limit; - spin_unlock_irq(&srq->rq.lock); + spin_unlock_irq(&srq->rq.kwq->c_lock); } return ret; bail_unlock: - spin_unlock_irq(&srq->rq.lock); + spin_unlock_irq(&srq->rq.kwq->c_lock); bail_free: - vfree(wq); + kvfree(tmp_rq.kwq); + if (udata) + vfree(tmp_rq.wq); return ret; } @@ -347,8 +373,7 @@ int rvt_destroy_srq(struct ib_srq *ibsrq) spin_unlock(&dev->n_srqs_lock); if (srq->ip) kref_put(&srq->ip->ref, rvt_release_mmap_info); - else - vfree(srq->rq.wq); + kvfree(srq->rq.kwq); kfree(srq); return 0; diff --git a/include/rdma/rdmavt_qp.h b/include/rdma/rdmavt_qp.h index 1ef17a6..c40dfff 100644 --- a/include/rdma/rdmavt_qp.h +++ b/include/rdma/rdmavt_qp.h @@ -178,12 +178,36 @@ struct rvt_swqe { struct rvt_sge sg_list[0]; }; +/** + * struct rvt_krwq -kernel struct receive work request + * @p_lock: lock to protect producer of the kernel buffer + * @head: index of next entry to fill + * @c_lock:lock to protect consumer of the kernel buffer + * @tail: index of next entry to pull + * @count: count of total receive enteries posted. + * @rvt_rwqe: struct of receive work request queue entry. + * + * This structure is used to contain the head pointer, + * tail pointer and receive work queue entries for kernel + * mode user. + */ +struct rvt_krwq { + spinlock_t p_lock; /* protect producer */ + u32 head; /* new work requests posted to the head */ + + /* protect consumer */ + spinlock_t c_lock ____cacheline_aligned_in_smp; + u32 tail; /* receives pull requests from here. */ + u32 count; /* approx count of receive entries posted */ + struct rvt_rwqe *curr_wq; + struct rvt_rwqe wq[0]; +}; + struct rvt_rq { struct rvt_rwq *wq; + struct rvt_krwq *kwq; u32 size; /* size of RWQE array */ u8 max_sge; - /* protect changes in this struct */ - spinlock_t lock ____cacheline_aligned_in_smp; }; /* @@ -449,7 +473,7 @@ struct rvt_mcast { static inline struct rvt_rwqe *rvt_get_rwqe_ptr(struct rvt_rq *rq, unsigned n) { return (struct rvt_rwqe *) - ((char *)rq->wq->wq + + ((char *)rq->kwq->curr_wq + (sizeof(struct rvt_rwqe) + rq->max_sge * sizeof(struct ib_sge)) * n); }