LKML Archive on lore.kernel.org
help / color / mirror / Atom feed
* [PATCH 0/5] QRTR flow control improvements
@ 2019-05-08  6:06 Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
                   ` (4 more replies)
  0 siblings, 5 replies; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

In order to prevent overconsumption of resources on the remote side QRTR
implements a flow control mechanism.

Move the handling of the incoming confirm_rx to the receiving process to ensure
incoming flow is controlled. Then implement outgoing flow control, using the
recommended algorithm of counting outstanding non-confirmed messages and
blocking when hitting a limit. The last three patches refactors the node
assignment and port lookup, in order to remove the worker in the receive path.

Bjorn Andersson (5):
  net: qrtr: Move resume-tx transmission to recvmsg
  net: qrtr: Implement outgoing flow control
  net: qrtr: Migrate node lookup tree to spinlock
  net: qrtr: Make qrtr_port_lookup() use RCU
  net: qrtr: Remove receive worker

 net/qrtr/qrtr.c | 265 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 196 insertions(+), 69 deletions(-)

-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg
  2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
@ 2019-05-08  6:06 ` Bjorn Andersson
  2019-05-09 10:23   ` Arun Kumar Neelakantam
  2019-05-08  6:06 ` [PATCH 2/5] net: qrtr: Implement outgoing flow control Bjorn Andersson
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

The confirm-rx bit is used to implement a per port flow control, in
order to make sure that no messages are dropped due to resource
exhaustion. Move the resume-tx transmission to recvmsg to only confirm
messages as they are consumed by the application.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---
 net/qrtr/qrtr.c | 60 +++++++++++++++++++++++++++----------------------
 1 file changed, 33 insertions(+), 27 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index dd0e97f4f6c0..07a35362fba2 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -369,22 +369,11 @@ static void qrtr_port_put(struct qrtr_sock *ipc);
 static void qrtr_node_rx_work(struct work_struct *work)
 {
 	struct qrtr_node *node = container_of(work, struct qrtr_node, work);
-	struct qrtr_ctrl_pkt *pkt;
-	struct sockaddr_qrtr dst;
-	struct sockaddr_qrtr src;
 	struct sk_buff *skb;
 
 	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
 		struct qrtr_sock *ipc;
-		struct qrtr_cb *cb;
-		int confirm;
-
-		cb = (struct qrtr_cb *)skb->cb;
-		src.sq_node = cb->src_node;
-		src.sq_port = cb->src_port;
-		dst.sq_node = cb->dst_node;
-		dst.sq_port = cb->dst_port;
-		confirm = !!cb->confirm_rx;
+		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
 
 		qrtr_node_assign(node, cb->src_node);
 
@@ -397,20 +386,6 @@ static void qrtr_node_rx_work(struct work_struct *work)
 
 			qrtr_port_put(ipc);
 		}
-
-		if (confirm) {
-			skb = qrtr_alloc_ctrl_packet(&pkt);
-			if (!skb)
-				break;
-
-			pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
-			pkt->client.node = cpu_to_le32(dst.sq_node);
-			pkt->client.port = cpu_to_le32(dst.sq_port);
-
-			if (qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX,
-					      &dst, &src))
-				break;
-		}
 	}
 }
 
@@ -822,6 +797,34 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
 	return rc;
 }
 
+static int qrtr_resume_tx(struct qrtr_cb *cb)
+{
+	struct sockaddr_qrtr remote = { AF_QIPCRTR, cb->src_node, cb->src_port };
+	struct sockaddr_qrtr local = { AF_QIPCRTR, cb->dst_node, cb->dst_port };
+	struct qrtr_ctrl_pkt *pkt;
+	struct qrtr_node *node;
+	struct sk_buff *skb;
+	int ret;
+
+	node = qrtr_node_lookup(remote.sq_node);
+	if (!node)
+		return -EINVAL;
+
+	skb = qrtr_alloc_ctrl_packet(&pkt);
+	if (!skb)
+		return -ENOMEM;
+
+	pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
+	pkt->client.node = cpu_to_le32(cb->dst_node);
+	pkt->client.port = cpu_to_le32(cb->dst_port);
+
+	ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote);
+
+	qrtr_node_release(node);
+
+	return ret;
+}
+
 static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
 			size_t size, int flags)
 {
@@ -844,6 +847,7 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
 		release_sock(sk);
 		return rc;
 	}
+	cb = (struct qrtr_cb *)skb->cb;
 
 	copied = skb->len;
 	if (copied > size) {
@@ -857,7 +861,6 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
 	rc = copied;
 
 	if (addr) {
-		cb = (struct qrtr_cb *)skb->cb;
 		addr->sq_family = AF_QIPCRTR;
 		addr->sq_node = cb->src_node;
 		addr->sq_port = cb->src_port;
@@ -865,6 +868,9 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
 	}
 
 out:
+	if (cb->confirm_rx)
+		qrtr_resume_tx(cb);
+
 	skb_free_datagram(sk, skb);
 	release_sock(sk);
 
-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 2/5] net: qrtr: Implement outgoing flow control
  2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
@ 2019-05-08  6:06 ` Bjorn Andersson
  2019-05-08 16:41   ` David Miller
  2019-05-13 21:26   ` Chris Lew
  2019-05-08  6:06 ` [PATCH 3/5] net: qrtr: Migrate node lookup tree to spinlock Bjorn Andersson
                   ` (2 subsequent siblings)
  4 siblings, 2 replies; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

In order to prevent overconsumption of resources on the remote side QRTR
implements a flow control mechanism.

The mechanism works by the sender keeping track of the number of
outstanding unconfirmed messages that has been transmitted to a
particular node/port pair.

Upon count reaching a low watermark (L) the confirm_rx bit is set in the
outgoing message and when the count reaching a high watermark (H)
transmission will be blocked upon the reception of a resume_tx message
from the remote, that resets the counter to 0.

This guarantees that there will be at most 2H - L messages in flight.
Values chosen for L and H are 5 and 10 respectively.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---
 net/qrtr/qrtr.c | 143 +++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 136 insertions(+), 7 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index 07a35362fba2..62abd622618d 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -16,6 +16,7 @@
 #include <linux/qrtr.h>
 #include <linux/termios.h>	/* For TIOCINQ/OUTQ */
 #include <linux/numa.h>
+#include <linux/wait.h>
 
 #include <net/sock.h>
 
@@ -121,6 +122,9 @@ static DEFINE_MUTEX(qrtr_port_lock);
  * @ep: endpoint
  * @ref: reference count for node
  * @nid: node id
+ * @qrtr_tx_flow: tree with tx counts per flow
+ * @resume_tx: waiters for a resume tx from the remote
+ * @qrtr_tx_lock: lock for qrtr_tx_flow
  * @rx_queue: receive queue
  * @work: scheduled work struct for recv work
  * @item: list item for broadcast list
@@ -131,11 +135,26 @@ struct qrtr_node {
 	struct kref ref;
 	unsigned int nid;
 
+	struct radix_tree_root qrtr_tx_flow;
+	struct wait_queue_head resume_tx;
+	struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */
+
 	struct sk_buff_head rx_queue;
 	struct work_struct work;
 	struct list_head item;
 };
 
+/**
+ * struct qrtr_tx_flow - tx flow control
+ * @pending: number of waiting senders
+ */
+struct qrtr_tx_flow {
+	atomic_t pending;
+};
+
+#define QRTR_TX_FLOW_HIGH	10
+#define QRTR_TX_FLOW_LOW	5
+
 static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 			      int type, struct sockaddr_qrtr *from,
 			      struct sockaddr_qrtr *to);
@@ -150,7 +169,9 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
  */
 static void __qrtr_node_release(struct kref *kref)
 {
+	struct radix_tree_iter iter;
 	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
+	void __rcu **slot;
 
 	if (node->nid != QRTR_EP_NID_AUTO)
 		radix_tree_delete(&qrtr_nodes, node->nid);
@@ -158,6 +179,12 @@ static void __qrtr_node_release(struct kref *kref)
 	list_del(&node->item);
 	mutex_unlock(&qrtr_node_lock);
 
+	/* Free tx flow counters */
+	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+		radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot);
+		kfree(*slot);
+	}
+
 	skb_queue_purge(&node->rx_queue);
 	kfree(node);
 }
@@ -178,15 +205,106 @@ static void qrtr_node_release(struct qrtr_node *node)
 	kref_put_mutex(&node->ref, __qrtr_node_release, &qrtr_node_lock);
 }
 
+/**
+ * qrtr_tx_resume() - reset flow control counter
+ * @node:	qrtr_node that the QRTR_TYPE_RESUME_TX packet arrived on
+ * @skb:	resume_tx packet
+ */
+static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
+{
+	struct qrtr_ctrl_pkt *pkt = (struct qrtr_ctrl_pkt *)skb->data;
+	struct qrtr_tx_flow *flow;
+	unsigned long key;
+	u64 remote_node = le32_to_cpu(pkt->client.node);
+	u32 remote_port = le32_to_cpu(pkt->client.port);
+
+	key = remote_node << 32 | remote_port;
+
+	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
+	if (flow)
+		atomic_set(&flow->pending, 0);
+
+	wake_up_interruptible_all(&node->resume_tx);
+
+	consume_skb(skb);
+}
+
+/**
+ * qrtr_tx_wait() - flow control for outgoing packets
+ * @node:	qrtr_node that the packet is to be send to
+ * @dest_node:	node id of the destination
+ * @dest_port:	port number of the destination
+ * @type:	type of message
+ *
+ * The flow control scheme is based around the low and high "watermarks". When
+ * the low watermark is passed the confirm_rx flag is set on the outgoing
+ * message, which will trigger the remote to send a control message of the type
+ * QRTR_TYPE_RESUME_TX to reset the counter. If the high watermark is hit
+ * further transmision should be paused.
+ *
+ * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure
+ */
+static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
+			int type)
+{
+	struct qrtr_tx_flow *flow;
+	unsigned long key = (u64)dest_node << 32 | dest_port;
+	int confirm_rx = 0;
+	int ret;
+
+	/* Never set confirm_rx on non-data packets */
+	if (type != QRTR_TYPE_DATA)
+		return 0;
+
+	mutex_lock(&node->qrtr_tx_lock);
+	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
+	if (!flow) {
+		flow = kzalloc(sizeof(*flow), GFP_KERNEL);
+		if (!flow)
+			confirm_rx = 1;
+		else
+			radix_tree_insert(&node->qrtr_tx_flow, key, flow);
+	}
+	mutex_unlock(&node->qrtr_tx_lock);
+
+	for (;;) {
+		ret = wait_event_interruptible(node->resume_tx,
+					       atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH ||
+					       !node->ep);
+		if (ret)
+			return ret;
+
+		if (!node->ep)
+			return -EPIPE;
+
+		mutex_lock(&node->qrtr_tx_lock);
+		if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) {
+			confirm_rx = atomic_inc_return(&flow->pending) == QRTR_TX_FLOW_LOW;
+			mutex_unlock(&node->qrtr_tx_lock);
+			break;
+		}
+		mutex_unlock(&node->qrtr_tx_lock);
+	}
+
+	return confirm_rx;
+}
+
 /* Pass an outgoing packet socket buffer to the endpoint driver. */
 static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 			     int type, struct sockaddr_qrtr *from,
 			     struct sockaddr_qrtr *to)
 {
 	struct qrtr_hdr_v1 *hdr;
+	int confirm_rx;
 	size_t len = skb->len;
 	int rc = -ENODEV;
 
+	confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type);
+	if (confirm_rx < 0) {
+		kfree_skb(skb);
+		return confirm_rx;
+	}
+
 	hdr = skb_push(skb, sizeof(*hdr));
 	hdr->version = cpu_to_le32(QRTR_PROTO_VER_1);
 	hdr->type = cpu_to_le32(type);
@@ -201,7 +319,7 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 	}
 
 	hdr->size = cpu_to_le32(len);
-	hdr->confirm_rx = 0;
+	hdr->confirm_rx = !!confirm_rx;
 
 	skb_put_padto(skb, ALIGN(len, 4));
 
@@ -318,7 +436,8 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 	if (len != ALIGN(size, 4) + hdrlen)
 		goto err;
 
-	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA)
+	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA &&
+	    cb->type != QRTR_TYPE_RESUME_TX)
 		goto err;
 
 	skb_put_data(skb, data + hdrlen, size);
@@ -377,14 +496,18 @@ static void qrtr_node_rx_work(struct work_struct *work)
 
 		qrtr_node_assign(node, cb->src_node);
 
-		ipc = qrtr_port_lookup(cb->dst_port);
-		if (!ipc) {
-			kfree_skb(skb);
+		if (cb->type == QRTR_TYPE_RESUME_TX) {
+			qrtr_tx_resume(node, skb);
 		} else {
-			if (sock_queue_rcv_skb(&ipc->sk, skb))
+			ipc = qrtr_port_lookup(cb->dst_port);
+			if (!ipc) {
 				kfree_skb(skb);
+			} else {
+				if (sock_queue_rcv_skb(&ipc->sk, skb))
+					kfree_skb(skb);
 
-			qrtr_port_put(ipc);
+				qrtr_port_put(ipc);
+			}
 		}
 	}
 }
@@ -415,6 +538,9 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
 	node->nid = QRTR_EP_NID_AUTO;
 	node->ep = ep;
 
+	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
+	init_waitqueue_head(&node->resume_tx);
+
 	qrtr_node_assign(node, nid);
 
 	mutex_lock(&qrtr_node_lock);
@@ -449,6 +575,9 @@ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep)
 		qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst);
 	}
 
+	/* Wake up any transmitters waiting for resume-tx from the node */
+	wake_up_interruptible_all(&node->resume_tx);
+
 	qrtr_node_release(node);
 	ep->node = NULL;
 }
-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 3/5] net: qrtr: Migrate node lookup tree to spinlock
  2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 2/5] net: qrtr: Implement outgoing flow control Bjorn Andersson
@ 2019-05-08  6:06 ` Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 4/5] net: qrtr: Make qrtr_port_lookup() use RCU Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 5/5] net: qrtr: Remove receive worker Bjorn Andersson
  4 siblings, 0 replies; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

Move operations on the qrtr_nodes radix tree under a separate spinlock
and make the qrtr_nodes tree GFP_ATOMIC, to allow operation from atomic
context in a subsequent patch.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---
 net/qrtr/qrtr.c | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index 62abd622618d..9075751028a2 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -16,6 +16,7 @@
 #include <linux/qrtr.h>
 #include <linux/termios.h>	/* For TIOCINQ/OUTQ */
 #include <linux/numa.h>
+#include <linux/spinlock.h>
 #include <linux/wait.h>
 
 #include <net/sock.h>
@@ -106,10 +107,11 @@ static inline struct qrtr_sock *qrtr_sk(struct sock *sk)
 static unsigned int qrtr_local_nid = NUMA_NO_NODE;
 
 /* for node ids */
-static RADIX_TREE(qrtr_nodes, GFP_KERNEL);
+static RADIX_TREE(qrtr_nodes, GFP_ATOMIC);
+static DEFINE_SPINLOCK(qrtr_nodes_lock);
 /* broadcast list */
 static LIST_HEAD(qrtr_all_nodes);
-/* lock for qrtr_nodes, qrtr_all_nodes and node reference */
+/* lock for qrtr_all_nodes and node reference */
 static DEFINE_MUTEX(qrtr_node_lock);
 
 /* local port allocation management */
@@ -171,10 +173,13 @@ static void __qrtr_node_release(struct kref *kref)
 {
 	struct radix_tree_iter iter;
 	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
+	unsigned long flags;
 	void __rcu **slot;
 
+	spin_lock_irqsave(&qrtr_nodes_lock, flags);
 	if (node->nid != QRTR_EP_NID_AUTO)
 		radix_tree_delete(&qrtr_nodes, node->nid);
+	spin_unlock_irqrestore(&qrtr_nodes_lock, flags);
 
 	list_del(&node->item);
 	mutex_unlock(&qrtr_node_lock);
@@ -340,11 +345,12 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 static struct qrtr_node *qrtr_node_lookup(unsigned int nid)
 {
 	struct qrtr_node *node;
+	unsigned long flags;
 
-	mutex_lock(&qrtr_node_lock);
+	spin_lock_irqsave(&qrtr_nodes_lock, flags);
 	node = radix_tree_lookup(&qrtr_nodes, nid);
 	node = qrtr_node_acquire(node);
-	mutex_unlock(&qrtr_node_lock);
+	spin_unlock_irqrestore(&qrtr_nodes_lock, flags);
 
 	return node;
 }
@@ -356,13 +362,15 @@ static struct qrtr_node *qrtr_node_lookup(unsigned int nid)
  */
 static void qrtr_node_assign(struct qrtr_node *node, unsigned int nid)
 {
+	unsigned long flags;
+
 	if (node->nid != QRTR_EP_NID_AUTO || nid == QRTR_EP_NID_AUTO)
 		return;
 
-	mutex_lock(&qrtr_node_lock);
+	spin_lock_irqsave(&qrtr_nodes_lock, flags);
 	radix_tree_insert(&qrtr_nodes, nid, node);
 	node->nid = nid;
-	mutex_unlock(&qrtr_node_lock);
+	spin_unlock_irqrestore(&qrtr_nodes_lock, flags);
 }
 
 /**
-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 4/5] net: qrtr: Make qrtr_port_lookup() use RCU
  2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
                   ` (2 preceding siblings ...)
  2019-05-08  6:06 ` [PATCH 3/5] net: qrtr: Migrate node lookup tree to spinlock Bjorn Andersson
@ 2019-05-08  6:06 ` Bjorn Andersson
  2019-05-08  6:06 ` [PATCH 5/5] net: qrtr: Remove receive worker Bjorn Andersson
  4 siblings, 0 replies; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

The important part of qrtr_port_lookup() wrt synchronization is that the
function returns a reference counted struct qrtr_sock, or fail.

As such we need only to ensure that an decrement of the object's
refcount happens inbetween the finding of the object in the idr and
qrtr_port_lookup()'s own increment of the object.

By using RCU and putting a synchronization point after we remove the
mapping from the idr, but before it can be released we achieve this -
with the benefit of not having to hold the mutex in qrtr_port_lookup().

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---
 net/qrtr/qrtr.c | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index 9075751028a2..d2eef43a3124 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -602,11 +602,11 @@ static struct qrtr_sock *qrtr_port_lookup(int port)
 	if (port == QRTR_PORT_CTRL)
 		port = 0;
 
-	mutex_lock(&qrtr_port_lock);
+	rcu_read_lock();
 	ipc = idr_find(&qrtr_ports, port);
 	if (ipc)
 		sock_hold(&ipc->sk);
-	mutex_unlock(&qrtr_port_lock);
+	rcu_read_unlock();
 
 	return ipc;
 }
@@ -648,6 +648,10 @@ static void qrtr_port_remove(struct qrtr_sock *ipc)
 	mutex_lock(&qrtr_port_lock);
 	idr_remove(&qrtr_ports, port);
 	mutex_unlock(&qrtr_port_lock);
+
+	/* Ensure that if qrtr_port_lookup() did enter the RCU read section we
+	 * wait for it to up increment the refcount */
+	synchronize_rcu();
 }
 
 /* Assign port number to socket.
-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 5/5] net: qrtr: Remove receive worker
  2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
                   ` (3 preceding siblings ...)
  2019-05-08  6:06 ` [PATCH 4/5] net: qrtr: Make qrtr_port_lookup() use RCU Bjorn Andersson
@ 2019-05-08  6:06 ` Bjorn Andersson
  4 siblings, 0 replies; 9+ messages in thread
From: Bjorn Andersson @ 2019-05-08  6:06 UTC (permalink / raw)
  To: David S. Miller, Arun Kumar Neelakantam, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm

Rather than enqueuing messages and scheduling a worker to deliver them
to the individual sockets we can now, thanks to the previous work, move
this directly into the endpoint callback.

This saves us a context switch per incoming message and removes the
possibility of an opportunistic suspend to happen between the message is
coming from the endpoint until it ends up in the socket's receive
buffer.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---
 net/qrtr/qrtr.c | 54 ++++++++++++++++---------------------------------
 1 file changed, 17 insertions(+), 37 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index d2eef43a3124..25cd5ddce5b9 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -163,6 +163,8 @@ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 			      int type, struct sockaddr_qrtr *from,
 			      struct sockaddr_qrtr *to);
+static struct qrtr_sock *qrtr_port_lookup(int port);
+static void qrtr_port_put(struct qrtr_sock *ipc);
 
 /* Release node resources and free the node.
  *
@@ -386,6 +388,7 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 	struct qrtr_node *node = ep->node;
 	const struct qrtr_hdr_v1 *v1;
 	const struct qrtr_hdr_v2 *v2;
+	struct qrtr_sock *ipc;
 	struct sk_buff *skb;
 	struct qrtr_cb *cb;
 	unsigned int size;
@@ -450,8 +453,20 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 
 	skb_put_data(skb, data + hdrlen, size);
 
-	skb_queue_tail(&node->rx_queue, skb);
-	schedule_work(&node->work);
+	qrtr_node_assign(node, cb->src_node);
+
+	if (cb->type == QRTR_TYPE_RESUME_TX) {
+		qrtr_tx_resume(node, skb);
+	} else {
+		ipc = qrtr_port_lookup(cb->dst_port);
+		if (!ipc)
+			goto err;
+
+		if (sock_queue_rcv_skb(&ipc->sk, skb))
+			goto err;
+
+		qrtr_port_put(ipc);
+	}
 
 	return 0;
 
@@ -486,40 +501,6 @@ static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt)
 	return skb;
 }
 
-static struct qrtr_sock *qrtr_port_lookup(int port);
-static void qrtr_port_put(struct qrtr_sock *ipc);
-
-/* Handle and route a received packet.
- *
- * This will auto-reply with resume-tx packet as necessary.
- */
-static void qrtr_node_rx_work(struct work_struct *work)
-{
-	struct qrtr_node *node = container_of(work, struct qrtr_node, work);
-	struct sk_buff *skb;
-
-	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
-		struct qrtr_sock *ipc;
-		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
-
-		qrtr_node_assign(node, cb->src_node);
-
-		if (cb->type == QRTR_TYPE_RESUME_TX) {
-			qrtr_tx_resume(node, skb);
-		} else {
-			ipc = qrtr_port_lookup(cb->dst_port);
-			if (!ipc) {
-				kfree_skb(skb);
-			} else {
-				if (sock_queue_rcv_skb(&ipc->sk, skb))
-					kfree_skb(skb);
-
-				qrtr_port_put(ipc);
-			}
-		}
-	}
-}
-
 /**
  * qrtr_endpoint_register() - register a new endpoint
  * @ep: endpoint to register
@@ -539,7 +520,6 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
 	if (!node)
 		return -ENOMEM;
 
-	INIT_WORK(&node->work, qrtr_node_rx_work);
 	kref_init(&node->ref);
 	mutex_init(&node->ep_lock);
 	skb_queue_head_init(&node->rx_queue);
-- 
2.18.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 2/5] net: qrtr: Implement outgoing flow control
  2019-05-08  6:06 ` [PATCH 2/5] net: qrtr: Implement outgoing flow control Bjorn Andersson
@ 2019-05-08 16:41   ` David Miller
  2019-05-13 21:26   ` Chris Lew
  1 sibling, 0 replies; 9+ messages in thread
From: David Miller @ 2019-05-08 16:41 UTC (permalink / raw)
  To: bjorn.andersson; +Cc: aneela, clew, netdev, linux-kernel, linux-arm-msm

From: Bjorn Andersson <bjorn.andersson@linaro.org>
Date: Tue,  7 May 2019 23:06:40 -0700

> +static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
> +{
> +	struct qrtr_ctrl_pkt *pkt = (struct qrtr_ctrl_pkt *)skb->data;
> +	struct qrtr_tx_flow *flow;
> +	unsigned long key;
> +	u64 remote_node = le32_to_cpu(pkt->client.node);
> +	u32 remote_port = le32_to_cpu(pkt->client.port);

Reverse christmas tree for the local variables please.

> +static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
> +			int type)
> +{
> +	struct qrtr_tx_flow *flow;
> +	unsigned long key = (u64)dest_node << 32 | dest_port;
> +	int confirm_rx = 0;
> +	int ret;

Likewise.

>  /* Pass an outgoing packet socket buffer to the endpoint driver. */
>  static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
>  			     int type, struct sockaddr_qrtr *from,
>  			     struct sockaddr_qrtr *to)
>  {
>  	struct qrtr_hdr_v1 *hdr;
> +	int confirm_rx;
>  	size_t len = skb->len;
>  	int rc = -ENODEV;

Likewise.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg
  2019-05-08  6:06 ` [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
@ 2019-05-09 10:23   ` Arun Kumar Neelakantam
  0 siblings, 0 replies; 9+ messages in thread
From: Arun Kumar Neelakantam @ 2019-05-09 10:23 UTC (permalink / raw)
  To: Bjorn Andersson, David S. Miller, Chris Lew
  Cc: netdev, linux-kernel, linux-arm-msm


On 5/8/2019 11:36 AM, Bjorn Andersson wrote:
> The confirm-rx bit is used to implement a per port flow control, in
> order to make sure that no messages are dropped due to resource
> exhaustion. Move the resume-tx transmission to recvmsg to only confirm
> messages as they are consumed by the application.
>
> Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
> ---
>   net/qrtr/qrtr.c | 60 +++++++++++++++++++++++++++----------------------
>   1 file changed, 33 insertions(+), 27 deletions(-)
>
> diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
> index dd0e97f4f6c0..07a35362fba2 100644
> --- a/net/qrtr/qrtr.c
> +++ b/net/qrtr/qrtr.c
> @@ -369,22 +369,11 @@ static void qrtr_port_put(struct qrtr_sock *ipc);
>   static void qrtr_node_rx_work(struct work_struct *work)
>   {
>   	struct qrtr_node *node = container_of(work, struct qrtr_node, work);
> -	struct qrtr_ctrl_pkt *pkt;
> -	struct sockaddr_qrtr dst;
> -	struct sockaddr_qrtr src;
>   	struct sk_buff *skb;
>   
>   	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
>   		struct qrtr_sock *ipc;
> -		struct qrtr_cb *cb;
> -		int confirm;
> -
> -		cb = (struct qrtr_cb *)skb->cb;
> -		src.sq_node = cb->src_node;
> -		src.sq_port = cb->src_port;
> -		dst.sq_node = cb->dst_node;
> -		dst.sq_port = cb->dst_port;
> -		confirm = !!cb->confirm_rx;
> +		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
>   
>   		qrtr_node_assign(node, cb->src_node);
>   
> @@ -397,20 +386,6 @@ static void qrtr_node_rx_work(struct work_struct *work)
>   
>   			qrtr_port_put(ipc);
>   		}
> -
> -		if (confirm) {
> -			skb = qrtr_alloc_ctrl_packet(&pkt);
> -			if (!skb)
> -				break;
> -
> -			pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
> -			pkt->client.node = cpu_to_le32(dst.sq_node);
> -			pkt->client.port = cpu_to_le32(dst.sq_port);
> -
> -			if (qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX,
> -					      &dst, &src))
> -				break;
> -		}
>   	}
>   }
>   
> @@ -822,6 +797,34 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
>   	return rc;
>   }
>   
> +static int qrtr_resume_tx(struct qrtr_cb *cb)
This function name is adding some confusion with qrtr_tx_resume() in 
next patch. can you please rename this function to qrtr_send_resume_tx().
> +{
> +	struct sockaddr_qrtr remote = { AF_QIPCRTR, cb->src_node, cb->src_port };
> +	struct sockaddr_qrtr local = { AF_QIPCRTR, cb->dst_node, cb->dst_port };
> +	struct qrtr_ctrl_pkt *pkt;
> +	struct qrtr_node *node;
> +	struct sk_buff *skb;
> +	int ret;
> +
> +	node = qrtr_node_lookup(remote.sq_node);
> +	if (!node)
> +		return -EINVAL;
> +
> +	skb = qrtr_alloc_ctrl_packet(&pkt);
> +	if (!skb)
> +		return -ENOMEM;
> +
> +	pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
> +	pkt->client.node = cpu_to_le32(cb->dst_node);
> +	pkt->client.port = cpu_to_le32(cb->dst_port);
> +
> +	ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote);
> +
> +	qrtr_node_release(node);
> +
> +	return ret;
> +}
> +
>   static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
>   			size_t size, int flags)
>   {
> @@ -844,6 +847,7 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
>   		release_sock(sk);
>   		return rc;
>   	}
> +	cb = (struct qrtr_cb *)skb->cb;
>   
>   	copied = skb->len;
>   	if (copied > size) {
> @@ -857,7 +861,6 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
>   	rc = copied;
>   
>   	if (addr) {
> -		cb = (struct qrtr_cb *)skb->cb;
>   		addr->sq_family = AF_QIPCRTR;
>   		addr->sq_node = cb->src_node;
>   		addr->sq_port = cb->src_port;
> @@ -865,6 +868,9 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
>   	}
>   
>   out:
> +	if (cb->confirm_rx)
> +		qrtr_resume_tx(cb);
> +
>   	skb_free_datagram(sk, skb);
>   	release_sock(sk);
>   

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 2/5] net: qrtr: Implement outgoing flow control
  2019-05-08  6:06 ` [PATCH 2/5] net: qrtr: Implement outgoing flow control Bjorn Andersson
  2019-05-08 16:41   ` David Miller
