about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--TODO1
-rw-r--r--cmogstored.c12
-rw-r--r--cmogstored.h6
-rw-r--r--fdmap.c23
-rw-r--r--http.c28
-rw-r--r--mgmt.c24
-rw-r--r--queue_loop.c32
-rw-r--r--svc.c1
-rw-r--r--test/graceful_quit.rb206
-rw-r--r--test/http.rb2
-rw-r--r--test/ruby.mk3
-rw-r--r--test/test_helper.rb4
12 files changed, 332 insertions, 10 deletions
diff --git a/TODO b/TODO
index 08f52b3..3b6a4b5 100644
--- a/TODO
+++ b/TODO
@@ -1,4 +1,3 @@
-* graceful shutdown
 * hot management interface
 * tunable thread counts
 * keepalive expiry/timeouts?
diff --git a/cmogstored.c b/cmogstored.c
index 2231224..1baad7d 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -290,6 +290,15 @@ static bool svc_quit_each(void *svcptr, void *ignored)
         return true;
 }
 
+static bool svc_queue_set(void *svcptr, void *queue)
+{
+        struct mog_svc *svc = svcptr;
+
+        svc->queue = queue;
+
+        return true;
+}
+
 _Noreturn static void cmogstored_exit(void)
 {
         /* call atexit() handlers and make valgrind happy */
@@ -297,6 +306,9 @@ _Noreturn static void cmogstored_exit(void)
         mog_svc_dev_shutdown();
         mog_queue_stop(mog_notify_queue);
         mog_svc_dev_shutdown();
+        mog_svc_each(svc_queue_set, mog_notify_queue);
+        mog_fdmap_requeue(mog_notify_queue);
+        mog_queue_quit_loop(mog_notify_queue);
         exit(EXIT_SUCCESS);
 }
 
diff --git a/cmogstored.h b/cmogstored.h
index 90af6fb..fa1ac3e 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -114,7 +114,6 @@ struct mog_mgmt {
 
 struct mog_queue;
 struct mog_svc {
-        int svc_alive;
         int docroot_fd;
         const char *docroot;
 
@@ -258,6 +257,8 @@ struct mog_fd {
 };
 struct mog_fd *mog_fd_get(int fd);
 void mog_fd_put(struct mog_fd *mfd);
+void mog_fdmap_requeue(struct mog_queue *quit_queue);
+extern size_t mog_nr_active_at_quit;
 #include "fdmap.h"
 
 /* alloc.c */
@@ -351,6 +352,7 @@ void mog_thrpool_quit(struct mog_thrpool *);
 void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt);
 void mog_mgmt_post_accept(int fd, struct mog_svc *);
 void mog_mgmt_queue_step(struct mog_fd *);
+void mog_mgmt_quit_step(struct mog_fd *);
 
 /* queue_epoll.c */
 struct mog_queue * mog_queue_new(void);
@@ -376,6 +378,7 @@ void mog_close(int fd);
 
 /* mog_queue_loop.c */
 void * mog_queue_loop(void *arg);
+void mog_queue_quit_loop(struct mog_queue *queue);
 
 /* queue_step.c */
 void mog_queue_step(struct mog_fd *mfd);
@@ -406,6 +409,7 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *);
 /* http.c */
 void mog_http_post_accept(int fd, struct mog_svc *);
 void mog_http_queue_step(struct mog_fd *);
+void mog_http_quit_step(struct mog_fd *);
 char *mog_http_path(struct mog_http *, char *buf);
 
 /* http_dav.c */
diff --git a/fdmap.c b/fdmap.c
index 565a8e9..80d86d5 100644
--- a/fdmap.c
+++ b/fdmap.c
@@ -3,6 +3,7 @@
  * License: GPLv3 or later (see COPYING for details)
  */
 #include "cmogstored.h"
+#include "activeq.h"
 #define FD_PAD_SIZE ((size_t)128)
 verify(sizeof(struct mog_fd) <= FD_PAD_SIZE);
 static int max_fd;
@@ -10,6 +11,7 @@ static size_t fd_heaps;
 static const size_t FD_PER_HEAP = 256;
 static unsigned char **fd_map;
 static pthread_mutex_t fd_lock = PTHREAD_MUTEX_INITIALIZER;
+size_t mog_nr_active_at_quit;
 
 static inline struct mog_fd *aref(size_t fd)
 {
@@ -95,3 +97,24 @@ void mog_fd_put(struct mog_fd *mfd)
         mfd->fd_type = MOG_FD_TYPE_UNUSED;
         mog_close(fd);
 }
+
+/* called during shutdown, no other threads are running when this is called */
+void mog_fdmap_requeue(struct mog_queue *quit_queue)
+{
+        int fd;
+        struct mog_fd *mfd;
+
+        for (fd = max_fd - 1; fd >= 0; fd--) {
+                mfd = aref(fd);
+                switch (mfd->fd_type) {
+                case MOG_FD_TYPE_MGMT:
+                case MOG_FD_TYPE_HTTP:
+                        mfd->queue_state = MOG_QUEUE_STATE_NEW;
+                        assert((mfd->in_queue = 0) == 0 && "in_queue check");
+                        mog_activeq_insert(quit_queue, mfd);
+                        mog_nr_active_at_quit++;
+                default:
+                        break;
+                }
+        }
+}
diff --git a/http.c b/http.c
index 681dec3..b689714 100644
--- a/http.c
+++ b/http.c
@@ -40,9 +40,6 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len)
 static void
 http_process_client(struct mog_http *http, char *buf, size_t buf_len)
 {
-        if (!http->svc->svc_alive)
-                http->persistent = 0;
-
         switch (http->http_method) {
         case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method");
         case MOG_HTTP_METHOD_GET: mog_http_get_open(http, buf, false); break;
@@ -205,6 +202,31 @@ void mog_http_queue_step(struct mog_fd *mfd)
         }
 }
 
+/* called during graceful shutdown instead of mog_http_queue_step */
+void mog_http_quit_step(struct mog_fd *mfd)
+{
+        struct mog_http *http = &mfd->as.http;
+        struct mog_queue *q = http->svc->queue;
+
+        /* centralize all queue transitions here: */
+        switch (http_queue_step(mfd)) {
+        case MOG_NEXT_WAIT_RD:
+                if (http->forward || http->rbuf) {
+                        mog_idleq_push(q, mfd, MOG_QEV_RD);
+                        return;
+                }
+                /* fall-through */
+        case MOG_NEXT_CLOSE:
+                mog_nr_active_at_quit--;
+                http_close(mfd);
+                return;
+        case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return;
+        case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return;
+        case MOG_NEXT_IGNORE:
+                assert(0 && "refused to put HTTP client into ignore state");
+        }
+}
+
 /* called immediately after accept(), this initializes the mfd (once) */
 void mog_http_post_accept(int fd, struct mog_svc *svc)
 {
diff --git a/mgmt.c b/mgmt.c
index 51c13c2..60036ec 100644
--- a/mgmt.c
+++ b/mgmt.c
@@ -216,6 +216,30 @@ void mog_mgmt_queue_step(struct mog_fd *mfd)
         }
 }
 
+/* called during graceful shutdown instead of mog_mgmt_queue_step */
+void mog_mgmt_quit_step(struct mog_fd *mfd)
+{
+        struct mog_mgmt *mgmt = &mfd->as.mgmt;
+        struct mog_queue *q = mgmt->svc->queue;
+
+        /* centralize all queue transitions here: */
+        switch (mgmt_queue_step(mfd)) {
+        case MOG_NEXT_WAIT_RD:
+                if (mgmt->forward || mgmt->rbuf) {
+                        mog_idleq_push(q, mfd, MOG_QEV_RD);
+                        return;
+                }
+                /* fall-through */
+        case MOG_NEXT_IGNORE: /* no new iostat watchers during shutdown */
+        case MOG_NEXT_CLOSE:
+                mog_nr_active_at_quit--;
+                mgmt_close(mfd);
+                return;
+        case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return;
+        case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return;
+        }
+}
+
 /* called immediately after accept(), this initializes the mfd (once) */
 void mog_mgmt_post_accept(int fd, struct mog_svc *svc)
 {
diff --git a/queue_loop.c b/queue_loop.c
index 117c0d6..3e788b9 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -85,3 +85,35 @@ void * mog_queue_loop(void *arg)
 
         return NULL;
 }
+
+static void queue_quit_step(struct mog_fd *mfd)
+{
+        switch (mfd->fd_type) {
+        case MOG_FD_TYPE_MGMT: mog_mgmt_quit_step(mfd); return;
+        case MOG_FD_TYPE_HTTP: mog_http_quit_step(mfd); return;
+        case MOG_FD_TYPE_FILE:
+        case MOG_FD_TYPE_QUEUE:
+        case MOG_FD_TYPE_SVC:
+                assert(0 && "invalid fd_type in queue_quit_step");
+        default:
+                break;
+        }
+}
+
+/* called at shutdown when only one thread is active */
+void mog_queue_quit_loop(struct mog_queue *queue)
+{
+        struct mog_fd *mfd;
+
+        while (mog_nr_active_at_quit) {
+                assert(mog_nr_active_at_quit <= (size_t)INT_MAX
+                       && "mog_nr_active_at_quit underflow");
+
+                if ((mfd = mog_activeq_trytake(queue))) {
+                        queue_quit_step(mfd);
+                } else {
+                        if ((mfd = mog_idleq_wait(queue, -1)))
+                                queue_quit_step(mfd);
+                }
+        }
+}
diff --git a/svc.c b/svc.c
index e1ed4ed..8f13657 100644
--- a/svc.c
+++ b/svc.c
@@ -86,7 +86,6 @@ struct mog_svc * mog_svc_new(const char *docroot)
         svc = xzalloc(sizeof(struct mog_svc));
         svc->http_fd = svc->httpget_fd = svc->mgmt_fd = -1;
         svc->docroot = docroot;
