Re: [PATCH RESEND 8/8] dlm: slow down filling up processing queue

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Oct 10, 2023 at 06:04:48PM -0400, Alexander Aring wrote:
> If there is a burst of message the receive worker will filling up the
> processing queue but where are too slow to process dlm messages. This
> patch will slow down the receiver worker to keep the buffer on the
> socket layer to tell the sender to backoff. This is done by a threshold
> to get the next buffers from the socket after all messages were
> processed done by a flush_workqueue(). This however only occurs when we
> have a message burst when we e.g. create 1 million locks. If we put more
> and more new messages to process in the processqueue we will soon run out
> of memory.
> 
> Signed-off-by: Alexander Aring <aahringo@xxxxxxxxxx>
> ---
>  fs/dlm/lowcomms.c | 12 ++++++++++++
>  1 file changed, 12 insertions(+)
> 
> diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
> index f7bc22e74db2..67f8dd8a05ef 100644
> --- a/fs/dlm/lowcomms.c
> +++ b/fs/dlm/lowcomms.c
> @@ -63,6 +63,7 @@
>  #include "config.h"
>  
>  #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
> +#define DLM_MAX_PROCESS_BUFFERS 24
>  #define NEEDED_RMEM (4*1024*1024)
>  
>  struct connection {
> @@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops;
>  #define DLM_IO_END 1
>  #define DLM_IO_EOF 2
>  #define DLM_IO_RESCHED 3
> +#define DLM_IO_FLUSH 4
>  
>  static void process_recv_sockets(struct work_struct *work);
>  static void process_send_sockets(struct work_struct *work);
> @@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
>  static DECLARE_WORK(process_work, process_dlm_messages);
>  static DEFINE_SPINLOCK(processqueue_lock);
>  static bool process_dlm_messages_pending;
> +static atomic_t processqueue_count;
>  static LIST_HEAD(processqueue);
>  
>  bool dlm_lowcomms_is_running(void)
> @@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work)
>  	}
>  
>  	list_del(&pentry->list);
> +	atomic_dec(&processqueue_count);
>  	spin_unlock(&processqueue_lock);
>  
>  	for (;;) {
> @@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work)
>  		}
>  
>  		list_del(&pentry->list);
> +		atomic_dec(&processqueue_count);
>  		spin_unlock(&processqueue_lock);
>  	}
>  }
> @@ -962,6 +967,7 @@ static int receive_from_sock(struct connection *con, int buflen)
>  		con->rx_leftover);
>  
>  	spin_lock(&processqueue_lock);
> +	ret = atomic_inc_return(&processqueue_count);
>  	list_add_tail(&pentry->list, &processqueue);
>  	if (!process_dlm_messages_pending) {
>  		process_dlm_messages_pending = true;
> @@ -969,6 +975,9 @@ static int receive_from_sock(struct connection *con, int buflen)
>  	}
>  	spin_unlock(&processqueue_lock);
>  
> +	if (ret > DLM_MAX_PROCESS_BUFFERS)
> +		return DLM_IO_FLUSH;
> +
>  	return DLM_IO_SUCCESS;
>  }
>  
> @@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work)
>  		wake_up(&con->shutdown_wait);
>  		/* CF_RECV_PENDING cleared */
>  		break;
> +	case DLM_IO_FLUSH:
> +		flush_workqueue(process_workqueue);
> +		fallthrough;
>  	case DLM_IO_RESCHED:
>  		cond_resched();
>  		queue_work(io_workqueue, &con->rwork);
> -- 
> 2.39.3
> 

<formletter>

This is not the correct way to submit patches for inclusion in the
stable kernel tree.  Please read:
    https://www.kernel.org/doc/html/latest/process/stable-kernel-rules.html
for how to do this properly.

</formletter>



[Index of Archives]     [Linux Kernel]     [Kernel Development Newbies]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite Hiking]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux