[PATCH dlm/next 12/12] fs: dlm: add send ack threshold and append acks to msgs

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch changes the time when we sending an ack back to tell the
other side it can free some message because it is arrived on the
receiver node, due random reconnects e.g. TCP resets this is handled as
well on application layer to not let DLM run into a deadlock state.

The current handling has the following problems:

1. We end in situations that we only send an ack back message of 16
   bytes out and no other messages. Whereas DLM has logic to combine
   so much messages as it can in one send() socket call. This behaviour
   can be discovered by "trace-cmd start -e dlm_recv" and observing the
   ret field being 16 bytes.

2. When processing of DLM messages will never end because we receive a
   lot of messages, we will not send an ack back as it happens when
   the processing loop ends.

This patch introduces a likely and unlikely threshold case. The likely
case will send an ack back on a transmit path if the threshold is
triggered of amount of processed upper layer protocol. This will solve
issue 1 because it will be send when another normal DLM message will be
sent. It solves issue 2 because it is not part of the processing loop.

There is however a unlikely case, the unlikely case has a bigger
threshold and will be triggered when we only receive messages and do not
sent any message back. This case avoids that the sending node will keep
a lot of message for a long time as we send sometimes ack backs to tell
the sender to finally release messages.

The atomic cmpxchg() is there to provide a atomically ack send with
reset of the upper layer protocol delivery counter.

Signed-off-by: Alexander Aring <aahringo@xxxxxxxxxx>
---
 fs/dlm/lowcomms.c | 30 -------------------
 fs/dlm/midcomms.c | 76 +++++++++++++++++++----------------------------
 2 files changed, 31 insertions(+), 75 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 345a316ae54c..68092f953830 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -860,30 +860,8 @@ struct dlm_processed_nodes {
 	struct list_head list;
 };
 
-static void add_processed_node(int nodeid, struct list_head *processed_nodes)
-{
-	struct dlm_processed_nodes *n;
-
-	list_for_each_entry(n, processed_nodes, list) {
-		/* we already remembered this node */
-		if (n->nodeid == nodeid)
-			return;
-	}
-
-	/* if it's fails in worst case we simple don't send an ack back.
-	 * We try it next time.
-	 */
-	n = kmalloc(sizeof(*n), GFP_NOFS);
-	if (!n)
-		return;
-
-	n->nodeid = nodeid;
-	list_add(&n->list, processed_nodes);
-}
-
 static void process_dlm_messages(struct work_struct *work)
 {
-	struct dlm_processed_nodes *n, *n_tmp;
 	struct processqueue_entry *pentry;
 	LIST_HEAD(processed_nodes);
 
@@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
 	for (;;) {
 		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
 					    pentry->buflen);
-		add_processed_node(pentry->nodeid, &processed_nodes);
 		free_processqueue_entry(pentry);
 
 		spin_lock(&processqueue_lock);
@@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
 		list_del(&pentry->list);
 		spin_unlock(&processqueue_lock);
 	}
-
-	/* send ack back after we processed couple of messages */
-	list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
-		list_del(&n->list);
-		dlm_midcomms_receive_done(n->nodeid);
-		kfree(n);
-	}
 }
 
 /* Data received from remote end */
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 70286737eb0a..e1a0df67b566 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -148,6 +148,8 @@
 /* 5 seconds wait to sync ending of dlm */
 #define DLM_SHUTDOWN_TIMEOUT	msecs_to_jiffies(5000)
 #define DLM_VERSION_NOT_SET	0
+#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
+#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
 
 struct midcomms_node {
 	int nodeid;
@@ -165,7 +167,7 @@ struct midcomms_node {
 #define DLM_NODE_FLAG_CLOSE	1
 #define DLM_NODE_FLAG_STOP_TX	2
 #define DLM_NODE_FLAG_STOP_RX	3
-#define DLM_NODE_ULP_DELIVERED	4
+	atomic_t ulp_delivered;
 	unsigned long flags;
 	wait_queue_head_t shutdown_wait;
 
@@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node)
 
 	atomic_set(&node->seq_next, DLM_SEQ_INIT);
 	atomic_set(&node->seq_send, DLM_SEQ_INIT);
+	atomic_set(&node->ulp_delivered, 0);
 	node->version = DLM_VERSION_NOT_SET;
 	node->flags = 0;
 
@@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
 	return 0;
 }
 
+static void dlm_send_ack_threshold(struct midcomms_node *node,
+				   uint32_t threshold)
+{
+	uint32_t oval, nval;
+	bool send_ack;
+
+	/* let only send one user trigger threshold to send ack back */
+	do {
+		oval = atomic_read(&node->ulp_delivered);
+		send_ack = (oval > threshold);
+		/* abort if threshold is not reached */
+		if (!send_ack)
+			break;
+
+		nval = 0;
+		/* try to reset ulp_delivered counter */
+	} while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
+
+	if (send_ack)
+		dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
+}
+
 static int dlm_send_fin(struct midcomms_node *node,
 			void (*ack_rcv)(struct midcomms_node *node))
 {
@@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 			WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 			dlm_receive_buffer_3_2_trace(seq, p);
 			dlm_receive_buffer(p, node->nodeid);
-			set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
+			atomic_inc(&node->ulp_delivered);
+			/* unlikely case to send ack back when we don't transmit */
+			dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
 			break;
 		}
 	} else {
@@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 	return ret;
 }
 
-void dlm_midcomms_receive_done(int nodeid)
-{
-	struct midcomms_node *node;
-	int idx;
-
-	idx = srcu_read_lock(&nodes_srcu);
-	node = nodeid2node(nodeid, 0);
-	if (!node) {
-		srcu_read_unlock(&nodes_srcu, idx);
-		return;
-	}
-
-	/* old protocol, we do nothing */
-	switch (node->version) {
-	case DLM_VERSION_3_2:
-		break;
-	default:
-		srcu_read_unlock(&nodes_srcu, idx);
-		return;
-	}
-
-	/* do nothing if we didn't delivered stateful to ulp */
-	if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
-				&node->flags)) {
-		srcu_read_unlock(&nodes_srcu, idx);
-		return;
-	}
-
-	spin_lock(&node->state_lock);
-	/* we only ack if state is ESTABLISHED */
-	switch (node->state) {
-	case DLM_ESTABLISHED:
-		spin_unlock(&node->state_lock);
-		dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
-		break;
-	default:
-		spin_unlock(&node->state_lock);
-		/* do nothing FIN has it's own ack send */
-		break;
-	}
-	srcu_read_unlock(&nodes_srcu, idx);
-}
-
 void dlm_midcomms_unack_msg_resend(int nodeid)
 {
 	struct midcomms_node *node;
@@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 			goto err;
 		}
 
+		/* send ack back if necessary */
+		dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
 		break;
 	default:
 		dlm_free_mhandle(mh);
-- 
2.31.1




[Index of Archives]     [Linux Kernel]     [Kernel Development Newbies]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite Hiking]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux