[PATCH V2 net-next 2/2] net: qrtr: Add support for processing DEL_PROC type control message

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



For certain rproc's like modem, when it goes down and endpoint gets
un-registered, DEL_PROC control message gets forwarded to other
remote nodes. So remote nodes should listen on the message,
wakeup all local waiters waiting for tx_resume notifications
(which will never come) and also forward the message to all
local qrtr sockets like QMI etc. Adding the support here.

Introduced a new rx worker here, because endpoint_post can get called in
atomic contexts, but processing of DEL_PROC needs to acquire node
qrtr_tx mutex.

Signed-off-by: Sricharan Ramabadhran <quic_srichara@xxxxxxxxxxx>
---
  [v2] Fixed a sparse warning.

 include/uapi/linux/qrtr.h |  1 +
 net/qrtr/af_qrtr.c        | 65 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/include/uapi/linux/qrtr.h b/include/uapi/linux/qrtr.h
index f7e2fb3d752b..1c920159d610 100644
--- a/include/uapi/linux/qrtr.h
+++ b/include/uapi/linux/qrtr.h
@@ -26,6 +26,7 @@ enum qrtr_pkt_type {
 	QRTR_TYPE_PING          = 9,
 	QRTR_TYPE_NEW_LOOKUP	= 10,
 	QRTR_TYPE_DEL_LOOKUP	= 11,
+	QRTR_TYPE_DEL_PROC	= 13,
 };
 
 struct qrtr_ctrl_pkt {
diff --git a/net/qrtr/af_qrtr.c b/net/qrtr/af_qrtr.c
index e5cf4245c3dc..016b946f308b 100644
--- a/net/qrtr/af_qrtr.c
+++ b/net/qrtr/af_qrtr.c
@@ -3,6 +3,7 @@
  * Copyright (c) 2015, Sony Mobile Communications Inc.
  * Copyright (c) 2013, The Linux Foundation. All rights reserved.
  */
+#include <linux/kthread.h>
 #include <linux/module.h>
 #include <linux/netlink.h>
 #include <linux/qrtr.h>
@@ -122,6 +123,9 @@ static DEFINE_XARRAY_ALLOC(qrtr_ports);
  * @qrtr_tx_lock: lock for qrtr_tx_flow inserts
  * @rx_queue: receive queue
  * @item: list item for broadcast list
+ * @kworker: worker thread for recv work
+ * @task: task to run the worker thread
+ * @read_data: scheduled work for recv work
  */
 struct qrtr_node {
 	struct mutex ep_lock;
@@ -134,6 +138,9 @@ struct qrtr_node {
 
 	struct sk_buff_head rx_queue;
 	struct list_head item;
+	struct kthread_worker kworker;
+	struct task_struct *task;
+	struct kthread_work read_data;
 };
 
 /**
@@ -186,6 +193,9 @@ static void __qrtr_node_release(struct kref *kref)
 	list_del(&node->item);
 	mutex_unlock(&qrtr_node_lock);
 
+	kthread_flush_worker(&node->kworker);
+	kthread_stop(node->task);
+
 	skb_queue_purge(&node->rx_queue);
 
 	/* Free tx flow counters */
@@ -526,6 +536,9 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 
 	if (cb->type == QRTR_TYPE_RESUME_TX) {
 		qrtr_tx_resume(node, skb);
+	} else if (cb->type == QRTR_TYPE_DEL_PROC) {
+		skb_queue_tail(&node->rx_queue, skb);
+		kthread_queue_work(&node->kworker, &node->read_data);
 	} else {
 		ipc = qrtr_port_lookup(cb->dst_port);
 		if (!ipc)
@@ -574,6 +587,50 @@ static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt,
 	return skb;
 }
 
+/* Handle DEL_PROC control message */
+static void qrtr_node_rx_work(struct kthread_work *work)
+{
+	struct qrtr_node *node = container_of(work, struct qrtr_node,
+					      read_data);
+	struct radix_tree_iter iter;
+	struct qrtr_tx_flow *flow;
+	struct qrtr_ctrl_pkt *pkt;
+	struct qrtr_sock *ipc;
+	struct sk_buff *skb;
+	void __rcu **slot;
+
+	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
+		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
+
+		ipc = qrtr_port_lookup(cb->dst_port);
+		if (!ipc) {
+			kfree_skb(skb);
+			continue;
+		}
+
+		if (cb->type == QRTR_TYPE_DEL_PROC) {
+			/* Free tx flow counters */
+			mutex_lock(&node->qrtr_tx_lock);
+			radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+				flow = rcu_dereference_raw(*slot);
+				wake_up_interruptible_all(&flow->resume_tx);
+			}
+			mutex_unlock(&node->qrtr_tx_lock);
+
+			/* Translate DEL_PROC to BYE for local enqueue */
+			cb->type = QRTR_TYPE_BYE;
+			pkt = (struct qrtr_ctrl_pkt *)skb->data;
+			memset(pkt, 0, sizeof(*pkt));
+			pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE);
+
+			if (sock_queue_rcv_skb(&ipc->sk, skb))
+				kfree_skb(skb);
+
+			qrtr_port_put(ipc);
+		}
+	}
+}
+
 /**
  * qrtr_endpoint_register() - register a new endpoint
  * @ep: endpoint to register
@@ -599,6 +656,14 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
 	node->nid = QRTR_EP_NID_AUTO;
 	node->ep = ep;
 
+	kthread_init_work(&node->read_data, qrtr_node_rx_work);
+	kthread_init_worker(&node->kworker);
+	node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx");
+	if (IS_ERR(node->task)) {
+		kfree(node);
+		return -ENOMEM;
+	}
+
 	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
 	mutex_init(&node->qrtr_tx_lock);
 
-- 
2.34.1




[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [Linux for Sparc]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]

  Powered by Linux