about summary refs log tree commit homepage
path: root/cmogstored.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-03-14 16:02:25 -0700
committerEric Wong <normalperson@yhbt.net>2012-03-14 23:35:47 +0000
commit610005513249eea5da18e03defc91bbf295ec3bf (patch)
tree1e4f5da2a79b4b9e313d5621dc11345d1e96b526 /cmogstored.c
parentccdea5e664f71080578d0e188273a136e3498e42 (diff)
downloadcmogstored-610005513249eea5da18e03defc91bbf295ec3bf.tar.gz
This setting this to a positive value ensures we stay running if
there are any remotely triggerable crashes.  Hopefully users
will still send (good) bug reports in this case so we can
fix them.

We may also be able to use this feature to reduce unavoidable
contention in some places, too:

* kernel FD table
* epoll/kqueue descriptor
* global active queue
* malloc()
Diffstat (limited to 'cmogstored.c')
-rw-r--r--cmogstored.c204
1 files changed, 179 insertions, 25 deletions
diff --git a/cmogstored.c b/cmogstored.c
index f58b816..e7e7d50 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -12,6 +12,12 @@ const char *argp_program_version = THIS" "PACKAGE_VERSION;
 static sig_atomic_t sigchld_nr;
 static sig_atomic_t do_exit;
 static size_t nthr;
+static unsigned long worker_processes;
+static Hash_table *workers;
+struct worker {
+        pid_t pid;
+        unsigned long id;
+};
 
 #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1)
 static struct argp_option options[] = {
@@ -43,6 +49,11 @@ static struct argp_option options[] = {
           /* hidden for now, don't break compat with Perl mogstored */
           .name = "multi", .key = 'M', .flags = OPTION_HIDDEN
         },
+        {
+          /* hidden for now, don't break compat with Perl mogstored */
+          .name = "worker-processes", .key = 'W', .flags = OPTION_HIDDEN,
+          .arg = "<number>",
+        },
         { NULL }
 };
 
@@ -57,17 +68,17 @@ static void new_cfg_or_die(const char *config)
             config, errno ? strerror(errno) : "parser error");
 }
 
-static void maxconns_or_die(struct mog_cfg *cfg, const char *s)
+static void check_strtoul(unsigned long *dst, const char *s, const char *key)
 {
         char *end;
 
         errno = 0;
-        cfg->maxconns = strtoul(s, &end, 10);
+        *dst = strtoul(s, &end, 10);
         if (errno)
-                die("failed to parse maxconns=%s (%s)\n", s, strerror(errno));
+                die("failed to parse --%s=%s (%s)\n", key, s, strerror(errno));
         if (*end)
-                die("failed to parse maxconns=%s (invalid character: %c)\n",
-                    s, *end);
+                die("failed to parse --%s=%s (invalid character: %c)\n",
+                    key, s, *end);
 }
 
 static void addr_or_die(struct mog_addrinfo **dst, const char *key, char *s)
@@ -87,7 +98,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
         case CFG_KEY(docroot): cfg->docroot = xstrdup(arg); break;
         case CFG_KEY(pidfile): cfg->pidfile = xstrdup(arg); break;
         case CFG_KEY(configfile): new_cfg_or_die(arg); break;
-        case CFG_KEY(maxconns): maxconns_or_die(cfg, arg); break;
+        case CFG_KEY(maxconns):
+                check_strtoul(&cfg->maxconns, arg, "maxconns");
+                break;
         case CFG_KEY(httplisten):
                 addr_or_die(&cfg->httplisten, "httplisten", arg);
                 break;
@@ -99,6 +112,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
                 mog_cfg_check_server(cfg);
                 break;
         case 'M': mog_cfg_multi = true; break;
+        case 'W':
+                check_strtoul(&worker_processes, arg, "worker-processes");
+                break;
         case ARGP_KEY_ARG:
                 argp_usage(state);
         case ARGP_KEY_END:
