about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--cmogstored.c204
-rw-r--r--cmogstored.h1
-rw-r--r--iostat_process.c1
-rw-r--r--test/cmogstored-cfg.rb73
4 files changed, 253 insertions, 26 deletions
diff --git a/cmogstored.c b/cmogstored.c
index f58b816..e7e7d50 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -12,6 +12,12 @@ const char *argp_program_version = THIS" "PACKAGE_VERSION;
 static sig_atomic_t sigchld_nr;
 static sig_atomic_t do_exit;
 static size_t nthr;
+static unsigned long worker_processes;
+static Hash_table *workers;
+struct worker {
+        pid_t pid;
+        unsigned long id;
+};
 
 #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1)
 static struct argp_option options[] = {
@@ -43,6 +49,11 @@ static struct argp_option options[] = {
           /* hidden for now, don't break compat with Perl mogstored */
           .name = "multi", .key = 'M', .flags = OPTION_HIDDEN
         },
+        {
+          /* hidden for now, don't break compat with Perl mogstored */
+          .name = "worker-processes", .key = 'W', .flags = OPTION_HIDDEN,
+          .arg = "<number>",
+        },
         { NULL }
 };
 
@@ -57,17 +68,17 @@ static void new_cfg_or_die(const char *config)
             config, errno ? strerror(errno) : "parser error");
 }
 
-static void maxconns_or_die(struct mog_cfg *cfg, const char *s)
+static void check_strtoul(unsigned long *dst, const char *s, const char *key)
 {
         char *end;
 
         errno = 0;
-        cfg->maxconns = strtoul(s, &end, 10);
+        *dst = strtoul(s, &end, 10);
         if (errno)
-                die("failed to parse maxconns=%s (%s)\n", s, strerror(errno));
+                die("failed to parse --%s=%s (%s)\n", key, s, strerror(errno));
         if (*end)
-                die("failed to parse maxconns=%s (invalid character: %c)\n",
-                    s, *end);
+                die("failed to parse --%s=%s (invalid character: %c)\n",
+                    key, s, *end);
 }
 
 static void addr_or_die(struct mog_addrinfo **dst, const char *key, char *s)
@@ -87,7 +98,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
         case CFG_KEY(docroot): cfg->docroot = xstrdup(arg); break;
         case CFG_KEY(pidfile): cfg->pidfile = xstrdup(arg); break;
         case CFG_KEY(configfile): new_cfg_or_die(arg); break;
-        case CFG_KEY(maxconns): maxconns_or_die(cfg, arg); break;
+        case CFG_KEY(maxconns):
+                check_strtoul(&cfg->maxconns, arg, "maxconns");
+                break;
         case CFG_KEY(httplisten):
                 addr_or_die(&cfg->httplisten, "httplisten", arg);
                 break;
@@ -99,6 +112,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
                 mog_cfg_check_server(cfg);
                 break;
         case 'M': mog_cfg_multi = true; break;
+        case 'W':
+                check_strtoul(&worker_processes, arg, "worker-processes");
+                break;
         case ARGP_KEY_ARG:
                 argp_usage(state);
         case ARGP_KEY_END:
@@ -173,8 +189,6 @@ static void daemonize(void)
         if (pid > 0) /* intermediate parent */
                 exit(EXIT_SUCCESS);
 
-        mog_notify_init(); /* this must be done post-fork() for kqueue */
-
         if (chdir("/") < 0)
                 die("chdir(/) failed: %s", strerror(errno));
         fd = open("/dev/null", O_RDWR);
@@ -196,12 +210,17 @@ static void daemonize(void)
         mog_close(ready_pipe[1]);
 }
 
