mbox series

[RFC,v2,0/5] sched: User Managed Concurrency Groups

Message ID 20220120155517.066795336@infradead.org (mailing list archive)
Headers show
Series sched: User Managed Concurrency Groups | expand

Message

Peter Zijlstra Jan. 20, 2022, 3:55 p.m. UTC
Latest version, many changes since last time, still under heavy discussion.

Seems to work with the test-case I have (below), but that still has a few gaps,
coverage wise.

Still haven't done the SMP wakeup thing, finally get the idea with
worker-timeouts but haven't yet implemented that.

Happy hacking..

----

#define _GNU_SOURCE
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>

#ifndef __NR_umcg_ctl
#define __NR_umcg_ctl  450
#define __NR_umcg_wait 451
#define __NR_umcg_kick 452
#endif

#include <linux/list.h>
#include "include/uapi/linux/umcg.h"

/* syscall wrappers */

static inline int
sys_umcg_ctl(u32 flags, struct umcg_task *self, clockid_t which_clock)
{
	return syscall(__NR_umcg_ctl, flags, self, which_clock);
}

static inline int
sys_umcg_wait(u32 flags, u64 timo)
{
	return syscall(__NR_umcg_wait, flags, timo);
}

static inline int
sys_umcg_kick(u32 flags, pid_t tid)
{
	return syscall(__NR_umcg_kick, flags, tid);
}

/* the 'foo' scheduler */

struct foo_task {
	struct umcg_task	task;
	struct list_head	node;
	pid_t			tid;
};

struct foo_server {
	struct umcg_task	task;
	struct list_head	node;
	pid_t			tid;
	struct foo_task		*cur;
	int			workers;
};

void foo_add(struct foo_server *server, struct umcg_task *t)
{
	struct foo_task *foo = container_of(t, struct foo_task, task);

	t->runnable_workers_ptr = 0ULL;
	list_add_tail(&foo->node, &server->node);
}

struct foo_task *foo_pick_next(struct foo_server *server)
{
	struct foo_task *first = NULL;

	if (list_empty(&server->node))
		return first;

	first = list_first_entry(&server->node, struct foo_task, node);
	list_del(&first->node);
	return first;
}

#define NSEC_PER_SEC 1000000000ULL

u64 foo_time(void)
{
	struct timespec ts;
	clock_gettime(CLOCK_MONOTONIC, &ts);
	return (unsigned long long)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec;
}

void foo_yield(struct umcg_task *self)
{
	self->state = UMCG_TASK_RUNNABLE | UMCG_TF_COND_WAIT;
	sys_umcg_wait(0, 0);
}

#define TICK_NSEC NSEC_PER_SEC

static volatile bool foo_preemptible = false;

/* our workers */

static volatile bool done = false;

static void umcg_signal(int signr)
{
	done = true;
}

/* always running worker */
void *worker_fn0(void *arg)
{
	struct foo_server *server = arg;
	struct foo_task task = { };
	unsigned long i;
	int ret;

	task.tid = gettid();
	task.task.server_tid = server->tid;
	task.task.state = UMCG_TASK_BLOCKED;

	printf("A == %d\n", gettid());

	ret = sys_umcg_ctl(UMCG_CTL_REGISTER|UMCG_CTL_WORKER, &task.task, CLOCK_MONOTONIC);
	if (ret) {
		perror("umcg_ctl(A): ");
		exit(-1);
	}

	__atomic_add_fetch(&server->workers, 1, __ATOMIC_RELAXED);

	while (!done) {
		int x = i++;

		if (!(x % 1000000)) {
			putchar('.');
			fflush(stdout);
		}

		/* co-operative or preemptible */
		if (!foo_preemptible && !(x % 10000000))
			foo_yield(&task.task);
	}

	printf("A == done\n");

	__atomic_add_fetch(&server->workers, -1, __ATOMIC_RELAXED);

	ret = sys_umcg_ctl(UMCG_CTL_UNREGISTER|UMCG_CTL_WORKER, &task.task, 0);
	if (ret) {
		perror("umcg_ctl(~A): ");
		exit(-1);
	}

	return NULL;
}

