about summary refs log tree commit homepage
path: root/thrpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'thrpool.c')
-rw-r--r--thrpool.c142
1 files changed, 105 insertions, 37 deletions
diff --git a/thrpool.c b/thrpool.c
index 96246a8..a0c0bc2 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -4,6 +4,14 @@
  */
 #include "cmogstored.h"
 
+static __thread unsigned mog_do_quit;
+struct mog_thr_start_arg {
+        struct mog_thrpool *tp;
+        pthread_mutex_t mtx;
+        pthread_cond_t cond;
+        unsigned *do_quit;
+};
+
 /*
  * we can lower this if we can test with lower values, NPTL minimum is 16K.
  * We also use syslog() and *printf() functions which take a lot of
@@ -22,17 +30,59 @@
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
 
+static sigset_t quitset;
+
+__attribute__((constructor)) static void thrpool_init(void)
+{
+        CHECK(int, 0, sigfillset(&quitset));
+        CHECK(int, 0, sigdelset(&quitset, SIGURG));
+}
+
+/* child thread notifies the parent about its readiness */
+static void *thr_start_wrapper(void *ptr)
+{
+        struct mog_thr_start_arg *arg = ptr;
+        struct mog_thrpool *tp;
+
+        mog_do_quit = 0;
+        CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL));
+        CHECK(int, 0, pthread_mutex_lock(&arg->mtx));
+
+        arg->do_quit = &mog_do_quit;
+        tp = arg->tp; /* arg becomes invalid once we unlock */
+
+        CHECK(int, 0, pthread_cond_signal(&arg->cond));
+        CHECK(int, 0, pthread_mutex_unlock(&arg->mtx));
+
+        return tp->start_fn(tp->start_arg);
+}
+
+/* child thread tests if its quit flag is set and exits if it is */
+void mog_thr_test_quit(void)
+{
+        if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) {
+                mog_alloc_quit();
+                pthread_exit(NULL);
+        }
+}
+
 /*
- * kevent() sleep is not a cancellation point, so it's possible for
- * a thread to sleep on it if the cancel request arrived right after
- * we checked for cancellation
+ * we no longer rely on pthreads cancellation, so our explicit checks for
+ * thread quitting requires us to continuously signal a thread for death
+ * in case it enters a sleeping syscall (epoll_wait/kevent) immediately
+ * after checking the mog_do_quit TLS variable
  */
 static void poke(pthread_t thr, int sig)
 {
         int err;
 
+        /*
+         * This is an uncommon code path and only triggered when
+         * we lower thread counts or shut down
+         */
         while ((err = pthread_kill(thr, sig)) == 0)
                 sched_yield();
+
         assert(err == ESRCH && "pthread_kill() usage bug");
 }
 
@@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size,
         }
 }
 
+static bool
+thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain)
+{
+        struct mog_thr_start_arg arg = {
+                .mtx = PTHREAD_MUTEX_INITIALIZER,
+                .cond = PTHREAD_COND_INITIALIZER,
+        };
+        pthread_t *thr;
+        pthread_attr_t attr;
+        size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread);
+        int rc;
+
+        assert(tp && "tp no defined");
+        arg.tp = tp;
+        tp->threads = xrealloc(tp->threads, bytes);
+
+        CHECK(int, 0, pthread_attr_init(&attr));
+
+        if (stacksize > 0)
+                CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize));
+
+        thr = &tp->threads[tp->n_threads].thr;
+
+        CHECK(int, 0, pthread_mutex_lock(&arg.mtx));
+        rc = pthread_create(thr, &attr, thr_start_wrapper, &arg);
+        CHECK(int, 0, pthread_attr_destroy(&attr));
+        if (rc == 0) {
+                CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx));
+                tp->threads[tp->n_threads].do_quit = arg.do_quit;
+        }
+        CHECK(int, 0, pthread_mutex_unlock(&arg.mtx));
+
+        if (rc == 0) {
+                tp->n_threads++;
+                *nr_eagain = 0;
+        } else if (mog_pthread_create_retryable(rc)) {
+                if (!thr_create_fail_retry(tp, size, nr_eagain, rc))
+                        return false;
+        } else {
+                assert(rc == 0 && "pthread_create usage error");
+        }
+        return true;
+}
+
 void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
 {
         unsigned long nr_eagain = 0;
 
         CHECK(int, 0, pthread_mutex_lock(&tp->lock));
-        while (size > tp->n_threads) {
-                pthread_t *thr;
-                pthread_attr_t attr;
-                size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t);
-                int rc;
 
-                tp->threads = xrealloc(tp->threads, bytes);
-
-                CHECK(int, 0, pthread_attr_init(&attr));
-
-                if (stacksize > 0) {
-                        CHECK(int, 0,
-                              pthread_attr_setstacksize(&attr, stacksize));
-                }
-
-                thr = tp->threads + tp->n_threads;
-
-                rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg);
-                CHECK(int, 0, pthread_attr_destroy(&attr));
-
-                if (rc == 0) {
-                        tp->n_threads++;
-                        nr_eagain = 0;
-                } else if (mog_pthread_create_retry(rc)) {
-                        if (!thr_create_fail_retry(tp, size, &nr_eagain, rc))
-                                goto out;
-                } else {
-                        assert(rc == 0 && "pthread_create usage error");
-                }
-        }
+        while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain))
+                /* nothing */;
 
         if (tp->n_threads > size) {
                 size_t i;
                 int err;
 
+                /* set the do_quit flag for all threads we kill */
                 for (i = size; i < tp->n_threads; i++) {
-                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
-                        err = pthread_kill(tp->threads[i], SIGURG);
+                        __sync_add_and_fetch(tp->threads[i].do_quit, 1);
+                        err = pthread_kill(tp->threads[i].thr, SIGURG);
 
                         switch (err) {
                         case 0:
@@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
                         }
                 }
 
+                /* keep poking them to kick them out out epoll_wait/kevent */
                 for (i = size; i < tp->n_threads; i++) {
-                        poke(tp->threads[i], SIGURG);
+                        poke(tp->threads[i].thr, SIGURG);
 
-                        CHECK(int, 0, pthread_join(tp->threads[i], NULL));
+                        CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL));
                 }
                 tp->n_threads = size;
         }
-out:
         CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }