On Tue, Oct 10, 2023 at 06:04:48PM -0400, Alexander Aring wrote: > If there is a burst of message the receive worker will filling up the > processing queue but where are too slow to process dlm messages. This > patch will slow down the receiver worker to keep the buffer on the > socket layer to tell the sender to backoff. This is done by a threshold > to get the next buffers from the socket after all messages were > processed done by a flush_workqueue(). This however only occurs when we > have a message burst when we e.g. create 1 million locks. If we put more > and more new messages to process in the processqueue we will soon run out > of memory. > > Signed-off-by: Alexander Aring <aahringo@xxxxxxxxxx> > --- > fs/dlm/lowcomms.c | 12 ++++++++++++ > 1 file changed, 12 insertions(+) > > diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c > index f7bc22e74db2..67f8dd8a05ef 100644 > --- a/fs/dlm/lowcomms.c > +++ b/fs/dlm/lowcomms.c > @@ -63,6 +63,7 @@ > #include "config.h" > > #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) > +#define DLM_MAX_PROCESS_BUFFERS 24 > #define NEEDED_RMEM (4*1024*1024) > > struct connection { > @@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops; > #define DLM_IO_END 1 > #define DLM_IO_EOF 2 > #define DLM_IO_RESCHED 3 > +#define DLM_IO_FLUSH 4 > > static void process_recv_sockets(struct work_struct *work); > static void process_send_sockets(struct work_struct *work); > @@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work); > static DECLARE_WORK(process_work, process_dlm_messages); > static DEFINE_SPINLOCK(processqueue_lock); > static bool process_dlm_messages_pending; > +static atomic_t processqueue_count; > static LIST_HEAD(processqueue); > > bool dlm_lowcomms_is_running(void) > @@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work) > } > > list_del(&pentry->list); > + atomic_dec(&processqueue_count); > spin_unlock(&processqueue_lock); > > for (;;) { > @@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work) > } > > list_del(&pentry->list); > + atomic_dec(&processqueue_count); > spin_unlock(&processqueue_lock); > } > } > @@ -962,6 +967,7 @@ static int receive_from_sock(struct connection *con, int buflen) > con->rx_leftover); > > spin_lock(&processqueue_lock); > + ret = atomic_inc_return(&processqueue_count); > list_add_tail(&pentry->list, &processqueue); > if (!process_dlm_messages_pending) { > process_dlm_messages_pending = true; > @@ -969,6 +975,9 @@ static int receive_from_sock(struct connection *con, int buflen) > } > spin_unlock(&processqueue_lock); > > + if (ret > DLM_MAX_PROCESS_BUFFERS) > + return DLM_IO_FLUSH; > + > return DLM_IO_SUCCESS; > } > > @@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work) > wake_up(&con->shutdown_wait); > /* CF_RECV_PENDING cleared */ > break; > + case DLM_IO_FLUSH: > + flush_workqueue(process_workqueue); > + fallthrough; > case DLM_IO_RESCHED: > cond_resched(); > queue_work(io_workqueue, &con->rwork); > -- > 2.39.3 > <formletter> This is not the correct way to submit patches for inclusion in the stable kernel tree. Please read: https://www.kernel.org/doc/html/latest/process/stable-kernel-rules.html for how to do this properly. </formletter>