about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--accept_loop.c22
-rw-r--r--cmogstored.c1
-rw-r--r--cmogstored.h9
-rw-r--r--mnt.c2
-rw-r--r--queue_epoll.c11
-rw-r--r--queue_kqueue.c11
-rw-r--r--queue_loop.c13
-rw-r--r--sig.c17
-rw-r--r--test/queue-idle-1.c1
-rw-r--r--test/thrpool-1.c4
-rw-r--r--thrpool.c142
-rw-r--r--util.h32
12 files changed, 128 insertions, 137 deletions
diff --git a/accept_loop.c b/accept_loop.c
index dab9259..dd9c929 100644
--- a/accept_loop.c
+++ b/accept_loop.c
@@ -40,8 +40,10 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac)
 
         switch (errno) {
         case ECONNABORTED:
+                /* common error, nothing we can do about it */
         case EINTR:
-                return; /* common errors, nothing we can do about it */
+                /* we'll hit mog_thr_test_quit when we restart the loop */
+                return;
         case EBADF:
                 assert(0 && "BUG, called accept on bad FD");
         case ENOTSOCK:
@@ -74,11 +76,6 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac)
         }
 }
 
-static void accept_loop_cleanup(void *ignored)
-{
-        mog_alloc_quit();
-}
-
 /*
  * passed as the start_routine argument to pthread_create.
  * This function may run concurrently in multiple threads.
@@ -92,26 +89,19 @@ void *mog_accept_loop(void *arg)
         int accept_fd = mog_fd_of(ac)->fd;
         union mog_sockaddr msa;
 
-        mog_cancel_prepare();
-        pthread_cleanup_push(accept_loop_cleanup, NULL);
-
         for (;;) {
                 struct sockaddr *sa = mog_sockaddr_sa(&msa);
                 socklen_t salen = (socklen_t)sizeof(msa);
                 int client_fd;
 
-                /* pthread cancellation point */
+                mog_thr_test_quit();
                 client_fd = mog_accept_fn(accept_fd, sa, &salen);
 
-                if (client_fd >= 0) {
+                if (client_fd >= 0)
                         ac->post_accept_fn(client_fd, ac->svc, sa, salen);
-                } else {
-                        mog_testcancel();
+                else
                         accept_error_check(ac);
-                }
         }
 
-        pthread_cleanup_pop(1);
-
         return NULL;
 }
diff --git a/cmogstored.c b/cmogstored.c
index bec2137..31d8e64 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -371,7 +371,6 @@ static void upgrade_handler(void)
 
 static void main_worker_loop(const pid_t parent)
 {
-        mog_cancel_disable(); /* mog_idleq_wait() now relies on this */
         while (parent == 0 || parent == getppid()) {
                 mog_notify_wait(mog_main.have_mgmt);
                 if (sigchld_hit)
diff --git a/cmogstored.h b/cmogstored.h
index a91f393..7b1f164 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -207,11 +207,16 @@ struct mog_http {
         struct mog_packaddr mpa;
 } __attribute__((packed));
 
+struct mog_thread {
+        pthread_t thr;
+        unsigned *do_quit;
+};
+
 struct mog_thrpool {
         pthread_mutex_t lock;
         size_t n_threads;
         size_t want_threads;
-        pthread_t *threads;
+        struct mog_thread *threads;
         void *(*start_fn)(void *);
         void *start_arg;
 };
@@ -262,7 +267,6 @@ struct mog_file {
 
 /* sig.c */
 extern sigset_t mog_emptyset;
-void mog_cancel_prepare(void);
 void mog_intr_disable(void);
 void mog_intr_enable(void);
 void mog_sleep(long seconds);
@@ -400,6 +404,7 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode);
 char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode);
 
 /* thrpool.c */
+void mog_thr_test_quit(void);
 void mog_thrpool_start(struct mog_thrpool *, size_t n,
                        void *(*start_fn)(void *), void *arg);
 void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *);
diff --git a/mnt.c b/mnt.c
index bb4e645..9fb2e68 100644
--- a/mnt.c
+++ b/mnt.c
@@ -159,7 +159,7 @@ static void timed_init_once(void)
                         break;
 
                 /* this must succeed, keep looping */
-                if (mog_pthread_create_retry(rc)) {
+                if (mog_pthread_create_retryable(rc)) {
                         if ((++tries % 1024) == 0)
                                 warn("pthread_create: %s (tries: %lu)",
                                      strerror(rc), tries);
diff --git a/queue_epoll.c b/queue_epoll.c
index c2a7159..bc7b513 100644
--- a/queue_epoll.c
+++ b/queue_epoll.c
@@ -102,7 +102,7 @@ struct mog_queue * mog_queue_new(void)
 }
 
 static struct mog_fd *
-epoll_event_check(int rc, struct epoll_event *event, bool cancellable)
+epoll_event_check(int rc, struct epoll_event *event)
 {
         struct mog_fd *mfd;
 
@@ -119,9 +119,6 @@ epoll_event_check(int rc, struct epoll_event *event, bool cancellable)
                 /* rc could be > 1 if the kernel is broken :P */
                 die_errno("epoll_wait() failed with (%d)", rc);
 
-        if (cancellable)
-                mog_testcancel();
-
         return NULL;
 }
 
@@ -137,12 +134,12 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
         bool cancellable = timeout != 0;
 
         if (cancellable)
-                mog_testcancel();
+                mog_thr_test_quit();
 
         /* epoll_wait is a cancellation point since glibc 2.4 */
         rc = epoll_wait(q->queue_fd, &event, 1, timeout);
 
-        return epoll_event_check(rc, &event, cancellable);
+        return epoll_event_check(rc, &event);
 }
 
 struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
@@ -151,7 +148,7 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
         struct epoll_event event;
 
         rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset);
-        return epoll_event_check(rc, &event, false);
+        return epoll_event_check(rc, &event);
 }
 
 MOG_NOINLINE static void
diff --git a/queue_kqueue.c b/queue_kqueue.c
index b419486..9b28c5b 100644
--- a/queue_kqueue.c
+++ b/queue_kqueue.c
@@ -19,13 +19,6 @@ struct mog_queue * mog_queue_new(void)
         return mog_queue_init(kqueue_fd);
 }
 
-static void check_cancel(void)
-{
-        mog_cancel_enable();
-        pthread_testcancel();
-        mog_cancel_disable();
-}
-
 /*
  * grabs one active event off the event queue
  */
@@ -52,7 +45,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
          * cancellation request (since kevent() is not a cancellation point).
          */
         if (cancellable)
-                mog_testcancel();
+                mog_thr_test_quit();
 
         rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
 
@@ -64,7 +57,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
                 return mfd;
         }
         if (cancellable)
-                mog_testcancel();
+                mog_thr_test_quit();
         if (rc == 0)
                 return NULL;
 
diff --git a/queue_loop.c b/queue_loop.c
index 77c8620..f8a03a9 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -4,15 +4,6 @@
  */
 #include "cmogstored.h"
 
-static void queue_loop_cleanup(void *arg)
-{
-        unsigned long self = (unsigned long)pthread_self();
-
-        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self);
-        mog_alloc_quit();
-        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
-}
-
 static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd)
 {
         /*
@@ -46,8 +37,6 @@ void * mog_queue_loop(void *arg)
         struct mog_queue *q = arg;
         struct mog_fd *mfd = NULL;
 
-        mog_cancel_prepare();
-        pthread_cleanup_push(queue_loop_cleanup, NULL);
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
                (unsigned long)pthread_self());
 
@@ -71,8 +60,6 @@ void * mog_queue_loop(void *arg)
                 }
         }
 
-        pthread_cleanup_pop(1);
-
         return NULL;
 }
 
diff --git a/sig.c b/sig.c
index 8c052f6..8220eac 100644
--- a/sig.c
+++ b/sig.c
@@ -9,27 +9,12 @@
  */
 
 static sigset_t fullset;
-static sigset_t cancelset;
 sigset_t mog_emptyset;
 
-__attribute__((constructor)) void sig_init(void)
+__attribute__((constructor)) static void sig_init(void)
 {
         CHECK(int, 0, sigfillset(&fullset));
         CHECK(int, 0, sigemptyset(&mog_emptyset));
-        CHECK(int, 0, sigfillset(&cancelset));
-        CHECK(int, 0, sigdelset(&cancelset, SIGURG));
-}
-
-/* this runs at the start of every thread managed by thrpool */
-void mog_cancel_prepare(void)
-{
-        int old;
-
-        CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &cancelset, NULL));
-        mog_cancel_disable();
-        CHECK(int, 0, pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old));
-        assert(old == PTHREAD_CANCEL_DEFERRED
-               && "async cancel enabled redundantly");
 }
 
 void mog_intr_disable(void)
diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c
index 1343720..3b71923 100644
--- a/test/queue-idle-1.c
+++ b/test/queue-idle-1.c
@@ -54,7 +54,6 @@ static void test_blocking(void)
         mog_idleq_add(q, mfd, MOG_QEV_RD);
         CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL));
         printf("start wait: %d\n", (int)time(NULL));
-        mog_cancel_disable();
         assert(mfd == mog_idleq_wait_intr(q, -1));
         printf("  end wait: %d\n", (int)time(NULL));
         assert(1 == read(fds[0], buf, 1) && "read failed");
diff --git a/test/thrpool-1.c b/test/thrpool-1.c
index 7df099e..67aaff2 100644
--- a/test/thrpool-1.c
+++ b/test/thrpool-1.c
@@ -24,12 +24,10 @@ void *fn(void *xarg)
                         t.tv_sec++;
                 }
 
-                mog_cancel_disable();
                 CHECK(int, 0, pthread_mutex_lock(&lock));
                 pthread_cond_timedwait(&cond, &lock, &t);
                 CHECK(int, 0, pthread_mutex_unlock(&lock));
-                mog_cancel_enable();
-                pthread_testcancel();
+                mog_thr_test_quit();
         }
         assert(strcmp("whazzup", s) == 0 && "arg changed");
 
diff --git a/thrpool.c b/thrpool.c
index 96246a8..a0c0bc2 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -4,6 +4,14 @@
  */
 #include "cmogstored.h"
 
+static __thread unsigned mog_do_quit;
+struct mog_thr_start_arg {
+        struct mog_thrpool *tp;
+        pthread_mutex_t mtx;
+        pthread_cond_t cond;
+        unsigned *do_quit;
+};
+
 /*
  * we can lower this if we can test with lower values, NPTL minimum is 16K.
  * We also use syslog() and *printf() functions which take a lot of
@@ -22,17 +30,59 @@
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
 
+static sigset_t quitset;
+
+__attribute__((constructor)) static void thrpool_init(void)
+{
+        CHECK(int, 0, sigfillset(&quitset));
+        CHECK(int, 0, sigdelset(&quitset, SIGURG));
+}
+
+/* child thread notifies the parent about its readiness */
+static void *thr_start_wrapper(void *ptr)
+{
+        struct mog_thr_start_arg *arg = ptr;
+        struct mog_thrpool *tp;
+
+        mog_do_quit = 0;
+        CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL));
+        CHECK(int, 0, pthread_mutex_lock(&arg->mtx));
+
+        arg->do_quit = &mog_do_quit;
+        tp = arg->tp; /* arg becomes invalid once we unlock */
+
+        CHECK(int, 0, pthread_cond_signal(&arg->cond));
+        CHECK(int, 0, pthread_mutex_unlock(&arg->mtx));
+
+        return tp->start_fn(tp->start_arg);
+}
+
+/* child thread tests if its quit flag is set and exits if it is */
+void mog_thr_test_quit(void)
+{
+        if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) {
+                mog_alloc_quit();
+                pthread_exit(NULL);
+        }
+}
+
 /*
- * kevent() sleep is not a cancellation point, so it's possible for
- * a thread to sleep on it if the cancel request arrived right after
- * we checked for cancellation
+ * we no longer rely on pthreads cancellation, so our explicit checks for
+ * thread quitting requires us to continuously signal a thread for death
+ * in case it enters a sleeping syscall (epoll_wait/kevent) immediately
+ * after checking the mog_do_quit TLS variable
  */
 static void poke(pthread_t thr, int sig)
 {
         int err;
 
+        /*
+         * This is an uncommon code path and only triggered when
+         * we lower thread counts or shut down
+         */
         while ((err = pthread_kill(thr, sig)) == 0)
                 sched_yield();
+
         assert(err == ESRCH && "pthread_kill() usage bug");
 }
 
@@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size,
         }
 }
 
