diff mbox series

[1/6] net/rds: Avoid queuing superfluous send and recv work

Message ID 20250227042638.82553-2-allison.henderson@oracle.com (mailing list archive)
State Changes Requested
Delegated to: Netdev Maintainers
Headers show
Series RDS bug fix collection | expand

Checks

Context Check Description
netdev/series_format warning Target tree name not specified in the subject
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers warning 6 maintainers not CCed: linux-rdma@vger.kernel.org rds-devel@oss.oracle.com kuba@kernel.org horms@kernel.org edumazet@google.com pabeni@redhat.com
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 12 this patch: 12
netdev/checkpatch warning WARNING: line length of 90 exceeds 80 columns
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 30 this patch: 30
netdev/source_inline success Was 0 now: 0

Commit Message

Allison Henderson Feb. 27, 2025, 4:26 a.m. UTC
From: Håkon Bugge <haakon.bugge@oracle.com>

Queuing work on the connect path's work-queue is done from a plurality
of places and contexts.  Two more bits in cp_flags are defined, and
together with some helper functions, the work is only queued if not
already queued.

This to avoid the work queue being overloaded, reducing the connection
management performance, since the connection management uses the same
work queue.

Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c | 11 ++++++++---
 net/rds/ib_recv.c    |  2 +-
 net/rds/ib_send.c    |  2 +-
 net/rds/rds.h        | 33 +++++++++++++++++++++++++++++++++
 net/rds/send.c       |  6 +++---
 net/rds/tcp.c        | 12 ++++++++++--
 net/rds/tcp_recv.c   |  2 +-
 net/rds/tcp_send.c   |  2 +-
 net/rds/threads.c    | 25 +++++++++++++++++--------
 9 files changed, 75 insertions(+), 20 deletions(-)

Comments

Jakub Kicinski March 1, 2025, 12:19 a.m. UTC | #1
On Wed, 26 Feb 2025 21:26:33 -0700 allison.henderson@oracle.com wrote:
> +	/* clear_bit() does not imply a memory barrier */
> +	smp_mb__before_atomic();
> +	clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
> +	/* clear_bit() does not imply a memory barrier */
> +	smp_mb__after_atomic();

I'm guessing the comments were added because checkpatch asked for them.
The comments are supposed to indicate what this barrier pairs with.
I don't see the purpose of these barriers, please document..
Allison Henderson March 5, 2025, 12:38 a.m. UTC | #2
On Fri, 2025-02-28 at 16:19 -0800, Jakub Kicinski wrote:
> On Wed, 26 Feb 2025 21:26:33 -0700 allison.henderson@oracle.com wrote:
> > +	/* clear_bit() does not imply a memory barrier */
> > +	smp_mb__before_atomic();
> > +	clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
> > +	/* clear_bit() does not imply a memory barrier */
> > +	smp_mb__after_atomic();
> 
> I'm guessing the comments were added because checkpatch asked for them.
> The comments are supposed to indicate what this barrier pairs with.
> I don't see the purpose of these barriers, please document..

Hi Jakob,

I think the comments meant to refer to the implicit memory barrier in "test_and_set_bit".  It looks like it has assembly
code to set the barrier if CONFIG_SMP is set.  How about we change the comments to: "pairs with implicit memory barrier
in test_and_set_bit()" ?  Let me know what you think.

Thanks!
Allison
Jakub Kicinski March 5, 2025, 12:44 a.m. UTC | #3
On Wed, 5 Mar 2025 00:38:41 +0000 Allison Henderson wrote:
> > I'm guessing the comments were added because checkpatch asked for them.
> > The comments are supposed to indicate what this barrier pairs with.
> > I don't see the purpose of these barriers, please document..  
> 
> Hi Jakob,
> 
> I think the comments meant to refer to the implicit memory barrier in
> "test_and_set_bit".  It looks like it has assembly code to set the
> barrier if CONFIG_SMP is set.  How about we change the comments to:
> "pairs with implicit memory barrier in test_and_set_bit()" ?  Let me
> know what you think.

