From: Kaike Wan <kaike.wan@xxxxxxxxx> This patch implements a wait mechanism for TID allocation. When there are not enough TID resources for a request, the requesting qp will be put on a waiting list and will be woken up later when the resources become available. Signed-off-by: Mitko Haralanov <mitko.haralanov@xxxxxxxxx> Signed-off-by: Ashutosh Dixit <ashutosh.dixit@xxxxxxxxx> Signed-off-by: Mike Marciniszyn <mike.marciniszyn@xxxxxxxxx> Signed-off-by: Kaike Wan <kaike.wan@xxxxxxxxx> Signed-off-by: Dennis Dalessandro <dennis.dalessandro@xxxxxxxxx> --- drivers/infiniband/hw/hfi1/qp.h | 2 drivers/infiniband/hw/hfi1/tid_rdma.c | 277 +++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 0 deletions(-) diff --git a/drivers/infiniband/hw/hfi1/qp.h b/drivers/infiniband/hw/hfi1/qp.h index 078cff7..38030e9 100644 --- a/drivers/infiniband/hw/hfi1/qp.h +++ b/drivers/infiniband/hw/hfi1/qp.h @@ -75,11 +75,13 @@ static inline int hfi1_send_ok(struct rvt_qp *qp) * HFI1_S_AHG_VALID - ahg header valid on chip * HFI1_S_AHG_CLEAR - have send engine clear ahg state * HFI1_S_WAIT_PIO_DRAIN - qp waiting for PIOs to drain + * HFI1_S_WAIT_TID_SPACE - a QP is waiting for TID resource * HFI1_S_MIN_BIT_MASK - the lowest bit that can be used by hfi1 */ #define HFI1_S_AHG_VALID 0x80000000 #define HFI1_S_AHG_CLEAR 0x40000000 #define HFI1_S_WAIT_PIO_DRAIN 0x20000000 +#define HFI1_S_WAIT_TID_SPACE 0x10000000 #define HFI1_S_MIN_BIT_MASK 0x01000000 /* diff --git a/drivers/infiniband/hw/hfi1/tid_rdma.c b/drivers/infiniband/hw/hfi1/tid_rdma.c index 585c796..1ddcc01 100644 --- a/drivers/infiniband/hw/hfi1/tid_rdma.c +++ b/drivers/infiniband/hw/hfi1/tid_rdma.c @@ -319,6 +319,195 @@ void tid_rdma_conn_error(struct rvt_qp *qp) } /** + * DOC: lock ordering + * + * There are two locks involved with the queuing + * routines: the qp s_lock and the exp_lock. + * + * Since the tid space allocation is called from + * the send engine, the qp s_lock is already held. + * + * The allocation routines will get the exp_lock. + * + * The first_qp() call is provided to allow the head of + * the rcd wait queue to be fetched under the exp_lock and + * followed by a drop of the exp_lock. + * + * Any qp in the wait list will have the qp reference count held + * to hold the qp in memory. + */ + +/* + * return head of rcd wait list + * + * Must hold the exp_lock. + * + * Get a reference to the QP to hold the QP in memory. + * + * The caller must release the reference when the local + * is no longer being used. + */ +static struct rvt_qp *first_qp(struct hfi1_ctxtdata *rcd, + struct tid_queue *queue) + __must_hold(&rcd->exp_lock) +{ + struct hfi1_qp_priv *priv; + + lockdep_assert_held(&rcd->exp_lock); + priv = list_first_entry_or_null(&queue->queue_head, + struct hfi1_qp_priv, + tid_wait); + if (!priv) + return NULL; + rvt_get_qp(priv->owner); + return priv->owner; +} + +/** + * kernel_tid_waiters - determine rcd wait + * @rcd: the receive context + * @qp: the head of the qp being processed + * + * This routine will return false IFF + * the list is NULL or the head of the + * list is the indicated qp. + * + * Must hold the qp s_lock and the exp_lock. + * + * Return: + * false if all the conditions below are statisfied: + * 1. The list is empty or + * 2. The indicated qp is at the head of the list and the + * HFI1_S_WAIT_TID_SPACE bit is set in qp->s_flags. + * true is returned otherwise. + */ +static bool kernel_tid_waiters(struct hfi1_ctxtdata *rcd, + struct tid_queue *queue, struct rvt_qp *qp) + __must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock) +{ + struct rvt_qp *fqp; + bool ret = true; + + lockdep_assert_held(&qp->s_lock); + lockdep_assert_held(&rcd->exp_lock); + fqp = first_qp(rcd, queue); + if (!fqp || (fqp == qp && (qp->s_flags & HFI1_S_WAIT_TID_SPACE))) + ret = false; + rvt_put_qp(fqp); + return ret; +} + +/** + * dequeue_tid_waiter - dequeue the qp from the list + * @qp - the qp to remove the wait list + * + * This routine removes the indicated qp from the + * wait list if it is there. + * + * This should be done after the hardware flow and + * tid array resources have been allocated. + * + * Must hold the qp s_lock and the rcd exp_lock. + * + * It assumes the s_lock to protect the s_flags + * field and to reliably test the HFI1_S_WAIT_TID_SPACE flag. + */ +static void dequeue_tid_waiter(struct hfi1_ctxtdata *rcd, + struct tid_queue *queue, struct rvt_qp *qp) + __must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock) +{ + struct hfi1_qp_priv *priv = qp->priv; + + lockdep_assert_held(&qp->s_lock); + lockdep_assert_held(&rcd->exp_lock); + if (list_empty(&priv->tid_wait)) + return; + list_del_init(&priv->tid_wait); + qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE; + queue->dequeue++; + rvt_put_qp(qp); +} + +/** + * queue_qp_for_tid_wait - suspend QP on tid space + * @rcd: the receive context + * @qp: the qp + * + * The qp is inserted at the tail of the rcd + * wait queue and the HFI1_S_WAIT_TID_SPACE s_flag is set. + * + * Must hold the qp s_lock and the exp_lock. + */ +static void queue_qp_for_tid_wait(struct hfi1_ctxtdata *rcd, + struct tid_queue *queue, struct rvt_qp *qp) + __must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock) +{ + struct hfi1_qp_priv *priv = qp->priv; + + lockdep_assert_held(&qp->s_lock); + lockdep_assert_held(&rcd->exp_lock); + if (list_empty(&priv->tid_wait)) { + qp->s_flags |= HFI1_S_WAIT_TID_SPACE; + list_add_tail(&priv->tid_wait, &queue->queue_head); + priv->tid_enqueue = ++queue->enqueue; + rvt_get_qp(qp); + } +} + +/** + * __trigger_tid_waiter - trigger tid waiter + * @qp: the qp + * + * This is a private entrance to schedule the qp + * assuming the caller is holding the qp->s_lock. + */ +static void __trigger_tid_waiter(struct rvt_qp *qp) + __must_hold(&qp->s_lock) +{ + lockdep_assert_held(&qp->s_lock); + if (!(qp->s_flags & HFI1_S_WAIT_TID_SPACE)) + return; + hfi1_schedule_send(qp); +} + +/** + * tid_rdma_schedule_tid_wakeup - schedule wakeup for a qp + * @qp - the qp + * + * trigger a schedule or a waiting qp in a deadlock + * safe manner. The qp reference is held prior + * to this call via first_qp(). + * + * If the qp trigger was already scheduled (!rval) + * the the reference is dropped, otherwise the resume + * or the destroy cancel will dispatch the reference. + */ +static void tid_rdma_schedule_tid_wakeup(struct rvt_qp *qp) +{ + struct hfi1_qp_priv *priv; + struct hfi1_ibport *ibp; + struct hfi1_pportdata *ppd; + struct hfi1_devdata *dd; + bool rval; + + if (!qp) + return; + + priv = qp->priv; + ibp = to_iport(qp->ibqp.device, qp->port_num); + ppd = ppd_from_ibp(ibp); + dd = dd_from_ibdev(qp->ibqp.device); + + rval = queue_work_on(priv->s_sde ? + priv->s_sde->cpu : + cpumask_first(cpumask_of_node(dd->node)), + ppd->hfi1_wq, + &priv->tid_rdma.trigger_work); + if (!rval) + rvt_put_qp(qp); +} + +/** * tid_rdma_trigger_resume - field a trigger work request * @work - the work item * @@ -327,6 +516,57 @@ void tid_rdma_conn_error(struct rvt_qp *qp) */ static void tid_rdma_trigger_resume(struct work_struct *work) { + struct tid_rdma_qp_params *tr; + struct hfi1_qp_priv *priv; + struct rvt_qp *qp; + + tr = container_of(work, struct tid_rdma_qp_params, trigger_work); + priv = container_of(tr, struct hfi1_qp_priv, tid_rdma); + qp = priv->owner; + spin_lock_irq(&qp->s_lock); + if (qp->s_flags & HFI1_S_WAIT_TID_SPACE) { + spin_unlock_irq(&qp->s_lock); + hfi1_do_send(priv->owner, true); + } else { + spin_unlock_irq(&qp->s_lock); + } + rvt_put_qp(qp); +} + +/** + * tid_rdma_flush_wait - unwind any tid space wait + * + * This is called when resetting a qp to + * allow a destroy or reset to get rid + * of any tid space linkage and reference counts. + */ +static void _tid_rdma_flush_wait(struct rvt_qp *qp, struct tid_queue *queue) + __must_hold(&qp->s_lock) +{ + struct hfi1_qp_priv *priv; + + if (!qp) + return; + lockdep_assert_held(&qp->s_lock); + priv = qp->priv; + qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE; + spin_lock(&priv->rcd->exp_lock); + if (!list_empty(&priv->tid_wait)) { + list_del_init(&priv->tid_wait); + qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE; + queue->dequeue++; + rvt_put_qp(qp); + } + spin_unlock(&priv->rcd->exp_lock); +} + +void tid_rdma_flush_wait(struct rvt_qp *qp) + __must_hold(&qp->s_lock) +{ + struct hfi1_qp_priv *priv = qp->priv; + + _tid_rdma_flush_wait(qp, &priv->rcd->flow_queue); + _tid_rdma_flush_wait(qp, &priv->rcd->rarr_queue); } void hfi1_compute_tid_rdma_flow_wt(void) @@ -429,6 +669,7 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd, { struct hfi1_qp_priv *qpriv = (struct hfi1_qp_priv *)qp->priv; struct tid_flow_state *fs = &qpriv->flow_state; + struct rvt_qp *fqp; unsigned long flags; int ret = 0; @@ -437,6 +678,8 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd, return ret; spin_lock_irqsave(&rcd->exp_lock, flags); + if (kernel_tid_waiters(rcd, &rcd->flow_queue, qp)) + goto queue; ret = kern_reserve_flow(rcd, fs->last_index); if (ret < 0) @@ -450,10 +693,15 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd, fs->generation = kern_setup_hw_flow(rcd, fs->index); fs->psn = 0; fs->flags = 0; + dequeue_tid_waiter(rcd, &rcd->flow_queue, qp); + /* get head before dropping lock */ + fqp = first_qp(rcd, &rcd->flow_queue); spin_unlock_irqrestore(&rcd->exp_lock, flags); + tid_rdma_schedule_tid_wakeup(fqp); return 0; queue: + queue_qp_for_tid_wait(rcd, &rcd->flow_queue, qp); spin_unlock_irqrestore(&rcd->exp_lock, flags); return -EAGAIN; } @@ -462,6 +710,7 @@ void hfi1_kern_clear_hw_flow(struct hfi1_ctxtdata *rcd, struct rvt_qp *qp) { struct hfi1_qp_priv *qpriv = (struct hfi1_qp_priv *)qp->priv; struct tid_flow_state *fs = &qpriv->flow_state; + struct rvt_qp *fqp; unsigned long flags; if (fs->index >= RXE_NUM_TID_FLOWS) @@ -473,7 +722,16 @@ void hfi1_kern_clear_hw_flow(struct hfi1_ctxtdata *rcd, struct rvt_qp *qp) fs->psn = 0; fs->generation = KERN_GENERATION_RESERVED; + /* get head before dropping lock */ + fqp = first_qp(rcd, &rcd->flow_queue); spin_unlock_irqrestore(&rcd->exp_lock, flags); + + if (fqp == qp) { + __trigger_tid_waiter(fqp); + rvt_put_qp(fqp); + } else { + tid_rdma_schedule_tid_wakeup(fqp); + } } void hfi1_kern_init_ctxt_generations(struct hfi1_ctxtdata *rcd) @@ -1076,6 +1334,7 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req, struct hfi1_ctxtdata *rcd = req->rcd; struct hfi1_qp_priv *qpriv = req->qp->priv; unsigned long flags; + struct rvt_qp *fqp; u16 clear_tail = req->clear_tail; lockdep_assert_held(&req->qp->s_lock); @@ -1102,6 +1361,9 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req, } spin_lock_irqsave(&rcd->exp_lock, flags); + if (kernel_tid_waiters(rcd, &rcd->rarr_queue, flow->req->qp)) + goto queue; + /* * At this point we know the number of pagesets and hence the number of * TID's to map the segment. Allocate the TID's from the TID groups. If @@ -1132,11 +1394,16 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req, full_flow_psn(flow, flow->flow_state.spsn); qpriv->flow_state.psn += flow->npkts; + dequeue_tid_waiter(rcd, &rcd->rarr_queue, flow->req->qp); + /* get head before dropping lock */ + fqp = first_qp(rcd, &rcd->rarr_queue); spin_unlock_irqrestore(&rcd->exp_lock, flags); + tid_rdma_schedule_tid_wakeup(fqp); req->setup_head = (req->setup_head + 1) & (req->n_max_flows - 1); return 0; queue: + queue_qp_for_tid_wait(rcd, &rcd->rarr_queue, flow->req->qp); spin_unlock_irqrestore(&rcd->exp_lock, flags); return -EAGAIN; } @@ -1159,6 +1426,7 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req) struct hfi1_ctxtdata *rcd = req->rcd; unsigned long flags; int i; + struct rvt_qp *fqp; lockdep_assert_held(&req->qp->s_lock); /* Exit if we have nothing in the flow circular buffer */ @@ -1171,6 +1439,8 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req) kern_unprogram_rcv_group(flow, i); /* To prevent double unprogramming */ flow->tnode_cnt = 0; + /* get head before dropping lock */ + fqp = first_qp(rcd, &rcd->rarr_queue); spin_unlock_irqrestore(&rcd->exp_lock, flags); dma_unmap_flow(flow); @@ -1178,6 +1448,13 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req) hfi1_tid_rdma_reset_flow(flow); req->clear_tail = (req->clear_tail + 1) & (req->n_max_flows - 1); + if (fqp == req->qp) { + __trigger_tid_waiter(fqp); + rvt_put_qp(fqp); + } else { + tid_rdma_schedule_tid_wakeup(fqp); + } + return 0; } -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html