@@ -266,17 +266,6 @@ enum {
RQ_DATA, /* request has data */
};
-/**
- * svc_thread_busy - check if a thread as busy
- * @rqstp: the thread which might be busy
- *
- * A thread is only busy when it is not an the idle list.
- */
-static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
-{
- return !llist_on_list(&rqstp->rq_idle);
-}
-
#define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
/*
@@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
folio_batch_init(&rqstp->rq_fbatch);
- init_llist_node(&rqstp->rq_idle);
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
@@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
struct llist_node *ln;
rcu_read_lock();
- spin_lock_bh(&pool->sp_lock);
- ln = llist_del_first_init(&pool->sp_idle_threads);
- spin_unlock_bh(&pool->sp_lock);
+ ln = READ_ONCE(pool->sp_idle_threads.first);
if (ln) {
rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
-
WRITE_ONCE(rqstp->rq_qtime, ktime_get());
- wake_up_process(rqstp->rq_task);
+ if (!task_is_running(rqstp->rq_task)) {
+ wake_up_process(rqstp->rq_task);
+ trace_svc_wake_up(rqstp->rq_task->pid);
+ percpu_counter_inc(&pool->sp_threads_woken);
+ }
rcu_read_unlock();
- percpu_counter_inc(&pool->sp_threads_woken);
- trace_svc_wake_up(rqstp->rq_task->pid);
return;
}
rcu_read_unlock();
@@ -732,20 +732,19 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
if (svc_thread_should_sleep(rqstp)) {
set_current_state(TASK_IDLE | TASK_FREEZABLE);
llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
+ if (likely(svc_thread_should_sleep(rqstp)))
+ schedule();
- if (unlikely(!svc_thread_should_sleep(rqstp)))
- /* Work just became available. This thread cannot simply
- * choose not to sleep as it *must* wait until removed.
- * So wake the first waiter - whether it is this
- * thread or some other, it will get the work done.
+ while (!llist_del_first_this(&pool->sp_idle_threads,
+ &rqstp->rq_idle)) {
+ /* Work just became available. This thread can only
+ * handle it after removing rqstp from the idle
+ * list. If that attempt failed, some other thread
+ * must have queued itself after finding no
+ * work to do, so that thread has taken responsibly
+ * for this new work. This thread can safely sleep
+ * until woken again.
*/
- svc_pool_wake_idle_thread(pool);
-
- /* Since a thread cannot remove itself from an llist,
- * schedule until someone else removes @rqstp from
- * the idle list.
- */
- while (!svc_thread_busy(rqstp)) {
schedule();
set_current_state(TASK_IDLE | TASK_FREEZABLE);
}
@@ -835,6 +834,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
svc_xprt_release(rqstp);
}
+static void svc_thread_wake_next(struct svc_rqst *rqstp)
+{
+ if (!svc_thread_should_sleep(rqstp))
+ /* More work pending after I dequeued some,
+ * wake another worker
+ */
+ svc_pool_wake_idle_thread(rqstp->rq_pool);
+}
+
/**
* svc_recv - Receive and process the next request on any transport
* @rqstp: an idle RPC service thread
@@ -854,13 +862,16 @@ void svc_recv(struct svc_rqst *rqstp)
clear_bit(SP_TASK_PENDING, &pool->sp_flags);
- if (svc_thread_should_stop(rqstp))
+ if (svc_thread_should_stop(rqstp)) {
+ svc_thread_wake_next(rqstp);
return;
+ }
rqstp->rq_xprt = svc_xprt_dequeue(pool);
if (rqstp->rq_xprt) {
struct svc_xprt *xprt = rqstp->rq_xprt;
+ svc_thread_wake_next(rqstp);
/* Normally we will wait up to 5 seconds for any required
* cache information to be provided. When there are no
* idle threads, we reduce the wait time.
@@ -885,6 +896,7 @@ void svc_recv(struct svc_rqst *rqstp)
if (req) {
list_del(&req->rq_bc_list);
spin_unlock_bh(&serv->sv_cb_lock);
+ svc_thread_wake_next(rqstp);
svc_process_bc(req, rqstp);
return;
Currently if several items of work become available in quick succession, that number of threads (if available) will be woken. By the time some of them wake up another thread that was already cache-warm might have come along and completed the work. Anecdotal evidence suggests as many as 15% of wakes find nothing to do once they get to the point of looking. This patch changes svc_pool_wake_idle_thread() to wake the first thread on the queue but NOT remove it. Subsequent calls will wake the same thread. Once that thread starts it will dequeue itself and after dequeueing some work to do, it will wake the next thread if there is more work ready. This results in a more orderly increase in the number of busy threads. As a bonus, this allows us to reduce locking around the idle queue. svc_pool_wake_idle_thread() no longer needs to take a lock (beyond rcu_read_lock()) as it doesn't manipulate the queue, it just looks at the first item. The thread itself can avoid locking by using the new llist_del_first_this() interface. This will safely remove the thread itself if it is the head. If it isn't the head, it will do nothing. If multiple threads call this concurrently only one will succeed. The others will do nothing, so no corruption can result. If a thread wakes up and finds that it cannot dequeue itself that means either - that it wasn't woken because it was the head of the queue. Maybe the freezer woke it. In that case it can go back to sleep (after trying to freeze of course). - some other thread found there was nothing to do very recently, and placed itself on the head of the queue in front of this thread. It must check again after placing itself there, so it can be deemed to be responsible for any pending work, and this thread can go back to sleep until woken. No code ever tests for busy threads any more. Only each thread itself cares if it is busy. So svc_thread_busy() is no longer needed. Signed-off-by: NeilBrown <neilb@suse.de> --- include/linux/sunrpc/svc.h | 11 ----------- net/sunrpc/svc.c | 14 ++++++-------- net/sunrpc/svc_xprt.c | 38 +++++++++++++++++++++++++------------- 3 files changed, 31 insertions(+), 32 deletions(-)