Okay, but what is the purpose. The commit message does not explain 
at all why these barriers are needed.
Allison Henderson March 6, 2025, 4:41 p.m. UTC | #4
On Tue, 2025-03-04 at 16:44 -0800, Jakub Kicinski wrote:
> On Wed, 5 Mar 2025 00:38:41 +0000 Allison Henderson wrote:
> > > I'm guessing the comments were added because checkpatch asked for them.
> > > The comments are supposed to indicate what this barrier pairs with.
> > > I don't see the purpose of these barriers, please document..  
> > 
> > Hi Jakob,
> > 
> > I think the comments meant to refer to the implicit memory barrier in
> > "test_and_set_bit".  It looks like it has assembly code to set the
> > barrier if CONFIG_SMP is set.  How about we change the comments to:
> > "pairs with implicit memory barrier in test_and_set_bit()" ?  Let me
> > know what you think.
> 
> Okay, but what is the purpose. The commit message does not explain 
> at all why these barriers are needed.

Hi Jakub,

I think it's to make sure the clearing of the bit is the last operation done for the calling function, in this case
rds_queue_reconnect.  The purpose of the barrier in test_and_set is to make sure the bit is checked before proceeding to
any further operations (in our case queuing reconnect items).  So it would make sense that the clearing of the bit
should happen only after we are done with all such operations.  I found some documentation for smp_mb__*_atomic in
Documentation/memory-barriers.txt that mentions these functions are used for atomic RMW bitop functions like clear_bit
and set_bit, since they do not imply a memory barrier themselves.  Perhaps the original comment meant to reference that
too.

I hope that helps?  If so, maybe we can expand the comment further to something like:
/*
 * pairs with implicit memory barrier in calls to test_and_set_bit with RDS_RECONNECT_PENDING
 * Used to ensure the bit is on only while reconnect operations are in progress
 */
 
 Let me know what you think, or that's too much or too little detail.
 
 Thanks!
 Allison
Jakub Kicinski March 6, 2025, 6:18 p.m. UTC | #5
On Thu, 6 Mar 2025 16:41:35 +0000 Allison Henderson wrote:
> I think it's to make sure the clearing of the bit is the last
> operation done for the calling function, in this case
> rds_queue_reconnect.  The purpose of the barrier in test_and_set is
> to make sure the bit is checked before proceeding to any further
> operations (in our case queuing reconnect items).

Let's be precise, can you give an example of 2 execution threads
and memory accesses which have to be ordered.
Allison Henderson March 7, 2025, 8:28 p.m. UTC | #6
On Thu, 2025-03-06 at 10:18 -0800, Jakub Kicinski wrote:
> On Thu, 6 Mar 2025 16:41:35 +0000 Allison Henderson wrote:
> > I think it's to make sure the clearing of the bit is the last
> > operation done for the calling function, in this case
> > rds_queue_reconnect.  The purpose of the barrier in test_and_set is
> > to make sure the bit is checked before proceeding to any further
> > operations (in our case queuing reconnect items).
> 
> Let's be precise, can you give an example of 2 execution threads
> and memory accesses which have to be ordered.

Hi Jakub,

I just realized my last response referred to bits and functions in the next patch instead this of one.  Apologies for
the confusion!  For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a
good example?  How about this:

Thread A:
  Calls rds_send_worker()
    calls rds_clear_queued_send_work_bit()
      clears RDS_SEND_WORK_QUEUED in cp->cp_flags
    calls rds_send_xmit()
    calls cond_resched()

Thread B:
   Calls rds_sendmsg()
   Calls rds_send_xmit
   Calls rds_cond_queue_send_work 
      checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags


So in this example the memory barriers ensure that the clearing of the bit is properly seen by thread B.  Without these
memory barriers in rds_clear_queued_send_work_bit(), rds_cond_queue_send_work() could see stale values in cp->cp_flags
and incorrectly assume that the work is still queued, leading to potential missed work processing.

