Message ID | 20210930153037.1194279-11-vgoyal@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | virtiofsd: Support notification queue and | expand |
On Thu, Sep 30, 2021 at 11:30:34AM -0400, Vivek Goyal wrote: > Add a new custom threadpool using posix threads that specifically > service locking requests. > > In the case of a fcntl(SETLKW) request, if the guest is waiting > for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd > unblocks the blocked threads by sending a signal to them and waking > them up. > > The current threadpool (GThreadPool) is not adequate to service the > locking requests that result in a thread blocking. That is because > GLib does not provide an API to cancel the request while it is > serviced by a thread. In addition, a user might be running virtiofsd > without a threadpool (--thread-pool-size=0), thus a locking request > that blocks, will block the main virtqueue thread that services requests > from servicing any other requests. > > The only exception occurs when the lock is of type F_UNLCK. In this case > the request is serviced by the main virtqueue thread or a GThreadPool > thread to avoid a deadlock, when all the threads in the custom threadpool > are blocked. > > Then virtiofsd proceeds to cleanup the state of the threads, release > them back to the system and re-initialize. Is there another way to cancel SETLKW without resorting to a new thread pool? Since this only matters when shutting down or restarting, can we close all plock->fd file descriptors to kick the GThreadPool workers out of fnctl()? > > Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com> > Signed-off-by: Vivek Goyal <vgoyal@redhat.com> > --- > tools/virtiofsd/fuse_virtio.c | 90 ++++++- > tools/virtiofsd/meson.build | 1 + > tools/virtiofsd/passthrough_seccomp.c | 1 + > tools/virtiofsd/tpool.c | 331 ++++++++++++++++++++++++++ > tools/virtiofsd/tpool.h | 18 ++ > 5 files changed, 440 insertions(+), 1 deletion(-) > create mode 100644 tools/virtiofsd/tpool.c > create mode 100644 tools/virtiofsd/tpool.h > > diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c > index 3b720c5d4a..c67c2e0e7a 100644 > --- a/tools/virtiofsd/fuse_virtio.c > +++ b/tools/virtiofsd/fuse_virtio.c > @@ -20,6 +20,7 @@ > #include "fuse_misc.h" > #include "fuse_opt.h" > #include "fuse_virtio.h" > +#include "tpool.h" > > #include <sys/eventfd.h> > #include <sys/socket.h> > @@ -612,6 +613,60 @@ out: > free(req); > } > > +/* > + * If the request is a locking request, use a custom locking thread pool. > + */ > +static bool use_lock_tpool(gpointer data, gpointer user_data) > +{ > + struct fv_QueueInfo *qi = user_data; > + struct fuse_session *se = qi->virtio_dev->se; > + FVRequest *req = data; > + VuVirtqElement *elem = &req->elem; > + struct fuse_buf fbuf = {}; > + struct fuse_in_header *inhp; > + struct fuse_lk_in *lkinp; > + size_t lk_req_len; > + /* The 'out' part of the elem is from qemu */ > + unsigned int out_num = elem->out_num; > + struct iovec *out_sg = elem->out_sg; > + size_t out_len = iov_size(out_sg, out_num); > + bool use_custom_tpool = false; > + > + /* > + * If notifications are not enabled, no point in using cusotm lock > + * thread pool. > + */ > + if (!se->notify_enabled) { > + return false; > + } > + > + assert(se->bufsize > sizeof(struct fuse_in_header)); > + lk_req_len = sizeof(struct fuse_in_header) + sizeof(struct fuse_lk_in); > + > + if (out_len < lk_req_len) { > + return false; > + } > + > + fbuf.mem = g_malloc(se->bufsize); > + copy_from_iov(&fbuf, out_num, out_sg, lk_req_len); This looks inefficient: for every FUSE request we now malloc se->bufsize and then copy lk_req_len bytes, only to free the memory again. Is it possible to keep lk_req_len bytes on the stack instead?
On Mon, Oct 04, 2021 at 03:54:31PM +0100, Stefan Hajnoczi wrote: > On Thu, Sep 30, 2021 at 11:30:34AM -0400, Vivek Goyal wrote: > > Add a new custom threadpool using posix threads that specifically > > service locking requests. > > > > In the case of a fcntl(SETLKW) request, if the guest is waiting > > for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd > > unblocks the blocked threads by sending a signal to them and waking > > them up. > > > > The current threadpool (GThreadPool) is not adequate to service the > > locking requests that result in a thread blocking. That is because > > GLib does not provide an API to cancel the request while it is > > serviced by a thread. In addition, a user might be running virtiofsd > > without a threadpool (--thread-pool-size=0), thus a locking request > > that blocks, will block the main virtqueue thread that services requests > > from servicing any other requests. > > > > The only exception occurs when the lock is of type F_UNLCK. In this case > > the request is serviced by the main virtqueue thread or a GThreadPool > > thread to avoid a deadlock, when all the threads in the custom threadpool > > are blocked. > > > > Then virtiofsd proceeds to cleanup the state of the threads, release > > them back to the system and re-initialize. > > Is there another way to cancel SETLKW without resorting to a new thread > pool? Since this only matters when shutting down or restarting, can we > close all plock->fd file descriptors to kick the GThreadPool workers out > of fnctl()? I don't think that closing plock->fd will unblock fcntl(). SYSCALL_DEFINE3(fcntl, unsigned int, fd, unsigned int, cmd, unsigned long, arg) { struct fd f = fdget_raw(fd); } IIUC, fdget_raw() will take a reference on associated "struct file" and after that rest of the code will work with that "struct file". static int do_lock_file_wait(struct file *filp, unsigned int cmd, struct file_lock *fl) { .. .. error = wait_event_interruptible(fl->fl_wait, list_empty(&fl->fl_blocked_member)); .. .. } And this shoudl break upon receiving signal. And man page says the same thing. F_OFD_SETLKW (struct flock *) As for F_OFD_SETLK, but if a conflicting lock is held on the file, then wait for that lock to be released. If a signal is caught while waiting, then the call is interrupted and (after the signal handler has returned) returns immediately (with re‐ turn value -1 and errno set to EINTR; see signal(7)). It would be nice if we don't have to implement our own custom threadpool just for locking. Would have been better if glib thread pool provided some facility for this. [..] > > diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c > > index 3b720c5d4a..c67c2e0e7a 100644 > > --- a/tools/virtiofsd/fuse_virtio.c > > +++ b/tools/virtiofsd/fuse_virtio.c > > @@ -20,6 +20,7 @@ > > #include "fuse_misc.h" > > #include "fuse_opt.h" > > #include "fuse_virtio.h" > > +#include "tpool.h" > > > > #include <sys/eventfd.h> > > #include <sys/socket.h> > > @@ -612,6 +613,60 @@ out: > > free(req); > > } > > > > +/* > > + * If the request is a locking request, use a custom locking thread pool. > > + */ > > +static bool use_lock_tpool(gpointer data, gpointer user_data) > > +{ > > + struct fv_QueueInfo *qi = user_data; > > + struct fuse_session *se = qi->virtio_dev->se; > > + FVRequest *req = data; > > + VuVirtqElement *elem = &req->elem; > > + struct fuse_buf fbuf = {}; > > + struct fuse_in_header *inhp; > > + struct fuse_lk_in *lkinp; > > + size_t lk_req_len; > > + /* The 'out' part of the elem is from qemu */ > > + unsigned int out_num = elem->out_num; > > + struct iovec *out_sg = elem->out_sg; > > + size_t out_len = iov_size(out_sg, out_num); > > + bool use_custom_tpool = false; > > + > > + /* > > + * If notifications are not enabled, no point in using cusotm lock > > + * thread pool. > > + */ > > + if (!se->notify_enabled) { > > + return false; > > + } > > + > > + assert(se->bufsize > sizeof(struct fuse_in_header)); > > + lk_req_len = sizeof(struct fuse_in_header) + sizeof(struct fuse_lk_in); > > + > > + if (out_len < lk_req_len) { > > + return false; > > + } > > + > > + fbuf.mem = g_malloc(se->bufsize); > > + copy_from_iov(&fbuf, out_num, out_sg, lk_req_len); > > This looks inefficient: for every FUSE request we now malloc se->bufsize > and then copy lk_req_len bytes, only to free the memory again. > > Is it possible to keep lk_req_len bytes on the stack instead? I guess it should be possible. se->bufsize is variable but lk_req_len is known at compile time. lk_req_len = sizeof(struct fuse_in_header) + sizeof(struct fuse_lk_in); So we should be able to allocate this much space on stack and point fbuf.mem to it. char buf[sizeof(struct fuse_in_header) + sizeof(struct fuse_lk_in)]; fbuf.mem = buf; Will give it a try. Vivek
On Mon, Oct 04, 2021 at 03:54:31PM +0100, Stefan Hajnoczi wrote: > On Thu, Sep 30, 2021 at 11:30:34AM -0400, Vivek Goyal wrote: > > Add a new custom threadpool using posix threads that specifically > > service locking requests. > > > > In the case of a fcntl(SETLKW) request, if the guest is waiting > > for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd > > unblocks the blocked threads by sending a signal to them and waking > > them up. > > > > The current threadpool (GThreadPool) is not adequate to service the > > locking requests that result in a thread blocking. That is because > > GLib does not provide an API to cancel the request while it is > > serviced by a thread. In addition, a user might be running virtiofsd > > without a threadpool (--thread-pool-size=0), thus a locking request > > that blocks, will block the main virtqueue thread that services requests > > from servicing any other requests. > > > > The only exception occurs when the lock is of type F_UNLCK. In this case > > the request is serviced by the main virtqueue thread or a GThreadPool > > thread to avoid a deadlock, when all the threads in the custom threadpool > > are blocked. > > > > Then virtiofsd proceeds to cleanup the state of the threads, release > > them back to the system and re-initialize. > > Is there another way to cancel SETLKW without resorting to a new thread > pool? Since this only matters when shutting down or restarting, can we > close all plock->fd file descriptors to kick the GThreadPool workers out > of fnctl()? Ok, I tested this. If a thread is blocked on OFD lock and another thread closes associated "fd", it does not unblock the thread which is blocked on lock. So closing OFD can't be used for unblocking a thread. Even if it could be, it can't be a replacement for a thread pool in general as we can't block main thread otherwise it can deadlock. But we could have used another glib thread pool (instead of a custom thread pool which can handle signals to unblock threads). If you are curious, here is my test program. https://github.com/rhvgoyal/misc/blob/master/virtiofs-tests/ofd-lock.c Comments in there explain how to use it. It can block on an OFD lock and one can send SIGUSR1 which will close fd. Thanks Vivek
On Tue, Oct 05, 2021 at 04:09:35PM -0400, Vivek Goyal wrote: > On Mon, Oct 04, 2021 at 03:54:31PM +0100, Stefan Hajnoczi wrote: > > On Thu, Sep 30, 2021 at 11:30:34AM -0400, Vivek Goyal wrote: > > > Add a new custom threadpool using posix threads that specifically > > > service locking requests. > > > > > > In the case of a fcntl(SETLKW) request, if the guest is waiting > > > for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd > > > unblocks the blocked threads by sending a signal to them and waking > > > them up. > > > > > > The current threadpool (GThreadPool) is not adequate to service the > > > locking requests that result in a thread blocking. That is because > > > GLib does not provide an API to cancel the request while it is > > > serviced by a thread. In addition, a user might be running virtiofsd > > > without a threadpool (--thread-pool-size=0), thus a locking request > > > that blocks, will block the main virtqueue thread that services requests > > > from servicing any other requests. > > > > > > The only exception occurs when the lock is of type F_UNLCK. In this case > > > the request is serviced by the main virtqueue thread or a GThreadPool > > > thread to avoid a deadlock, when all the threads in the custom threadpool > > > are blocked. > > > > > > Then virtiofsd proceeds to cleanup the state of the threads, release > > > them back to the system and re-initialize. > > > > Is there another way to cancel SETLKW without resorting to a new thread > > pool? Since this only matters when shutting down or restarting, can we > > close all plock->fd file descriptors to kick the GThreadPool workers out > > of fnctl()? > > Ok, I tested this. If a thread is blocked on OFD lock and another > thread closes associated "fd", it does not unblock the thread > which is blocked on lock. So closing OFD can't be used for unblocking > a thread. > > Even if it could be, it can't be a replacement for a thread pool > in general as we can't block main thread otherwise it can deadlock. > But we could have used another glib thread pool (instead of a > custom thread pool which can handle signals to unblock threads). > > If you are curious, here is my test program. > > https://github.com/rhvgoyal/misc/blob/master/virtiofs-tests/ofd-lock.c > > Comments in there explain how to use it. It can block on an OFD > lock and one can send SIGUSR1 which will close fd. Thanks for investigating this! Too bad that the semantics of SETLKW are not usable: I ran two instances on my system so that the second instance blocks in SETLKW and found the same thing. fcntl(fd, F_OFD_SETLKW, &flock) return success even though the other thread already closed the fd while the main thread was blocked in fcntl(). Here is where it gets weird: lslocks(1) shows the OFD locks that are acquired (process 1) and waiting (process 2). When process 1 terminates, process 2 makes progress but lslocks(1) shows there are no OFD locks. This suggests that when fcntl(2) returns success in process 2, the OFD lock is immediately released by the kernel since the fd was already closed beforehand. Process 2 would have no way of releasing the lock since it already closed its fd. So the 0 return value does not really mean success - there is no acquired OFD lock when fcntl(2) returns! The problem is that doesn't return early with -EBADFD or similar when fcntl(2) is blocked, so we cannot use close(fd) to interrupt it :(. Stefan
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c index 3b720c5d4a..c67c2e0e7a 100644 --- a/tools/virtiofsd/fuse_virtio.c +++ b/tools/virtiofsd/fuse_virtio.c @@ -20,6 +20,7 @@ #include "fuse_misc.h" #include "fuse_opt.h" #include "fuse_virtio.h" +#include "tpool.h" #include <sys/eventfd.h> #include <sys/socket.h> @@ -612,6 +613,60 @@ out: free(req); } +/* + * If the request is a locking request, use a custom locking thread pool. + */ +static bool use_lock_tpool(gpointer data, gpointer user_data) +{ + struct fv_QueueInfo *qi = user_data; + struct fuse_session *se = qi->virtio_dev->se; + FVRequest *req = data; + VuVirtqElement *elem = &req->elem; + struct fuse_buf fbuf = {}; + struct fuse_in_header *inhp; + struct fuse_lk_in *lkinp; + size_t lk_req_len; + /* The 'out' part of the elem is from qemu */ + unsigned int out_num = elem->out_num; + struct iovec *out_sg = elem->out_sg; + size_t out_len = iov_size(out_sg, out_num); + bool use_custom_tpool = false; + + /* + * If notifications are not enabled, no point in using cusotm lock + * thread pool. + */ + if (!se->notify_enabled) { + return false; + } + + assert(se->bufsize > sizeof(struct fuse_in_header)); + lk_req_len = sizeof(struct fuse_in_header) + sizeof(struct fuse_lk_in); + + if (out_len < lk_req_len) { + return false; + } + + fbuf.mem = g_malloc(se->bufsize); + copy_from_iov(&fbuf, out_num, out_sg, lk_req_len); + + inhp = fbuf.mem; + if (inhp->opcode != FUSE_SETLKW) { + goto out; + } + + lkinp = fbuf.mem + sizeof(struct fuse_in_header); + if (lkinp->lk.type == F_UNLCK) { + goto out; + } + + /* Its a blocking lock request. Use custom thread pool */ + use_custom_tpool = true; +out: + g_free(fbuf.mem); + return use_custom_tpool; +} + /* Thread function for individual queues, created when a queue is 'started' */ static void *fv_queue_thread(void *opaque) { @@ -619,6 +674,7 @@ static void *fv_queue_thread(void *opaque) struct VuDev *dev = &qi->virtio_dev->dev; struct VuVirtq *q = vu_get_queue(dev, qi->qidx); struct fuse_session *se = qi->virtio_dev->se; + struct fv_ThreadPool *lk_tpool = NULL; GThreadPool *pool = NULL; GList *req_list = NULL; @@ -631,6 +687,24 @@ static void *fv_queue_thread(void *opaque) fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__); return NULL; } + + } + + /* + * Create the custom thread pool to handle blocking locking requests. + * Do not create for hiprio queue (qidx=0). + */ + if (qi->qidx) { + fuse_log(FUSE_LOG_DEBUG, "%s: Creating a locking thread pool for" + " Queue %d with size %d\n", __func__, qi->qidx, 4); + lk_tpool = fv_thread_pool_init(4); + if (!lk_tpool) { + fuse_log(FUSE_LOG_ERR, "%s: fv_thread_pool failed\n", __func__); + if (pool) { + g_thread_pool_free(pool, FALSE, TRUE); + } + return NULL; + } } fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__, @@ -703,7 +777,17 @@ static void *fv_queue_thread(void *opaque) req->reply_sent = false; - if (!se->thread_pool_size) { + /* + * In every case we get the opcode of the request and check if it + * is a locking request. If yes, we assign the request to the + * custom thread pool, with the exception when the lock is of type + * F_UNCLK. In this case to avoid a deadlock when all the custom + * threads are blocked, the request is serviced by the main + * virtqueue thread or a thread in GThreadPool + */ + if (use_lock_tpool(req, qi)) { + fv_thread_pool_push(lk_tpool, fv_queue_worker, req, qi); + } else if (!se->thread_pool_size) { req_list = g_list_prepend(req_list, req); } else { g_thread_pool_push(pool, req, NULL); @@ -726,6 +810,10 @@ static void *fv_queue_thread(void *opaque) g_thread_pool_free(pool, FALSE, TRUE); } + if (lk_tpool) { + fv_thread_pool_destroy(lk_tpool); + } + return NULL; } diff --git a/tools/virtiofsd/meson.build b/tools/virtiofsd/meson.build index c134ba633f..203cd5613a 100644 --- a/tools/virtiofsd/meson.build +++ b/tools/virtiofsd/meson.build @@ -6,6 +6,7 @@ executable('virtiofsd', files( 'fuse_signals.c', 'fuse_virtio.c', 'helper.c', + 'tpool.c', 'passthrough_ll.c', 'passthrough_seccomp.c'), dependencies: [seccomp, qemuutil, libcap_ng, vhost_user], diff --git a/tools/virtiofsd/passthrough_seccomp.c b/tools/virtiofsd/passthrough_seccomp.c index a3ce9f898d..cd24b40b78 100644 --- a/tools/virtiofsd/passthrough_seccomp.c +++ b/tools/virtiofsd/passthrough_seccomp.c @@ -116,6 +116,7 @@ static const int syscall_allowlist[] = { SCMP_SYS(write), SCMP_SYS(writev), SCMP_SYS(umask), + SCMP_SYS(nanosleep), }; /* Syscalls used when --syslog is enabled */ diff --git a/tools/virtiofsd/tpool.c b/tools/virtiofsd/tpool.c new file mode 100644 index 0000000000..f9aa41b0c5 --- /dev/null +++ b/tools/virtiofsd/tpool.c @@ -0,0 +1,331 @@ +/* + * custom threadpool for virtiofsd + * + * Copyright (C) 2021 Red Hat, Inc. + * + * Authors: + * Ioannis Angelakopoulos <iangelak@redhat.com> + * Vivek Goyal <vgoyal@redhat.com> + * + * SPDX-License-Identifier: GPL-2.0-or-later + */ + +#include <pthread.h> +#include <glib.h> +#include <stdbool.h> +#include <errno.h> +#include "tpool.h" +#include "fuse_log.h" + +struct fv_PoolReq { + struct fv_PoolReq *next; /* pointer to next task */ + void (*worker_func)(void *arg1, void *arg2); /* worker function */ + void *arg1; /* 1st arg: Request */ + void *arg2; /* 2nd arg: Virtqueue */ +}; + +struct fv_PoolReqQueue { + pthread_mutex_t lock; + GQueue queue; + pthread_cond_t notify; /* Conditional variable */ +}; + +struct fv_PoolThread { + pthread_t pthread; + int alive; + int id; + struct fv_ThreadPool *tpool; +}; + +struct fv_ThreadPool { + struct fv_PoolThread **threads; + struct fv_PoolReqQueue *req_queue; + pthread_mutex_t tp_lock; + + /* Total number of threads created */ + int num_threads; + + /* Number of threads running now */ + int nr_running; + int destroy_pool; +}; + +/* Initialize the Locking Request Queue */ +static struct fv_PoolReqQueue *fv_pool_request_queue_init(void) +{ + struct fv_PoolReqQueue *rq; + + rq = g_new0(struct fv_PoolReqQueue, 1); + pthread_mutex_init(&(rq->lock), NULL); + pthread_cond_init(&(rq->notify), NULL); + g_queue_init(&rq->queue); + return rq; +} + +/* Push a new locking request to the queue*/ +void fv_thread_pool_push(struct fv_ThreadPool *tpool, + void (*worker_func)(void *, void *), + void *arg1, void *arg2) +{ + struct fv_PoolReq *newreq; + struct fv_PoolReqQueue *rq = tpool->req_queue; + + newreq = g_new(struct fv_PoolReq, 1); + newreq->worker_func = worker_func; + newreq->arg1 = arg1; + newreq->arg2 = arg2; + newreq->next = NULL; + + /* Now add the request to the queue */ + pthread_mutex_lock(&rq->lock); + g_queue_push_tail(&rq->queue, newreq); + + /* Notify the threads that a request is available */ + pthread_cond_signal(&rq->notify); + pthread_mutex_unlock(&rq->lock); + +} + +/* Pop a locking request from the queue*/ +static struct fv_PoolReq *fv_tpool_pop(struct fv_ThreadPool *tpool) +{ + struct fv_PoolReq *pool_req = NULL; + struct fv_PoolReqQueue *rq = tpool->req_queue; + + pthread_mutex_lock(&rq->lock); + + pool_req = g_queue_pop_head(&rq->queue); + + if (!g_queue_is_empty(&rq->queue)) { + pthread_cond_signal(&rq->notify); + } + pthread_mutex_unlock(&rq->lock); + + return pool_req; +} + +static void fv_pool_request_queue_destroy(struct fv_ThreadPool *tpool) +{ + struct fv_PoolReq *pool_req; + + while ((pool_req = fv_tpool_pop(tpool))) { + g_free(pool_req); + } + + /* Now free the actual queue itself */ + g_free(tpool->req_queue); +} + +/* + * Signal handler for blcking threads that wait on a remote lock to be released + * Called when virtiofsd does cleanup and wants to wake up these threads + */ +static void fv_thread_signal_handler(int signal) +{ + fuse_log(FUSE_LOG_DEBUG, "Thread received a signal.\n"); + return; +} + +static bool is_pool_stopping(struct fv_ThreadPool *tpool) +{ + bool destroy = false; + + pthread_mutex_lock(&tpool->tp_lock); + destroy = tpool->destroy_pool; + pthread_mutex_unlock(&tpool->tp_lock); + + return destroy; +} + +static void *fv_thread_do_work(void *thread) +{ + struct fv_PoolThread *worker = (struct fv_PoolThread *)thread; + struct fv_ThreadPool *tpool = worker->tpool; + struct fv_PoolReq *pool_request; + /* Actual worker function and arguments. Same as non locking requests */ + void (*worker_func)(void*, void*); + void *arg1; + void *arg2; + + while (1) { + if (is_pool_stopping(tpool)) { + break; + } + + /* + * Get the queue lock first so that we can wait on the conditional + * variable afterwards + */ + pthread_mutex_lock(&tpool->req_queue->lock); + + /* Wait on the condition variable until it is available */ + while (g_queue_is_empty(&tpool->req_queue->queue) && + !is_pool_stopping(tpool)) { + pthread_cond_wait(&tpool->req_queue->notify, + &tpool->req_queue->lock); + } + + /* Unlock the queue for other threads */ + pthread_mutex_unlock(&tpool->req_queue->lock); + + if (is_pool_stopping(tpool)) { + break; + } + + /* Now the request must be serviced */ + pool_request = fv_tpool_pop(tpool); + if (pool_request) { + fuse_log(FUSE_LOG_DEBUG, "%s: Locking Thread:%d handling" + " a request\n", __func__, worker->id); + worker_func = pool_request->worker_func; + arg1 = pool_request->arg1; + arg2 = pool_request->arg2; + worker_func(arg1, arg2); + g_free(pool_request); + } + } + + /* Mark the thread as inactive */ + pthread_mutex_lock(&tpool->tp_lock); + tpool->threads[worker->id]->alive = 0; + tpool->nr_running--; + pthread_mutex_unlock(&tpool->tp_lock); + + return NULL; +} + +/* Create a single thread that handles locking requests */ +static int fv_worker_thread_init(struct fv_ThreadPool *tpool, + struct fv_PoolThread **thread, int id) +{ + struct fv_PoolThread *worker; + int ret; + + worker = g_new(struct fv_PoolThread, 1); + worker->tpool = tpool; + worker->id = id; + worker->alive = 1; + + ret = pthread_create(&worker->pthread, NULL, fv_thread_do_work, + worker); + if (ret) { + fuse_log(FUSE_LOG_ERR, "pthread_create() failed with err=%d\n", ret); + g_free(worker); + return ret; + } + pthread_detach(worker->pthread); + *thread = worker; + return 0; +} + +static void send_signal_all(struct fv_ThreadPool *tpool) +{ + int i; + + pthread_mutex_lock(&tpool->tp_lock); + for (i = 0; i < tpool->num_threads; i++) { + if (tpool->threads[i]->alive) { + pthread_kill(tpool->threads[i]->pthread, SIGUSR1); + } + } + pthread_mutex_unlock(&tpool->tp_lock); +} + +static void do_pool_destroy(struct fv_ThreadPool *tpool, bool send_signal) +{ + int i, nr_running; + + /* We want to destroy the pool */ + pthread_mutex_lock(&tpool->tp_lock); + tpool->destroy_pool = 1; + pthread_mutex_unlock(&tpool->tp_lock); + + /* Wake up threads waiting for requests */ + pthread_mutex_lock(&tpool->req_queue->lock); + pthread_cond_broadcast(&tpool->req_queue->notify); + pthread_mutex_unlock(&tpool->req_queue->lock); + + /* Send Signal and wait for all threads to exit. */ + while (1) { + if (send_signal) { + send_signal_all(tpool); + } + pthread_mutex_lock(&tpool->tp_lock); + nr_running = tpool->nr_running; + pthread_mutex_unlock(&tpool->tp_lock); + if (!nr_running) { + break; + } + g_usleep(10000); + } + + /* Destroy the locking request queue */ + fv_pool_request_queue_destroy(tpool); + for (i = 0; i < tpool->num_threads; i++) { + g_free(tpool->threads[i]); + } + + /* Now free the threadpool */ + g_free(tpool->threads); + g_free(tpool); +} + +void fv_thread_pool_destroy(struct fv_ThreadPool *tpool) +{ + if (!tpool) { + return; + } + do_pool_destroy(tpool, true); +} + +static int register_sig_handler(void) +{ + struct sigaction sa; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sa.sa_handler = fv_thread_signal_handler; + if (sigaction(SIGUSR1, &sa, NULL) == -1) { + fuse_log(FUSE_LOG_ERR, "Cannot register the signal handler:%s\n", + strerror(errno)); + return 1; + } + return 0; +} + +/* Initialize the thread pool for the locking posix threads */ +struct fv_ThreadPool *fv_thread_pool_init(unsigned int thread_num) +{ + struct fv_ThreadPool *tpool = NULL; + int i, ret; + + if (!thread_num) { + thread_num = 1; + } + + if (register_sig_handler()) { + return NULL; + } + tpool = g_new0(struct fv_ThreadPool, 1); + pthread_mutex_init(&(tpool->tp_lock), NULL); + + /* Initialize the Lock Request Queue */ + tpool->req_queue = fv_pool_request_queue_init(); + + /* Create the threads in the pool */ + tpool->threads = g_new(struct fv_PoolThread *, thread_num); + + for (i = 0; i < thread_num; i++) { + ret = fv_worker_thread_init(tpool, &tpool->threads[i], i); + if (ret) { + goto out_err; + } + tpool->num_threads++; + tpool->nr_running++; + } + + return tpool; +out_err: + /* An error occurred. Cleanup and return NULL */ + do_pool_destroy(tpool, false); + return NULL; +} diff --git a/tools/virtiofsd/tpool.h b/tools/virtiofsd/tpool.h new file mode 100644 index 0000000000..48d67e9a50 --- /dev/null +++ b/tools/virtiofsd/tpool.h @@ -0,0 +1,18 @@ +/* + * custom threadpool for virtiofsd + * + * Copyright (C) 2021 Red Hat, Inc. + * + * Authors: + * Ioannis Angelakopoulos <iangelak@redhat.com> + * Vivek Goyal <vgoyal@redhat.com> + * + * SPDX-License-Identifier: GPL-2.0-or-later + */ + +struct fv_ThreadPool; + +struct fv_ThreadPool *fv_thread_pool_init(unsigned int thread_num); +void fv_thread_pool_destroy(struct fv_ThreadPool *tpool); +void fv_thread_pool_push(struct fv_ThreadPool *tpool, + void (*worker_func)(void *, void *), void *arg1, void *arg2);