+#ifndef LOG_PERROR
+# define LOG_PERROR 0
+#endif
+
 MOG_NOINLINE static void setup(int argc, char *argv[])
 {
         int pid_fd = -1;
         static struct argp argp = { options, parse_opt, NULL, summary };
         int mask = 0;
 
+        mog_mnt_refresh();
         argp_parse(&argp, argc, argv, 0, NULL, &mog_cli);
         mog_cfg_validate_or_die(&mog_cli);
         mog_cfg_svc_start_or_die(&mog_cli);
@@ -210,7 +229,12 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         /* TODO: make logging configurable */
         {
-                openlog(THIS, LOG_ODELAY|LOG_PID, LOG_DAEMON);
+                int option = LOG_PID;
+
+                if (!mog_cli.daemonize)
+                        option |= LOG_PERROR;
+
+                openlog(THIS, option, LOG_DAEMON);
                 mask |= LOG_MASK(LOG_EMERG);
                 mask |= LOG_MASK(LOG_ALERT);
                 mask |= LOG_MASK(LOG_CRIT);
@@ -224,8 +248,6 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         if (mog_cli.daemonize)
                 daemonize();
-        else
-                mog_notify_init();
 
         if (pid_fd >= 0 && mog_pidfile_commit(pid_fd) < 0)
                 syslog(LOG_ERR,
@@ -313,7 +335,7 @@ _Noreturn static void cmogstored_exit(void)
         exit(EXIT_SUCCESS);
 }
 
-static void cmogstored_wakeup_handler(int signum)
+static void worker_wakeup_handler(int signum)
 {
         switch (signum) {
         case SIGCHLD: ++sigchld_nr; break;
@@ -325,7 +347,32 @@ static void cmogstored_wakeup_handler(int signum)
         mog_notify(MOG_NOTIFY_SIGNAL);
 }
 
-static void cmogstored_siginit(void)
+static bool kill_child(void *workent, void *ignored)
+{
+        struct worker *worker = workent;
+
+        if (kill(worker->pid, do_exit) == 0)
+                return true;
+
+        /* race between receiving a signal and waitpid2, ignore */
+        if (errno == ESRCH)
+                return true;
+        syslog(LOG_ERR, "could not signal worker[%lu] pid=%d: %m",
+               worker->id, (int)worker->pid);
+        return true;
+}
+
+static void master_wakeup_handler(int signum)
+{
+        switch (signum) {
+        case SIGQUIT:
+        case SIGTERM:
+        case SIGINT:
+                do_exit = signum;
+        }
+}
+
+static void siginit(void (*wakeup_handler)(int))
 {
         struct sigaction sa;
 
@@ -335,7 +382,7 @@ static void cmogstored_siginit(void)
         sa.sa_handler = SIG_IGN;
         CHECK(int, 0, sigaction(SIGPIPE, &sa, NULL));
 
-        sa.sa_handler = cmogstored_wakeup_handler;
+        sa.sa_handler = wakeup_handler;
         CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
         CHECK(int, 0, sigaction(SIGINT, &sa, NULL));
 
@@ -349,9 +396,9 @@ static void cmogstored_siginit(void)
         CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
 }
 
-static void main_loop(struct mog_queue *q)
+static void main_loop(struct mog_queue *q, const pid_t parent)
 {
-        for (;;) {
+        while (parent == 0 || parent == getppid()) {
                 mog_notify_wait();
                 if (do_exit)
                         cmogstored_exit();
@@ -361,22 +408,129 @@ static void main_loop(struct mog_queue *q)
                         sigchld_nr = 0;
                 }
         }
+
+        syslog(LOG_INFO, "parent=%d abandoned us, dying", parent);
+        cmogstored_exit();
 }
 
-int main(int argc, char *argv[])
+static void run_worker(const pid_t parent)
 {
-        struct mog_queue *q;
+        struct mog_queue *q = mog_queue_new();
 
-        mog_mnt_refresh();
-        setup(argc, argv);
-
-        q = mog_queue_new();
-        mog_intr_disable();
-        cmogstored_siginit();
+        mog_notify_init();
+        siginit(worker_wakeup_handler);
         mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);
         mog_svc_each(svc_start_each, q);
         mog_iostat_spawn();
-        main_loop(q);
+        main_loop(q, parent);
+}
+
+static bool worker_cmp(const void *a, const void *b)
+{
+        const struct worker *wa = a;
+        const struct worker *wb = b;
+
+        return wa->pid == wb->pid;
+}
+
+static size_t worker_hash(const void *x, size_t tablesize)
+{
+        const struct worker *w = x;
+
+        return w->pid % tablesize;
+}
+
+static void fork_worker(unsigned long worker_id)
+{
+        struct worker *worker;
+        pid_t pid;
+        pid_t parent = getpid(); /* not using getppid() since it's racy */
+
+        mog_intr_disable();
+        pid = fork();
+        if (pid > 0) {
+                worker = xmalloc(sizeof(struct worker));
+                worker->pid = pid;
+                worker->id = worker_id;
+                if (hash_insert(workers, worker) == NULL)
+                        mog_oom();
+                mog_intr_enable();
+        } else if (pid == 0) {
+                /* worker will call mog_intr_enable() later in notify loop */
+                hash_free(workers);
+                run_worker(parent);
+                exit(EXIT_SUCCESS);
+        } else {
+                PRESERVE_ERRNO( mog_intr_enable() );
+                syslog(LOG_ERR, "fork() failed: %m, sleeping for 10s");
+                sleep(10);
+        }
+}
+
+static void worker_died(int status, struct worker *tmp)
+{
+        struct worker *died = hash_delete(workers, tmp);
+
+        /*
+         * we got an unexpected pid, just ignore it, it could
+         * be a reparented iostat at shutdown
+         */
+        if (died == NULL)
+                return;
+
+        tmp->id = died->id;
+        free(died);
+
+        if (do_exit)
+                return;
+
+        syslog(LOG_INFO,
+               "worker[%lu] pid=%d died with status=%d, respawning",
+               tmp->id, (int)tmp->pid, status);
+        fork_worker(tmp->id);
+}
+
+static void run_master(void)
+{
+        unsigned long id;
+
+        workers = hash_initialize(worker_processes, NULL,
+                                      worker_hash, worker_cmp, free);
+
+        siginit(master_wakeup_handler);
+
+        for (id = 0; id < worker_processes; id++)
+                fork_worker(id);
+
+        while (! do_exit || hash_get_n_entries(workers) > 0) {
+                int status;
+                struct worker tmp;
+
+                tmp.pid = waitpid(-1, &status, 0);
+
+                if (tmp.pid > 0)
+                        worker_died(status, &tmp);
+                if (tmp.pid < 0 && errno != EINTR)
+                        syslog(LOG_WARNING, "waitpid failed: %m");
+
+                if (do_exit && do_exit != SIGCONT) {
+                        hash_do_for_each(workers, kill_child, NULL);
+                        do_exit = SIGCONT; /* only kill once */
+                }
+        }
+        hash_free(workers);
+}
+
+int main(int argc, char *argv[])
+{
+        setup(argc, argv); /* this daemonizes */
+
+        if (worker_processes == 0) {
+                mog_intr_disable();
+                run_worker(0);
+        } else {
+                run_master();
+        }
 
         return 0;
 }
diff --git a/cmogstored.h b/cmogstored.h
index f9d49a7..820697f 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -18,6 +18,7 @@
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
+#include <sys/wait.h>
 #include <signal.h>
 #include <stddef.h>
 #include <stdint.h>
diff --git a/iostat_process.c b/iostat_process.c
index f1a6165..935736a 100644
--- a/iostat_process.c
+++ b/iostat_process.c
@@ -8,7 +8,6 @@
  * regardless of the number of mog_svc objects we have.
  */
 #include "cmogstored.h"
-#include <sys/wait.h>
 
 static pid_t iostat_pid;
 static time_t iostat_last_fail;
diff --git a/test/cmogstored-cfg.rb b/test/cmogstored-cfg.rb
index e438402..553e81a 100644
--- a/test/cmogstored-cfg.rb
+++ b/test/cmogstored-cfg.rb
@@ -28,6 +28,79 @@ class TestCmogstoredConfig < Test::Unit::TestCase
     assert_kind_of String, c.gets
   end
 
+  def children(pid = @pid)
+    pids = `ps --ppid #{pid} --no-headers -o pid 2>/dev/null`.strip
+    if RUBY_PLATFORM =~ /linux/
+      assert $?.success?, $?.inspect
+    end
+    pids.split(/\s+/).grep(/\A\d+\z/).map { |x| x.to_i }.sort
+  end
+
+  def test_worker_processes
+    nproc = 2
+    @cmd << "--worker-processes=#{nproc}"
+    @cmd << "--mgmtlisten=#@host:#@port"
+    tmp = Tempfile.new("err")
+    @pid = fork do
+      $stderr.reopen(tmp)
+      exec(*@cmd)
+    end
+    pre_kill # ensure workers are running
+    pids = children
+
+    # kill gently
+    pids.each do |pid|
+      begin
+        Process.kill(:QUIT, pid)
+        t_yield
+      rescue Errno::ESRCH
+        break
+      end while true
+    end
+
+    pre_kill # ensure workers are respawned
+
+    if pids[0]
+      p pids if $VERBOSE
+      new_pids = children
+      assert pids != new_pids, "new=#{new_pids.inspect} old=#{pids.inspect}"
+      pids.each do |pid|
+        assert_raises(Errno::ESRCH) { Process.kill(0, pid) }
+      end
+    end
+
+    # kill brutally
+    pids = children
+    pids.each do |pid|
+      begin
+        Process.kill(:KILL, pid)
+        t_yield
+      rescue Errno::ESRCH
+        break
+      end while true
+    end
+
+    pre_kill # ensure workers are respawned
+
+    if pids[0]
+      p pids if $VERBOSE
+      new_pids = children
+      assert pids != new_pids, "new=#{new_pids.inspect} old=#{pids.inspect}"
+      pids.each do |pid|
+        assert_raises(Errno::ESRCH) { Process.kill(0, pid) }
+      end
+    end
+
+    Process.kill(:QUIT, @pid)
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+    if new_pids[0]
+      new_pids.each do |pid|
+        assert_raises(Errno::ESRCH) { Process.kill(0, pid) }
+      end
+    end
+  end
+
   def test_config_file
     tmp = Tempfile.new("cmogstored-cfg-test")
     tmp.write("mgmtlisten = #@host:#@port\n")