/* event driven worker */
void *worker_fn1(void *arg)
{
	struct foo_server *server = arg;
	struct foo_task task = { };
	int ret;

	task.tid = gettid();
	task.task.server_tid = server->tid;
	task.task.state = UMCG_TASK_BLOCKED;

	printf("B == %d\n", gettid());

	ret = sys_umcg_ctl(UMCG_CTL_REGISTER|UMCG_CTL_WORKER, &task.task, CLOCK_MONOTONIC);
	if (ret) {
		perror("umcg_ctl(B): ");
		exit(-1);
	}

	__atomic_add_fetch(&server->workers, 1, __ATOMIC_RELAXED);

	while (!done) {
		printf("B\n");
		fflush(stdout);

		sleep(1);
	}

	printf("B == done\n");

	__atomic_add_fetch(&server->workers, -1, __ATOMIC_RELAXED);

	ret = sys_umcg_ctl(UMCG_CTL_UNREGISTER|UMCG_CTL_WORKER, &task.task, 0);
	if (ret) {
		perror("umcg_ctl(~B): ");
		exit(-1);
	}

	return NULL;
}

void *worker_fn2(void *arg)
{
	struct foo_server *server = arg;
	struct foo_task task = { };
	int ret;

	task.tid = gettid();
	task.task.server_tid = server->tid;
	task.task.state = UMCG_TASK_BLOCKED;

	printf("C == %d\n", gettid());

	ret = sys_umcg_ctl(UMCG_CTL_REGISTER|UMCG_CTL_WORKER, &task.task, CLOCK_MONOTONIC);
	if (ret) {
		perror("umcg_ctl(C): ");
		exit(-1);
	}

	__atomic_add_fetch(&server->workers, 1, __ATOMIC_RELAXED);

	while (!done) {
		printf("C\n");
		fflush(stdout);

		sleep(2);
	}

	printf("C == done\n");

	__atomic_add_fetch(&server->workers, -1, __ATOMIC_RELAXED);

	ret = sys_umcg_ctl(UMCG_CTL_UNREGISTER|UMCG_CTL_WORKER, &task.task, 0);
	if (ret) {
		perror("umcg_ctl(~C): ");
		exit(-1);
	}

	return NULL;
}

/* the server */