@ 2019-05-13 21:26   ` Chris Lew
  1 sibling, 0 replies; 9+ messages in thread
From: Chris Lew @ 2019-05-13 21:26 UTC (permalink / raw)
  To: Bjorn Andersson, David S. Miller, Arun Kumar Neelakantam
  Cc: netdev, linux-kernel, linux-arm-msm



On 5/7/2019 11:06 PM, Bjorn Andersson wrote:
> In order to prevent overconsumption of resources on the remote side QRTR
> implements a flow control mechanism.
> 
> The mechanism works by the sender keeping track of the number of
> outstanding unconfirmed messages that has been transmitted to a
> particular node/port pair.
> 
> Upon count reaching a low watermark (L) the confirm_rx bit is set in the
> outgoing message and when the count reaching a high watermark (H)
> transmission will be blocked upon the reception of a resume_tx message
> from the remote, that resets the counter to 0.
> 
> This guarantees that there will be at most 2H - L messages in flight.
> Values chosen for L and H are 5 and 10 respectively.
> 
> Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
> ---
>   net/qrtr/qrtr.c | 143 +++++++++++++++++++++++++++++++++++++++++++++---
>   1 file changed, 136 insertions(+), 7 deletions(-)
> 
> diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
> index 07a35362fba2..62abd622618d 100644
> --- a/net/qrtr/qrtr.c
> +++ b/net/qrtr/qrtr.c
> @@ -16,6 +16,7 @@
>   #include <linux/qrtr.h>
>   #include <linux/termios.h>	/* For TIOCINQ/OUTQ */
>   #include <linux/numa.h>
> +#include <linux/wait.h>
>   
>   #include <net/sock.h>
>   
> @@ -121,6 +122,9 @@ static DEFINE_MUTEX(qrtr_port_lock);
>    * @ep: endpoint
>    * @ref: reference count for node
>    * @nid: node id
> + * @qrtr_tx_flow: tree with tx counts per flow
> + * @resume_tx: waiters for a resume tx from the remote
> + * @qrtr_tx_lock: lock for qrtr_tx_flow
>    * @rx_queue: receive queue
>    * @work: scheduled work struct for recv work
>    * @item: list item for broadcast list
> @@ -131,11 +135,26 @@ struct qrtr_node {
>   	struct kref ref;
>   	unsigned int nid;
>   
> +	struct radix_tree_root qrtr_tx_flow;
> +	struct wait_queue_head resume_tx;
> +	struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */
> +
>   	struct sk_buff_head rx_queue;
>   	struct work_struct work;
>   	struct list_head item;
>   };
>   
> +/**
> + * struct qrtr_tx_flow - tx flow control
> + * @pending: number of waiting senders
> + */
> +struct qrtr_tx_flow {
> +	atomic_t pending;
> +};
> +
> +#define QRTR_TX_FLOW_HIGH	10
> +#define QRTR_TX_FLOW_LOW	5
> +
>   static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
>   			      int type, struct sockaddr_qrtr *from,
>   			      struct sockaddr_qrtr *to);
> @@ -150,7 +169,9 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
>    */
>   static void __qrtr_node_release(struct kref *kref)
>   {
> +	struct radix_tree_iter iter;
>   	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
> +	void __rcu **slot;
>   
>   	if (node->nid != QRTR_EP_NID_AUTO)
>   		radix_tree_delete(&qrtr_nodes, node->nid);
> @@ -158,6 +179,12 @@ static void __qrtr_node_release(struct kref *kref)
>   	list_del(&node->item);
>   	mutex_unlock(&qrtr_node_lock);
>   
> +	/* Free tx flow counters */
> +	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
> +		radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot);
> +		kfree(*slot);
> +	}
> +
>   	skb_queue_purge(&node->rx_queue);
>   	kfree(node);
>   }
> @@ -178,15 +205,106 @@ static void qrtr_node_release(struct qrtr_node *node)
>   	kref_put_mutex(&node->ref, __qrtr_node_release, &qrtr_node_lock);
>   }
>   
> +/**
> + * qrtr_tx_resume() - reset flow control counter
> + * @node:	qrtr_node that the QRTR_TYPE_RESUME_TX packet arrived on
> + * @skb:	resume_tx packet
> + */
> +static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
> +{
> +	struct qrtr_ctrl_pkt *pkt = (struct qrtr_ctrl_pkt *)skb->data;
> +	struct qrtr_tx_flow *flow;
> +	unsigned long key;
> +	u64 remote_node = le32_to_cpu(pkt->client.node);
> +	u32 remote_port = le32_to_cpu(pkt->client.port);
> +
> +	key = remote_node << 32 | remote_port;
> +
> +	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
> +	if (flow)
> +		atomic_set(&flow->pending, 0);
> +
> +	wake_up_interruptible_all(&node->resume_tx);
> +
> +	consume_skb(skb);
> +}
> +
> +/**
> + * qrtr_tx_wait() - flow control for outgoing packets
> + * @node:	qrtr_node that the packet is to be send to
> + * @dest_node:	node id of the destination
> + * @dest_port:	port number of the destination
> + * @type:	type of message
> + *
> + * The flow control scheme is based around the low and high "watermarks". When
> + * the low watermark is passed the confirm_rx flag is set on the outgoing
> + * message, which will trigger the remote to send a control message of the type
> + * QRTR_TYPE_RESUME_TX to reset the counter. If the high watermark is hit
> + * further transmision should be paused.
> + *
> + * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure
> + */
> +static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
> +			int type)
> +{
> +	struct qrtr_tx_flow *flow;
> +	unsigned long key = (u64)dest_node << 32 | dest_port;
> +	int confirm_rx = 0;
> +	int ret;
> +
> +	/* Never set confirm_rx on non-data packets */
> +	if (type != QRTR_TYPE_DATA)
> +		return 0;
> +
> +	mutex_lock(&node->qrtr_tx_lock);
> +	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
> +	if (!flow) {
> +		flow = kzalloc(sizeof(*flow), GFP_KERNEL);
> +		if (!flow)
> +			confirm_rx = 1;
> +		else
> +			radix_tree_insert(&node->qrtr_tx_flow, key, flow);
> +	}
> +	mutex_unlock(&node->qrtr_tx_lock);
> +
> +	for (;;) {
> +		ret = wait_event_interruptible(node->resume_tx,
> +					       atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH ||
> +					       !node->ep);
> +		if (ret)
> +			return ret;
> +
> +		if (!node->ep)
> +			return -EPIPE;
> +
> +		mutex_lock(&node->qrtr_tx_lock);
> +		if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) {
> +			confirm_rx = atomic_inc_return(&flow->pending) == QRTR_TX_FLOW_LOW;
> +			mutex_unlock(&node->qrtr_tx_lock);
> +			break;
> +		}
> +		mutex_unlock(&node->qrtr_tx_lock);
> +	}
> +
> +	return confirm_rx;
> +}
> +
>   /* Pass an outgoing packet socket buffer to the endpoint driver. */
>   static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
>   			     int type, struct sockaddr_qrtr *from,
>   			     struct sockaddr_qrtr *to)
>   {
>   	struct qrtr_hdr_v1 *hdr;
> +	int confirm_rx;
>   	size_t len = skb->len;
>   	int rc = -ENODEV;
>   
> +	confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type);
> +	if (confirm_rx < 0) {
> +		kfree_skb(skb);
> +		return confirm_rx;
> +	}
> +
>   	hdr = skb_push(skb, sizeof(*hdr));
>   	hdr->version = cpu_to_le32(QRTR_PROTO_VER_1);
>   	hdr->type = cpu_to_le32(type);
> @@ -201,7 +319,7 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
>   	}
>   
>   	hdr->size = cpu_to_le32(len);
> -	hdr->confirm_rx = 0;
> +	hdr->confirm_rx = !!confirm_rx;
>   
>   	skb_put_padto(skb, ALIGN(len, 4));
>   

We had issues where the underlying transport layer failed to send the 
packet but flow control count wasn't adjusted. Eventually we couldn't
send to that remote port if the packet with the control flag bit was
dropped by the transport.

> @@ -318,7 +436,8 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
>   	if (len != ALIGN(size, 4) + hdrlen)
>   		goto err;
>   
> -	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA)
> +	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA &&
> +	    cb->type != QRTR_TYPE_RESUME_TX)
>   		goto err;
>   
>   	skb_put_data(skb, data + hdrlen, size);
> @@ -377,14 +496,18 @@ static void qrtr_node_rx_work(struct work_struct *work)
>   
>   		qrtr_node_assign(node, cb->src_node);
>   
> -		ipc = qrtr_port_lookup(cb->dst_port);
> -		if (!ipc) {
> -			kfree_skb(skb);
> +		if (cb->type == QRTR_TYPE_RESUME_TX) {
> +			qrtr_tx_resume(node, skb);
>   		} else {
> -			if (sock_queue_rcv_skb(&ipc->sk, skb))
> +			ipc = qrtr_port_lookup(cb->dst_port);
> +			if (!ipc) {
>   				kfree_skb(skb);
> +			} else {
> +				if (sock_queue_rcv_skb(&ipc->sk, skb))
> +					kfree_skb(skb);
>   
> -			qrtr_port_put(ipc);
> +				qrtr_port_put(ipc);
> +			}
>   		}
>   	}
>   }
> @@ -415,6 +538,9 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
>   	node->nid = QRTR_EP_NID_AUTO;
>   	node->ep = ep;
>   
> +	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
> +	init_waitqueue_head(&node->resume_tx);
> +
>   	qrtr_node_assign(node, nid);
>   
>   	mutex_lock(&qrtr_node_lock);
> @@ -449,6 +575,9 @@ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep)
>   		qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst);
>   	}
>   
> +	/* Wake up any transmitters waiting for resume-tx from the node */
> +	wake_up_interruptible_all(&node->resume_tx);
> +
>   	qrtr_node_release(node);
>   	ep->node = NULL;
>   }
> 

-- 
Qualcomm Innovation Center, Inc. is a member of Code Aurora Forum,
a Linux Foundation Collaborative Project

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2019-05-13 21:26 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-05-08  6:06 [PATCH 0/5] QRTR flow control improvements Bjorn Andersson
2019-05-08  6:06 ` [PATCH 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
2019-05-09 10:23   ` Arun Kumar Neelakantam
2019-05-08  6:06 ` [PATCH 2/5] net: qrtr: Implement outgoing flow control Bjorn Andersson
2019-05-08 16:41   ` David Miller
2019-05-13 21:26   ` Chris Lew
2019-05-08  6:06 ` [PATCH 3/5] net: qrtr: Migrate node lookup tree to spinlock Bjorn Andersson
2019-05-08  6:06 ` [PATCH 4/5] net: qrtr: Make qrtr_port_lookup() use RCU Bjorn Andersson
2019-05-08  6:06 ` [PATCH 5/5] net: qrtr: Remove receive worker Bjorn Andersson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).