+static bool
+thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain)
+{
+        struct mog_thr_start_arg arg = {
+                .mtx = PTHREAD_MUTEX_INITIALIZER,
+                .cond = PTHREAD_COND_INITIALIZER,
+        };
+        pthread_t *thr;
+        pthread_attr_t attr;
+        size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread);
+        int rc;
+
+        assert(tp && "tp no defined");
+        arg.tp = tp;
+        tp->threads = xrealloc(tp->threads, bytes);
+
+        CHECK(int, 0, pthread_attr_init(&attr));
+
+        if (stacksize > 0)
+                CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize));
+
+        thr = &tp->threads[tp->n_threads].thr;
+
+        CHECK(int, 0, pthread_mutex_lock(&arg.mtx));
+        rc = pthread_create(thr, &attr, thr_start_wrapper, &arg);
+        CHECK(int, 0, pthread_attr_destroy(&attr));
+        if (rc == 0) {
+                CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx));
+                tp->threads[tp->n_threads].do_quit = arg.do_quit;
+        }
+        CHECK(int, 0, pthread_mutex_unlock(&arg.mtx));
+
+        if (rc == 0) {
+                tp->n_threads++;
+                *nr_eagain = 0;
+        } else if (mog_pthread_create_retryable(rc)) {
+                if (!thr_create_fail_retry(tp, size, nr_eagain, rc))
+                        return false;
+        } else {
+                assert(rc == 0 && "pthread_create usage error");
+        }
+        return true;
+}
+
 void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
 {
         unsigned long nr_eagain = 0;
 
         CHECK(int, 0, pthread_mutex_lock(&tp->lock));
-        while (size > tp->n_threads) {
-                pthread_t *thr;
-                pthread_attr_t attr;
-                size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t);
-                int rc;
 
-                tp->threads = xrealloc(tp->threads, bytes);
-
-                CHECK(int, 0, pthread_attr_init(&attr));
-
-                if (stacksize > 0) {
-                        CHECK(int, 0,
-                              pthread_attr_setstacksize(&attr, stacksize));
-                }
-
-                thr = tp->threads + tp->n_threads;
-
-                rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg);
-                CHECK(int, 0, pthread_attr_destroy(&attr));
-
-                if (rc == 0) {
-                        tp->n_threads++;
-                        nr_eagain = 0;
-                } else if (mog_pthread_create_retry(rc)) {
-                        if (!thr_create_fail_retry(tp, size, &nr_eagain, rc))
-                                goto out;
-                } else {
-                        assert(rc == 0 && "pthread_create usage error");
-                }
-        }
+        while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain))
+                /* nothing */;
 
         if (tp->n_threads > size) {
                 size_t i;
                 int err;
 
+                /* set the do_quit flag for all threads we kill */
                 for (i = size; i < tp->n_threads; i++) {
-                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
-                        err = pthread_kill(tp->threads[i], SIGURG);
+                        __sync_add_and_fetch(tp->threads[i].do_quit, 1);
+                        err = pthread_kill(tp->threads[i].thr, SIGURG);
 
                         switch (err) {
                         case 0:
@@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
                         }
                 }
 
+                /* keep poking them to kick them out out epoll_wait/kevent */
                 for (i = size; i < tp->n_threads; i++) {
-                        poke(tp->threads[i], SIGURG);
+                        poke(tp->threads[i].thr, SIGURG);
 
-                        CHECK(int, 0, pthread_join(tp->threads[i], NULL));
+                        CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL));
                 }
                 tp->n_threads = size;
         }
-out:
         CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }
 
diff --git a/util.h b/util.h
index 82f737e..32d5b38 100644
--- a/util.h
+++ b/util.h
@@ -36,36 +36,6 @@ static inline void mog_free(const void *ptr)
         assert(checkvar==(expect)&& "BUG" && __FILE__ && __LINE__); \
         } while (0)
 
-static inline void mog_cancel_enable(void)
-{
-        int old;
-
-        CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old));
-        assert(old == PTHREAD_CANCEL_DISABLE && "redundant cancel enable");
-}
-
-static inline void mog_cancel_disable(void)
-{
-        int old;
-
-        CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old));
-        assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable");
-}
-
-static inline void mog_testcancel(void)
-{
-        int old;
-
-        mog_cancel_enable();
-
-        /* make sure we are already using the async cancel type */
-        assert(0 == pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old)
-               && old == PTHREAD_CANCEL_ASYNCHRONOUS
-               && "asynchronous cancel not previously enabled");
-
-        mog_cancel_disable();
-}
-
 /* compiler should optimize this away */
 __attribute__((const)) static inline off_t off_t_max(void)
 {
@@ -98,7 +68,7 @@ static inline int mog_set_cloexec(int fd, const bool set)
         return fcntl(fd, F_SETFD, set ? FD_CLOEXEC : 0);
 }
 
-static inline bool mog_pthread_create_retry(const int err)
+static inline bool mog_pthread_create_retryable(const int err)
 {
         /*
          * older versions of glibc return ENOMEM instead of EAGAIN