-        svc->svc_alive = 1;
         svc->docroot_fd = fd;
         svc->dir = dir;
         svc->put_perms = (~mog_umask) & 0666;
diff --git a/test/graceful_quit.rb b/test/graceful_quit.rb
new file mode 100644
index 0000000..9c26020
--- /dev/null
+++ b/test/graceful_quit.rb
@@ -0,0 +1,206 @@
+#!/usr/bin/env ruby
+# -*- encoding: binary -*-
+# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (see COPYING for details)
+require 'test/test_helper'
+require 'net/http'
+
+class TestGracefulQuit < Test::Unit::TestCase
+  def setup
+    @tmpdir = Dir.mktmpdir('cmogstored-graceful-quit-test')
+    @to_close = []
+    @host = TEST_HOST
+
+    srv = TCPServer.new(@host, 0)
+    @http = srv.addr[1]
+    srv.close
+
+    srv = TCPServer.new(@host, 0)
+    @mgmt= srv.addr[1]
+    srv.close
+
+    @err = Tempfile.new("stderr")
+    cmd = [ "cmogstored", "--docroot=#@tmpdir",
+            "--httplisten=#@host:#@http", "--mgmtlisten=#@host:#@mgmt",
+            "--maxconns=500" ]
+    vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd)
+    @pid = fork {
+      #$stderr.reopen(@err)
+      @err.close
+      exec(*cmd)
+    }
+  end
+
+  def wait_for_eof(client)
+    assert_raises(EOFError) do
+      begin
+        client.read_nonblock(666)
+      rescue Errno::EAGAIN
+      end while true
+    end
+  end
+
+  def teardown
+    @to_close.each { |io| io.close unless io.closed? }
+    FileUtils.rm_rf(@tmpdir)
+  end
+
+  def test_iostat_watcher_shutdown
+    client = get_client(666, @mgmt)
+    client.write "watch\n"
+    2.times { assert_kind_of String, client.gets }
+    Process.kill(:QUIT, @pid)
+
+    line = ""
+    10.times { line = client.gets or break }
+    assert_nil line
+    wait_for_eof(client)
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end if `which iostat 2>/dev/null`.chomp.size != 0 &&
+         RUBY_PLATFORM !~ /kfreebsd-gnu/
+
+  def test_http_get_huge_file
+    Dir.mkdir("#@tmpdir/dev666")
+    big = 100 * 1024 * 1024
+    File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp|
+      fp.seek(big - 1)
+      fp.write('.')
+    end
+    client = get_client(666, @http)
+    client.write("GET /dev666/sparse-file.fid HTTP/1.1\r\n" \
+                 "Host: example.com\r\n\r\n")
+    buf = client.readpartial(666)
+    _, body = buf.split(/\r\n\r\n/, 2)
+    Process.kill(:QUIT, @pid)
+
+    bytes = 0
+    buf = ""
+    begin
+      t_yield
+      client.readpartial(666666, buf)
+      bytes += buf.size
+    rescue EOFError
+      break
+    end while true
+    assert_equal bytes + body.bytesize, big
+    assert_raises(EOFError) { client.read_nonblock(666) }
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_http_trickle_get
+    client = get_client(666, @http)
+    client.write("GET /")
+    t_yield
+    Process.kill(:QUIT, @pid)
+    t_yield
+    client.write("dev666/sparse-file.fid HTTP/1.1\r\n")
+    t_yield
+    client.write("Host: example.com\r\n\r\n")
+    buf = client.readpartial(666)
+    head, body = buf.split(/\r\n\r\n/, 2)
+    assert_equal "", body
+    assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0]
+    wait_for_eof(client)
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_http_trickle_get_pipelined
+    client = get_client(666, @http)
+    client.write("GET /")
+    t_yield
+    Process.kill(:QUIT, @pid)
+    t_yield
+    client.write("dev666/trickle-get-pipelined.fid HTTP/1.1\r\n")
+    t_yield
+    client.write("Host: example.com\r\n\r\nGET /")
+
+    buf = client.readpartial(666)
+    head, body = buf.split(/\r\n\r\n/, 2)
+    assert_equal "", body
+    assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0]
+
+    client.write("dev666/trickle-get-pipelined.fid HTTP/1.1\r\n")
+    client.write("Host: example.com\r\n\r\n")
+
+    buf = client.readpartial(666)
+    head, body = buf.split(/\r\n\r\n/, 2)
+    assert_equal "", body
+    assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0]
+
+    wait_for_eof(client)
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_http_trickle_put
+    client = get_client(666, @http)
+    client.write("PUT /dev666/blarg.fid HTTP/1.1\r\n" \
+                 "Host: example.com\r\n" \
+                 "Content-Length: 5\r\n\r\n")
+    t_yield
+    Process.kill(:QUIT, @pid)
+    "hihi".each_byte do |x|
+      t_yield
+      client.write(x.chr)
+    end
+
+    t_yield
+    client.write("\nGET")
+
+    buf = client.readpartial(666)
+    head, body = buf.split(/\r\n\r\n/, 2)
+    assert_equal "", body
+    assert_equal "HTTP/1.1 201 Created", head.split(/\r\n/)[0]
+    assert_equal "hihi\n", File.read("#@tmpdir/dev666/blarg.fid")
+
+    client.write(" /dev666/blarg.fid HTTP/1.0\r\n\r\n")
+    buf = client.read
+    head, body = buf.split(/\r\n\r\n/, 2)
+    assert_equal "hihi\n", body
+    assert_equal "HTTP/1.1 200 OK", head.split(/\r\n/)[0]
+
+    wait_for_eof(client)
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_mgmt_md5_huge_file
+    Dir.mkdir("#@tmpdir/dev666")
+    big = 100 * 1024 * 1024
+    File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp|
+      fp.seek(big - 1)
+      fp.write('.')
+    end
+    client = get_client(666, @mgmt)
+    client.write("MD5 /dev666/sparse-file.fid\r\n")
+    t_yield
+    Process.kill(:QUIT, @pid)
+    t_yield
+
+    buf = client.gets
+    assert_nil client.gets
+    expect = "/dev666/sparse-file.fid MD5=56bb745c25f52e8378a9ca49b7cfd27f\r\n"
+    assert_equal expect, buf
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_mgmt_size_trickle
+    client = get_client(666, @mgmt)
+    client.write("size ")
+    t_yield
+    Process.kill(:QUIT, @pid)
+    t_yield
+    client.write("/dev666/missing.fid\r\n")
+
+    buf = client.gets
+    assert_nil client.gets
+    assert_equal "/dev666/missing.fid -1\r\n", buf
+
+    _, status = Process.waitpid2(@pid)
+    assert status.success?, status.inspect
+  end
+end
diff --git a/test/http.rb b/test/http.rb
index f5a19de..ad21af4 100644
--- a/test/http.rb
+++ b/test/http.rb
@@ -74,6 +74,7 @@ class TestHTTP < Test::Unit::TestCase
   end
 
   def test_get_huge
+    return unless IO.respond_to?(:copy_stream)
     Dir.mkdir("#@tmpdir/dev666")
     big = 100 * 1024 * 1024
     File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp|
@@ -84,7 +85,6 @@ class TestHTTP < Test::Unit::TestCase
     buf = @client.readpartial(600)
     _, body = buf.split(/\r\n\r\n/, 2)
 
-    return unless IO.respond_to?(:copy_stream)
     bytes = IO.copy_stream(@client, "/dev/null")
     assert_equal bytes + body.bytesize, big
   end
diff --git a/test/ruby.mk b/test/ruby.mk
index 2a399d6..c511f57 100644
--- a/test/ruby.mk
+++ b/test/ruby.mk
@@ -1,5 +1,6 @@
 RB_TESTS_FAST = test/cmogstored-cfg.rb test/http_dav.rb test/http_range.rb \
   test/http_put.rb
 RB_TESTS_SLOW = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \
- test/http.rb test/http_put_slow.rb test/http_chunked_put.rb
+ test/http.rb test/http_put_slow.rb test/http_chunked_put.rb \
+ test/graceful_quit.rb
 RB_TESTS = $(RB_TESTS_FAST) $(RB_TESTS_SLOW)
diff --git a/test/test_helper.rb b/test/test_helper.rb
index a040054..9cdb3fd 100644
--- a/test/test_helper.rb
+++ b/test/test_helper.rb
@@ -25,9 +25,9 @@ def t_yield
   sleep 0.015
 end
 
-def get_client(tries = 300)
+def get_client(tries = 300, port = @port)
   begin
-    c = TCPSocket.new(@host, @port)
+    c = TCPSocket.new(@host, port)
     @to_close << c
     return c
   rescue