On Thu, Jan 27, 2022 at 03:37:47PM -0600, Bob Pearson wrote: > /** > - * __rxe_init_mca - initialize a new mca holding lock > + * __rxe_init_mca_rcu - initialize a new mca holding lock > * @qp: qp object > * @mcg: mcg object > * @mca: empty space for new mca > @@ -280,7 +281,7 @@ void rxe_cleanup_mcg(struct kref *kref) > * > * Returns: 0 on success else an error > */ > -static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg, > +static int __rxe_init_mca_rcu(struct rxe_qp *qp, struct rxe_mcg *mcg, > struct rxe_mca *mca) There is nothing "rcu" about this function.. > @@ -324,14 +325,14 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp) > int err; > > /* check to see if the qp is already a member of the group */ > - spin_lock_bh(&rxe->mcg_lock); > - list_for_each_entry(mca, &mcg->qp_list, qp_list) { > + rcu_read_lock(); > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { > if (mca->qp == qp) { > - spin_unlock_bh(&rxe->mcg_lock); > + rcu_read_unlock(); > return 0; > } > } > - spin_unlock_bh(&rxe->mcg_lock); > + rcu_read_unlock(); Ok.. > @@ -340,16 +341,19 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp) > > spin_lock_bh(&rxe->mcg_lock); > /* re-check to see if someone else just attached qp */ > - list_for_each_entry(mca, &mcg->qp_list, qp_list) { > + rcu_read_lock(); Do not hold the RCU if you are holding the write side spinlock. All mutations o fthe list must hold mcg_lock. > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { > if (mca->qp == qp) { > + rcu_read_unlock(); > kfree(new_mca); > err = 0; > goto done; > } > } > + rcu_read_unlock(); > > mca = new_mca; > - err = __rxe_init_mca(qp, mcg, mca); > + err = __rxe_init_mca_rcu(qp, mcg, mca); > if (err) > kfree(mca); Which looks since the list_add is still inside the spinlock > done: > @@ -359,21 +363,23 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp) > } > > /** > - * __rxe_cleanup_mca - cleanup mca object holding lock > + * __rxe_cleanup_mca_rcu - cleanup mca object holding lock > * @mca: mca object > * @mcg: mcg object > * > * Context: caller must hold a reference to mcg and rxe->mcg_lock > */ > -static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg) > +static void __rxe_cleanup_mca_rcu(struct rxe_mca *mca, struct rxe_mcg *mcg) Also not rcu, list_del must hold the write side spinlock. > { > - list_del(&mca->qp_list); > + list_del_rcu(&mca->qp_list); > > atomic_dec(&mcg->qp_num); > atomic_dec(&mcg->rxe->mcg_attach); > atomic_dec(&mca->qp->mcg_num); > > rxe_drop_ref(mca->qp); > + > + kfree_rcu(mca, rcu); OK > } > > /** > @@ -386,22 +392,29 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg) > static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp) > { > struct rxe_dev *rxe = mcg->rxe; > - struct rxe_mca *mca, *tmp; > + struct rxe_mca *mca; > + int ret; > > spin_lock_bh(&rxe->mcg_lock); > - list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) { > + rcu_read_lock(); > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { As before, don't hold the rcu when holding the write side lock > if (mca->qp == qp) { > - __rxe_cleanup_mca(mca, mcg); > - if (atomic_read(&mcg->qp_num) <= 0) > - kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg); > - spin_unlock_bh(&rxe->mcg_lock); > - kfree(mca); > - return 0; > + rcu_read_unlock(); > + goto found; > } > } > + rcu_read_unlock(); > + ret = -EINVAL; > + goto done; > +found: > + __rxe_cleanup_mca_rcu(mca, mcg); > + if (atomic_read(&mcg->qp_num) <= 0) > + kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg); This is confusing, why an atomic and a refcount with an atomic? Isn't qpnum == 0 the same as list_empty(qp_list) ? > diff --git a/drivers/infiniband/sw/rxe/rxe_recv.c b/drivers/infiniband/sw/rxe/rxe_recv.c > index 357a6cea1484..7f2ea61a52c1 100644 > +++ b/drivers/infiniband/sw/rxe/rxe_recv.c > @@ -267,13 +267,13 @@ static void rxe_rcv_mcast_pkt(struct sk_buff *skb) > qp_array = kmalloc_array(nmax, sizeof(qp), GFP_KERNEL); > > n = 0; > - spin_lock_bh(&rxe->mcg_lock); > - list_for_each_entry(mca, &mcg->qp_list, qp_list) { > + rcu_read_lock(); > + list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) { > qp_array[n++] = mca->qp; > if (n == nmax) > break; > } > - spin_unlock_bh(&rxe->mcg_lock); > + rcu_read_unlock(); > kref_put(&mcg->ref_cnt, rxe_cleanup_mcg); I have no idea how this works, what keeps 'qp' valid and prevents it from being free'd once we leave the locking? Remember the mca can be in concurrent progress to free so qp is just garbage under RCU at this point. Jason