I hope that helps some?  Let me know if so/not or if there is anything else that would help clarify.  If it helps at
all, I think there's a similar use case in commit 93093ea1f059, though it's the other way around with the barriers
around the set_bit, and the implicit barriers in the test_and_clear_bit().  And I think on CPUs with strongly ordered
memory, the barriers do not expand to anything in that case.

Let me know if this helps!
Thank you!

Allison
Jakub Kicinski March 8, 2025, 2:53 a.m. UTC | #7
On Fri, 7 Mar 2025 20:28:57 +0000 Allison Henderson wrote:
> > Let's be precise, can you give an example of 2 execution threads
> > and memory accesses which have to be ordered.  
> 
> Hi Jakub,
> 
> I just realized my last response referred to bits and functions in the next patch instead this of one.  Apologies for
> the confusion!  For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a
> good example?  How about this:
> 
> Thread A:
>   Calls rds_send_worker()
>     calls rds_clear_queued_send_work_bit()
>       clears RDS_SEND_WORK_QUEUED in cp->cp_flags
>     calls rds_send_xmit()
>     calls cond_resched()
> 
> Thread B:
>    Calls rds_sendmsg()
>    Calls rds_send_xmit
>    Calls rds_cond_queue_send_work 
>       checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags

We need at least two memory locations if we want to talk about ordering.
In your example we have cp_flags, but the rest is code.
What's the second memory location.
Take a look at e592b5110b3e9393 for an example of a good side by side
thread execution.. listing(?):

    Thread1 (oa_tc6_start_xmit)     Thread2 (oa_tc6_spi_thread_handler)
    ---------------------------     -----------------------------------
    - if waiting_tx_skb is NULL
                                    - if ongoing_tx_skb is NULL
                                    - ongoing_tx_skb = waiting_tx_skb
    - waiting_tx_skb = skb
                                    - waiting_tx_skb = NULL
                                    ...
                                    - ongoing_tx_skb = NULL
    - if waiting_tx_skb is NULL
    - waiting_tx_skb = skb


This makes it pretty clear what fields are at play and how the race
happens.
Allison Henderson March 12, 2025, 7:50 a.m. UTC | #8
On Fri, 2025-03-07 at 18:53 -0800, Jakub Kicinski wrote:
> On Fri, 7 Mar 2025 20:28:57 +0000 Allison Henderson wrote:
> > > Let's be precise, can you give an example of 2 execution threads
> > > and memory accesses which have to be ordered.  
> > 
> > Hi Jakub,
> > 
> > I just realized my last response referred to bits and functions in the next patch instead this of one.  Apologies for
> > the confusion!  For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a
> > good example?  How about this:
> > 
> > Thread A:
> >   Calls rds_send_worker()
> >     calls rds_clear_queued_send_work_bit()
> >       clears RDS_SEND_WORK_QUEUED in cp->cp_flags
> >     calls rds_send_xmit()
> >     calls cond_resched()
> > 
> > Thread B:
> >    Calls rds_sendmsg()
> >    Calls rds_send_xmit
> >    Calls rds_cond_queue_send_work 
> >       checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags
> 
> We need at least two memory locations if we want to talk about ordering.
> In your example we have cp_flags, but the rest is code.
> What's the second memory location.
> Take a look at e592b5110b3e9393 for an example of a good side by side
> thread execution.. listing(?):
> 
>     Thread1 (oa_tc6_start_xmit)     Thread2 (oa_tc6_spi_thread_handler)
>     ---------------------------     -----------------------------------
>     - if waiting_tx_skb is NULL
>                                     - if ongoing_tx_skb is NULL
>                                     - ongoing_tx_skb = waiting_tx_skb
>     - waiting_tx_skb = skb
>                                     - waiting_tx_skb = NULL
>                                     ...
>                                     - ongoing_tx_skb = NULL
>     - if waiting_tx_skb is NULL
>     - waiting_tx_skb = skb
> 
> 
> This makes it pretty clear what fields are at play and how the race
> happens.
Hi Jakub,

