[PATCH for-next 13/24] IB/hfi1: Add wait mechanism for TID allocation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Kaike Wan <kaike.wan@xxxxxxxxx>

This patch implements a wait mechanism for TID allocation. When there are
not enough TID resources for a request, the requesting qp will be put
on a waiting list and will be woken up later when the resources become
available.

Signed-off-by: Mitko Haralanov <mitko.haralanov@xxxxxxxxx>
Signed-off-by: Ashutosh Dixit <ashutosh.dixit@xxxxxxxxx>
Signed-off-by: Mike Marciniszyn <mike.marciniszyn@xxxxxxxxx>
Signed-off-by: Kaike Wan <kaike.wan@xxxxxxxxx>
Signed-off-by: Dennis Dalessandro <dennis.dalessandro@xxxxxxxxx>
---
 drivers/infiniband/hw/hfi1/qp.h       |    2 
 drivers/infiniband/hw/hfi1/tid_rdma.c |  277 +++++++++++++++++++++++++++++++++
 2 files changed, 279 insertions(+), 0 deletions(-)

diff --git a/drivers/infiniband/hw/hfi1/qp.h b/drivers/infiniband/hw/hfi1/qp.h
index 078cff7..38030e9 100644
--- a/drivers/infiniband/hw/hfi1/qp.h
+++ b/drivers/infiniband/hw/hfi1/qp.h
@@ -75,11 +75,13 @@ static inline int hfi1_send_ok(struct rvt_qp *qp)
  * HFI1_S_AHG_VALID - ahg header valid on chip
  * HFI1_S_AHG_CLEAR - have send engine clear ahg state
  * HFI1_S_WAIT_PIO_DRAIN - qp waiting for PIOs to drain
+ * HFI1_S_WAIT_TID_SPACE - a QP is waiting for TID resource
  * HFI1_S_MIN_BIT_MASK - the lowest bit that can be used by hfi1
  */
 #define HFI1_S_AHG_VALID         0x80000000
 #define HFI1_S_AHG_CLEAR         0x40000000
 #define HFI1_S_WAIT_PIO_DRAIN    0x20000000
