Re: [PATCH 5/9] ipvs: use adaptive pause in master thread

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



	Hello,

On Thu, 5 Apr 2012, Pablo Neira Ayuso wrote:

> I think you can control when the kernel thread is woken up with a
> counting semaphore. The counter of that semaphore will be initially
> set to zero. Then, you can up() the semaphore once per new buffer
> that you enqueue to the sender.
> 
> feeder:
>         add message to sync buffer
>         if buffer full:
>                 enqueue buffer to sender_thread
>                 up(s)
> 
> sender_thread:
>         while (1) {
>                 down(s)
>                 retrieve message from queue
>                 send message
>         }
> 
> It seems to me like the classical producer/consumer problem that you
> can resolve with semaphores.

	May be it is possible to use up/down but we
have to handle the kthread_should_stop check and also
I prefer to reduce the wakeup events. So, I'm trying
another solution which is appended just for review.

> Under congestion the situation is complicated. At some point you'll
> end up dropping messages.
> 
> You may want to increase the socket queue to delay the moment at which
> we start dropping messages. You can expose the socke buffer length via
> /proc interface I guess (not sure if you're already doing that or
> suggesting to use the global socket buffer length).

	I'm still thinking if sndbuf value should be exported,
currently users have to modify the global default/max value.
But in below version I'm trying to handle the sndbuf overflow
by blocking for write_space event. By this way we should work
with any sndbuf configuration.

> You also can define some mechanism to reduce the amount of events,
> some state filtering so you only propagate important states.
> 
> Some partially reliable protocol, so the backup can request messages
> that got lost in a smart way would can also in handy. Basically, the
> master only retransmits the current state, not the whole sequence of
> messages (this is good under congestion, since you save messages).
> I implement that in conntrackd, but that's more complex solution,
> of course. I'd start with something simple.

	The patch "reduce sync rate with time thresholds"
that follows the discussed one in the changeset has such
purpose to reduce the events, in tests the sync traffic is
reduced ~10 times. But it does not modify the current
protocol, it adds a very limited logic for retransmissions.

	Here is the new version for review. It compiles
and does not crash in simple tests.

ipvs: wakeup master thread

	High rate of sync messages in master can lead to
overflowing the socket buffer and dropping the messages.
Fixed pause of 1 second is not suitable for loaded masters,
so allow packet processing to wakeup the master thread
once per 32 messages (IPVS_SYNC_WAKEUP_RATE), so that
we do not send in long bursts even when socket buffer is
very large.

	Add hard limit for the queued messages before sending
by using "sync_qlen_max" sysctl var. It defaults to 1/32 of
the memory pages but actually represents number of messages.
It will protect us from allocating large parts of memory
when the sending rate is lower than the queuing rate.

	Change the master thread to detect and block on
SNDBUF overflow, so that we do not drop messages when
the socket limit is low but the sync_qlen_max limit is
not reached. On ENOBUFS or other errors just drop the
messages.

	Use lower pause (200ms) to wait for messages, it
takes effect when the sync rate is low. It has two purposes,
to limit the delay for queued messages and to avoid
many wakeups when no messages are queued.

	Finally, make sure kthread_should_stop is checked
properly in TASK_INTERRUPTIBLE state when going to sleep
to avoid delays in thread stopping.