I suppose the second address would have to be the queue itself wouldn't it?  We have a flag that's meant to avoid
threads racing to access a queue, so it would make sense that the addresses of interest would be the flag and the queue.
Which is cp->cp_send_w in the send example.  So if we adjusted our example to include the queue access, then it would
look like this: 

Thread A:					Thread B:
-----------------------------------		-----------------------------------
						Calls rds_sendmsg()
						   Calls rds_send_xmit()
						      Calls rds_cond_queue_send_work()   
Calls rds_send_worker()					
  calls rds_clear_queued_send_work_bit()		   
    clears RDS_SEND_WORK_QUEUED in cp->cp_flags		
      						         checks RDS_SEND_WORK_QUEUED in cp->cp_flags
      						         but sees stale value
      						         Skips queuing on cp->cp_send_w when it should not
    Calls rds_send_xmit()
       Calls rds_cond_queue_send_work()
          queues work on cp->cp_send_w

And then if we have the barriers, then the example would look like this:

Thread A:					Thread B:
-----------------------------------		-----------------------------------
						Calls rds_sendmsg()
						   Calls rds_send_xmit()
						      Calls rds_cond_queue_send_work()   
Calls rds_send_worker()					
  calls rds_clear_queued_send_work_bit()		   
    clears RDS_SEND_WORK_QUEUED in cp->cp_flags		
      						         checks RDS_SEND_WORK_QUEUED in cp->cp_flags
      						         Queues work on on cp->cp_send_w
    Calls rds_send_xmit()
       Calls rds_cond_queue_send_work()
          skips queueing work on cp->cp_send_w

I think the barriers also make sure thread A's call to rds_send_xmit() happens after the clear_bit() too.  Otherwise it
may be possible that it is reordered, and then we get another missed work item there too.  I hope this helps some?  Let
me know if that makes sense or if you think there's a better way it could be managed.  Thank you!

Allison
Jakub Kicinski March 26, 2025, 4:42 p.m. UTC | #9
On Wed, 12 Mar 2025 07:50:11 +0000 Allison Henderson wrote:
> Thread A:					Thread B:
> -----------------------------------		-----------------------------------
> 						Calls rds_sendmsg()
> 						   Calls rds_send_xmit()
> 						      Calls rds_cond_queue_send_work()   
> Calls rds_send_worker()					
>   calls rds_clear_queued_send_work_bit()		   
>     clears RDS_SEND_WORK_QUEUED in cp->cp_flags		
>       						         checks RDS_SEND_WORK_QUEUED in cp->cp_flags

But if the two threads run in parallel what prevents this check 
to happen fully before the previous line on the "Thread A" side?

Please take a look at netif_txq_try_stop() for an example of 
a memory-barrier based algo.

>       						         Queues work on on cp->cp_send_w
>     Calls rds_send_xmit()
>        Calls rds_cond_queue_send_work()
>           skips queueing work on cp->cp_send_w
Allison Henderson April 2, 2025, 1:34 a.m. UTC | #10
On Wed, 2025-03-26 at 09:42 -0700, Jakub Kicinski wrote:
> On Wed, 12 Mar 2025 07:50:11 +0000 Allison Henderson wrote:
> > Thread A:					Thread B:
> > -----------------------------------		-----------------------------------
> > 						Calls rds_sendmsg()
> > 						   Calls rds_send_xmit()
> > 						      Calls rds_cond_queue_send_work()   
> > Calls rds_send_worker()					
> >   calls rds_clear_queued_send_work_bit()		   
> >     clears RDS_SEND_WORK_QUEUED in cp->cp_flags		
> >       						         checks RDS_SEND_WORK_QUEUED in cp->cp_flags
> 
> But if the two threads run in parallel what prevents this check 
> to happen fully before the previous line on the "Thread A" side?
> 
> Please take a look at netif_txq_try_stop() for an example of 
> a memory-barrier based algo.
> 
> >       						         Queues work on on cp->cp_send_w
> >     Calls rds_send_xmit()
> >        Calls rds_cond_queue_send_work()
> >           skips queueing work on cp->cp_send_w