+#define HFI1_S_WAIT_TID_SPACE    0x10000000
 #define HFI1_S_MIN_BIT_MASK      0x01000000
 
 /*
diff --git a/drivers/infiniband/hw/hfi1/tid_rdma.c b/drivers/infiniband/hw/hfi1/tid_rdma.c
index 585c796..1ddcc01 100644
--- a/drivers/infiniband/hw/hfi1/tid_rdma.c
+++ b/drivers/infiniband/hw/hfi1/tid_rdma.c
@@ -319,6 +319,195 @@ void tid_rdma_conn_error(struct rvt_qp *qp)
 }
 
 /**
+ * DOC: lock ordering
+ *
+ * There are two locks involved with the queuing
+ * routines: the qp s_lock and the exp_lock.
+ *
+ * Since the tid space allocation is called from
+ * the send engine, the qp s_lock is already held.
+ *
+ * The allocation routines will get the exp_lock.
+ *
+ * The first_qp() call is provided to allow the head of
+ * the rcd wait queue to be fetched under the exp_lock and
+ * followed by a drop of the exp_lock.
+ *
+ * Any qp in the wait list will have the qp reference count held
+ * to hold the qp in memory.
+ */
+
+/*
+ * return head of rcd wait list
+ *
+ * Must hold the exp_lock.
+ *
+ * Get a reference to the QP to hold the QP in memory.
+ *
+ * The caller must release the reference when the local
+ * is no longer being used.
+ */
+static struct rvt_qp *first_qp(struct hfi1_ctxtdata *rcd,
+			       struct tid_queue *queue)
+	__must_hold(&rcd->exp_lock)
+{
+	struct hfi1_qp_priv *priv;
+
+	lockdep_assert_held(&rcd->exp_lock);
+	priv = list_first_entry_or_null(&queue->queue_head,
+					struct hfi1_qp_priv,
+					tid_wait);
+	if (!priv)
+		return NULL;
+	rvt_get_qp(priv->owner);
+	return priv->owner;
+}
+
+/**
+ * kernel_tid_waiters - determine rcd wait
+ * @rcd: the receive context
+ * @qp: the head of the qp being processed
+ *
+ * This routine will return false IFF
+ * the list is NULL or the head of the
+ * list is the indicated qp.
+ *
+ * Must hold the qp s_lock and the exp_lock.
+ *
+ * Return:
+ * false if all the conditions below are statisfied:
+ * 1. The list is empty or
+ * 2. The indicated qp is at the head of the list and the
+ *    HFI1_S_WAIT_TID_SPACE bit is set in qp->s_flags.
+ * true is returned otherwise.
+ */
+static bool kernel_tid_waiters(struct hfi1_ctxtdata *rcd,
+			       struct tid_queue *queue, struct rvt_qp *qp)
+	__must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock)
+{
+	struct rvt_qp *fqp;
+	bool ret = true;
+
+	lockdep_assert_held(&qp->s_lock);
+	lockdep_assert_held(&rcd->exp_lock);
+	fqp = first_qp(rcd, queue);
+	if (!fqp || (fqp == qp && (qp->s_flags & HFI1_S_WAIT_TID_SPACE)))
+		ret = false;
+	rvt_put_qp(fqp);
+	return ret;
+}
+
+/**
+ * dequeue_tid_waiter - dequeue the qp from the list
+ * @qp - the qp to remove the wait list
+ *
+ * This routine removes the indicated qp from the
+ * wait list if it is there.
+ *
+ * This should be done after the hardware flow and
+ * tid array resources have been allocated.
+ *
+ * Must hold the qp s_lock and the rcd exp_lock.
+ *
+ * It assumes the s_lock to protect the s_flags
+ * field and to reliably test the HFI1_S_WAIT_TID_SPACE flag.
+ */
+static void dequeue_tid_waiter(struct hfi1_ctxtdata *rcd,
+			       struct tid_queue *queue, struct rvt_qp *qp)
+	__must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock)
+{
+	struct hfi1_qp_priv *priv = qp->priv;
+
+	lockdep_assert_held(&qp->s_lock);
+	lockdep_assert_held(&rcd->exp_lock);
+	if (list_empty(&priv->tid_wait))
+		return;
+	list_del_init(&priv->tid_wait);
+	qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE;
+	queue->dequeue++;
+	rvt_put_qp(qp);
+}
+
+/**
+ * queue_qp_for_tid_wait - suspend QP on tid space
+ * @rcd: the receive context
+ * @qp: the qp
+ *
+ * The qp is inserted at the tail of the rcd
+ * wait queue and the HFI1_S_WAIT_TID_SPACE s_flag is set.
+ *
+ * Must hold the qp s_lock and the exp_lock.
+ */
+static void queue_qp_for_tid_wait(struct hfi1_ctxtdata *rcd,
+				  struct tid_queue *queue, struct rvt_qp *qp)
+	__must_hold(&rcd->exp_lock) __must_hold(&qp->s_lock)
+{
+	struct hfi1_qp_priv *priv = qp->priv;
+
+	lockdep_assert_held(&qp->s_lock);
+	lockdep_assert_held(&rcd->exp_lock);
+	if (list_empty(&priv->tid_wait)) {
+		qp->s_flags |= HFI1_S_WAIT_TID_SPACE;
+		list_add_tail(&priv->tid_wait, &queue->queue_head);
+		priv->tid_enqueue = ++queue->enqueue;
+		rvt_get_qp(qp);
+	}
+}
+
+/**
+ * __trigger_tid_waiter - trigger tid waiter
+ * @qp: the qp
+ *
+ * This is a private entrance to schedule the qp
+ * assuming the caller is holding the qp->s_lock.
+ */
+static void __trigger_tid_waiter(struct rvt_qp *qp)
+	__must_hold(&qp->s_lock)
+{
+	lockdep_assert_held(&qp->s_lock);
+	if (!(qp->s_flags & HFI1_S_WAIT_TID_SPACE))
+		return;
+	hfi1_schedule_send(qp);
+}
+
+/**
+ * tid_rdma_schedule_tid_wakeup - schedule wakeup for a qp
+ * @qp - the qp
+ *
+ * trigger a schedule or a waiting qp in a deadlock
+ * safe manner.  The qp reference is held prior
+ * to this call via first_qp().
+ *
+ * If the qp trigger was already scheduled (!rval)
+ * the the reference is dropped, otherwise the resume
+ * or the destroy cancel will dispatch the reference.
+ */
+static void tid_rdma_schedule_tid_wakeup(struct rvt_qp *qp)
+{
+	struct hfi1_qp_priv *priv;
+	struct hfi1_ibport *ibp;
+	struct hfi1_pportdata *ppd;
+	struct hfi1_devdata *dd;
+	bool rval;
+
+	if (!qp)
+		return;
+
+	priv = qp->priv;
+	ibp = to_iport(qp->ibqp.device, qp->port_num);
+	ppd = ppd_from_ibp(ibp);
+	dd = dd_from_ibdev(qp->ibqp.device);
+
+	rval = queue_work_on(priv->s_sde ?
+			     priv->s_sde->cpu :
+			     cpumask_first(cpumask_of_node(dd->node)),
+			     ppd->hfi1_wq,
+			     &priv->tid_rdma.trigger_work);
+	if (!rval)
+		rvt_put_qp(qp);
+}
+
+/**
  * tid_rdma_trigger_resume - field a trigger work request
  * @work - the work item
  *
@@ -327,6 +516,57 @@ void tid_rdma_conn_error(struct rvt_qp *qp)
  */
 static void tid_rdma_trigger_resume(struct work_struct *work)
 {
+	struct tid_rdma_qp_params *tr;
+	struct hfi1_qp_priv *priv;
+	struct rvt_qp *qp;
+
+	tr = container_of(work, struct tid_rdma_qp_params, trigger_work);
+	priv = container_of(tr, struct hfi1_qp_priv, tid_rdma);
+	qp = priv->owner;
+	spin_lock_irq(&qp->s_lock);
+	if (qp->s_flags & HFI1_S_WAIT_TID_SPACE) {
+		spin_unlock_irq(&qp->s_lock);
+		hfi1_do_send(priv->owner, true);
+	} else {
+		spin_unlock_irq(&qp->s_lock);
+	}
+	rvt_put_qp(qp);
+}
+
+/**
+ * tid_rdma_flush_wait - unwind any tid space wait
+ *
+ * This is called when resetting a qp to
+ * allow a destroy or reset to get rid
+ * of any tid space linkage and reference counts.
+ */
+static void _tid_rdma_flush_wait(struct rvt_qp *qp, struct tid_queue *queue)
+	__must_hold(&qp->s_lock)
+{
+	struct hfi1_qp_priv *priv;
+
+	if (!qp)
+		return;
+	lockdep_assert_held(&qp->s_lock);
+	priv = qp->priv;
+	qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE;
+	spin_lock(&priv->rcd->exp_lock);
+	if (!list_empty(&priv->tid_wait)) {
+		list_del_init(&priv->tid_wait);
+		qp->s_flags &= ~HFI1_S_WAIT_TID_SPACE;
+		queue->dequeue++;
+		rvt_put_qp(qp);
+	}
+	spin_unlock(&priv->rcd->exp_lock);
+}
+
+void tid_rdma_flush_wait(struct rvt_qp *qp)
+	__must_hold(&qp->s_lock)
+{
+	struct hfi1_qp_priv *priv = qp->priv;
+
+	_tid_rdma_flush_wait(qp, &priv->rcd->flow_queue);
+	_tid_rdma_flush_wait(qp, &priv->rcd->rarr_queue);
 }
 
 void hfi1_compute_tid_rdma_flow_wt(void)
