diff options
Diffstat (limited to 'thrpool.c')
-rw-r--r-- | thrpool.c | 142 |
1 files changed, 105 insertions, 37 deletions
@@ -4,6 +4,14 @@ */ #include "cmogstored.h" +static __thread unsigned mog_do_quit; +struct mog_thr_start_arg { + struct mog_thrpool *tp; + pthread_mutex_t mtx; + pthread_cond_t cond; + unsigned *do_quit; +}; + /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -22,17 +30,59 @@ #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; +static sigset_t quitset; + +__attribute__((constructor)) static void thrpool_init(void) +{ + CHECK(int, 0, sigfillset(&quitset)); + CHECK(int, 0, sigdelset(&quitset, SIGURG)); +} + +/* child thread notifies the parent about its readiness */ +static void *thr_start_wrapper(void *ptr) +{ + struct mog_thr_start_arg *arg = ptr; + struct mog_thrpool *tp; + + mog_do_quit = 0; + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL)); + CHECK(int, 0, pthread_mutex_lock(&arg->mtx)); + + arg->do_quit = &mog_do_quit; + tp = arg->tp; /* arg becomes invalid once we unlock */ + + CHECK(int, 0, pthread_cond_signal(&arg->cond)); + CHECK(int, 0, pthread_mutex_unlock(&arg->mtx)); + + return tp->start_fn(tp->start_arg); +} + +/* child thread tests if its quit flag is set and exits if it is */ +void mog_thr_test_quit(void) +{ + if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) { + mog_alloc_quit(); + pthread_exit(NULL); + } +} + /* - * kevent() sleep is not a cancellation point, so it's possible for - * a thread to sleep on it if the cancel request arrived right after - * we checked for cancellation + * we no longer rely on pthreads cancellation, so our explicit checks for + * thread quitting requires us to continuously signal a thread for death + * in case it enters a sleeping syscall (epoll_wait/kevent) immediately + * after checking the mog_do_quit TLS variable */ static void poke(pthread_t thr, int sig) { int err; + /* + * This is an uncommon code path and only triggered when + * we lower thread counts or shut down + */ while ((err = pthread_kill(thr, sig)) == 0) sched_yield(); + assert(err == ESRCH && "pthread_kill() usage bug"); } @@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, } } +static bool +thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain) +{ + struct mog_thr_start_arg arg = { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + }; + pthread_t *thr; + pthread_attr_t attr; + size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread); + int rc; + + assert(tp && "tp no defined"); + arg.tp = tp; + tp->threads = xrealloc(tp->threads, bytes); + + CHECK(int, 0, pthread_attr_init(&attr)); + + if (stacksize > 0) + CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize)); + + thr = &tp->threads[tp->n_threads].thr; + + CHECK(int, 0, pthread_mutex_lock(&arg.mtx)); + rc = pthread_create(thr, &attr, thr_start_wrapper, &arg); + CHECK(int, 0, pthread_attr_destroy(&attr)); + if (rc == 0) { + CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx)); + tp->threads[tp->n_threads].do_quit = arg.do_quit; + } + CHECK(int, 0, pthread_mutex_unlock(&arg.mtx)); + + if (rc == 0) { + tp->n_threads++; + *nr_eagain = 0; + } else if (mog_pthread_create_retryable(rc)) { + if (!thr_create_fail_retry(tp, size, nr_eagain, rc)) + return false; + } else { + assert(rc == 0 && "pthread_create usage error"); + } + return true; +} + void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) { unsigned long nr_eagain = 0; CHECK(int, 0, pthread_mutex_lock(&tp->lock)); - while (size > tp->n_threads) { - pthread_t *thr; - pthread_attr_t attr; - size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t); - int rc; - tp->threads = xrealloc(tp->threads, bytes); - - CHECK(int, 0, pthread_attr_init(&attr)); - - if (stacksize > 0) { - CHECK(int, 0, - pthread_attr_setstacksize(&attr, stacksize)); - } - - thr = tp->threads + tp->n_threads; - - rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg); - CHECK(int, 0, pthread_attr_destroy(&attr)); - - if (rc == 0) { - tp->n_threads++; - nr_eagain = 0; - } else if (mog_pthread_create_retry(rc)) { - if (!thr_create_fail_retry(tp, size, &nr_eagain, rc)) - goto out; - } else { - assert(rc == 0 && "pthread_create usage error"); - } - } + while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain)) + /* nothing */; if (tp->n_threads > size) { size_t i; int err; + /* set the do_quit flag for all threads we kill */ for (i = size; i < tp->n_threads; i++) { - CHECK(int, 0, pthread_cancel(tp->threads[i])); - err = pthread_kill(tp->threads[i], SIGURG); + __sync_add_and_fetch(tp->threads[i].do_quit, 1); + err = pthread_kill(tp->threads[i].thr, SIGURG); switch (err) { case 0: @@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) } } + /* keep poking them to kick them out out epoll_wait/kevent */ for (i = size; i < tp->n_threads; i++) { - poke(tp->threads[i], SIGURG); + poke(tp->threads[i].thr, SIGURG); - CHECK(int, 0, pthread_join(tp->threads[i], NULL)); + CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL)); } tp->n_threads = size; } -out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } |