@@ -266,17 +266,6 @@ enum {
RQ_DATA, /* request has data */
};
-/**
- * svc_thread_busy - check if a thread as busy
- * @rqstp: the thread which might be busy
- *
- * A thread is only busy when it is not an the idle list.
- */
-static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
-{
- return !llist_on_list(&rqstp->rq_idle);
-}
-
#define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
/*
@@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
folio_batch_init(&rqstp->rq_fbatch);
- init_llist_node(&rqstp->rq_idle);
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
@@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
struct llist_node *ln;
rcu_read_lock();
- spin_lock_bh(&pool->sp_lock);
- ln = llist_del_first_init(&pool->sp_idle_threads);
- spin_unlock_bh(&pool->sp_lock);
+ ln = READ_ONCE(pool->sp_idle_threads.first);
if (ln) {
rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
-
WRITE_ONCE(rqstp->rq_qtime, ktime_get());
- wake_up_process(rqstp->rq_task);
+ if (!task_is_running(rqstp->rq_task)) {
+ wake_up_process(rqstp->rq_task);
+ trace_svc_wake_up(rqstp->rq_task->pid);
+ percpu_counter_inc(&pool->sp_threads_woken);
+ }
rcu_read_unlock();
- percpu_counter_inc(&pool->sp_threads_woken);
- trace_svc_wake_up(rqstp->rq_task->pid);
return;
}
rcu_read_unlock();
@@ -732,20 +732,19 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
if (svc_thread_should_sleep(rqstp)) {
set_current_state(TASK_IDLE | TASK_FREEZABLE);
llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
+ if (likely(svc_thread_should_sleep(rqstp)))
+ schedule();
- if (unlikely(!svc_thread_should_sleep(rqstp)))
- /* Work just became available. This thread cannot simply
- * choose not to sleep as it *must* wait until removed.
- * So wake the first waiter - whether it is this
- * thread or some other, it will get the work done.
+ while (!llist_del_first_this(&pool->sp_idle_threads,
+ &rqstp->rq_idle)) {
+ /* Work just became available. This thread can only
+ * handle it after removing rqstp from the idle
+ * list. If that attempt failed, some other thread
+ * must have queued itself after finding no
+ * work to do, so that thread has taken responsibly
+ * for this new work. This thread can safely sleep
+ * until woken again.
*/
- svc_pool_wake_idle_thread(pool);
-
- /* Since a thread cannot remove itself from an llist,
- * schedule until someone else removes @rqstp from
- * the idle list.
- */
- while (!svc_thread_busy(rqstp)) {
schedule();
set_current_state(TASK_IDLE | TASK_FREEZABLE);
}
@@ -835,6 +834,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
svc_xprt_release(rqstp);
}
+static void svc_thread_wake_next(struct svc_rqst *rqstp)
+{
+ if (!svc_thread_should_sleep(rqstp))
+ /* More work pending after I dequeued some,
+ * wake another worker
+ */
+ svc_pool_wake_idle_thread(rqstp->rq_pool);
+}
+
/**
* svc_recv - Receive and process the next request on any transport
* @rqstp: an idle RPC service thread
@@ -854,13 +862,16 @@ void svc_recv(struct svc_rqst *rqstp)
clear_bit(SP_TASK_PENDING, &pool->sp_flags);
- if (svc_thread_should_stop(rqstp))
+ if (svc_thread_should_stop(rqstp)) {
+ svc_thread_wake_next(rqstp);
return;
+ }
rqstp->rq_xprt = svc_xprt_dequeue(pool);
if (rqstp->rq_xprt) {
struct svc_xprt *xprt = rqstp->rq_xprt;
+ svc_thread_wake_next(rqstp);
/* Normally we will wait up to 5 seconds for any required
* cache information to be provided. When there are no
* idle threads, we reduce the wait time.
@@ -885,6 +896,7 @@ void svc_recv(struct svc_rqst *rqstp)
if (req) {
list_del(&req->rq_bc_list);
spin_unlock_bh(&serv->sv_cb_lock);
+ svc_thread_wake_next(rqstp);
svc_process_bc(req, rqstp);
return;