[PATCH] Use unix socket for local multicast loop

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Instead of rely on multicast loop functionality of kernel, we now use
unix socket created by socketpair to deliver multicast messages to
local node. This handles problems with improperly configured local
firewall. So if output/input to/from ethernet interface is blocked, node
is still able to create single node membership.

Dark side of the patch is fact, that membership is always created, so
"Totem is unable to form a cluster..." will never appear (same applies
to continuous_gather key).

Signed-off-by: Jan Friesse <jfriesse@xxxxxxxxxx>
---
 exec/totemudp.c |  175 ++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 148 insertions(+), 27 deletions(-)

diff --git a/exec/totemudp.c b/exec/totemudp.c
index 371746d..e702a32 100644
--- a/exec/totemudp.c
+++ b/exec/totemudp.c
@@ -94,6 +94,13 @@ struct totemudp_socket {
 	int mcast_recv;
 	int mcast_send;
 	int token;
+	/*
+	 * Socket used for local multicast delivery. We don't rely on multicast
+	 * loop and rather this UNIX DGRAM socket is used. Socket is created by
+	 * socketpair call and they are used in same way as pipe (so [0] is read
+	 * end and [1] is write end)
+	 */
+	int local_mcast_loop[2];
 };
 
 struct totemudp_instance {
@@ -381,6 +388,20 @@ static inline void mcast_sendmsg (
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
 			"sendmsg(mcast) failed (non-critical)");
 	}
+
+	/*
+	 * Transmit multicast message to local unix mcast loop
+	 * An error here is recovered by totemsrp
+	 */
+	msg_mcast.msg_name = NULL;
+	msg_mcast.msg_namelen = 0;
+
+	res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast,
+		MSG_NOSIGNAL);
+	if (res < 0) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"sendmsg(local mcast loop) failed (non-critical)");
+	}
 }
 
 
@@ -398,6 +419,12 @@ int totemudp_finalize (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		qb_loop_poll_del (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 			instance->totemudp_sockets.token);
@@ -564,6 +591,12 @@ static void timer_function_netif_check_timeout (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		qb_loop_poll_del (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 			instance->totemudp_sockets.token);
@@ -611,6 +644,12 @@ static void timer_function_netif_check_timeout (
 	qb_loop_poll_add (
 		instance->totemudp_poll_handle,
 		QB_LOOP_MED,
+		instance->totemudp_sockets.local_mcast_loop[0],
+		POLLIN, instance, net_deliver_fn);
+
+	qb_loop_poll_add (
+		instance->totemudp_poll_handle,
+		QB_LOOP_MED,
 		instance->totemudp_sockets.token,
 		POLLIN, instance, net_deliver_fn);
 
@@ -685,6 +724,7 @@ static int totemudp_build_sockets_ip (
 	int res;
 	int flag;
 	uint8_t sflag;
+	int i;
 
 	/*
 	 * Create multicast recv socket
@@ -727,6 +767,27 @@ static int totemudp_build_sockets_ip (
 	}
 
 	/*
+	 * Create local multicast loop socket
+	 */
+	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+			"socket() failed");
+		return (-1);
+	}
+
+	for (i = 0; i < 2; i++) {
+		totemip_nosigpipe (sockets->local_mcast_loop[i]);
+		res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK);
+		if (res == -1) {
+			LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+				"Could not set non-blocking operation on multicast socket");
+			return (-1);
+		}
+	}
+
+
+
+	/*
 	 * Setup mcast send socket
 	 */
 	sockets->mcast_send = socket (bindnet_address->family, SOCK_DGRAM, 0);
@@ -808,12 +869,34 @@ static int totemudp_build_sockets_ip (
 	/*
 	 * Set buffer sizes to avoid overruns
 	 */
-	 res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
-	 res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
 
 	res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
 	if (res == 0) {
-	 	log_printf (instance->totemudp_log_level_debug,
+		log_printf (instance->totemudp_log_level_debug,
 			"Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
 	}
 
@@ -823,6 +906,19 @@ static int totemudp_build_sockets_ip (
 			"Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
 	}
 
+	res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
+	}
+
+	res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
+	}
+
+
 	/*
 	 * Join group membership on socket
 	 */
@@ -875,13 +971,13 @@ static int totemudp_build_sockets_ip (
 	}
 
 	/*
-	 * Turn on multicast loopback
+	 * Turn off multicast loopback
 	 */
 
-	flag = 1;
+	flag = 0;
 	switch ( bindnet_address->family ) {
 		case AF_INET:
-		sflag = 1;
+		sflag = 0;
 		res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
 			&sflag, sizeof (sflag));
 		break;
@@ -891,7 +987,7 @@ static int totemudp_build_sockets_ip (
 	}
 	if (res == -1) {
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
-			"Unable to turn on multicast loopback");
+			"Unable to turn off multicast loopback");
 		return (-1);
 	}
 
@@ -1126,18 +1222,30 @@ int totemudp_recv_flush (void *udp_context)
 	struct pollfd ufd;
 	int nfds;
 	int res = 0;
+	int i;
+	int sock;
 
 	instance->flushing = 1;
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-		net_deliver_fn (instance->totemudp_sockets.mcast_recv,
-			ufd.revents, instance);
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
+		}
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
 		}
-	} while (nfds == 1);
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+			net_deliver_fn (sock, ufd.revents, instance);
+			}
+		} while (nfds == 1);
+	}
 
 	instance->flushing = 0;
 
@@ -1251,6 +1359,8 @@ extern int totemudp_recv_mcast_empty (
 	struct pollfd ufd;
 	int nfds;
 	int msg_processed = 0;
+	int i;
+	int sock;
 
 	/*
 	 * Receive datagram
@@ -1275,19 +1385,30 @@ extern int totemudp_recv_mcast_empty (
 	msg_recv.msg_accrightslen = 0;
 #endif
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-			res = recvmsg (instance->totemudp_sockets.mcast_recv, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
-			if (res != -1) {
-				msg_processed = 1;
-			} else {
-				msg_processed = -1;
-			}
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
+		}
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
 		}
-	} while (nfds == 1);
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+				res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
+				if (res != -1) {
+					msg_processed = 1;
+				} else {
+					msg_processed = -1;
+				}
+			}
+		} while (nfds == 1);
+	}
 
 	return (msg_processed);
 }
-- 
1.7.1

_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss


[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux