diff options
-rw-r--r-- | accept_loop.c | 22 | ||||
-rw-r--r-- | cmogstored.c | 1 | ||||
-rw-r--r-- | cmogstored.h | 9 | ||||
-rw-r--r-- | mnt.c | 2 | ||||
-rw-r--r-- | queue_epoll.c | 11 | ||||
-rw-r--r-- | queue_kqueue.c | 11 | ||||
-rw-r--r-- | queue_loop.c | 13 | ||||
-rw-r--r-- | sig.c | 17 | ||||
-rw-r--r-- | test/queue-idle-1.c | 1 | ||||
-rw-r--r-- | test/thrpool-1.c | 4 | ||||
-rw-r--r-- | thrpool.c | 142 | ||||
-rw-r--r-- | util.h | 32 |
12 files changed, 128 insertions, 137 deletions
diff --git a/accept_loop.c b/accept_loop.c index dab9259..dd9c929 100644 --- a/accept_loop.c +++ b/accept_loop.c @@ -40,8 +40,10 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) switch (errno) { case ECONNABORTED: + /* common error, nothing we can do about it */ case EINTR: - return; /* common errors, nothing we can do about it */ + /* we'll hit mog_thr_test_quit when we restart the loop */ + return; case EBADF: assert(0 && "BUG, called accept on bad FD"); case ENOTSOCK: @@ -74,11 +76,6 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) } } -static void accept_loop_cleanup(void *ignored) -{ - mog_alloc_quit(); -} - /* * passed as the start_routine argument to pthread_create. * This function may run concurrently in multiple threads. @@ -92,26 +89,19 @@ void *mog_accept_loop(void *arg) int accept_fd = mog_fd_of(ac)->fd; union mog_sockaddr msa; - mog_cancel_prepare(); - pthread_cleanup_push(accept_loop_cleanup, NULL); - for (;;) { struct sockaddr *sa = mog_sockaddr_sa(&msa); socklen_t salen = (socklen_t)sizeof(msa); int client_fd; - /* pthread cancellation point */ + mog_thr_test_quit(); client_fd = mog_accept_fn(accept_fd, sa, &salen); - if (client_fd >= 0) { + if (client_fd >= 0) ac->post_accept_fn(client_fd, ac->svc, sa, salen); - } else { - mog_testcancel(); + else accept_error_check(ac); - } } - pthread_cleanup_pop(1); - return NULL; } diff --git a/cmogstored.c b/cmogstored.c index bec2137..31d8e64 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -371,7 +371,6 @@ static void upgrade_handler(void) static void main_worker_loop(const pid_t parent) { - mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { mog_notify_wait(mog_main.have_mgmt); if (sigchld_hit) diff --git a/cmogstored.h b/cmogstored.h index a91f393..7b1f164 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -207,11 +207,16 @@ struct mog_http { struct mog_packaddr mpa; } __attribute__((packed)); +struct mog_thread { + pthread_t thr; + unsigned *do_quit; +}; + struct mog_thrpool { pthread_mutex_t lock; size_t n_threads; size_t want_threads; - pthread_t *threads; + struct mog_thread *threads; void *(*start_fn)(void *); void *start_arg; }; @@ -262,7 +267,6 @@ struct mog_file { /* sig.c */ extern sigset_t mog_emptyset; -void mog_cancel_prepare(void); void mog_intr_disable(void); void mog_intr_enable(void); void mog_sleep(long seconds); @@ -400,6 +404,7 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode); char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); /* thrpool.c */ +void mog_thr_test_quit(void); void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); @@ -159,7 +159,7 @@ static void timed_init_once(void) break; /* this must succeed, keep looping */ - if (mog_pthread_create_retry(rc)) { + if (mog_pthread_create_retryable(rc)) { if ((++tries % 1024) == 0) warn("pthread_create: %s (tries: %lu)", strerror(rc), tries); diff --git a/queue_epoll.c b/queue_epoll.c index c2a7159..bc7b513 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -102,7 +102,7 @@ struct mog_queue * mog_queue_new(void) } static struct mog_fd * -epoll_event_check(int rc, struct epoll_event *event, bool cancellable) +epoll_event_check(int rc, struct epoll_event *event) { struct mog_fd *mfd; @@ -119,9 +119,6 @@ epoll_event_check(int rc, struct epoll_event *event, bool cancellable) /* rc could be > 1 if the kernel is broken :P */ die_errno("epoll_wait() failed with (%d)", rc); - if (cancellable) - mog_testcancel(); - return NULL; } @@ -137,12 +134,12 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) bool cancellable = timeout != 0; if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); /* epoll_wait is a cancellation point since glibc 2.4 */ rc = epoll_wait(q->queue_fd, &event, 1, timeout); - return epoll_event_check(rc, &event, cancellable); + return epoll_event_check(rc, &event); } struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) @@ -151,7 +148,7 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) struct epoll_event event; rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset); - return epoll_event_check(rc, &event, false); + return epoll_event_check(rc, &event); } MOG_NOINLINE static void diff --git a/queue_kqueue.c b/queue_kqueue.c index b419486..9b28c5b 100644 --- a/queue_kqueue.c +++ b/queue_kqueue.c @@ -19,13 +19,6 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(kqueue_fd); } -static void check_cancel(void) -{ - mog_cancel_enable(); - pthread_testcancel(); - mog_cancel_disable(); -} - /* * grabs one active event off the event queue */ @@ -52,7 +45,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) * cancellation request (since kevent() is not a cancellation point). */ if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp); @@ -64,7 +57,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) return mfd; } if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); if (rc == 0) return NULL; diff --git a/queue_loop.c b/queue_loop.c index 77c8620..f8a03a9 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -4,15 +4,6 @@ */ #include "cmogstored.h" -static void queue_loop_cleanup(void *arg) -{ - unsigned long self = (unsigned long)pthread_self(); - - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self); - mog_alloc_quit(); - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self); -} - static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd) { /* @@ -46,8 +37,6 @@ void * mog_queue_loop(void *arg) struct mog_queue *q = arg; struct mog_fd *mfd = NULL; - mog_cancel_prepare(); - pthread_cleanup_push(queue_loop_cleanup, NULL); syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready", (unsigned long)pthread_self()); @@ -71,8 +60,6 @@ void * mog_queue_loop(void *arg) } } - pthread_cleanup_pop(1); - return NULL; } @@ -9,27 +9,12 @@ */ static sigset_t fullset; -static sigset_t cancelset; sigset_t mog_emptyset; -__attribute__((constructor)) void sig_init(void) +__attribute__((constructor)) static void sig_init(void) { CHECK(int, 0, sigfillset(&fullset)); CHECK(int, 0, sigemptyset(&mog_emptyset)); - CHECK(int, 0, sigfillset(&cancelset)); - CHECK(int, 0, sigdelset(&cancelset, SIGURG)); -} - -/* this runs at the start of every thread managed by thrpool */ -void mog_cancel_prepare(void) -{ - int old; - - CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &cancelset, NULL)); - mog_cancel_disable(); - CHECK(int, 0, pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old)); - assert(old == PTHREAD_CANCEL_DEFERRED - && "async cancel enabled redundantly"); } void mog_intr_disable(void) diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c index 1343720..3b71923 100644 --- a/test/queue-idle-1.c +++ b/test/queue-idle-1.c @@ -54,7 +54,6 @@ static void test_blocking(void) mog_idleq_add(q, mfd, MOG_QEV_RD); CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL)); printf("start wait: %d\n", (int)time(NULL)); - mog_cancel_disable(); assert(mfd == mog_idleq_wait_intr(q, -1)); printf(" end wait: %d\n", (int)time(NULL)); assert(1 == read(fds[0], buf, 1) && "read failed"); diff --git a/test/thrpool-1.c b/test/thrpool-1.c index 7df099e..67aaff2 100644 --- a/test/thrpool-1.c +++ b/test/thrpool-1.c @@ -24,12 +24,10 @@ void *fn(void *xarg) t.tv_sec++; } - mog_cancel_disable(); CHECK(int, 0, pthread_mutex_lock(&lock)); pthread_cond_timedwait(&cond, &lock, &t); CHECK(int, 0, pthread_mutex_unlock(&lock)); - mog_cancel_enable(); - pthread_testcancel(); + mog_thr_test_quit(); } assert(strcmp("whazzup", s) == 0 && "arg changed"); @@ -4,6 +4,14 @@ */ #include "cmogstored.h" +static __thread unsigned mog_do_quit; +struct mog_thr_start_arg { + struct mog_thrpool *tp; + pthread_mutex_t mtx; + pthread_cond_t cond; + unsigned *do_quit; +}; + /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -22,17 +30,59 @@ #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; +static sigset_t quitset; + +__attribute__((constructor)) static void thrpool_init(void) +{ + CHECK(int, 0, sigfillset(&quitset)); + CHECK(int, 0, sigdelset(&quitset, SIGURG)); +} + +/* child thread notifies the parent about its readiness */ +static void *thr_start_wrapper(void *ptr) +{ + struct mog_thr_start_arg *arg = ptr; + struct mog_thrpool *tp; + + mog_do_quit = 0; + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL)); + CHECK(int, 0, pthread_mutex_lock(&arg->mtx)); + + arg->do_quit = &mog_do_quit; + tp = arg->tp; /* arg becomes invalid once we unlock */ + + CHECK(int, 0, pthread_cond_signal(&arg->cond)); + CHECK(int, 0, pthread_mutex_unlock(&arg->mtx)); + + return tp->start_fn(tp->start_arg); +} + +/* child thread tests if its quit flag is set and exits if it is */ +void mog_thr_test_quit(void) +{ + if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) { + mog_alloc_quit(); + pthread_exit(NULL); + } +} + /* - * kevent() sleep is not a cancellation point, so it's possible for - * a thread to sleep on it if the cancel request arrived right after - * we checked for cancellation + * we no longer rely on pthreads cancellation, so our explicit checks for + * thread quitting requires us to continuously signal a thread for death + * in case it enters a sleeping syscall (epoll_wait/kevent) immediately + * after checking the mog_do_quit TLS variable */ static void poke(pthread_t thr, int sig) { int err; + /* + * This is an uncommon code path and only triggered when + * we lower thread counts or shut down + */ while ((err = pthread_kill(thr, sig)) == 0) sched_yield(); + assert(err == ESRCH && "pthread_kill() usage bug"); } @@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, } } +static bool +thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain) +{ + struct mog_thr_start_arg arg = { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + }; + pthread_t *thr; + pthread_attr_t attr; + size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread); + int rc; + + assert(tp && "tp no defined"); + arg.tp = tp; + tp->threads = xrealloc(tp->threads, bytes); + + CHECK(int, 0, pthread_attr_init(&attr)); + + if (stacksize > 0) + CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize)); + + thr = &tp->threads[tp->n_threads].thr; + + CHECK(int, 0, pthread_mutex_lock(&arg.mtx)); + rc = pthread_create(thr, &attr, thr_start_wrapper, &arg); + CHECK(int, 0, pthread_attr_destroy(&attr)); + if (rc == 0) { + CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx)); + tp->threads[tp->n_threads].do_quit = arg.do_quit; + } + CHECK(int, 0, pthread_mutex_unlock(&arg.mtx)); + + if (rc == 0) { + tp->n_threads++; + *nr_eagain = 0; + } else if (mog_pthread_create_retryable(rc)) { + if (!thr_create_fail_retry(tp, size, nr_eagain, rc)) + return false; + } else { + assert(rc == 0 && "pthread_create usage error"); + } + return true; +} + void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) { unsigned long nr_eagain = 0; CHECK(int, 0, pthread_mutex_lock(&tp->lock)); - while (size > tp->n_threads) { - pthread_t *thr; - pthread_attr_t attr; - size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t); - int rc; - tp->threads = xrealloc(tp->threads, bytes); - - CHECK(int, 0, pthread_attr_init(&attr)); - - if (stacksize > 0) { - CHECK(int, 0, - pthread_attr_setstacksize(&attr, stacksize)); - } - - thr = tp->threads + tp->n_threads; - - rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg); - CHECK(int, 0, pthread_attr_destroy(&attr)); - - if (rc == 0) { - tp->n_threads++; - nr_eagain = 0; - } else if (mog_pthread_create_retry(rc)) { - if (!thr_create_fail_retry(tp, size, &nr_eagain, rc)) - goto out; - } else { - assert(rc == 0 && "pthread_create usage error"); - } - } + while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain)) + /* nothing */; if (tp->n_threads > size) { size_t i; int err; + /* set the do_quit flag for all threads we kill */ for (i = size; i < tp->n_threads; i++) { - CHECK(int, 0, pthread_cancel(tp->threads[i])); - err = pthread_kill(tp->threads[i], SIGURG); + __sync_add_and_fetch(tp->threads[i].do_quit, 1); + err = pthread_kill(tp->threads[i].thr, SIGURG); switch (err) { case 0: @@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) } } + /* keep poking them to kick them out out epoll_wait/kevent */ for (i = size; i < tp->n_threads; i++) { - poke(tp->threads[i], SIGURG); + poke(tp->threads[i].thr, SIGURG); - CHECK(int, 0, pthread_join(tp->threads[i], NULL)); + CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL)); } tp->n_threads = size; } -out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } @@ -36,36 +36,6 @@ static inline void mog_free(const void *ptr) assert(checkvar==(expect)&& "BUG" && __FILE__ && __LINE__); \ } while (0) -static inline void mog_cancel_enable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old)); - assert(old == PTHREAD_CANCEL_DISABLE && "redundant cancel enable"); -} - -static inline void mog_cancel_disable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old)); - assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable"); -} - -static inline void mog_testcancel(void) -{ - int old; - - mog_cancel_enable(); - - /* make sure we are already using the async cancel type */ - assert(0 == pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old) - && old == PTHREAD_CANCEL_ASYNCHRONOUS - && "asynchronous cancel not previously enabled"); - - mog_cancel_disable(); -} - /* compiler should optimize this away */ __attribute__((const)) static inline off_t off_t_max(void) { @@ -98,7 +68,7 @@ static inline int mog_set_cloexec(int fd, const bool set) return fcntl(fd, F_SETFD, set ? FD_CLOEXEC : 0); } -static inline bool mog_pthread_create_retry(const int err) +static inline bool mog_pthread_create_retryable(const int err) { /* * older versions of glibc return ENOMEM instead of EAGAIN |