int main(int argc, char **argv)
{
	struct umcg_task *runnable_ptr, *next;
	struct foo_server server = { };
	pthread_t worker[3];
	u64 timeout = 0;
	u32 tid;
	int ret;

	struct sigaction sa = {
		.sa_handler = umcg_signal,
	};

	sigaction(SIGINT, &sa, NULL);

	printf("server == %d\n", gettid());
	fflush(stdout);

	server.tid = gettid();
	INIT_LIST_HEAD(&server.node);
	server.task.server_tid = gettid();
	server.task.state = UMCG_TASK_RUNNING;

	ret = sys_umcg_ctl(UMCG_CTL_REGISTER, &server.task, CLOCK_MONOTONIC);
	if (ret) {
		perror("umcg_ctl: ");
		exit(-1);
	}

	pthread_create(&worker[0], NULL, worker_fn0, &server);
	pthread_create(&worker[1], NULL, worker_fn1, &server);
	pthread_create(&worker[2], NULL, worker_fn2, &server);

	if (argc > 1) {
		foo_preemptible = true;
		/*
		 * setup preemption tick
		 */
		timeout = foo_time() + TICK_NSEC;
	}

	while (!(done && !__atomic_load_n(&server.workers, __ATOMIC_RELAXED))) {
		/*
		 * Mark the server as runnable first, so we can detect
		 * additions to the runnable list after we read it.
		 */
		__atomic_store_n(&server.task.state,
				 UMCG_TASK_RUNNABLE | UMCG_TF_COND_WAIT,
				 __ATOMIC_RELAXED);

		/*
		 * comsume the runnable notification list and add
		 * the tasks to our local runqueue.
		 */
		runnable_ptr = (void*)__atomic_exchange_n(&server.task.runnable_workers_ptr,
							  NULL, __ATOMIC_SEQ_CST);
		while (runnable_ptr) {
			next = (void *)runnable_ptr->runnable_workers_ptr;
			foo_add(&server, runnable_ptr);
			runnable_ptr = next;
		}

		if (server.cur && server.cur->task.state == UMCG_TASK_RUNNING) {
			/*
			 * Assert ::next_tid still points there and has RUNNING bit on
			 */
			if (server.task.next_tid != (server.cur->tid | UMCG_TID_RUNNING)) {
				printf("current not running: %d %x\n",
				       server.task.next_tid & UMCG_TID_MASK,
				       server.task.next_tid & ~UMCG_TID_MASK);
				exit(-1);
			}

			putchar('x');
		} else {
			tid = 0;
			server.cur = foo_pick_next(&server);
			if (server.cur)
				tid = server.cur->tid;

			__atomic_store_n(&server.task.next_tid, tid, __ATOMIC_RELAXED);

			printf("pick: %d\n", tid);
		}
		fflush(stdout);

		ret = sys_umcg_wait(0, timeout);

		/*
		 * If we set ::next_tid but it hasn't been consumed by the
		 * syscall due to failure, make sure to put the task back on
		 * the queue, lest we leak it.
		 */
		tid = __atomic_load_n(&server.task.next_tid, __ATOMIC_RELAXED);
		if (tid && !(tid & UMCG_TID_RUNNING)) {
			foo_add(&server, &server.cur->task);
			server.cur = NULL;
			putchar('*');
		}

		if (!ret)
			continue;

		switch (errno) {
		case EAGAIN:
			/*
			 * Got a wakeup, try again.
			 */
			continue;

		case ETIMEDOUT:
			/*
			 * timeout: drive preemption
			 */
			putchar('t');
			fflush(stdout);

			/*
			 * Next tick..
			 */
			timeout += TICK_NSEC;

			/*
			 * If we have a current, cmpxchg set TF_PREEMPT and on success
			 * send it a signal to kick it into the kernel such that
			 * it might re-report itself runnable.
			 */
			if (server.cur) {
				struct foo_task *t = server.cur;
				u32 val = UMCG_TASK_RUNNING;
				u32 new = UMCG_TASK_RUNNING | UMCG_TF_PREEMPT;

				if (__atomic_compare_exchange_n(&t->task.state, &val, new,
								false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
					sys_umcg_kick(0, t->tid);
				}
			}
			/*
			 * Either way around, if the cmpxchg
			 * failed the task will have blocked
			 * and we should re-start the loop.
			 */
			continue;

		default:
			printf("errno: %d\n", errno);
			perror("wait:");
			exit(-1);
		}
	}

	pthread_join(worker[0], NULL);
	pthread_join(worker[1], NULL);
	pthread_join(worker[2], NULL);

	return 0;
}

Comments

Peter Oskolkov Jan. 20, 2022, 5:28 p.m. UTC | #1
On Thu, Jan 20, 2022 at 8:09 AM Peter Zijlstra <peterz@infradead.org> wrote:
>
> Latest version, many changes since last time, still under heavy discussion.

Thanks! I'll work on testing/integrating this patchset. I'll also
assume that my changes that added blocked worker list
(https://lore.kernel.org/lkml/20220113233940.3608440-5-posk@google.com/)
and server kicking
(https://lore.kernel.org/lkml/20220113233940.3608440-6-posk@google.com/)
are acceptable.

[...]
Peter Zijlstra Jan. 21, 2022, 8:01 a.m. UTC | #2
On Thu, Jan 20, 2022 at 09:28:11AM -0800, Peter Oskolkov wrote:
> On Thu, Jan 20, 2022 at 8:09 AM Peter Zijlstra <peterz@infradead.org> wrote:
> >
> > Latest version, many changes since last time, still under heavy discussion.
> 
> Thanks! I'll work on testing/integrating this patchset. I'll also
> assume that my changes that added blocked worker list
> (https://lore.kernel.org/lkml/20220113233940.3608440-5-posk@google.com/)

So the blocked list I don't mind, but I'm not sure about the use-case
you mentioned... it gets a mess vs PE, but I see why you'd want it, so i
need to think about it more.

> and server kicking
> (https://lore.kernel.org/lkml/20220113233940.3608440-6-posk@google.com/)
> are acceptable.

Still need to look at the server kicking thing...
Steven Rostedt Jan. 21, 2022, 6:01 p.m. UTC | #3
On Thu, 20 Jan 2022 16:55:17 +0100
Peter Zijlstra <peterz@infradead.org> wrote:

> Latest version, many changes since last time, still under heavy discussion.
> 
> Seems to work with the test-case I have (below), but that still has a few gaps,
> coverage wise.

Nice work, Peter.

> 
> Still haven't done the SMP wakeup thing, finally get the idea with
> worker-timeouts but haven't yet implemented that.
> 
> Happy hacking..
> 
> ----
>

Should the below be added to samples/ ?

-- Steve
 
> #define _GNU_SOURCE
> #include <unistd.h>
> #include <sys/types.h>
> #include <sys/syscall.h>
> #include <pthread.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <errno.h>
> #include <signal.h>

[..]
Peter Zijlstra Jan. 24, 2022, 8:20 a.m. UTC | #4
On Fri, Jan 21, 2022 at 01:01:45PM -0500, Steven Rostedt wrote:

> Should the below be added to samples/ ?

Maybe eventually, it needs a little TLC to build with clang at the very
least.