On Thu, Feb 17, 2022 at 06:35:44PM -0600, Bob Pearson wrote: > Replace spinlock with rcu read locks for read side operations > on mca in rxe_recv.c and rxe_mcast.c. > > Signed-off-by: Bob Pearson <rpearsonhpe@xxxxxxxxx> > drivers/infiniband/sw/rxe/rxe_mcast.c | 97 +++++++++++++++++---------- > drivers/infiniband/sw/rxe/rxe_recv.c | 6 +- > drivers/infiniband/sw/rxe/rxe_verbs.h | 2 + > 3 files changed, 67 insertions(+), 38 deletions(-) > > diff --git a/drivers/infiniband/sw/rxe/rxe_mcast.c b/drivers/infiniband/sw/rxe/rxe_mcast.c > index 349a6fac2fcc..2bfec3748e1e 100644 > +++ b/drivers/infiniband/sw/rxe/rxe_mcast.c > @@ -17,6 +17,12 @@ > * mca is created. It holds a pointer to the qp and is added to a list > * of qp's that are attached to the mcg. The qp_list is used to replicate > * mcast packets in the rxe receive path. > + * > + * The highest performance operations are mca list traversal when > + * processing incoming multicast packets which need to be fanned out > + * to the attached qp's. This list is protected by RCU locking for read > + * operations and a spinlock in the rxe_dev struct for write operations. > + * The red-black tree is protected by the same spinlock. > */ > > #include "rxe.h" > @@ -288,7 +294,7 @@ static void rxe_destroy_mcg(struct rxe_mcg *mcg) > } > > /** > - * __rxe_init_mca - initialize a new mca holding lock > + * __rxe_init_mca_rcu - initialize a new mca holding lock > * @qp: qp object > * @mcg: mcg object > * @mca: empty space for new mca > @@ -298,8 +304,8 @@ static void rxe_destroy_mcg(struct rxe_mcg *mcg) > * > * Returns: 0 on success else an error > */ > -static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg, > - struct rxe_mca *mca) > +static int __rxe_init_mca_rcu(struct rxe_qp *qp, struct rxe_mcg *mcg, > + struct rxe_mca *mca) > { This isn't really 'rcu', it still has to hold the write side lock > struct rxe_dev *rxe = to_rdev(qp->ibqp.device); > int n; > @@ -322,7 +328,10 @@ static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg, > rxe_add_ref(qp); > mca->qp = qp; > > - list_add_tail(&mca->qp_list, &mcg->qp_list); > + kref_get(&mcg->ref_cnt); > + mca->mcg = mcg; > + > + list_add_tail_rcu(&mca->qp_list, &mcg->qp_list); list_add_tail gets to be called rcu because it is slower than the non-rcu safe version. > -static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg) > +static void __rxe_destroy_mca(struct rcu_head *head) > { > - list_del(&mca->qp_list); > + struct rxe_mca *mca = container_of(head, typeof(*mca), rcu); > + struct rxe_mcg *mcg = mca->mcg; > > atomic_dec(&mcg->qp_num); > atomic_dec(&mcg->rxe->mcg_attach); > @@ -394,6 +404,19 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg) > kfree(mca); > +/** > + * __rxe_cleanup_mca_rcu - cleanup mca object holding lock > + * @mca: mca object > + * @mcg: mcg object > + * > + * Context: caller must hold a reference to mcg and rxe->mcg_lock > + */ > +static void __rxe_cleanup_mca_rcu(struct rxe_mca *mca, struct rxe_mcg *mcg) > +{ > + list_del_rcu(&mca->qp_list); > + call_rcu(&mca->rcu, __rxe_destroy_mca); > +} I think this is OK now.. > + > /** > * rxe_detach_mcg - detach qp from mcg > * @mcg: mcg object > @@ -404,31 +427,35 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg) > static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp) > { > struct rxe_dev *rxe = mcg->rxe; > - struct rxe_mca *mca, *tmp; > - unsigned long flags; > + struct rxe_mca *mca; > + int ret; > > - spin_lock_irqsave(&rxe->mcg_lock, flags); > - list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) { > - if (mca->qp == qp) { > - __rxe_cleanup_mca(mca, mcg); > - > - /* if the number of qp's attached to the > - * mcast group falls to zero go ahead and > - * tear it down. This will not free the > - * object since we are still holding a ref > - * from the caller > - */ > - if (atomic_read(&mcg->qp_num) <= 0) > - __rxe_destroy_mcg(mcg); > - > - spin_unlock_irqrestore(&rxe->mcg_lock, flags); > - return 0; > - } > + spin_lock_bh(&rxe->mcg_lock); > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { > + if (mca->qp == qp) > + goto found; > } > > /* we didn't find the qp on the list */ > - spin_unlock_irqrestore(&rxe->mcg_lock, flags); > - return -EINVAL; > + ret = -EINVAL; > + goto done; > + > +found: > + __rxe_cleanup_mca_rcu(mca, mcg); > + > + /* if the number of qp's attached to the > + * mcast group falls to zero go ahead and > + * tear it down. This will not free the > + * object since we are still holding a ref > + * from the caller > + */ Fix the word wrap Would prefer to avoid the found label in the middle of the code. > + rcu_read_lock(); > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { > /* protect the qp pointers in the list */ > rxe_add_ref(mca->qp); The other approach you could take is to als kref_free the qp and allow its kref to become zero here. But this is fine, I think. Jason