For certain rproc's like modem, when it goes down and endpoint gets un-registered, DEL_PROC control message gets forwarded to other remote nodes. So remote nodes should listen on the message, wakeup all local waiters waiting for tx_resume notifications (which will never come) and also forward the message to all local qrtr sockets like QMI etc. Adding the support here. Introduced a new rx worker here, because endpoint_post can get called in atomic contexts, but processing of DEL_PROC needs to acquire node qrtr_tx mutex. Signed-off-by: Sricharan Ramabadhran <quic_srichara@xxxxxxxxxxx> --- Right now DEL_PROC is sent only by some legacy targets, latest uses only _BYE signalling for local observers only. So later that needs to be changed to broadcast and do the same DEL_PROC processing. include/uapi/linux/qrtr.h | 1 + net/qrtr/af_qrtr.c | 65 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/include/uapi/linux/qrtr.h b/include/uapi/linux/qrtr.h index f7e2fb3..1c92015 100644 --- a/include/uapi/linux/qrtr.h +++ b/include/uapi/linux/qrtr.h @@ -26,6 +26,7 @@ enum qrtr_pkt_type { QRTR_TYPE_PING = 9, QRTR_TYPE_NEW_LOOKUP = 10, QRTR_TYPE_DEL_LOOKUP = 11, + QRTR_TYPE_DEL_PROC = 13, }; struct qrtr_ctrl_pkt { diff --git a/net/qrtr/af_qrtr.c b/net/qrtr/af_qrtr.c index 26197a0..426cea0 100644 --- a/net/qrtr/af_qrtr.c +++ b/net/qrtr/af_qrtr.c @@ -3,6 +3,7 @@ * Copyright (c) 2015, Sony Mobile Communications Inc. * Copyright (c) 2013, The Linux Foundation. All rights reserved. */ +#include <linux/kthread.h> #include <linux/module.h> #include <linux/netlink.h> #include <linux/qrtr.h> @@ -122,6 +123,9 @@ static DEFINE_XARRAY_ALLOC(qrtr_ports); * @qrtr_tx_lock: lock for qrtr_tx_flow inserts * @rx_queue: receive queue * @item: list item for broadcast list + * @kworker: worker thread for recv work + * @task: task to run the worker thread + * @read_data: scheduled work for recv work */ struct qrtr_node { struct mutex ep_lock; @@ -134,6 +138,9 @@ struct qrtr_node { struct sk_buff_head rx_queue; struct list_head item; + struct kthread_worker kworker; + struct task_struct *task; + struct kthread_work read_data; }; /** @@ -186,6 +193,9 @@ static void __qrtr_node_release(struct kref *kref) list_del(&node->item); mutex_unlock(&qrtr_node_lock); + kthread_flush_worker(&node->kworker); + kthread_stop(node->task); + skb_queue_purge(&node->rx_queue); /* Free tx flow counters */ @@ -526,6 +536,9 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len) if (cb->type == QRTR_TYPE_RESUME_TX) { qrtr_tx_resume(node, skb); + } else if (cb->type == QRTR_TYPE_DEL_PROC) { + skb_queue_tail(&node->rx_queue, skb); + kthread_queue_work(&node->kworker, &node->read_data); } else { ipc = qrtr_port_lookup(cb->dst_port); if (!ipc) @@ -574,6 +587,50 @@ static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt, return skb; } +/* Handle DEL_PROC control message */ +static void qrtr_node_rx_work(struct kthread_work *work) +{ + struct qrtr_node *node = container_of(work, struct qrtr_node, + read_data); + struct qrtr_ctrl_pkt *pkt; + void __rcu **slot; + struct radix_tree_iter iter; + struct qrtr_tx_flow *flow; + struct sk_buff *skb; + struct qrtr_sock *ipc; + + while ((skb = skb_dequeue(&node->rx_queue)) != NULL) { + struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb; + + ipc = qrtr_port_lookup(cb->dst_port); + if (!ipc) { + kfree_skb(skb); + continue; + } + + if (cb->type == QRTR_TYPE_DEL_PROC) { + /* Free tx flow counters */ + mutex_lock(&node->qrtr_tx_lock); + radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) { + flow = *slot; + wake_up_interruptible_all(&flow->resume_tx); + } + mutex_unlock(&node->qrtr_tx_lock); + + /* Translate DEL_PROC to BYE for local enqueue */ + cb->type = QRTR_TYPE_BYE; + pkt = (struct qrtr_ctrl_pkt *)skb->data; + memset(pkt, 0, sizeof(*pkt)); + pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE); + + if (sock_queue_rcv_skb(&ipc->sk, skb)) + kfree_skb(skb); + + qrtr_port_put(ipc); + } + } +} + /** * qrtr_endpoint_register() - register a new endpoint * @ep: endpoint to register @@ -599,6 +656,14 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid) node->nid = QRTR_EP_NID_AUTO; node->ep = ep; + kthread_init_work(&node->read_data, qrtr_node_rx_work); + kthread_init_worker(&node->kworker); + node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx"); + if (IS_ERR(node->task)) { + kfree(node); + return -ENOMEM; + } + INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL); mutex_init(&node->qrtr_tx_lock); -- 2.7.4