Signed-off-by: Julian Anastasov <ja@xxxxxx>
---
 include/net/ip_vs.h             |   15 ++++++++
 net/netfilter/ipvs/ip_vs_ctl.c  |    8 ++++
 net/netfilter/ipvs/ip_vs_sync.c |   74 +++++++++++++++++++++++++++++---------
 3 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 2bdee51..539f557 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -870,6 +870,7 @@ struct netns_ipvs {
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_qlen_max;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -890,6 +891,8 @@ struct netns_ipvs {
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
 	struct list_head	sync_queue;
+	int			sync_queue_len;
+	unsigned int		sync_queue_sent;
 	spinlock_t		sync_lock;
 	struct ip_vs_sync_buff  *sync_buff;
 	spinlock_t		sync_buff_lock;
@@ -912,6 +915,8 @@ struct netns_ipvs {
 #define DEFAULT_SYNC_THRESHOLD	3
 #define DEFAULT_SYNC_PERIOD	50
 #define DEFAULT_SYNC_VER	1
+#define IPVS_SYNC_WAKEUP_RATE	32
+#define IPVS_SYNC_QLEN_MAX	(IPVS_SYNC_WAKEUP_RATE * 4)
 
 #ifdef CONFIG_SYSCTL
 
@@ -930,6 +935,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_qlen_max;
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -947,6 +957,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return IPVS_SYNC_QLEN_MAX;
+}
+
 #endif
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 964d426..e3280ae 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,12 @@ static struct ctl_table vs_vars[] = {
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_qlen_max",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3662,6 +3668,8 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
+	tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 0e36679..b36f89b 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -312,6 +312,7 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
+		ipvs->sync_queue_len--;
 	}
 	spin_unlock_bh(&ipvs->sync_lock);
 
@@ -358,9 +359,13 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
 	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
 
 	spin_lock(&ipvs->sync_lock);
-	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+	if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+	    ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
 		list_add_tail(&sb->list, &ipvs->sync_queue);
-	else
+		ipvs->sync_queue_len++;
+		if (!((++ipvs->sync_queue_sent) & (IPVS_SYNC_WAKEUP_RATE-1)))
+			wake_up_process(ipvs->master_thread);
+	} else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
 }
@@ -405,10 +410,11 @@ void ip_vs_sync_switch_mode(struct net *net, int mode)
 		ipvs->sync_buff = NULL;
 	} else {
 		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		if (ipvs->sync_state & IP_VS_STATE_MASTER) {
 			list_add_tail(&ipvs->sync_buff->list,
 				      &ipvs->sync_queue);
-		else
+			ipvs->sync_queue_len++;
+		} else
 			ip_vs_sync_buff_release(ipvs->sync_buff);
 		spin_unlock_bh(&ipvs->sync_lock);
 	}
@@ -1392,18 +1398,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,32 +1438,58 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	return len;
 }
 
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+	struct sock *sk = tinfo->sock->sk;
 	struct ip_vs_sync_buff *sb;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
-	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+	for (;;) {
+		sb = next_sync_buff(ipvs);
+		if (!sb) {
+			set_current_state(TASK_INTERRUPTIBLE);
+			if (kthread_should_stop())
+				break;
+			schedule_timeout(HZ / 5);
+			continue;
 		}
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+retry:
+		if (unlikely(kthread_should_stop()))
+			break;
+		if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+			int ret = 0;
+
+			__wait_event_interruptible(*sk_sleep(sk),
+						   sock_writeable(sk) ||
+						   kthread_should_stop(),
+						   ret);
+			goto retry;
 		}
-
-		schedule_timeout_interruptible(HZ);
+		ip_vs_sync_buff_release(sb);
 	}
+	__set_current_state(TASK_RUNNING);
+
+	if (sb)
+		ip_vs_sync_buff_release(sb);
 
 	/* clean up the sync_buff queue */
 	while ((sb = sb_dequeue(ipvs)))
@@ -1538,6 +1574,8 @@ int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
 		realtask = &ipvs->master_thread;
 		name = "ipvs_master:%d";
 		threadfn = sync_thread_master;
+		ipvs->sync_queue_len = 0;
+		ipvs->sync_queue_sent = 0;
 		sock = make_send_sock(net);
 	} else if (state == IP_VS_STATE_BACKUP) {
 		if (ipvs->backup_thread)
-- 
1.7.3.4

--
To unsubscribe from this list: send the line "unsubscribe lvs-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Filesystem Devel]     [Linux NFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux SCSI]     [X.Org]

  Powered by Linux