diff options
-rw-r--r-- | cmogstored.c | 204 | ||||
-rw-r--r-- | cmogstored.h | 1 | ||||
-rw-r--r-- | iostat_process.c | 1 | ||||
-rw-r--r-- | test/cmogstored-cfg.rb | 73 |
4 files changed, 253 insertions, 26 deletions
diff --git a/cmogstored.c b/cmogstored.c index f58b816..e7e7d50 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -12,6 +12,12 @@ const char *argp_program_version = THIS" "PACKAGE_VERSION; static sig_atomic_t sigchld_nr; static sig_atomic_t do_exit; static size_t nthr; +static unsigned long worker_processes; +static Hash_table *workers; +struct worker { + pid_t pid; + unsigned long id; +}; #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1) static struct argp_option options[] = { @@ -43,6 +49,11 @@ static struct argp_option options[] = { /* hidden for now, don't break compat with Perl mogstored */ .name = "multi", .key = 'M', .flags = OPTION_HIDDEN }, + { + /* hidden for now, don't break compat with Perl mogstored */ + .name = "worker-processes", .key = 'W', .flags = OPTION_HIDDEN, + .arg = "<number>", + }, { NULL } }; @@ -57,17 +68,17 @@ static void new_cfg_or_die(const char *config) config, errno ? strerror(errno) : "parser error"); } -static void maxconns_or_die(struct mog_cfg *cfg, const char *s) +static void check_strtoul(unsigned long *dst, const char *s, const char *key) { char *end; errno = 0; - cfg->maxconns = strtoul(s, &end, 10); + *dst = strtoul(s, &end, 10); if (errno) - die("failed to parse maxconns=%s (%s)\n", s, strerror(errno)); + die("failed to parse --%s=%s (%s)\n", key, s, strerror(errno)); if (*end) - die("failed to parse maxconns=%s (invalid character: %c)\n", - s, *end); + die("failed to parse --%s=%s (invalid character: %c)\n", + key, s, *end); } static void addr_or_die(struct mog_addrinfo **dst, const char *key, char *s) @@ -87,7 +98,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) case CFG_KEY(docroot): cfg->docroot = xstrdup(arg); break; case CFG_KEY(pidfile): cfg->pidfile = xstrdup(arg); break; case CFG_KEY(configfile): new_cfg_or_die(arg); break; - case CFG_KEY(maxconns): maxconns_or_die(cfg, arg); break; + case CFG_KEY(maxconns): + check_strtoul(&cfg->maxconns, arg, "maxconns"); + break; case CFG_KEY(httplisten): addr_or_die(&cfg->httplisten, "httplisten", arg); break; @@ -99,6 +112,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) mog_cfg_check_server(cfg); break; case 'M': mog_cfg_multi = true; break; + case 'W': + check_strtoul(&worker_processes, arg, "worker-processes"); + break; case ARGP_KEY_ARG: argp_usage(state); case ARGP_KEY_END: @@ -173,8 +189,6 @@ static void daemonize(void) if (pid > 0) /* intermediate parent */ exit(EXIT_SUCCESS); - mog_notify_init(); /* this must be done post-fork() for kqueue */ - if (chdir("/") < 0) die("chdir(/) failed: %s", strerror(errno)); fd = open("/dev/null", O_RDWR); @@ -196,12 +210,17 @@ static void daemonize(void) mog_close(ready_pipe[1]); } +#ifndef LOG_PERROR +# define LOG_PERROR 0 +#endif + MOG_NOINLINE static void setup(int argc, char *argv[]) { int pid_fd = -1; static struct argp argp = { options, parse_opt, NULL, summary }; int mask = 0; + mog_mnt_refresh(); argp_parse(&argp, argc, argv, 0, NULL, &mog_cli); mog_cfg_validate_or_die(&mog_cli); mog_cfg_svc_start_or_die(&mog_cli); @@ -210,7 +229,12 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) /* TODO: make logging configurable */ { - openlog(THIS, LOG_ODELAY|LOG_PID, LOG_DAEMON); + int option = LOG_PID; + + if (!mog_cli.daemonize) + option |= LOG_PERROR; + + openlog(THIS, option, LOG_DAEMON); mask |= LOG_MASK(LOG_EMERG); mask |= LOG_MASK(LOG_ALERT); mask |= LOG_MASK(LOG_CRIT); @@ -224,8 +248,6 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) if (mog_cli.daemonize) daemonize(); - else - mog_notify_init(); if (pid_fd >= 0 && mog_pidfile_commit(pid_fd) < 0) syslog(LOG_ERR, @@ -313,7 +335,7 @@ _Noreturn static void cmogstored_exit(void) exit(EXIT_SUCCESS); } -static void cmogstored_wakeup_handler(int signum) +static void worker_wakeup_handler(int signum) { switch (signum) { case SIGCHLD: ++sigchld_nr; break; @@ -325,7 +347,32 @@ static void cmogstored_wakeup_handler(int signum) mog_notify(MOG_NOTIFY_SIGNAL); } -static void cmogstored_siginit(void) +static bool kill_child(void *workent, void *ignored) +{ + struct worker *worker = workent; + + if (kill(worker->pid, do_exit) == 0) + return true; + + /* race between receiving a signal and waitpid2, ignore */ + if (errno == ESRCH) + return true; + syslog(LOG_ERR, "could not signal worker[%lu] pid=%d: %m", + worker->id, (int)worker->pid); + return true; +} + +static void master_wakeup_handler(int signum) +{ + switch (signum) { + case SIGQUIT: + case SIGTERM: + case SIGINT: + do_exit = signum; + } +} + +static void siginit(void (*wakeup_handler)(int)) { struct sigaction sa; @@ -335,7 +382,7 @@ static void cmogstored_siginit(void) sa.sa_handler = SIG_IGN; CHECK(int, 0, sigaction(SIGPIPE, &sa, NULL)); - sa.sa_handler = cmogstored_wakeup_handler; + sa.sa_handler = wakeup_handler; CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); CHECK(int, 0, sigaction(SIGINT, &sa, NULL)); @@ -349,9 +396,9 @@ static void cmogstored_siginit(void) CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL)); } -static void main_loop(struct mog_queue *q) +static void main_loop(struct mog_queue *q, const pid_t parent) { - for (;;) { + while (parent == 0 || parent == getppid()) { mog_notify_wait(); if (do_exit) cmogstored_exit(); @@ -361,22 +408,129 @@ static void main_loop(struct mog_queue *q) sigchld_nr = 0; } } + + syslog(LOG_INFO, "parent=%d abandoned us, dying", parent); + cmogstored_exit(); } -int main(int argc, char *argv[]) +static void run_worker(const pid_t parent) { - struct mog_queue *q; + struct mog_queue *q = mog_queue_new(); - mog_mnt_refresh(); - setup(argc, argv); - - q = mog_queue_new(); - mog_intr_disable(); - cmogstored_siginit(); + mog_notify_init(); + siginit(worker_wakeup_handler); mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q); mog_svc_each(svc_start_each, q); mog_iostat_spawn(); - main_loop(q); + main_loop(q, parent); +} + +static bool worker_cmp(const void *a, const void *b) +{ + const struct worker *wa = a; + const struct worker *wb = b; + + return wa->pid == wb->pid; +} + +static size_t worker_hash(const void *x, size_t tablesize) +{ + const struct worker *w = x; + + return w->pid % tablesize; +} + +static void fork_worker(unsigned long worker_id) +{ + struct worker *worker; + pid_t pid; + pid_t parent = getpid(); /* not using getppid() since it's racy */ + + mog_intr_disable(); + pid = fork(); + if (pid > 0) { + worker = xmalloc(sizeof(struct worker)); + worker->pid = pid; + worker->id = worker_id; + if (hash_insert(workers, worker) == NULL) + mog_oom(); + mog_intr_enable(); + } else if (pid == 0) { + /* worker will call mog_intr_enable() later in notify loop */ + hash_free(workers); + run_worker(parent); + exit(EXIT_SUCCESS); + } else { + PRESERVE_ERRNO( mog_intr_enable() ); + syslog(LOG_ERR, "fork() failed: %m, sleeping for 10s"); + sleep(10); + } +} + +static void worker_died(int status, struct worker *tmp) +{ + struct worker *died = hash_delete(workers, tmp); + + /* + * we got an unexpected pid, just ignore it, it could + * be a reparented iostat at shutdown + */ + if (died == NULL) + return; + + tmp->id = died->id; + free(died); + + if (do_exit) + return; + + syslog(LOG_INFO, + "worker[%lu] pid=%d died with status=%d, respawning", + tmp->id, (int)tmp->pid, status); + fork_worker(tmp->id); +} + +static void run_master(void) +{ + unsigned long id; + + workers = hash_initialize(worker_processes, NULL, + worker_hash, worker_cmp, free); + + siginit(master_wakeup_handler); + + for (id = 0; id < worker_processes; id++) + fork_worker(id); + + while (! do_exit || hash_get_n_entries(workers) > 0) { + int status; + struct worker tmp; + + tmp.pid = waitpid(-1, &status, 0); + + if (tmp.pid > 0) + worker_died(status, &tmp); + if (tmp.pid < 0 && errno != EINTR) + syslog(LOG_WARNING, "waitpid failed: %m"); + + if (do_exit && do_exit != SIGCONT) { + hash_do_for_each(workers, kill_child, NULL); + do_exit = SIGCONT; /* only kill once */ + } + } + hash_free(workers); +} + +int main(int argc, char *argv[]) +{ + setup(argc, argv); /* this daemonizes */ + + if (worker_processes == 0) { + mog_intr_disable(); + run_worker(0); + } else { + run_master(); + } return 0; } diff --git a/cmogstored.h b/cmogstored.h index f9d49a7..820697f 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -18,6 +18,7 @@ #include <sys/stat.h> #include <sys/socket.h> #include <sys/ioctl.h> +#include <sys/wait.h> #include <signal.h> #include <stddef.h> #include <stdint.h> diff --git a/iostat_process.c b/iostat_process.c index f1a6165..935736a 100644 --- a/iostat_process.c +++ b/iostat_process.c @@ -8,7 +8,6 @@ * regardless of the number of mog_svc objects we have. */ #include "cmogstored.h" -#include <sys/wait.h> static pid_t iostat_pid; static time_t iostat_last_fail; diff --git a/test/cmogstored-cfg.rb b/test/cmogstored-cfg.rb index e438402..553e81a 100644 --- a/test/cmogstored-cfg.rb +++ b/test/cmogstored-cfg.rb @@ -28,6 +28,79 @@ class TestCmogstoredConfig < Test::Unit::TestCase assert_kind_of String, c.gets end + def children(pid = @pid) + pids = `ps --ppid #{pid} --no-headers -o pid 2>/dev/null`.strip + if RUBY_PLATFORM =~ /linux/ + assert $?.success?, $?.inspect + end + pids.split(/\s+/).grep(/\A\d+\z/).map { |x| x.to_i }.sort + end + + def test_worker_processes + nproc = 2 + @cmd << "--worker-processes=#{nproc}" + @cmd << "--mgmtlisten=#@host:#@port" + tmp = Tempfile.new("err") + @pid = fork do + $stderr.reopen(tmp) + exec(*@cmd) + end + pre_kill # ensure workers are running + pids = children + + # kill gently + pids.each do |pid| + begin + Process.kill(:QUIT, pid) + t_yield + rescue Errno::ESRCH + break + end while true + end + + pre_kill # ensure workers are respawned + + if pids[0] + p pids if $VERBOSE + new_pids = children + assert pids != new_pids, "new=#{new_pids.inspect} old=#{pids.inspect}" + pids.each do |pid| + assert_raises(Errno::ESRCH) { Process.kill(0, pid) } + end + end + + # kill brutally + pids = children + pids.each do |pid| + begin + Process.kill(:KILL, pid) + t_yield + rescue Errno::ESRCH + break + end while true + end + + pre_kill # ensure workers are respawned + + if pids[0] + p pids if $VERBOSE + new_pids = children + assert pids != new_pids, "new=#{new_pids.inspect} old=#{pids.inspect}" + pids.each do |pid| + assert_raises(Errno::ESRCH) { Process.kill(0, pid) } + end + end + + Process.kill(:QUIT, @pid) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + if new_pids[0] + new_pids.each do |pid| + assert_raises(Errno::ESRCH) { Process.kill(0, pid) } + end + end + end + def test_config_file tmp = Tempfile.new("cmogstored-cfg-test") tmp.write("mgmtlisten = #@host:#@port\n") |