diff mbox

[3/6] kvm tools: Introduce generic IO threadpool

Message ID 1303998045-22932-3-git-send-email-levinsasha928@gmail.com (mailing list archive)
State New, archived
Headers show

Commit Message

Sasha Levin April 28, 2011, 1:40 p.m. UTC
This patch adds a generic pool to create a common interface for working with threads within the kvm tool.
Main idea here is using this threadpool for all I/O threads instead of having every I/O module write it's own thread code.

The process of working with the thread pool is supposed to be very simple.
During initialization, Each module which is interested in working with the threadpool will call threadpool__add_jobtype with the callback function and a void* parameter. For example, virtio modules will register every virt_queue as a new job type.
During operation, When theres work to do for a specific job, the module will signal it to the queue and would expect the callback to be called with proper parameters. It is assured that the callback will be called once for every signal action and each callback will be called only once at a time (i.e. callback functions themselves don't need to handle threading).

Signed-off-by: Sasha Levin <levinsasha928@gmail.com>
---
 tools/kvm/Makefile                 |    1 +
 tools/kvm/include/kvm/threadpool.h |   16 ++++
 tools/kvm/kvm-run.c                |    5 +
 tools/kvm/threadpool.c             |  171 ++++++++++++++++++++++++++++++++++++
 4 files changed, 193 insertions(+), 0 deletions(-)
 create mode 100644 tools/kvm/include/kvm/threadpool.h
 create mode 100644 tools/kvm/threadpool.c

Comments

Asias He April 29, 2011, 7:08 a.m. UTC | #1
On 04/28/2011 09:40 PM, Sasha Levin wrote:
> This patch adds a generic pool to create a common interface for working with threads within the kvm tool.
> Main idea here is using this threadpool for all I/O threads instead of having every I/O module write it's own thread code.
> 
> The process of working with the thread pool is supposed to be very simple.
> During initialization, Each module which is interested in working with the threadpool will call threadpool__add_jobtype with the callback function and a void* parameter. For example, virtio modules will register every virt_queue as a new job type.
> During operation, When theres work to do for a specific job, the module will signal it to the queue and would expect the callback to be called with proper parameters. It is assured that the callback will be called once for every signal action and each callback will be called only once at a time (i.e. callback functions themselves don't need to handle threading).
> 
> Signed-off-by: Sasha Levin <levinsasha928@gmail.com>
> ---
>  tools/kvm/Makefile                 |    1 +
>  tools/kvm/include/kvm/threadpool.h |   16 ++++
>  tools/kvm/kvm-run.c                |    5 +
>  tools/kvm/threadpool.c             |  171 ++++++++++++++++++++++++++++++++++++
>  4 files changed, 193 insertions(+), 0 deletions(-)
>  create mode 100644 tools/kvm/include/kvm/threadpool.h
>  create mode 100644 tools/kvm/threadpool.c
> 
> diff --git a/tools/kvm/Makefile b/tools/kvm/Makefile
> index 1b0c76e..fbce14d 100644
> --- a/tools/kvm/Makefile
> +++ b/tools/kvm/Makefile
> @@ -36,6 +36,7 @@ OBJS    += kvm-cmd.o
>  OBJS    += kvm-run.o
>  OBJS    += qcow.o
>  OBJS    += mptable.o
> +OBJS    += threadpool.o
>  
>  DEPS	:= $(patsubst %.o,%.d,$(OBJS))
>  
> diff --git a/tools/kvm/include/kvm/threadpool.h b/tools/kvm/include/kvm/threadpool.h
> new file mode 100644
> index 0000000..25b5eb8
> --- /dev/null
> +++ b/tools/kvm/include/kvm/threadpool.h
> @@ -0,0 +1,16 @@
> +#ifndef KVM__THREADPOOL_H
> +#define KVM__THREADPOOL_H
> +
> +#include <stdint.h>
> +
> +struct kvm;
> +
> +typedef void (*kvm_thread_callback_fn_t)(struct kvm *kvm, void *data);
> +
> +int thread_pool__init(unsigned long thread_count);
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm, kvm_thread_callback_fn_t callback, void *data);
> +
> +void thread_pool__signal_work(void *job);
> +
> +#endif
> diff --git a/tools/kvm/kvm-run.c b/tools/kvm/kvm-run.c
> index 071157a..97a17dd 100644
> --- a/tools/kvm/kvm-run.c
> +++ b/tools/kvm/kvm-run.c
> @@ -24,6 +24,7 @@
>  #include <kvm/pci.h>
>  #include <kvm/term.h>
>  #include <kvm/ioport.h>
> +#include <kvm/threadpool.h>
>  
>  /* header files for gitish interface  */
>  #include <kvm/kvm-run.h>
> @@ -312,6 +313,7 @@ int kvm_cmd_run(int argc, const char **argv, const char *prefix)
>  	int i;
>  	struct virtio_net_parameters net_params;
>  	char *hi;
> +	unsigned int nr_online_cpus;
>  
>  	signal(SIGALRM, handle_sigalrm);
>  	signal(SIGQUIT, handle_sigquit);
> @@ -457,6 +459,9 @@ int kvm_cmd_run(int argc, const char **argv, const char *prefix)
>  
>  	kvm__init_ram(kvm);
>  
> +	nr_online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> +	thread_pool__init(nr_online_cpus);

We may benefit from more threads than the number of hardware thread we
have. Currently, virtio_console consumes two,  virio_net consumes two,
and virtio_blk consumes one. Can we adjust the thread pool size when
devices register to use thread pool?

> +
>  	for (i = 0; i < nrcpus; i++) {
>  		if (pthread_create(&kvm_cpus[i]->thread, NULL, kvm_cpu_thread, kvm_cpus[i]) != 0)
>  			die("unable to create KVM VCPU thread");
> diff --git a/tools/kvm/threadpool.c b/tools/kvm/threadpool.c
> new file mode 100644
> index 0000000..e78db3a
> --- /dev/null
> +++ b/tools/kvm/threadpool.c
> @@ -0,0 +1,171 @@
> +#include "kvm/threadpool.h"
> +#include "kvm/mutex.h"
> +
> +#include <linux/kernel.h>
> +#include <linux/list.h>
> +#include <pthread.h>
> +#include <stdbool.h>
> +
> +struct thread_pool__job_info {
> +	kvm_thread_callback_fn_t callback;
> +	struct kvm *kvm;
> +	void *data;
> +
> +	int signalcount;
> +	pthread_mutex_t mutex;
> +
> +	struct list_head queue;
> +};

Does 'struct thread_pool__job' sound better?

> +static pthread_mutex_t	job_mutex		= PTHREAD_MUTEX_INITIALIZER;
> +static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t	job_cond		= PTHREAD_COND_INITIALIZER;

These mutex and cond are global. As the number of thread/job grows,
there may be a lot of contention.

> +
> +static LIST_HEAD(head);
> +
> +static pthread_t	*threads;
> +static long			threadcount;
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop(void)
> +{
> +	struct thread_pool__job_info *job;
> +
> +	if (list_empty(&head))
> +		return NULL;
> +
> +	job = list_first_entry(&head, struct thread_pool__job_info, queue);
> +	list_del(&job->queue);
> +
> +	return job;
> +}
> +
> +static void thread_pool__job_info_push(struct thread_pool__job_info *job)
> +{
> +	list_add_tail(&job->queue, &head);
> +}
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void)
> +{
> +	struct thread_pool__job_info *job;
> +
> +	mutex_lock(&job_mutex);
> +	job = thread_pool__job_info_pop();
> +	mutex_unlock(&job_mutex);
> +	return job;
> +}
> +
> +static void thread_pool__job_info_push_locked(struct thread_pool__job_info *job)
> +{
> +	mutex_lock(&job_mutex);
> +	thread_pool__job_info_push(job);
> +	mutex_unlock(&job_mutex);
> +}
> +
> +static void thread_pool__handle_job(struct thread_pool__job_info *job)
> +{
> +	while (job) {
> +		job->callback(job->kvm, job->data);
> +
> +		mutex_lock(&job->mutex);
> +
> +		if (--job->signalcount > 0)
> +			/* If the job was signaled again while we were working */
> +			thread_pool__job_info_push_locked(job);
> +
> +		mutex_unlock(&job->mutex);
> +
> +		job = thread_pool__job_info_pop_locked();
> +	}
> +}
> +
> +static void thread_pool__threadfunc_cleanup(void *param)
> +{
> +	mutex_unlock(&job_mutex);
> +}
> +
> +static void *thread_pool__threadfunc(void *param)
> +{
> +	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
> +
> +	for (;;) {
> +		struct thread_pool__job_info *curjob;
> +
> +		mutex_lock(&job_mutex);
> +		pthread_cond_wait(&job_cond, &job_mutex);
> +		curjob = thread_pool__job_info_pop();
> +		mutex_unlock(&job_mutex);
> +
> +		if (curjob)
> +			thread_pool__handle_job(curjob);
> +	}
> +
> +	pthread_cleanup_pop(0);
> +
> +	return NULL;
> +}
> +
> +static int thread_pool__addthread(void)
> +{
> +	int res;
> +	void *newthreads;
> +
> +	mutex_lock(&thread_mutex);
> +	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
> +	if (newthreads == NULL) {
> +		mutex_unlock(&thread_mutex);
> +		return -1;
> +	}
> +
> +	threads = newthreads;
> +
> +	res = pthread_create(threads + threadcount, NULL,
> +							thread_pool__threadfunc, NULL);
> +
> +	if (res == 0)
> +		threadcount++;
> +	mutex_unlock(&thread_mutex);
> +
> +	return res;
> +}
> +
> +int thread_pool__init(unsigned long thread_count)
> +{
> +	unsigned long i;
> +
> +	for (i = 0 ; i < thread_count ; i++)
> +		if (thread_pool__addthread() < 0)
> +			return i;
> +
> +	return i;
> +}
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm,
> +								kvm_thread_callback_fn_t callback,
> +								void *data)

Is thread_pool__add_job() better?

> +{
> +	struct thread_pool__job_info *job = calloc(1, sizeof(*job));
> +
> +	*job = (struct thread_pool__job_info) {
> +		.kvm		= kvm,
> +		.data		= data,
> +		.callback	= callback,
> +		.mutex		= PTHREAD_MUTEX_INITIALIZER
> +	};
> +
> +	return job;
> +}
> +
> +void thread_pool__signal_work(void *job)

I think thread_pool__signal_job() or thread_pool__do_job()
would be more consistent.

Consumer of this API can simply use it with: thread_pool_{add,do}_job().

> +{
> +	struct thread_pool__job_info *jobinfo = job;
> +
> +	if (jobinfo == NULL)
> +		return;
> +
> +	mutex_lock(&jobinfo->mutex);
> +	if (jobinfo->signalcount++ == 0)
> +		thread_pool__job_info_push_locked(job);
> +	mutex_unlock(&jobinfo->mutex);
> +
> +	pthread_cond_signal(&job_cond);
> +}
Sasha Levin April 29, 2011, 11:12 a.m. UTC | #2
On Fri, 2011-04-29 at 15:08 +0800, Asias He wrote:
On 04/28/2011 09:40 PM, Sasha Levin wrote:
> > +	nr_online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> > +	thread_pool__init(nr_online_cpus);
> 
> We may benefit from more threads than the number of hardware thread we
> have. Currently, virtio_console consumes two,  virio_net consumes two,
> and virtio_blk consumes one. Can we adjust the thread pool size when
> devices register to use thread pool?

How many threads do we want to have the threadpool use?
Currently theres a thread allocated for each VCPU + _SC_NPROCESSORS_ONLN
I/O threads.

Should we just allocate another thread for each device that registers
within the threadpool? This number can grow to be pretty big once we
start adding multiple devices.
diff mbox

Patch

diff --git a/tools/kvm/Makefile b/tools/kvm/Makefile
index 1b0c76e..fbce14d 100644
--- a/tools/kvm/Makefile
+++ b/tools/kvm/Makefile
@@ -36,6 +36,7 @@  OBJS    += kvm-cmd.o
 OBJS    += kvm-run.o
 OBJS    += qcow.o
 OBJS    += mptable.o
+OBJS    += threadpool.o
 
 DEPS	:= $(patsubst %.o,%.d,$(OBJS))
 
diff --git a/tools/kvm/include/kvm/threadpool.h b/tools/kvm/include/kvm/threadpool.h
new file mode 100644
index 0000000..25b5eb8
--- /dev/null
+++ b/tools/kvm/include/kvm/threadpool.h
@@ -0,0 +1,16 @@ 
+#ifndef KVM__THREADPOOL_H
+#define KVM__THREADPOOL_H
+
+#include <stdint.h>
+
+struct kvm;
+
+typedef void (*kvm_thread_callback_fn_t)(struct kvm *kvm, void *data);
+
+int thread_pool__init(unsigned long thread_count);
+
+void *thread_pool__add_jobtype(struct kvm *kvm, kvm_thread_callback_fn_t callback, void *data);
+
+void thread_pool__signal_work(void *job);
+
+#endif
diff --git a/tools/kvm/kvm-run.c b/tools/kvm/kvm-run.c
index 071157a..97a17dd 100644
--- a/tools/kvm/kvm-run.c
+++ b/tools/kvm/kvm-run.c
@@ -24,6 +24,7 @@ 
 #include <kvm/pci.h>
 #include <kvm/term.h>
 #include <kvm/ioport.h>
+#include <kvm/threadpool.h>
 
 /* header files for gitish interface  */
 #include <kvm/kvm-run.h>
@@ -312,6 +313,7 @@  int kvm_cmd_run(int argc, const char **argv, const char *prefix)
 	int i;
 	struct virtio_net_parameters net_params;
 	char *hi;
+	unsigned int nr_online_cpus;
 
 	signal(SIGALRM, handle_sigalrm);
 	signal(SIGQUIT, handle_sigquit);
@@ -457,6 +459,9 @@  int kvm_cmd_run(int argc, const char **argv, const char *prefix)
 
 	kvm__init_ram(kvm);
 
