RE: [PATCH] dmaengine: idxd: fix submission race window

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Dave,

> Konstantin observed that when descriptors are submitted, the descriptor is
> added to the pending list after the submission. This creates a race window
> with the slight possibility that the descriptor can complete before it
> gets added to the pending list and this window would cause the completion
> handler to miss processing the descriptor.
> 
> To address the issue, the addition of the descriptor to the pending list
> must be done before it gets submitted to the hardware. However, submitting
> to swq with ENQCMDS instruction can cause a failure with the condition of
> either wq is full or wq is not "active".
> 
> With the descriptor allocation being the gate to the wq capacity, it is not
> possible to hit a retry with ENQCMDS submission to the swq. The only
> possible failure can happen is when wq is no longer "active" due to hw
> error and therefore we are moving towards taking down the portal. Given
> this is a rare condition and there's no longer concern over I/O
> performance, the driver can walk the completion lists in order to retrieve
> and abort the descriptor.
> 
> The error path will set the descriptor to aborted status. It will take the
> work list lock to prevent further processing of worklist. It will do a
> delete_all on the pending llist to retrieve all descriptors on the pending
> llist. The delete_all action does not require a lock. It will walk through
> the acquired llist to find the aborted descriptor while add all remaining
> descriptors to the work list since it holds the lock. If it does not find
> the aborted descriptor on the llist, it will walk through the work
> list. And if it still does not find the descriptor, then it means the
> interrupt handler has removed the desc from the llist but is pending on the work
> list lock and will process it once the error path releases the lock.
> 
> Fixes: eb15e7154fbf ("dmaengine: idxd: add interrupt handle request and release support")
> Reported-by: Konstantin Ananyev <konstantin.ananyev@xxxxxxxxx>
> Signed-off-by: Dave Jiang <dave.jiang@xxxxxxxxx>
> ---
>  drivers/dma/idxd/idxd.h   |   14 ++++++++
>  drivers/dma/idxd/irq.c    |   36 +++++++++++----------
>  drivers/dma/idxd/submit.c |   77 ++++++++++++++++++++++++++++++++++++++++-----
>  3 files changed, 101 insertions(+), 26 deletions(-)
> 
> diff --git a/drivers/dma/idxd/idxd.h b/drivers/dma/idxd/idxd.h
> index 1f0991dec679..0f27374eae4b 100644
> --- a/drivers/dma/idxd/idxd.h
> +++ b/drivers/dma/idxd/idxd.h
> @@ -294,6 +294,14 @@ struct idxd_desc {
>  	struct idxd_wq *wq;
>  };
> 
> +/*
> + * This is software defined error for the completion status. We overload the error code
> + * that will never appear in completion status and only SWERR register.
> + */
> +enum idxd_completion_status {
> +	IDXD_COMP_DESC_ABORT = 0xff,
> +};
> +
>  #define confdev_to_idxd(dev) container_of(dev, struct idxd_device, conf_dev)
>  #define confdev_to_wq(dev) container_of(dev, struct idxd_wq, conf_dev)
> 
> @@ -480,4 +488,10 @@ static inline void perfmon_init(void) {}
>  static inline void perfmon_exit(void) {}
>  #endif
> 
> +static inline void complete_desc(struct idxd_desc *desc, enum idxd_complete_type reason)
> +{
> +	idxd_dma_complete_txd(desc, reason);
> +	idxd_free_desc(desc->wq, desc);
> +}
> +
>  #endif
> diff --git a/drivers/dma/idxd/irq.c b/drivers/dma/idxd/irq.c
> index 7a2cf0512501..8fd52937537a 100644
> --- a/drivers/dma/idxd/irq.c
> +++ b/drivers/dma/idxd/irq.c
> @@ -245,12 +245,6 @@ static inline bool match_fault(struct idxd_desc *desc, u64 fault_addr)
>  	return false;
>  }
> 
> -static inline void complete_desc(struct idxd_desc *desc, enum idxd_complete_type reason)
> -{
> -	idxd_dma_complete_txd(desc, reason);
> -	idxd_free_desc(desc->wq, desc);
> -}
> -
>  static int irq_process_pending_llist(struct idxd_irq_entry *irq_entry,
>  				     enum irq_work_type wtype,
>  				     int *processed, u64 data)
> @@ -258,8 +252,7 @@ static int irq_process_pending_llist(struct idxd_irq_entry *irq_entry,
>  	struct idxd_desc *desc, *t;
>  	struct llist_node *head;
>  	int queued = 0;
> -	unsigned long flags;
> -	enum idxd_complete_type reason;
> +	enum idxd_complete_type reason = IDXD_COMPLETE_NORMAL;
> 
>  	*processed = 0;
>  	head = llist_del_all(&irq_entry->pending_llist);
> @@ -272,16 +265,20 @@ static int irq_process_pending_llist(struct idxd_irq_entry *irq_entry,
>  		reason = IDXD_COMPLETE_DEV_FAIL;
> 
>  	llist_for_each_entry_safe(desc, t, head, llnode) {
> -		if (desc->completion->status) {
> -			if ((desc->completion->status & DSA_COMP_STATUS_MASK) != DSA_COMP_SUCCESS)
> +		u8 status = desc->completion->status & DSA_COMP_STATUS_MASK;
> +
> +		if (status) {
> +			if (status == IDXD_COMP_DESC_ABORT)
> +				reason = IDXD_COMPLETE_ABORT;
> +			else if (status != DSA_COMP_SUCCESS)
>  				match_fault(desc, data);
>  			complete_desc(desc, reason);

reason was overwritten above for one particular desc, but not restored.
So all futher descs will be completed with = IDXD_COMPLETE_ABORT.
I think we need either to set it up on each iteration, or:
if (status == IDXD_COMP_DESC_ABORT)
      complete_desc(desc, IDXD_COMPLETE_ABORT);
else {
if (status != DSA_COMP_SUCCESS)
    match_fault(desc, data);
    complete_desc(desc, reason);
}
   

>  			(*processed)++;
>  		} else {
> -			spin_lock_irqsave(&irq_entry->list_lock, flags);
> +			spin_lock_bh(&irq_entry->list_lock);
>  			list_add_tail(&desc->list,
>  				      &irq_entry->work_list);
> -			spin_unlock_irqrestore(&irq_entry->list_lock, flags);
> +			spin_unlock_bh(&irq_entry->list_lock);
>  			queued++;
>  		}
>  	}
> @@ -295,10 +292,9 @@ static int irq_process_work_list(struct idxd_irq_entry *irq_entry,
>  				 int *processed, u64 data)
>  {
>  	int queued = 0;
> -	unsigned long flags;
>  	LIST_HEAD(flist);
>  	struct idxd_desc *desc, *n;
> -	enum idxd_complete_type reason;
> +	enum idxd_complete_type reason = IDXD_COMPLETE_NORMAL;
> 
>  	*processed = 0;
>  	if (wtype == IRQ_WORK_NORMAL)
> @@ -310,9 +306,9 @@ static int irq_process_work_list(struct idxd_irq_entry *irq_entry,
>  	 * This lock protects list corruption from access of list outside of the irq handler
>  	 * thread.
>  	 */
> -	spin_lock_irqsave(&irq_entry->list_lock, flags);
> +	spin_lock_bh(&irq_entry->list_lock);

I am not really an expert in kenel sipn lock flavours...
Can you explain to me why we can now replace _irqsave_ with _bh_?

>  	if (list_empty(&irq_entry->work_list)) {
> -		spin_unlock_irqrestore(&irq_entry->list_lock, flags);
> +		spin_unlock_bh(&irq_entry->list_lock);
>  		return 0;
>  	}
> 
> @@ -326,10 +322,14 @@ static int irq_process_work_list(struct idxd_irq_entry *irq_entry,
>  		}
>  	}
> 
> -	spin_unlock_irqrestore(&irq_entry->list_lock, flags);
> +	spin_unlock_bh(&irq_entry->list_lock);
> 
>  	list_for_each_entry(desc, &flist, list) {
> -		if ((desc->completion->status & DSA_COMP_STATUS_MASK) != DSA_COMP_SUCCESS)
> +		u8 status = desc->completion->status & DSA_COMP_STATUS_MASK;
> +
> +		if (desc->completion->status == IDXD_COMP_DESC_ABORT)
> +			reason = IDXD_COMPLETE_ABORT;
> +		else if (status != DSA_COMP_SUCCESS)
>  			match_fault(desc, data);
>  		complete_desc(desc, reason);

Same comment as above for pending list.

>  	}
> diff --git a/drivers/dma/idxd/submit.c b/drivers/dma/idxd/submit.c
> index 736def129fa8..d99a3aeb038c 100644
> --- a/drivers/dma/idxd/submit.c
> +++ b/drivers/dma/idxd/submit.c
> @@ -88,9 +88,66 @@ void idxd_free_desc(struct idxd_wq *wq, struct idxd_desc *desc)
>  	sbitmap_queue_clear(&wq->sbq, desc->id, cpu);
>  }
> 
> +static void list_abort_desc(struct idxd_wq *wq, struct idxd_irq_entry *ie,
> +			    struct idxd_desc *desc)
> +{
> +	struct idxd_desc *d, *n;
> +
> +	lockdep_assert_held(&ie->list_lock);
> +	list_for_each_entry_safe(d, n, &ie->work_list, list) {
> +		if (d == desc) {
> +			list_del(&d->list);
> +			complete_desc(desc, IDXD_COMPLETE_ABORT);
> +			return;
> +		}
> +	}
> +
> +	/*
> +	 * At this point, the desc needs to be aborted is held by the completion
> +	 * handler where it has taken it off the pending list but has not added to the
> +	 * work list. It will be cleaned up by the interrupt handler when it sees the
> +	 * IDXD_COMP_DESC_ABORT for completion status.
> +	 */
> +}
> +
> +static void llist_abort_desc(struct idxd_wq *wq, struct idxd_irq_entry *ie,
> +			     struct idxd_desc *desc)
> +{
> +	struct idxd_desc *d, *t;
> +	struct llist_node *head;
> +	bool found = false;
> +
> +	desc->completion->status = IDXD_COMP_DESC_ABORT;
> +	/*
> +	 * Grab the list lock so it will block the irq thread handler. This allows the
> +	 * abort code to locate the descriptor need to be aborted.
> +	 */
> +	spin_lock_bh(&ie->list_lock);
> +	head = llist_del_all(&ie->pending_llist);
> +	if (!head) {
> +		list_abort_desc(wq, ie, desc);
> +		spin_unlock_bh(&ie->list_lock);
> +		return;
> +	}
> +
> +	llist_for_each_entry_safe(d, t, head, llnode) {
> +		if (d == desc) {
> +			complete_desc(desc, IDXD_COMPLETE_ABORT);
> +			found = true;
> +			continue;
> +		}
> +		list_add_tail(&desc->list, &ie->work_list);
> +	}
> +
> +	if (!found)
> +		list_abort_desc(wq, ie, desc);
> +	spin_unlock_bh(&ie->list_lock);
> +}

There is one thing that worries me in that approach:
>From what I see, right now idxd calls complete_desc() without list_lock held.
I think it is better to keep things like that.
I suppose it should be easy enough to re-arrange your code a bit:
1) make list_abort_desc() not to call complete_desc(), but simply
return desc if found or null otherwise.
2) In llist_abort_desc() - grab the lock and try to find the desc in one of the lists.
    If found in the work_list, then remove it from the llist, but don't call complete() yet.
    Then release the lock and do complete().

Something like that, to be more specific:

 static void llist_abort_desc(struct idxd_wq *wq, struct idxd_irq_entry *ie,
			     struct idxd_desc *desc)
{
              struct idxd_desc *d, *found, *t;
              struct llist_node *head;
	
              desc->completion->status = IDXD_COMP_DESC_ABORT;
              found = NULL;

               spin_lock_bh(&ie->list_lock);
	head = llist_del_all(&ie->pending_llist);
	if (head != NULL) {
                    llist_for_each_entry_safe(d, t, head, llnode) {
	           if (d == desc) {
                              found = desc;
                              continue;
                        }
                   list_add_tail(&desc->list, &ie->work_list);
            }

            If (found == NULL)
                 found = list_abort_desc(wq, ie, desc);

            spin_unlock_bh(&ie->list_lock);

            if (found != NULL)
                 complete_desc(found, IDXD_COMPLETE_ABORT);
}
 

> +
>  int idxd_submit_desc(struct idxd_wq *wq, struct idxd_desc *desc)
>  {
>  	struct idxd_device *idxd = wq->idxd;
> +	struct idxd_irq_entry *ie = NULL;
>  	void __iomem *portal;
>  	int rc;
> 
> @@ -108,6 +165,16 @@ int idxd_submit_desc(struct idxd_wq *wq, struct idxd_desc *desc)
>  	 * even on UP because the recipient is a device.
>  	 */
>  	wmb();
> +
> +	/*
> +	 * Pending the descriptor to the lockless list for the irq_entry
> +	 * that we designated the descriptor to.
> +	 */
> +	if (desc->hw->flags & IDXD_OP_FLAG_RCI) {
> +		ie = &idxd->irq_entries[desc->vector];
> +		llist_add(&desc->llnode, &ie->pending_llist);
> +	}
> +
>  	if (wq_dedicated(wq)) {
>  		iosubmit_cmds512(portal, desc->hw, 1);
>  	} else {
> @@ -120,18 +187,12 @@ int idxd_submit_desc(struct idxd_wq *wq, struct idxd_desc *desc)
>  		rc = enqcmds(portal, desc->hw);
>  		if (rc < 0) {
>  			percpu_ref_put(&wq->wq_active);
> +			if (ie)
> +				llist_abort_desc(wq, ie, desc);
>  			return rc;
>  		}
>  	}
> 
>  	percpu_ref_put(&wq->wq_active);
> -
> -	/*
> -	 * Pending the descriptor to the lockless list for the irq_entry
> -	 * that we designated the descriptor to.
> -	 */
> -	if (desc->hw->flags & IDXD_OP_FLAG_RCI)
> -		llist_add(&desc->llnode, &idxd->irq_entries[desc->vector].pending_llist);
> -
>  	return 0;
>  }
> 





[Index of Archives]     [Linux Kernel]     [Linux ARM (vger)]     [Linux ARM MSM]     [Linux Omap]     [Linux Arm]     [Linux Tegra]     [Fedora ARM]     [Linux for Samsung SOC]     [eCos]     [Linux PCI]     [Linux Fastboot]     [Gcc Help]     [Git]     [DCCP]     [IETF Announce]     [Security]     [Linux MIPS]     [Yosemite Campsites]

  Powered by Linux