diff options
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | cmogstored.c | 12 | ||||
-rw-r--r-- | cmogstored.h | 6 | ||||
-rw-r--r-- | fdmap.c | 23 | ||||
-rw-r--r-- | http.c | 28 | ||||
-rw-r--r-- | mgmt.c | 24 | ||||
-rw-r--r-- | queue_loop.c | 32 | ||||
-rw-r--r-- | svc.c | 1 | ||||
-rw-r--r-- | test/graceful_quit.rb | 206 | ||||
-rw-r--r-- | test/http.rb | 2 | ||||
-rw-r--r-- | test/ruby.mk | 3 | ||||
-rw-r--r-- | test/test_helper.rb | 4 |
12 files changed, 332 insertions, 10 deletions
@@ -1,4 +1,3 @@ -* graceful shutdown * hot management interface * tunable thread counts * keepalive expiry/timeouts? diff --git a/cmogstored.c b/cmogstored.c index 2231224..1baad7d 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -290,6 +290,15 @@ static bool svc_quit_each(void *svcptr, void *ignored) return true; } +static bool svc_queue_set(void *svcptr, void *queue) +{ + struct mog_svc *svc = svcptr; + + svc->queue = queue; + + return true; +} + _Noreturn static void cmogstored_exit(void) { /* call atexit() handlers and make valgrind happy */ @@ -297,6 +306,9 @@ _Noreturn static void cmogstored_exit(void) mog_svc_dev_shutdown(); mog_queue_stop(mog_notify_queue); mog_svc_dev_shutdown(); + mog_svc_each(svc_queue_set, mog_notify_queue); + mog_fdmap_requeue(mog_notify_queue); + mog_queue_quit_loop(mog_notify_queue); exit(EXIT_SUCCESS); } diff --git a/cmogstored.h b/cmogstored.h index 90af6fb..fa1ac3e 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -114,7 +114,6 @@ struct mog_mgmt { struct mog_queue; struct mog_svc { - int svc_alive; int docroot_fd; const char *docroot; @@ -258,6 +257,8 @@ struct mog_fd { }; struct mog_fd *mog_fd_get(int fd); void mog_fd_put(struct mog_fd *mfd); +void mog_fdmap_requeue(struct mog_queue *quit_queue); +extern size_t mog_nr_active_at_quit; #include "fdmap.h" /* alloc.c */ @@ -351,6 +352,7 @@ void mog_thrpool_quit(struct mog_thrpool *); void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); void mog_mgmt_post_accept(int fd, struct mog_svc *); void mog_mgmt_queue_step(struct mog_fd *); +void mog_mgmt_quit_step(struct mog_fd *); /* queue_epoll.c */ struct mog_queue * mog_queue_new(void); @@ -376,6 +378,7 @@ void mog_close(int fd); /* mog_queue_loop.c */ void * mog_queue_loop(void *arg); +void mog_queue_quit_loop(struct mog_queue *queue); /* queue_step.c */ void mog_queue_step(struct mog_fd *mfd); @@ -406,6 +409,7 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *); /* http.c */ void mog_http_post_accept(int fd, struct mog_svc *); void mog_http_queue_step(struct mog_fd *); +void mog_http_quit_step(struct mog_fd *); char *mog_http_path(struct mog_http *, char *buf); /* http_dav.c */ @@ -3,6 +3,7 @@ * License: GPLv3 or later (see COPYING for details) */ #include "cmogstored.h" +#include "activeq.h" #define FD_PAD_SIZE ((size_t)128) verify(sizeof(struct mog_fd) <= FD_PAD_SIZE); static int max_fd; @@ -10,6 +11,7 @@ static size_t fd_heaps; static const size_t FD_PER_HEAP = 256; static unsigned char **fd_map; static pthread_mutex_t fd_lock = PTHREAD_MUTEX_INITIALIZER; +size_t mog_nr_active_at_quit; static inline struct mog_fd *aref(size_t fd) { @@ -95,3 +97,24 @@ void mog_fd_put(struct mog_fd *mfd) mfd->fd_type = MOG_FD_TYPE_UNUSED; mog_close(fd); } + +/* called during shutdown, no other threads are running when this is called */ +void mog_fdmap_requeue(struct mog_queue *quit_queue) +{ + int fd; + struct mog_fd *mfd; + + for (fd = max_fd - 1; fd >= 0; fd--) { + mfd = aref(fd); + switch (mfd->fd_type) { + case MOG_FD_TYPE_MGMT: + case MOG_FD_TYPE_HTTP: + mfd->queue_state = MOG_QUEUE_STATE_NEW; + assert((mfd->in_queue = 0) == 0 && "in_queue check"); + mog_activeq_insert(quit_queue, mfd); + mog_nr_active_at_quit++; + default: + break; + } + } +} @@ -40,9 +40,6 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len) static void http_process_client(struct mog_http *http, char *buf, size_t buf_len) { - if (!http->svc->svc_alive) - http->persistent = 0; - switch (http->http_method) { case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method"); case MOG_HTTP_METHOD_GET: mog_http_get_open(http, buf, false); break; @@ -205,6 +202,31 @@ void mog_http_queue_step(struct mog_fd *mfd) } } +/* called during graceful shutdown instead of mog_http_queue_step */ +void mog_http_quit_step(struct mog_fd *mfd) +{ + struct mog_http *http = &mfd->as.http; + struct mog_queue *q = http->svc->queue; + + /* centralize all queue transitions here: */ + switch (http_queue_step(mfd)) { + case MOG_NEXT_WAIT_RD: + if (http->forward || http->rbuf) { + mog_idleq_push(q, mfd, MOG_QEV_RD); + return; + } + /* fall-through */ + case MOG_NEXT_CLOSE: + mog_nr_active_at_quit--; + http_close(mfd); + return; + case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; + case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; + case MOG_NEXT_IGNORE: + assert(0 && "refused to put HTTP client into ignore state"); + } +} + /* called immediately after accept(), this initializes the mfd (once) */ void mog_http_post_accept(int fd, struct mog_svc *svc) { @@ -216,6 +216,30 @@ void mog_mgmt_queue_step(struct mog_fd *mfd) } } +/* called during graceful shutdown instead of mog_mgmt_queue_step */ +void mog_mgmt_quit_step(struct mog_fd *mfd) +{ + struct mog_mgmt *mgmt = &mfd->as.mgmt; + struct mog_queue *q = mgmt->svc->queue; + + /* centralize all queue transitions here: */ + switch (mgmt_queue_step(mfd)) { + case MOG_NEXT_WAIT_RD: + if (mgmt->forward || mgmt->rbuf) { + mog_idleq_push(q, mfd, MOG_QEV_RD); + return; + } + /* fall-through */ + case MOG_NEXT_IGNORE: /* no new iostat watchers during shutdown */ + case MOG_NEXT_CLOSE: + mog_nr_active_at_quit--; + mgmt_close(mfd); + return; + case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; + case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; + } +} + /* called immediately after accept(), this initializes the mfd (once) */ void mog_mgmt_post_accept(int fd, struct mog_svc *svc) { diff --git a/queue_loop.c b/queue_loop.c index 117c0d6..3e788b9 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -85,3 +85,35 @@ void * mog_queue_loop(void *arg) return NULL; } + +static void queue_quit_step(struct mog_fd *mfd) +{ + switch (mfd->fd_type) { + case MOG_FD_TYPE_MGMT: mog_mgmt_quit_step(mfd); return; + case MOG_FD_TYPE_HTTP: mog_http_quit_step(mfd); return; + case MOG_FD_TYPE_FILE: + case MOG_FD_TYPE_QUEUE: + case MOG_FD_TYPE_SVC: + assert(0 && "invalid fd_type in queue_quit_step"); + default: + break; + } +} + +/* called at shutdown when only one thread is active */ +void mog_queue_quit_loop(struct mog_queue *queue) +{ + struct mog_fd *mfd; + + while (mog_nr_active_at_quit) { + assert(mog_nr_active_at_quit <= (size_t)INT_MAX + && "mog_nr_active_at_quit underflow"); + + if ((mfd = mog_activeq_trytake(queue))) { + queue_quit_step(mfd); + } else { + if ((mfd = mog_idleq_wait(queue, -1))) + queue_quit_step(mfd); + } + } +} @@ -86,7 +86,6 @@ struct mog_svc * mog_svc_new(const char *docroot) svc = xzalloc(sizeof(struct mog_svc)); svc->http_fd = svc->httpget_fd = svc->mgmt_fd = -1; svc->docroot = docroot; - svc->svc_alive = 1; svc->docroot_fd = fd; svc->dir = dir; svc->put_perms = (~mog_umask) & 0666; diff --git a/test/graceful_quit.rb b/test/graceful_quit.rb new file mode 100644 index 0000000..9c26020 --- /dev/null +++ b/test/graceful_quit.rb @@ -0,0 +1,206 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/test_helper' +require 'net/http' + +class TestGracefulQuit < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-graceful-quit-test') + @to_close = [] + @host = TEST_HOST + + srv = TCPServer.new(@host, 0) + @http = srv.addr[1] + srv.close + + srv = TCPServer.new(@host, 0) + @mgmt= srv.addr[1] + srv.close + + @err = Tempfile.new("stderr") + cmd = [ "cmogstored", "--docroot=#@tmpdir", + "--httplisten=#@host:#@http", "--mgmtlisten=#@host:#@mgmt", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @pid = fork { + #$stderr.reopen(@err) + @err.close + exec(*cmd) + } + end + + def wait_for_eof(client) + assert_raises(EOFError) do + begin + client.read_nonblock(666) + rescue Errno::EAGAIN + end while true + end + end + + def teardown + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + end + + def test_iostat_watcher_shutdown + client = get_client(666, @mgmt) + client.write "watch\n" + 2.times { assert_kind_of String, client.gets } + Process.kill(:QUIT, @pid) + + line = "" + 10.times { line = client.gets or break } + assert_nil line + wait_for_eof(client) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end if `which iostat 2>/dev/null`.chomp.size != 0 && + RUBY_PLATFORM !~ /kfreebsd-gnu/ + + def test_http_get_huge_file + Dir.mkdir("#@tmpdir/dev666") + big = 100 * 1024 * 1024 + File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp| + fp.seek(big - 1) + fp.write('.') + end + client = get_client(666, @http) + client.write("GET /dev666/sparse-file.fid HTTP/1.1\r\n" \ + "Host: example.com\r\n\r\n") + buf = client.readpartial(666) + _, body = buf.split(/\r\n\r\n/, 2) + Process.kill(:QUIT, @pid) + + bytes = 0 + buf = "" + begin + t_yield + client.readpartial(666666, buf) + bytes += buf.size + rescue EOFError + break + end while true + assert_equal bytes + body.bytesize, big + assert_raises(EOFError) { client.read_nonblock(666) } + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_http_trickle_get + client = get_client(666, @http) + client.write("GET /") + t_yield + Process.kill(:QUIT, @pid) + t_yield + client.write("dev666/sparse-file.fid HTTP/1.1\r\n") + t_yield + client.write("Host: example.com\r\n\r\n") + buf = client.readpartial(666) + head, body = buf.split(/\r\n\r\n/, 2) + assert_equal "", body + assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0] + wait_for_eof(client) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_http_trickle_get_pipelined + client = get_client(666, @http) + client.write("GET /") + t_yield + Process.kill(:QUIT, @pid) + t_yield + client.write("dev666/trickle-get-pipelined.fid HTTP/1.1\r\n") + t_yield + client.write("Host: example.com\r\n\r\nGET /") + + buf = client.readpartial(666) + head, body = buf.split(/\r\n\r\n/, 2) + assert_equal "", body + assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0] + + client.write("dev666/trickle-get-pipelined.fid HTTP/1.1\r\n") + client.write("Host: example.com\r\n\r\n") + + buf = client.readpartial(666) + head, body = buf.split(/\r\n\r\n/, 2) + assert_equal "", body + assert_equal "HTTP/1.1 404 Not Found", head.split(/\r\n/)[0] + + wait_for_eof(client) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_http_trickle_put + client = get_client(666, @http) + client.write("PUT /dev666/blarg.fid HTTP/1.1\r\n" \ + "Host: example.com\r\n" \ + "Content-Length: 5\r\n\r\n") + t_yield + Process.kill(:QUIT, @pid) + "hihi".each_byte do |x| + t_yield + client.write(x.chr) + end + + t_yield + client.write("\nGET") + + buf = client.readpartial(666) + head, body = buf.split(/\r\n\r\n/, 2) + assert_equal "", body + assert_equal "HTTP/1.1 201 Created", head.split(/\r\n/)[0] + assert_equal "hihi\n", File.read("#@tmpdir/dev666/blarg.fid") + + client.write(" /dev666/blarg.fid HTTP/1.0\r\n\r\n") + buf = client.read + head, body = buf.split(/\r\n\r\n/, 2) + assert_equal "hihi\n", body + assert_equal "HTTP/1.1 200 OK", head.split(/\r\n/)[0] + + wait_for_eof(client) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_mgmt_md5_huge_file + Dir.mkdir("#@tmpdir/dev666") + big = 100 * 1024 * 1024 + File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp| + fp.seek(big - 1) + fp.write('.') + end + client = get_client(666, @mgmt) + client.write("MD5 /dev666/sparse-file.fid\r\n") + t_yield + Process.kill(:QUIT, @pid) + t_yield + + buf = client.gets + assert_nil client.gets + expect = "/dev666/sparse-file.fid MD5=56bb745c25f52e8378a9ca49b7cfd27f\r\n" + assert_equal expect, buf + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_mgmt_size_trickle + client = get_client(666, @mgmt) + client.write("size ") + t_yield + Process.kill(:QUIT, @pid) + t_yield + client.write("/dev666/missing.fid\r\n") + + buf = client.gets + assert_nil client.gets + assert_equal "/dev666/missing.fid -1\r\n", buf + + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end +end diff --git a/test/http.rb b/test/http.rb index f5a19de..ad21af4 100644 --- a/test/http.rb +++ b/test/http.rb @@ -74,6 +74,7 @@ class TestHTTP < Test::Unit::TestCase end def test_get_huge + return unless IO.respond_to?(:copy_stream) Dir.mkdir("#@tmpdir/dev666") big = 100 * 1024 * 1024 File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp| @@ -84,7 +85,6 @@ class TestHTTP < Test::Unit::TestCase buf = @client.readpartial(600) _, body = buf.split(/\r\n\r\n/, 2) - return unless IO.respond_to?(:copy_stream) bytes = IO.copy_stream(@client, "/dev/null") assert_equal bytes + body.bytesize, big end diff --git a/test/ruby.mk b/test/ruby.mk index 2a399d6..c511f57 100644 --- a/test/ruby.mk +++ b/test/ruby.mk @@ -1,5 +1,6 @@ RB_TESTS_FAST = test/cmogstored-cfg.rb test/http_dav.rb test/http_range.rb \ test/http_put.rb RB_TESTS_SLOW = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \ - test/http.rb test/http_put_slow.rb test/http_chunked_put.rb + test/http.rb test/http_put_slow.rb test/http_chunked_put.rb \ + test/graceful_quit.rb RB_TESTS = $(RB_TESTS_FAST) $(RB_TESTS_SLOW) diff --git a/test/test_helper.rb b/test/test_helper.rb index a040054..9cdb3fd 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -25,9 +25,9 @@ def t_yield sleep 0.015 end -def get_client(tries = 300) +def get_client(tries = 300, port = @port) begin - c = TCPSocket.new(@host, @port) + c = TCPSocket.new(@host, port) @to_close << c return c rescue |