Currently seq_next is only be read on the receive side which processed in an ordered way. The seq_send is being protected by locks. To being able to read the seq_next value on send side as well we convert it to an atomic_t value. The atomic_cmpxchg() is probably not necessary, however the atomic_inc() depends on a if coniditional and this should be handled in an atomic context. Signed-off-by: Alexander Aring <aahringo@xxxxxxxxxx> --- fs/dlm/midcomms.c | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 9c66cb853d17..70286737eb0a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -152,8 +152,8 @@ struct midcomms_node { int nodeid; uint32_t version; - uint32_t seq_send; - uint32_t seq_next; + atomic_t seq_send; + atomic_t seq_next; /* These queues are unbound because we cannot drop any message in dlm. * We could send a fence signal for a specific node to the cluster * manager if queues hits some maximum value, however this handling @@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node) { pr_debug("reset node %d\n", node->nodeid); - node->seq_next = DLM_SEQ_INIT; - node->seq_send = DLM_SEQ_INIT; + atomic_set(&node->seq_next, DLM_SEQ_INIT); + atomic_set(&node->seq_send, DLM_SEQ_INIT); node->version = DLM_VERSION_NOT_SET; node->flags = 0; @@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { - if (seq == node->seq_next) { - node->seq_next++; + bool is_expected_seq; + uint32_t oval, nval; + do { + oval = atomic_read(&node->seq_next); + is_expected_seq = (oval == seq); + if (!is_expected_seq) + break; + + nval = oval + 1; + } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval); + + if (is_expected_seq) { switch (p->header.h_cmd) { case DLM_FIN: spin_lock(&node->state_lock); @@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, switch (node->state) { case DLM_ESTABLISHED: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); /* passive shutdown DLM_LAST_ACK case 1 * additional we check if the node is used by @@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, } break; case DLM_FIN_WAIT1: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); node->state = DLM_CLOSING; set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); break; case DLM_FIN_WAIT2: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); midcomms_node_reset(node); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, /* retry to ack message which we already have by sending back * current node->seq_next number as ack. */ - if (seq < node->seq_next) - dlm_send_ack(node->nodeid, node->seq_next); + if (seq < oval) + dlm_send_ack(node->nodeid, oval); log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", - seq, node->seq_next, node->nodeid); + seq, oval, node->nodeid); } } @@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid) switch (node->state) { case DLM_ESTABLISHED: spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); break; default: spin_unlock(&node->state_lock); @@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data) list_add_tail_rcu(&mh->list, &mh->node->send_queue); spin_unlock_bh(&mh->node->send_queue_lock); - mh->seq = mh->node->seq_send++; + mh->seq = atomic_fetch_inc(&mh->node->seq_send); } static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, @@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data) switch (h->h_cmd) { case DLM_OPTS: if (!h->u.h_seq) - h->u.h_seq = cpu_to_le32(rd->node->seq_send++); + h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send)); break; default: break; -- 2.31.1