Currently a parallel call of dlm_add_cb() can happen either from the DLM API call or from the DLM message receive context. In this case a rwlock could have a benefit when both context running into the same critical section at the same time. In future more parallel message receive context can happen at the same time so that this conversion of a per lockspace lock to a rwlock is more useful. In far future the whole delayed callbacks might not be necessary when the synchronization happens in a different way than it is now done. Signed-off-by: Alexander Aring <aahringo@xxxxxxxxxx> --- fs/dlm/ast.c | 21 +++++++++++++++------ fs/dlm/dlm_internal.h | 2 +- fs/dlm/lockspace.c | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 742b30b61c19..ce8f1f5dfa0c 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -178,11 +178,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL)) return; - spin_lock_bh(&ls->ls_cb_lock); +retry: + read_lock_bh(&ls->ls_cb_lock); if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + read_unlock_bh(&ls->ls_cb_lock); + write_lock_bh(&ls->ls_cb_lock); + if (!test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + write_unlock_bh(&ls->ls_cb_lock); + goto retry; + } + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); if (!rv) list_add(&cb->list, &ls->ls_cb_delay); + write_unlock_bh(&ls->ls_cb_lock); } else { if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) { dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags, @@ -195,8 +204,8 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, if (!rv) queue_work(ls->ls_callback_wq, &cb->work); } + read_unlock_bh(&ls->ls_cb_lock); } - spin_unlock_bh(&ls->ls_cb_lock); } int dlm_callback_start(struct dlm_ls *ls) @@ -225,9 +234,9 @@ void dlm_callback_suspend(struct dlm_ls *ls) if (!test_bit(LSFL_FS, &ls->ls_flags)) return; - spin_lock_bh(&ls->ls_cb_lock); + write_lock_bh(&ls->ls_cb_lock); set_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock_bh(&ls->ls_cb_lock); + write_unlock_bh(&ls->ls_cb_lock); if (ls->ls_callback_wq) flush_workqueue(ls->ls_callback_wq); @@ -245,7 +254,7 @@ void dlm_callback_resume(struct dlm_ls *ls) return; more: - spin_lock_bh(&ls->ls_cb_lock); + write_lock_bh(&ls->ls_cb_lock); list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { list_del(&cb->list); if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) @@ -260,7 +269,7 @@ void dlm_callback_resume(struct dlm_ls *ls) empty = list_empty(&ls->ls_cb_delay); if (empty) clear_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock_bh(&ls->ls_cb_lock); + write_unlock_bh(&ls->ls_cb_lock); sum += count; if (!empty) { diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index e299d8d4d971..5a7fbfec26fb 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -653,7 +653,7 @@ struct dlm_ls { /* recovery related */ - spinlock_t ls_cb_lock; + rwlock_t ls_cb_lock; struct list_head ls_cb_delay; /* save for queue_work later */ struct task_struct *ls_recoverd_task; struct mutex ls_recoverd_active; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 5b3a4c32ac99..f6918f366faa 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -449,7 +449,7 @@ static int new_lockspace(const char *name, const char *cluster, init_completion(&ls->ls_recovery_done); ls->ls_recovery_result = -1; - spin_lock_init(&ls->ls_cb_lock); + rwlock_init(&ls->ls_cb_lock); INIT_LIST_HEAD(&ls->ls_cb_delay); ls->ls_recoverd_task = NULL; -- 2.43.0