Hi Jakub,

I had a look at the example, how about we move the barriers from rds_clear_queued_send_work_bit into
rds_cond_queue_send_work?  Then we have something like this:

static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay)
{
	/* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed  */
	smp_mb__before_atomic();

        if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
                queue_delayed_work(rds_wq, &cp->cp_send_w, delay);

	/* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */
	smp_mb__after_atomic();
}

I think that's more like whats in the example, and in line with what this patch is trying to do.  Let me know what you
think.

Thank you for the reviews!
Allison
Jakub Kicinski April 2, 2025, 4:18 p.m. UTC | #11
On Wed, 2 Apr 2025 01:34:40 +0000 Allison Henderson wrote:
> I had a look at the example, how about we move the barriers from
> rds_clear_queued_send_work_bit into rds_cond_queue_send_work?  Then
> we have something like this:
> 
> static inline void rds_cond_queue_send_work(struct rds_conn_path *cp,
> unsigned long delay) {
> 	/* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed  */ smp_mb__before_atomic();
> 
>         if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
>                 queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
> 
> 	/* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */ smp_mb__after_atomic();
> }
> 
> I think that's more like whats in the example, and in line with what
> this patch is trying to do.  Let me know what you think.

Sorry, this still feels like a cargo cult to me.
Let's get a clear explanation of what the barriers order 
or just skip the patch.
Allison Henderson April 3, 2025, 1:27 a.m. UTC | #12
On Wed, 2025-04-02 at 09:18 -0700, Jakub Kicinski wrote:
> On Wed, 2 Apr 2025 01:34:40 +0000 Allison Henderson wrote:
> > I had a look at the example, how about we move the barriers from
> > rds_clear_queued_send_work_bit into rds_cond_queue_send_work?  Then
> > we have something like this:
> > 
> > static inline void rds_cond_queue_send_work(struct rds_conn_path *cp,
> > unsigned long delay) {
> > 	/* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed  */ smp_mb__before_atomic();
> > 
> >         if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
> >                 queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
> > 
> > 	/* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */ smp_mb__after_atomic();
> > }
> > 
> > I think that's more like whats in the example, and in line with what
> > this patch is trying to do.  Let me know what you think.
> 
> Sorry, this still feels like a cargo cult to me.
> Let's get a clear explanation of what the barriers order 
> or just skip the patch.
> 
Sure, I'm out of town tomorrow through Monday, but I'll see what I can do when I get back Tues.  I'm working on some
other fixes I'd like to add to the set, but they need the performance assist to not time out in the selftests, so I'll
have to find something else.

Allison
diff mbox series

Patch

diff --git a/net/rds/connection.c b/net/rds/connection.c
index c749c5525b40..1d80586fdda2 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -445,9 +445,14 @@  static void rds_conn_path_destroy(struct rds_conn_path *cp)
 	if (!cp->cp_transport_data)
 		return;
 
-	/* make sure lingering queued work won't try to ref the conn */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the
+	 * bit to avoid any re-queueing)
+	 */
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 
 	rds_conn_path_drop(cp, true);
 	flush_work(&cp->cp_down_w);
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index e53b7f266bd7..4ecee1ff1c26 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -457,7 +457,7 @@  void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 	    (must_wake ||
 	    (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
 	    rds_ib_ring_empty(&ic->i_recv_ring))) {
-		queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+		rds_cond_queue_recv_work(conn->c_path + 0, 1);
 	}
 	if (can_wait)
 		cond_resched();
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4190b90ff3b1..95edd4cb8528 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -419,7 +419,7 @@  void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 
 	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+		rds_cond_queue_send_work(conn->c_path + 0, 0);
 
 	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 
