Message ID | 20250227042638.82553-2-allison.henderson@oracle.com (mailing list archive) |
---|---|
State | Changes Requested |
Delegated to: | Netdev Maintainers |
Headers | show |
Series | RDS bug fix collection | expand |
On Wed, 26 Feb 2025 21:26:33 -0700 allison.henderson@oracle.com wrote: > + /* clear_bit() does not imply a memory barrier */ > + smp_mb__before_atomic(); > + clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags); > + /* clear_bit() does not imply a memory barrier */ > + smp_mb__after_atomic(); I'm guessing the comments were added because checkpatch asked for them. The comments are supposed to indicate what this barrier pairs with. I don't see the purpose of these barriers, please document..
On Fri, 2025-02-28 at 16:19 -0800, Jakub Kicinski wrote: > On Wed, 26 Feb 2025 21:26:33 -0700 allison.henderson@oracle.com wrote: > > + /* clear_bit() does not imply a memory barrier */ > > + smp_mb__before_atomic(); > > + clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags); > > + /* clear_bit() does not imply a memory barrier */ > > + smp_mb__after_atomic(); > > I'm guessing the comments were added because checkpatch asked for them. > The comments are supposed to indicate what this barrier pairs with. > I don't see the purpose of these barriers, please document.. Hi Jakob, I think the comments meant to refer to the implicit memory barrier in "test_and_set_bit". It looks like it has assembly code to set the barrier if CONFIG_SMP is set. How about we change the comments to: "pairs with implicit memory barrier in test_and_set_bit()" ? Let me know what you think. Thanks! Allison
On Wed, 5 Mar 2025 00:38:41 +0000 Allison Henderson wrote: > > I'm guessing the comments were added because checkpatch asked for them. > > The comments are supposed to indicate what this barrier pairs with. > > I don't see the purpose of these barriers, please document.. > > Hi Jakob, > > I think the comments meant to refer to the implicit memory barrier in > "test_and_set_bit". It looks like it has assembly code to set the > barrier if CONFIG_SMP is set. How about we change the comments to: > "pairs with implicit memory barrier in test_and_set_bit()" ? Let me > know what you think. Okay, but what is the purpose. The commit message does not explain at all why these barriers are needed.
On Tue, 2025-03-04 at 16:44 -0800, Jakub Kicinski wrote: > On Wed, 5 Mar 2025 00:38:41 +0000 Allison Henderson wrote: > > > I'm guessing the comments were added because checkpatch asked for them. > > > The comments are supposed to indicate what this barrier pairs with. > > > I don't see the purpose of these barriers, please document.. > > > > Hi Jakob, > > > > I think the comments meant to refer to the implicit memory barrier in > > "test_and_set_bit". It looks like it has assembly code to set the > > barrier if CONFIG_SMP is set. How about we change the comments to: > > "pairs with implicit memory barrier in test_and_set_bit()" ? Let me > > know what you think. > > Okay, but what is the purpose. The commit message does not explain > at all why these barriers are needed. Hi Jakub, I think it's to make sure the clearing of the bit is the last operation done for the calling function, in this case rds_queue_reconnect. The purpose of the barrier in test_and_set is to make sure the bit is checked before proceeding to any further operations (in our case queuing reconnect items). So it would make sense that the clearing of the bit should happen only after we are done with all such operations. I found some documentation for smp_mb__*_atomic in Documentation/memory-barriers.txt that mentions these functions are used for atomic RMW bitop functions like clear_bit and set_bit, since they do not imply a memory barrier themselves. Perhaps the original comment meant to reference that too. I hope that helps? If so, maybe we can expand the comment further to something like: /* * pairs with implicit memory barrier in calls to test_and_set_bit with RDS_RECONNECT_PENDING * Used to ensure the bit is on only while reconnect operations are in progress */ Let me know what you think, or that's too much or too little detail. Thanks! Allison
On Thu, 6 Mar 2025 16:41:35 +0000 Allison Henderson wrote: > I think it's to make sure the clearing of the bit is the last > operation done for the calling function, in this case > rds_queue_reconnect. The purpose of the barrier in test_and_set is > to make sure the bit is checked before proceeding to any further > operations (in our case queuing reconnect items). Let's be precise, can you give an example of 2 execution threads and memory accesses which have to be ordered.
On Thu, 2025-03-06 at 10:18 -0800, Jakub Kicinski wrote: > On Thu, 6 Mar 2025 16:41:35 +0000 Allison Henderson wrote: > > I think it's to make sure the clearing of the bit is the last > > operation done for the calling function, in this case > > rds_queue_reconnect. The purpose of the barrier in test_and_set is > > to make sure the bit is checked before proceeding to any further > > operations (in our case queuing reconnect items). > > Let's be precise, can you give an example of 2 execution threads > and memory accesses which have to be ordered. Hi Jakub, I just realized my last response referred to bits and functions in the next patch instead this of one. Apologies for the confusion! For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a good example? How about this: Thread A: Calls rds_send_worker() calls rds_clear_queued_send_work_bit() clears RDS_SEND_WORK_QUEUED in cp->cp_flags calls rds_send_xmit() calls cond_resched() Thread B: Calls rds_sendmsg() Calls rds_send_xmit Calls rds_cond_queue_send_work checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags So in this example the memory barriers ensure that the clearing of the bit is properly seen by thread B. Without these memory barriers in rds_clear_queued_send_work_bit(), rds_cond_queue_send_work() could see stale values in cp->cp_flags and incorrectly assume that the work is still queued, leading to potential missed work processing. I hope that helps some? Let me know if so/not or if there is anything else that would help clarify. If it helps at all, I think there's a similar use case in commit 93093ea1f059, though it's the other way around with the barriers around the set_bit, and the implicit barriers in the test_and_clear_bit(). And I think on CPUs with strongly ordered memory, the barriers do not expand to anything in that case. Let me know if this helps! Thank you! Allison
On Fri, 7 Mar 2025 20:28:57 +0000 Allison Henderson wrote: > > Let's be precise, can you give an example of 2 execution threads > > and memory accesses which have to be ordered. > > Hi Jakub, > > I just realized my last response referred to bits and functions in the next patch instead this of one. Apologies for > the confusion! For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a > good example? How about this: > > Thread A: > Calls rds_send_worker() > calls rds_clear_queued_send_work_bit() > clears RDS_SEND_WORK_QUEUED in cp->cp_flags > calls rds_send_xmit() > calls cond_resched() > > Thread B: > Calls rds_sendmsg() > Calls rds_send_xmit > Calls rds_cond_queue_send_work > checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags We need at least two memory locations if we want to talk about ordering. In your example we have cp_flags, but the rest is code. What's the second memory location. Take a look at e592b5110b3e9393 for an example of a good side by side thread execution.. listing(?): Thread1 (oa_tc6_start_xmit) Thread2 (oa_tc6_spi_thread_handler) --------------------------- ----------------------------------- - if waiting_tx_skb is NULL - if ongoing_tx_skb is NULL - ongoing_tx_skb = waiting_tx_skb - waiting_tx_skb = skb - waiting_tx_skb = NULL ... - ongoing_tx_skb = NULL - if waiting_tx_skb is NULL - waiting_tx_skb = skb This makes it pretty clear what fields are at play and how the race happens.
On Fri, 2025-03-07 at 18:53 -0800, Jakub Kicinski wrote: > On Fri, 7 Mar 2025 20:28:57 +0000 Allison Henderson wrote: > > > Let's be precise, can you give an example of 2 execution threads > > > and memory accesses which have to be ordered. > > > > Hi Jakub, > > > > I just realized my last response referred to bits and functions in the next patch instead this of one. Apologies for > > the confusion! For this thread example though, I think a pair of threads in rds_send_worker and rds_sendmsg would be a > > good example? How about this: > > > > Thread A: > > Calls rds_send_worker() > > calls rds_clear_queued_send_work_bit() > > clears RDS_SEND_WORK_QUEUED in cp->cp_flags > > calls rds_send_xmit() > > calls cond_resched() > > > > Thread B: > > Calls rds_sendmsg() > > Calls rds_send_xmit > > Calls rds_cond_queue_send_work > > checks and sets RDS_SEND_WORK_QUEUED in cp->cp_flags > > We need at least two memory locations if we want to talk about ordering. > In your example we have cp_flags, but the rest is code. > What's the second memory location. > Take a look at e592b5110b3e9393 for an example of a good side by side > thread execution.. listing(?): > > Thread1 (oa_tc6_start_xmit) Thread2 (oa_tc6_spi_thread_handler) > --------------------------- ----------------------------------- > - if waiting_tx_skb is NULL > - if ongoing_tx_skb is NULL > - ongoing_tx_skb = waiting_tx_skb > - waiting_tx_skb = skb > - waiting_tx_skb = NULL > ... > - ongoing_tx_skb = NULL > - if waiting_tx_skb is NULL > - waiting_tx_skb = skb > > > This makes it pretty clear what fields are at play and how the race > happens. Hi Jakub, I suppose the second address would have to be the queue itself wouldn't it? We have a flag that's meant to avoid threads racing to access a queue, so it would make sense that the addresses of interest would be the flag and the queue. Which is cp->cp_send_w in the send example. So if we adjusted our example to include the queue access, then it would look like this: Thread A: Thread B: ----------------------------------- ----------------------------------- Calls rds_sendmsg() Calls rds_send_xmit() Calls rds_cond_queue_send_work() Calls rds_send_worker() calls rds_clear_queued_send_work_bit() clears RDS_SEND_WORK_QUEUED in cp->cp_flags checks RDS_SEND_WORK_QUEUED in cp->cp_flags but sees stale value Skips queuing on cp->cp_send_w when it should not Calls rds_send_xmit() Calls rds_cond_queue_send_work() queues work on cp->cp_send_w And then if we have the barriers, then the example would look like this: Thread A: Thread B: ----------------------------------- ----------------------------------- Calls rds_sendmsg() Calls rds_send_xmit() Calls rds_cond_queue_send_work() Calls rds_send_worker() calls rds_clear_queued_send_work_bit() clears RDS_SEND_WORK_QUEUED in cp->cp_flags checks RDS_SEND_WORK_QUEUED in cp->cp_flags Queues work on on cp->cp_send_w Calls rds_send_xmit() Calls rds_cond_queue_send_work() skips queueing work on cp->cp_send_w I think the barriers also make sure thread A's call to rds_send_xmit() happens after the clear_bit() too. Otherwise it may be possible that it is reordered, and then we get another missed work item there too. I hope this helps some? Let me know if that makes sense or if you think there's a better way it could be managed. Thank you! Allison
On Wed, 12 Mar 2025 07:50:11 +0000 Allison Henderson wrote: > Thread A: Thread B: > ----------------------------------- ----------------------------------- > Calls rds_sendmsg() > Calls rds_send_xmit() > Calls rds_cond_queue_send_work() > Calls rds_send_worker() > calls rds_clear_queued_send_work_bit() > clears RDS_SEND_WORK_QUEUED in cp->cp_flags > checks RDS_SEND_WORK_QUEUED in cp->cp_flags But if the two threads run in parallel what prevents this check to happen fully before the previous line on the "Thread A" side? Please take a look at netif_txq_try_stop() for an example of a memory-barrier based algo. > Queues work on on cp->cp_send_w > Calls rds_send_xmit() > Calls rds_cond_queue_send_work() > skips queueing work on cp->cp_send_w
On Wed, 2025-03-26 at 09:42 -0700, Jakub Kicinski wrote: > On Wed, 12 Mar 2025 07:50:11 +0000 Allison Henderson wrote: > > Thread A: Thread B: > > ----------------------------------- ----------------------------------- > > Calls rds_sendmsg() > > Calls rds_send_xmit() > > Calls rds_cond_queue_send_work() > > Calls rds_send_worker() > > calls rds_clear_queued_send_work_bit() > > clears RDS_SEND_WORK_QUEUED in cp->cp_flags > > checks RDS_SEND_WORK_QUEUED in cp->cp_flags > > But if the two threads run in parallel what prevents this check > to happen fully before the previous line on the "Thread A" side? > > Please take a look at netif_txq_try_stop() for an example of > a memory-barrier based algo. > > > Queues work on on cp->cp_send_w > > Calls rds_send_xmit() > > Calls rds_cond_queue_send_work() > > skips queueing work on cp->cp_send_w Hi Jakub, I had a look at the example, how about we move the barriers from rds_clear_queued_send_work_bit into rds_cond_queue_send_work? Then we have something like this: static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay) { /* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed */ smp_mb__before_atomic(); if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) queue_delayed_work(rds_wq, &cp->cp_send_w, delay); /* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */ smp_mb__after_atomic(); } I think that's more like whats in the example, and in line with what this patch is trying to do. Let me know what you think. Thank you for the reviews! Allison
On Wed, 2 Apr 2025 01:34:40 +0000 Allison Henderson wrote: > I had a look at the example, how about we move the barriers from > rds_clear_queued_send_work_bit into rds_cond_queue_send_work? Then > we have something like this: > > static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, > unsigned long delay) { > /* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed */ smp_mb__before_atomic(); > > if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) > queue_delayed_work(rds_wq, &cp->cp_send_w, delay); > > /* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */ smp_mb__after_atomic(); > } > > I think that's more like whats in the example, and in line with what > this patch is trying to do. Let me know what you think. Sorry, this still feels like a cargo cult to me. Let's get a clear explanation of what the barriers order or just skip the patch.
On Wed, 2025-04-02 at 09:18 -0700, Jakub Kicinski wrote: > On Wed, 2 Apr 2025 01:34:40 +0000 Allison Henderson wrote: > > I had a look at the example, how about we move the barriers from > > rds_clear_queued_send_work_bit into rds_cond_queue_send_work? Then > > we have something like this: > > > > static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, > > unsigned long delay) { > > /* Ensure prior clear_bit operations for RDS_SEND_WORK_QUEUED are observed */ smp_mb__before_atomic(); > > > > if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) > > queue_delayed_work(rds_wq, &cp->cp_send_w, delay); > > > > /* Ensure the RDS_SEND_WORK_QUEUED bit is observed before proceeding */ smp_mb__after_atomic(); > > } > > > > I think that's more like whats in the example, and in line with what > > this patch is trying to do. Let me know what you think. > > Sorry, this still feels like a cargo cult to me. > Let's get a clear explanation of what the barriers order > or just skip the patch. > Sure, I'm out of town tomorrow through Monday, but I'll see what I can do when I get back Tues. I'm working on some other fixes I'd like to add to the set, but they need the performance assist to not time out in the selftests, so I'll have to find something else. Allison
diff --git a/net/rds/connection.c b/net/rds/connection.c index c749c5525b40..1d80586fdda2 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -445,9 +445,14 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) if (!cp->cp_transport_data) return; - /* make sure lingering queued work won't try to ref the conn */ - cancel_delayed_work_sync(&cp->cp_send_w); - cancel_delayed_work_sync(&cp->cp_recv_w); + /* make sure lingering queued work won't try to ref the + * conn. If there is work queued, we cancel it (and set the + * bit to avoid any re-queueing) + */ + if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) + cancel_delayed_work_sync(&cp->cp_send_w); + if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags)) + cancel_delayed_work_sync(&cp->cp_recv_w); rds_conn_path_drop(cp, true); flush_work(&cp->cp_down_w); diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index e53b7f266bd7..4ecee1ff1c26 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -457,7 +457,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) (must_wake || (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) || rds_ib_ring_empty(&ic->i_recv_ring))) { - queue_delayed_work(rds_wq, &conn->c_recv_w, 1); + rds_cond_queue_recv_work(conn->c_path + 0, 1); } if (can_wait) cond_resched(); diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 4190b90ff3b1..95edd4cb8528 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -419,7 +419,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits) atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + rds_cond_queue_send_work(conn->c_path + 0, 0); WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); diff --git a/net/rds/rds.h b/net/rds/rds.h index dc360252c515..c9a22d0e887b 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -90,6 +90,8 @@ enum { #define RDS_IN_XMIT 2 #define RDS_RECV_REFILL 3 #define RDS_DESTROY_PENDING 4 +#define RDS_SEND_WORK_QUEUED 5 +#define RDS_RECV_WORK_QUEUED 6 /* Max number of multipaths per RDS connection. Must be a power of 2 */ #define RDS_MPATH_WORKERS 8 @@ -791,6 +793,37 @@ void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...); #define rds_conn_path_error(cp, fmt...) \ __rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt) +extern struct workqueue_struct *rds_wq; +static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay) +{ + if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_send_w, delay); +} + +static inline void rds_clear_queued_send_work_bit(struct rds_conn_path *cp) +{ + /* clear_bit() does not imply a memory barrier */ + smp_mb__before_atomic(); + clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags); + /* clear_bit() does not imply a memory barrier */ + smp_mb__after_atomic(); +} + +static inline void rds_cond_queue_recv_work(struct rds_conn_path *cp, unsigned long delay) +{ + if (!test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_recv_w, delay); +} + +static inline void rds_clear_queued_recv_work_bit(struct rds_conn_path *cp) +{ + /* clear_bit() does not imply a memory barrier */ + smp_mb__before_atomic(); + clear_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags); + /* clear_bit() does not imply a memory barrier */ + smp_mb__after_atomic(); +} + static inline int rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) { diff --git a/net/rds/send.c b/net/rds/send.c index 09a280110654..6329cc8ec246 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -458,7 +458,7 @@ int rds_send_xmit(struct rds_conn_path *cp) if (rds_destroy_pending(cp->cp_conn)) ret = -ENETUNREACH; else - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rds_cond_queue_send_work(cp, 1); rcu_read_unlock(); } else if (raced) { rds_stats_inc(s_send_lock_queue_raced); @@ -1380,7 +1380,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) if (rds_destroy_pending(cpath->cp_conn)) ret = -ENETUNREACH; else - queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); + rds_cond_queue_send_work(cpath, 1); rcu_read_unlock(); } if (ret) @@ -1473,7 +1473,7 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport, /* schedule the send work on rds_wq */ rcu_read_lock(); if (!rds_destroy_pending(cp->cp_conn)) - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rds_cond_queue_send_work(cp, 0); rcu_read_unlock(); rds_message_put(rm); diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 0581c53e6517..b3f2c6e27b59 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -168,8 +168,16 @@ void rds_tcp_reset_callbacks(struct socket *sock, atomic_set(&cp->cp_state, RDS_CONN_RESETTING); wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags)); /* reset receive side state for rds_tcp_data_recv() for osock */ - cancel_delayed_work_sync(&cp->cp_send_w); - cancel_delayed_work_sync(&cp->cp_recv_w); + + /* make sure lingering queued work won't try to ref the + * conn. If there is work queued, we cancel it (and set the bit + * to avoid any re-queueing) + */ + + if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags)) + cancel_delayed_work_sync(&cp->cp_send_w); + if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags)) + cancel_delayed_work_sync(&cp->cp_recv_w); lock_sock(osock->sk); if (tc->t_tinc) { rds_inc_put(&tc->t_tinc->ti_inc); diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index 7997a19d1da3..ab9fc150f974 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -327,7 +327,7 @@ void rds_tcp_data_ready(struct sock *sk) if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) { rcu_read_lock(); if (!rds_destroy_pending(cp->cp_conn)) - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + rds_cond_queue_recv_work(cp, 0); rcu_read_unlock(); } out: diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 7d284ac7e81a..cecd3dbde58d 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -201,7 +201,7 @@ void rds_tcp_write_space(struct sock *sk) rcu_read_lock(); if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf && !rds_destroy_pending(cp->cp_conn)) - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + rds_cond_queue_send_work(cp, 0); rcu_read_unlock(); out: diff --git a/net/rds/threads.c b/net/rds/threads.c index 1f424cbfcbb4..eedae5653051 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -89,8 +89,10 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr) set_bit(0, &cp->cp_conn->c_map_queued); rcu_read_lock(); if (!rds_destroy_pending(cp->cp_conn)) { - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + rds_clear_queued_send_work_bit(cp); + rds_cond_queue_send_work(cp, 0); + rds_clear_queued_recv_work_bit(cp); + rds_cond_queue_recv_work(cp, 0); } rcu_read_unlock(); cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION; @@ -193,9 +195,11 @@ void rds_send_worker(struct work_struct *work) struct rds_conn_path *cp = container_of(work, struct rds_conn_path, cp_send_w.work); + unsigned long delay; int ret; if (rds_conn_path_state(cp) == RDS_CONN_UP) { + rds_clear_queued_send_work_bit(cp); clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); ret = rds_send_xmit(cp); cond_resched(); @@ -203,15 +207,17 @@ void rds_send_worker(struct work_struct *work) switch (ret) { case -EAGAIN: rds_stats_inc(s_send_immediate_retry); - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + delay = 0; break; case -ENOMEM: rds_stats_inc(s_send_delayed_retry); - queue_delayed_work(rds_wq, &cp->cp_send_w, 2); + delay = 2; break; default: - break; + return; } + + rds_cond_queue_send_work(cp, delay); } } @@ -220,23 +226,26 @@ void rds_recv_worker(struct work_struct *work) struct rds_conn_path *cp = container_of(work, struct rds_conn_path, cp_recv_w.work); + unsigned long delay; int ret; if (rds_conn_path_state(cp) == RDS_CONN_UP) { + rds_clear_queued_recv_work_bit(cp); ret = cp->cp_conn->c_trans->recv_path(cp); rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_recv_immediate_retry); - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + delay = 0; break; case -ENOMEM: rds_stats_inc(s_recv_delayed_retry); - queue_delayed_work(rds_wq, &cp->cp_recv_w, 2); + delay = 2; break; default: - break; + return; } + rds_cond_queue_recv_work(cp, delay); } }