@@ -429,6 +669,7 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd,
 {
 	struct hfi1_qp_priv *qpriv = (struct hfi1_qp_priv *)qp->priv;
 	struct tid_flow_state *fs = &qpriv->flow_state;
+	struct rvt_qp *fqp;
 	unsigned long flags;
 	int ret = 0;
 
@@ -437,6 +678,8 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd,
 		return ret;
 
 	spin_lock_irqsave(&rcd->exp_lock, flags);
+	if (kernel_tid_waiters(rcd, &rcd->flow_queue, qp))
+		goto queue;
 
 	ret = kern_reserve_flow(rcd, fs->last_index);
 	if (ret < 0)
@@ -450,10 +693,15 @@ static int hfi1_kern_setup_hw_flow(struct hfi1_ctxtdata *rcd,
 	fs->generation = kern_setup_hw_flow(rcd, fs->index);
 	fs->psn = 0;
 	fs->flags = 0;
+	dequeue_tid_waiter(rcd, &rcd->flow_queue, qp);
+	/* get head before dropping lock */
+	fqp = first_qp(rcd, &rcd->flow_queue);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
 
+	tid_rdma_schedule_tid_wakeup(fqp);
 	return 0;
 queue:
+	queue_qp_for_tid_wait(rcd, &rcd->flow_queue, qp);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
 	return -EAGAIN;
 }
@@ -462,6 +710,7 @@ void hfi1_kern_clear_hw_flow(struct hfi1_ctxtdata *rcd, struct rvt_qp *qp)
 {
 	struct hfi1_qp_priv *qpriv = (struct hfi1_qp_priv *)qp->priv;
 	struct tid_flow_state *fs = &qpriv->flow_state;
+	struct rvt_qp *fqp;
 	unsigned long flags;
 
 	if (fs->index >= RXE_NUM_TID_FLOWS)
@@ -473,7 +722,16 @@ void hfi1_kern_clear_hw_flow(struct hfi1_ctxtdata *rcd, struct rvt_qp *qp)
 	fs->psn = 0;
 	fs->generation = KERN_GENERATION_RESERVED;
 
+	/* get head before dropping lock */
+	fqp = first_qp(rcd, &rcd->flow_queue);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
+
+	if (fqp == qp) {
+		__trigger_tid_waiter(fqp);
+		rvt_put_qp(fqp);
+	} else {
+		tid_rdma_schedule_tid_wakeup(fqp);
+	}
 }
 
 void hfi1_kern_init_ctxt_generations(struct hfi1_ctxtdata *rcd)
@@ -1076,6 +1334,7 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req,
 	struct hfi1_ctxtdata *rcd = req->rcd;
 	struct hfi1_qp_priv *qpriv = req->qp->priv;
 	unsigned long flags;
+	struct rvt_qp *fqp;
 	u16 clear_tail = req->clear_tail;
 
 	lockdep_assert_held(&req->qp->s_lock);
@@ -1102,6 +1361,9 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req,
 	}
 
 	spin_lock_irqsave(&rcd->exp_lock, flags);
