@@ -35,6 +35,7 @@ struct svc_pool {
spinlock_t sp_lock; /* protects sp_sockets */
struct list_head sp_sockets; /* pending sockets */
unsigned int sp_nrthreads; /* # of threads in pool */
+ unsigned long *sp_idle_map; /* idle threads */
struct xarray sp_thread_xa;
/* statistics on pool operation */
@@ -189,6 +190,8 @@ extern u32 svc_max_payload(const struct svc_rqst *rqstp);
#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE \
+ 2 + 1)
+#define RPCSVC_MAXPOOLTHREADS (256)
+
/*
* The context of a single thread, including the request currently being
* processed.
@@ -238,8 +241,7 @@ struct svc_rqst {
#define RQ_SPLICE_OK (4) /* turned off in gss privacy
* to prevent encrypting page
* cache pages */
-#define RQ_BUSY (5) /* request is busy */
-#define RQ_DATA (6) /* request has data */
+#define RQ_DATA (5) /* request has data */
unsigned long rq_flags; /* flags field */
u32 rq_thread_id; /* xarray index */
ktime_t rq_qtime; /* enqueue time */
@@ -1600,7 +1600,6 @@ DEFINE_SVCXDRBUF_EVENT(sendto);
svc_rqst_flag(USEDEFERRAL) \
svc_rqst_flag(DROPME) \
svc_rqst_flag(SPLICE_OK) \
- svc_rqst_flag(BUSY) \
svc_rqst_flag_end(DATA)
#undef svc_rqst_flag
@@ -509,6 +509,12 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
INIT_LIST_HEAD(&pool->sp_sockets);
spin_lock_init(&pool->sp_lock);
xa_init_flags(&pool->sp_thread_xa, XA_FLAGS_ALLOC);
+ /* All threads initially marked "busy" */
+ pool->sp_idle_map =
+ bitmap_zalloc_node(RPCSVC_MAXPOOLTHREADS, GFP_KERNEL,
+ svc_pool_map_get_node(i));
+ if (!pool->sp_idle_map)
+ return NULL;
percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL);
percpu_counter_init(&pool->sp_threads_woken, 0, GFP_KERNEL);
@@ -594,6 +600,8 @@ svc_destroy(struct kref *ref)
percpu_counter_destroy(&pool->sp_threads_starved);
xa_destroy(&pool->sp_thread_xa);
+ bitmap_free(pool->sp_idle_map);
+ pool->sp_idle_map = NULL;
}
kfree(serv->sv_pools);
kfree(serv);
@@ -645,7 +653,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
folio_batch_init(&rqstp->rq_fbatch);
- __set_bit(RQ_BUSY, &rqstp->rq_flags);
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
@@ -675,7 +682,7 @@ static struct svc_rqst *
svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
{
static const struct xa_limit limit = {
- .max = UINT_MAX,
+ .max = RPCSVC_MAXPOOLTHREADS,
};
struct svc_rqst *rqstp;
int ret;
@@ -720,18 +727,24 @@ struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv,
struct svc_pool *pool)
{
struct svc_rqst *rqstp;
- unsigned long index;
+ unsigned long bit;
- xa_for_each(&pool->sp_thread_xa, index, rqstp) {
- if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
- continue;
+ bit = 0;
+ do {
+ bit = find_next_bit(pool->sp_idle_map, pool->sp_nrthreads, bit);
+ if (bit == pool->sp_nrthreads)
+ goto out_starved;
+ } while (!test_and_clear_bit(bit, pool->sp_idle_map));
- WRITE_ONCE(rqstp->rq_qtime, ktime_get());
- wake_up_process(rqstp->rq_task);
- percpu_counter_inc(&pool->sp_threads_woken);
- return rqstp;
- }
+ rqstp = xa_find(&pool->sp_thread_xa, &bit, bit, XA_PRESENT);
+ if (!rqstp)
+ goto out_starved;
+ WRITE_ONCE(rqstp->rq_qtime, ktime_get());
+ wake_up_process(rqstp->rq_task);
+ percpu_counter_inc(&pool->sp_threads_woken);
+ return rqstp;
+out_starved:
trace_svc_pool_starved(serv, pool);
percpu_counter_inc(&pool->sp_threads_starved);
return NULL;
@@ -765,7 +778,8 @@ svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *stat
}
found_pool:
- rqstp = xa_find(&pool->sp_thread_xa, &zero, U32_MAX, XA_PRESENT);
+ rqstp = xa_find(&pool->sp_thread_xa, &zero, RPCSVC_MAXPOOLTHREADS,
+ XA_PRESENT);
if (rqstp) {
__xa_erase(&pool->sp_thread_xa, rqstp->rq_thread_id);
task = rqstp->rq_task;
@@ -734,6 +734,18 @@ rqst_should_sleep(struct svc_rqst *rqstp)
return true;
}
+static void svc_rqst_mark_idle(struct svc_rqst *rqstp)
+{
+ set_bit(rqstp->rq_thread_id, rqstp->rq_pool->sp_idle_map);
+ smp_mb__after_atomic();
+}
+
+static void svc_rqst_mark_busy(struct svc_rqst *rqstp)
+{
+ clear_bit(rqstp->rq_thread_id, rqstp->rq_pool->sp_idle_map);
+ smp_mb__after_atomic();
+}
+
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
{
struct svc_pool *pool = rqstp->rq_pool;
@@ -755,8 +767,7 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
set_current_state(TASK_INTERRUPTIBLE);
smp_mb__before_atomic();
clear_bit(SP_CONGESTED, &pool->sp_flags);
- clear_bit(RQ_BUSY, &rqstp->rq_flags);
- smp_mb__after_atomic();
+ svc_rqst_mark_idle(rqstp);
if (likely(rqst_should_sleep(rqstp)))
time_left = schedule_timeout(timeout);
@@ -765,8 +776,12 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
try_to_freeze();
- set_bit(RQ_BUSY, &rqstp->rq_flags);
- smp_mb__after_atomic();
+ /* Post-sleep: look for more work.
+ *
+ * Note: If we were awoken, then this rqstp has already
+ * been marked busy.
+ */
+ svc_rqst_mark_busy(rqstp);
rqstp->rq_xprt = svc_xprt_dequeue(pool);
if (rqstp->rq_xprt) {
trace_svc_pool_awoken(pool, rqstp);