+	nr_online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
+	thread_pool__init(nr_online_cpus);
+
 	for (i = 0; i < nrcpus; i++) {
 		if (pthread_create(&kvm_cpus[i]->thread, NULL, kvm_cpu_thread, kvm_cpus[i]) != 0)
 			die("unable to create KVM VCPU thread");
diff --git a/tools/kvm/threadpool.c b/tools/kvm/threadpool.c
new file mode 100644
index 0000000..e78db3a
--- /dev/null
+++ b/tools/kvm/threadpool.c
@@ -0,0 +1,171 @@ 
+#include "kvm/threadpool.h"
+#include "kvm/mutex.h"
+
+#include <linux/kernel.h>
+#include <linux/list.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+struct thread_pool__job_info {
+	kvm_thread_callback_fn_t callback;
+	struct kvm *kvm;
+	void *data;
+
+	int signalcount;
+	pthread_mutex_t mutex;
+
+	struct list_head queue;
+};
+
+static pthread_mutex_t	job_mutex		= PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t	job_cond		= PTHREAD_COND_INITIALIZER;
+
+static LIST_HEAD(head);
+
+static pthread_t	*threads;
+static long			threadcount;
+
+static struct thread_pool__job_info *thread_pool__job_info_pop(void)
+{
+	struct thread_pool__job_info *job;
+
+	if (list_empty(&head))
+		return NULL;
+
+	job = list_first_entry(&head, struct thread_pool__job_info, queue);
+	list_del(&job->queue);
+
+	return job;
+}
+
+static void thread_pool__job_info_push(struct thread_pool__job_info *job)
+{
+	list_add_tail(&job->queue, &head);
+}
+
+static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void)
+{
+	struct thread_pool__job_info *job;
+
+	mutex_lock(&job_mutex);
+	job = thread_pool__job_info_pop();
+	mutex_unlock(&job_mutex);
+	return job;
+}
+
+static void thread_pool__job_info_push_locked(struct thread_pool__job_info *job)
+{
+	mutex_lock(&job_mutex);
+	thread_pool__job_info_push(job);
+	mutex_unlock(&job_mutex);
+}
+
+static void thread_pool__handle_job(struct thread_pool__job_info *job)
+{
+	while (job) {
+		job->callback(job->kvm, job->data);
+
+		mutex_lock(&job->mutex);
+
+		if (--job->signalcount > 0)
+			/* If the job was signaled again while we were working */
+			thread_pool__job_info_push_locked(job);
+
+		mutex_unlock(&job->mutex);
+
+		job = thread_pool__job_info_pop_locked();
+	}
+}
+
+static void thread_pool__threadfunc_cleanup(void *param)
+{
+	mutex_unlock(&job_mutex);
+}
+
+static void *thread_pool__threadfunc(void *param)
+{
+	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
+
+	for (;;) {
+		struct thread_pool__job_info *curjob;
+
+		mutex_lock(&job_mutex);
+		pthread_cond_wait(&job_cond, &job_mutex);
+		curjob = thread_pool__job_info_pop();
+		mutex_unlock(&job_mutex);
+
+		if (curjob)
+			thread_pool__handle_job(curjob);
+	}
+
+	pthread_cleanup_pop(0);
+
+	return NULL;
+}
+
+static int thread_pool__addthread(void)
+{
+	int res;
+	void *newthreads;
+
+	mutex_lock(&thread_mutex);
+	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
+	if (newthreads == NULL) {
+		mutex_unlock(&thread_mutex);
+		return -1;
+	}
+
+	threads = newthreads;
+
+	res = pthread_create(threads + threadcount, NULL,
+							thread_pool__threadfunc, NULL);
+
+	if (res == 0)
+		threadcount++;
+	mutex_unlock(&thread_mutex);
+
+	return res;
+}
+
+int thread_pool__init(unsigned long thread_count)
+{
+	unsigned long i;
+
+	for (i = 0 ; i < thread_count ; i++)
+		if (thread_pool__addthread() < 0)
+			return i;
+
+	return i;
+}
+
+void *thread_pool__add_jobtype(struct kvm *kvm,
+								kvm_thread_callback_fn_t callback,
+								void *data)
+{
+	struct thread_pool__job_info *job = calloc(1, sizeof(*job));
+
+	*job = (struct thread_pool__job_info) {
+		.kvm		= kvm,
+		.data		= data,
+		.callback	= callback,
+		.mutex		= PTHREAD_MUTEX_INITIALIZER
+	};
+
+	return job;
+}
+
+void thread_pool__signal_work(void *job)
+{
+	struct thread_pool__job_info *jobinfo = job;
+
+	if (jobinfo == NULL)
+		return;
+
+	mutex_lock(&jobinfo->mutex);
+	if (jobinfo->signalcount++ == 0)
+		thread_pool__job_info_push_locked(job);
+	mutex_unlock(&jobinfo->mutex);
+
+	pthread_cond_signal(&job_cond);
+}