+	if (kernel_tid_waiters(rcd, &rcd->rarr_queue, flow->req->qp))
+		goto queue;
+
 	/*
 	 * At this point we know the number of pagesets and hence the number of
 	 * TID's to map the segment. Allocate the TID's from the TID groups. If
@@ -1132,11 +1394,16 @@ static int hfi1_kern_exp_rcv_setup(struct tid_rdma_request *req,
 		full_flow_psn(flow, flow->flow_state.spsn);
 	qpriv->flow_state.psn += flow->npkts;
 
+	dequeue_tid_waiter(rcd, &rcd->rarr_queue, flow->req->qp);
+	/* get head before dropping lock */
+	fqp = first_qp(rcd, &rcd->rarr_queue);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
+	tid_rdma_schedule_tid_wakeup(fqp);
 
 	req->setup_head = (req->setup_head + 1) & (req->n_max_flows - 1);
 	return 0;
 queue:
+	queue_qp_for_tid_wait(rcd, &rcd->rarr_queue, flow->req->qp);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
 	return -EAGAIN;
 }
@@ -1159,6 +1426,7 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req)
 	struct hfi1_ctxtdata *rcd = req->rcd;
 	unsigned long flags;
 	int i;
+	struct rvt_qp *fqp;
 
 	lockdep_assert_held(&req->qp->s_lock);
 	/* Exit if we have nothing in the flow circular buffer */
@@ -1171,6 +1439,8 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req)
 		kern_unprogram_rcv_group(flow, i);
 	/* To prevent double unprogramming */
 	flow->tnode_cnt = 0;
+	/* get head before dropping lock */
+	fqp = first_qp(rcd, &rcd->rarr_queue);
 	spin_unlock_irqrestore(&rcd->exp_lock, flags);
 
 	dma_unmap_flow(flow);
@@ -1178,6 +1448,13 @@ int hfi1_kern_exp_rcv_clear(struct tid_rdma_request *req)
 	hfi1_tid_rdma_reset_flow(flow);
 	req->clear_tail = (req->clear_tail + 1) & (req->n_max_flows - 1);
 
+	if (fqp == req->qp) {
+		__trigger_tid_waiter(fqp);
+		rvt_put_qp(fqp);
+	} else {
+		tid_rdma_schedule_tid_wakeup(fqp);
+	}
+
 	return 0;
 }
 

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux