[patch 080/102] ipc/msg: batch queue sender wakeups

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Davidlohr Bueso <dave@xxxxxxxxxxxx>
Subject: ipc/msg: batch queue sender wakeups

Currently the use of wake_qs in sysv msg queues are only for the receiver
tasks that are blocked on the queue.  But blocked sender tasks (due to
queue size constraints) still are awoken with the ipc object lock held,
which can be a problem particularly for small sized queues and far from
gracious for -rt (just like it was for the receiver side).

The paths that actually wakeup a sender are obviously related to when we
are either getting rid of the queue or after (some) space is freed-up
after a receiver takes the msg (msgrcv).  Furthermore, with the exception
of msgrcv, we can always piggy-back on expunge_all that has its own tasks
lined-up for waking.  Finally, upon unlinking the message, it should be no
problem delaying the wakeups a bit until after we've released the lock.

Link: http://lkml.kernel.org/r/1469748819-19484-3-git-send-email-dave@xxxxxxxxxxxx
Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
Acked-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
Cc: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
Cc: Sebastian Andrzej Siewior <bigeasy@xxxxxxxxxxxxx>
Signed-off-by: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx>
---

 ipc/msg.c |   30 ++++++++++++++++++++----------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff -puN ipc/msg.c~ipc-msg-batch-queue-sender-wakeups ipc/msg.c
--- a/ipc/msg.c~ipc-msg-batch-queue-sender-wakeups
+++ a/ipc/msg.c
@@ -166,14 +166,15 @@ static inline void ss_del(struct msg_sen
 		list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h, int kill)
+static void ss_wakeup(struct list_head *h,
+		      struct wake_q_head *wake_q, int kill)
 {
 	struct msg_sender *mss, *t;
 
 	list_for_each_entry_safe(mss, t, h, list) {
 		if (kill)
 			mss->list.next = NULL;
-		wake_up_process(mss->tsk);
+		wake_q_add(wake_q, mss->tsk);
 	}
 }
 
@@ -203,7 +204,7 @@ static void freeque(struct ipc_namespace
 	WAKE_Q(wake_q);
 
 	expunge_all(msq, -EIDRM, &wake_q);
-	ss_wakeup(&msq->q_senders, 1);
+	ss_wakeup(&msq->q_senders, &wake_q, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
 	wake_up_q(&wake_q);
@@ -331,7 +332,6 @@ static int msgctl_down(struct ipc_namesp
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
-	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -362,6 +362,9 @@ static int msgctl_down(struct ipc_namesp
 		freeque(ns, ipcp);
 		goto out_up;
 	case IPC_SET:
+	{
+		WAKE_Q(wake_q);
+
 		if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
 		    !capable(CAP_SYS_RESOURCE)) {
 			err = -EPERM;
@@ -376,15 +379,21 @@ static int msgctl_down(struct ipc_namesp
 		msq->q_qbytes = msqid64.msg_qbytes;
 
 		msq->q_ctime = get_seconds();
-		/* sleeping receivers might be excluded by
+		/*
+		 * Sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
 		expunge_all(msq, -EAGAIN, &wake_q);
-		/* sleeping senders might be able to send
+		/*
+		 * Sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
-		ss_wakeup(&msq->q_senders, 0);
-		break;
+		ss_wakeup(&msq->q_senders, &wake_q, 0);
+		ipc_unlock_object(&msq->q_perm);
+		wake_up_q(&wake_q);
+
+		goto out_unlock1;
+	}
 	default:
 		err = -EINVAL;
 		goto out_unlock1;
@@ -392,7 +401,6 @@ static int msgctl_down(struct ipc_namesp
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
-	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -809,6 +817,7 @@ long do_msgrcv(int msqid, void __user *b
 	struct msg_queue *msq;
 	struct ipc_namespace *ns;
 	struct msg_msg *msg, *copy = NULL;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -873,7 +882,7 @@ long do_msgrcv(int msqid, void __user *b
 			msq->q_cbytes -= msg->m_ts;
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
-			ss_wakeup(&msq->q_senders, 0);
+			ss_wakeup(&msq->q_senders, &wake_q, 0);
 
 			goto out_unlock0;
 		}
@@ -945,6 +954,7 @@ long do_msgrcv(int msqid, void __user *b
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (IS_ERR(msg)) {
_
--
To unsubscribe from this list: send the line "unsubscribe mm-commits" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Kernel Archive]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]
  Powered by Linux