@@ -173,8 +189,6 @@ static void daemonize(void)
         if (pid > 0) /* intermediate parent */
                 exit(EXIT_SUCCESS);
 
-        mog_notify_init(); /* this must be done post-fork() for kqueue */
-
         if (chdir("/") < 0)
                 die("chdir(/) failed: %s", strerror(errno));
         fd = open("/dev/null", O_RDWR);
@@ -196,12 +210,17 @@ static void daemonize(void)
         mog_close(ready_pipe[1]);
 }
 
+#ifndef LOG_PERROR
+# define LOG_PERROR 0
+#endif
+
 MOG_NOINLINE static void setup(int argc, char *argv[])
 {
         int pid_fd = -1;
         static struct argp argp = { options, parse_opt, NULL, summary };
         int mask = 0;
 
+        mog_mnt_refresh();
         argp_parse(&argp, argc, argv, 0, NULL, &mog_cli);
         mog_cfg_validate_or_die(&mog_cli);
         mog_cfg_svc_start_or_die(&mog_cli);
@@ -210,7 +229,12 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         /* TODO: make logging configurable */
         {
-                openlog(THIS, LOG_ODELAY|LOG_PID, LOG_DAEMON);
+                int option = LOG_PID;
+
+                if (!mog_cli.daemonize)
+                        option |= LOG_PERROR;
+
+                openlog(THIS, option, LOG_DAEMON);
                 mask |= LOG_MASK(LOG_EMERG);
                 mask |= LOG_MASK(LOG_ALERT);
                 mask |= LOG_MASK(LOG_CRIT);
@@ -224,8 +248,6 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         if (mog_cli.daemonize)
                 daemonize();
-        else
-                mog_notify_init();
 
         if (pid_fd >= 0 && mog_pidfile_commit(pid_fd) < 0)
                 syslog(LOG_ERR,
@@ -313,7 +335,7 @@ _Noreturn static void cmogstored_exit(void)
         exit(EXIT_SUCCESS);
 }
 
-static void cmogstored_wakeup_handler(int signum)
+static void worker_wakeup_handler(int signum)
 {
         switch (signum) {
         case SIGCHLD: ++sigchld_nr; break;
@@ -325,7 +347,32 @@ static void cmogstored_wakeup_handler(int signum)
         mog_notify(MOG_NOTIFY_SIGNAL);
 }
 
-static void cmogstored_siginit(void)
+static bool kill_child(void *workent, void *ignored)
+{
+        struct worker *worker = workent;
+
+        if (kill(worker->pid, do_exit) == 0)
+                return true;
+
+        /* race between receiving a signal and waitpid2, ignore */
+        if (errno == ESRCH)
+                return true;
+        syslog(LOG_ERR, "could not signal worker[%lu] pid=%d: %m",
+               worker->id, (int)worker->pid);
+        return true;
+}
+
+static void master_wakeup_handler(int signum)
+{
+        switch (signum) {
+        case SIGQUIT:
+        case SIGTERM:
+        case SIGINT:
+                do_exit = signum;
+        }
+}
+
+static void siginit(void (*wakeup_handler)(int))
 {
         struct sigaction sa;
 
@@ -335,7 +382,7 @@ static void cmogstored_siginit(void)
         sa.sa_handler = SIG_IGN;
         CHECK(int, 0, sigaction(SIGPIPE, &sa, NULL));
 
-        sa.sa_handler = cmogstored_wakeup_handler;
+        sa.sa_handler = wakeup_handler;
         CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
         CHECK(int, 0, sigaction(SIGINT, &sa, NULL));
 
@@ -349,9 +396,9 @@ static void cmogstored_siginit(void)
         CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
 }
 
-static void main_loop(struct mog_queue *q)
+static void main_loop(struct mog_queue *q, const pid_t parent)
 {
-        for (;;) {
+        while (parent == 0 || parent == getppid()) {
                 mog_notify_wait();
                 if (do_exit)
                         cmogstored_exit();
@@ -361,22 +408,129 @@ static void main_loop(struct mog_queue *q)
                         sigchld_nr = 0;
                 }
         }
+
+        syslog(LOG_INFO, "parent=%d abandoned us, dying", parent);
+        cmogstored_exit();
 }
 
-int main(int argc, char *argv[])
+static void run_worker(const pid_t parent)
 {
-        struct mog_queue *q;
+        struct mog_queue *q = mog_queue_new();
 
-        mog_mnt_refresh();
-        setup(argc, argv);
-
-        q = mog_queue_new();
-        mog_intr_disable();
-        cmogstored_siginit();
+        mog_notify_init();
+        siginit(worker_wakeup_handler);
         mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);
         mog_svc_each(svc_start_each, q);
         mog_iostat_spawn();
-        main_loop(q);
+        main_loop(q, parent);
+}
+
+static bool worker_cmp(const void *a, const void *b)
+{
+        const struct worker *wa = a;
+        const struct worker *wb = b;
+
+        return wa->pid == wb->pid;
+}
+
+static size_t worker_hash(const void *x, size_t tablesize)
+{
+        const struct worker *w = x;
+
+        return w->pid % tablesize;
+}
+
+static void fork_worker(unsigned long worker_id)
+{
+        struct worker *worker;
+        pid_t pid;
+        pid_t parent = getpid(); /* not using getppid() since it's racy */
+
+        mog_intr_disable();
+        pid = fork();
+        if (pid > 0) {
+                worker = xmalloc(sizeof(struct worker));
+                worker->pid = pid;
+                worker->id = worker_id;
+                if (hash_insert(workers, worker) == NULL)
+                        mog_oom();
+                mog_intr_enable();
+        } else if (pid == 0) {
+                /* worker will call mog_intr_enable() later in notify loop */
+                hash_free(workers);
+                run_worker(parent);
+                exit(EXIT_SUCCESS);
+        } else {
+                PRESERVE_ERRNO( mog_intr_enable() );
+                syslog(LOG_ERR, "fork() failed: %m, sleeping for 10s");
+                sleep(10);
+        }
+}
+
+static void worker_died(int status, struct worker *tmp)
+{
+        struct worker *died = hash_delete(workers, tmp);
+
+        /*
+         * we got an unexpected pid, just ignore it, it could
+         * be a reparented iostat at shutdown
+         */
+        if (died == NULL)
+                return;
+
+        tmp->id = died->id;
+        free(died);
+
+        if (do_exit)
+                return;
+
+        syslog(LOG_INFO,
+               "worker[%lu] pid=%d died with status=%d, respawning",
+               tmp->id, (int)tmp->pid, status);
+        fork_worker(tmp->id);
+}
+
+static void run_master(void)
+{
+        unsigned long id;
+
+        workers = hash_initialize(worker_processes, NULL,
+                                      worker_hash, worker_cmp, free);
+
+        siginit(master_wakeup_handler);
+
+        for (id = 0; id < worker_processes; id++)
+                fork_worker(id);
+
+        while (! do_exit || hash_get_n_entries(workers) > 0) {
+                int status;
+                struct worker tmp;
+
+                tmp.pid = waitpid(-1, &status, 0);
+
+                if (tmp.pid > 0)
+                        worker_died(status, &tmp);
+                if (tmp.pid < 0 && errno != EINTR)
+                        syslog(LOG_WARNING, "waitpid failed: %m");
+
+                if (do_exit && do_exit != SIGCONT) {
+                        hash_do_for_each(workers, kill_child, NULL);
+                        do_exit = SIGCONT; /* only kill once */
+                }
+        }
+        hash_free(workers);
+}
+
+int main(int argc, char *argv[])
+{
+        setup(argc, argv); /* this daemonizes */
+
+        if (worker_processes == 0) {
+                mog_intr_disable();
+                run_worker(0);
+        } else {
+                run_master();
+        }
 
         return 0;
 }