diff --git a/net/rds/rds.h b/net/rds/rds.h
index dc360252c515..c9a22d0e887b 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -90,6 +90,8 @@  enum {
 #define RDS_IN_XMIT		2
 #define RDS_RECV_REFILL		3
 #define	RDS_DESTROY_PENDING	4
+#define RDS_SEND_WORK_QUEUED	5
+#define RDS_RECV_WORK_QUEUED	6
 
 /* Max number of multipaths per RDS connection. Must be a power of 2 */
 #define	RDS_MPATH_WORKERS	8
@@ -791,6 +793,37 @@  void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
 #define rds_conn_path_error(cp, fmt...) \
 	__rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
 
+extern struct workqueue_struct *rds_wq;
+static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
+}
+
+static inline void rds_clear_queued_send_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
+static inline void rds_cond_queue_recv_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_recv_w, delay);
+}
+
+static inline void rds_clear_queued_recv_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
 static inline int
 rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
 {
diff --git a/net/rds/send.c b/net/rds/send.c
index 09a280110654..6329cc8ec246 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -458,7 +458,7 @@  int rds_send_xmit(struct rds_conn_path *cp)
 			if (rds_destroy_pending(cp->cp_conn))
 				ret = -ENETUNREACH;
 			else
-				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+				rds_cond_queue_send_work(cp, 1);
 			rcu_read_unlock();
 		} else if (raced) {
 			rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1380,7 @@  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		if (rds_destroy_pending(cpath->cp_conn))
 			ret = -ENETUNREACH;
 		else
-			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+			rds_cond_queue_send_work(cpath, 1);
 		rcu_read_unlock();
 	}
 	if (ret)
@@ -1473,7 +1473,7 @@  rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	/* schedule the send work on rds_wq */
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 	rds_message_put(rm);
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 0581c53e6517..b3f2c6e27b59 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -168,8 +168,16 @@  void rds_tcp_reset_callbacks(struct socket *sock,
 	atomic_set(&cp->cp_state, RDS_CONN_RESETTING);
 	wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 	/* reset receive side state for rds_tcp_data_recv() for osock  */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the bit
+	 * to avoid any re-queueing)
+	 */
+
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 	lock_sock(osock->sk);
 	if (tc->t_tinc) {
 		rds_inc_put(&tc->t_tinc->ti_inc);
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index 7997a19d1da3..ab9fc150f974 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -327,7 +327,7 @@  void rds_tcp_data_ready(struct sock *sk)
 	if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
 		rcu_read_lock();
 		if (!rds_destroy_pending(cp->cp_conn))
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			rds_cond_queue_recv_work(cp, 0);
 		rcu_read_unlock();
 	}
 out:
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 7d284ac7e81a..cecd3dbde58d 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -201,7 +201,7 @@  void rds_tcp_write_space(struct sock *sk)
 	rcu_read_lock();
 	if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
 	    !rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 1f424cbfcbb4..eedae5653051 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -89,8 +89,10 @@  void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 	set_bit(0, &cp->cp_conn->c_map_queued);
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn)) {
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+		rds_clear_queued_send_work_bit(cp);
+		rds_cond_queue_send_work(cp, 0);
+		rds_clear_queued_recv_work_bit(cp);
+		rds_cond_queue_recv_work(cp, 0);
 	}
 	rcu_read_unlock();
 	cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -193,9 +195,11 @@  void rds_send_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_send_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_send_work_bit(cp);
 		clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
 		ret = rds_send_xmit(cp);
 		cond_resched();
@@ -203,15 +207,17 @@  void rds_send_worker(struct work_struct *work)
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_send_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_send_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+
+		rds_cond_queue_send_work(cp, delay);
 	}
 }
 
@@ -220,23 +226,26 @@  void rds_recv_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_recv_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_recv_work_bit(cp);
 		ret = cp->cp_conn->c_trans->recv_path(cp);
 		rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_recv_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_recv_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+		rds_cond_queue_recv_work(cp, delay);
 	}
 }