From 6b45526295eb68313075c19d66b86e6a524bc0a3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 28 Feb 2013 09:00:50 +0000 Subject: mgmt: check for client death during fsck checksumming Clients may start an fsck checksum request and not be around to read the response. So detect client death and abort checksumming if we have a dead socket. This is not extensively tested and may be overkill. --- Makefile.am | 1 + cmogstored.h | 3 +++ digest.h | 3 ++- mgmt.c | 13 +++++++++++-- socket_alive.c | 39 +++++++++++++++++++++++++++++++++++++++ test/mgmt.rb | 53 ++++++++++++++++++++++++++++++++++------------------- 6 files changed, 90 insertions(+), 22 deletions(-) create mode 100644 socket_alive.c diff --git a/Makefile.am b/Makefile.am index 6fa520c..862cd53 100644 --- a/Makefile.am +++ b/Makefile.am @@ -81,6 +81,7 @@ mog_src += queue_step.c mog_src += selfwake.h mog_src += selfwake.c mog_src += sig.c +mog_src += socket_alive.c mog_src += svc.c mog_src += svc_dev.c mog_src += thrpool.c diff --git a/cmogstored.h b/cmogstored.h index e72c071..84d3b69 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -387,6 +387,9 @@ void mog_mgmt_post_accept(int fd, struct mog_svc *); enum mog_next mog_mgmt_queue_step(struct mog_fd *) MOG_CHECK; void mog_mgmt_quit_step(struct mog_fd *); +/* socket_alive.c */ +bool mog_socket_alive(struct mog_fd *); + /* queue_epoll.c */ struct mog_queue * mog_queue_new(void); void mog_idleq_add(struct mog_queue *, struct mog_fd *, enum mog_qev); diff --git a/digest.h b/digest.h index bef35b8..a013efa 100644 --- a/digest.h +++ b/digest.h @@ -5,7 +5,8 @@ enum mog_digest_next { MOG_DIGEST_CONTINUE = 0, MOG_DIGEST_EOF, - MOG_DIGEST_ERROR + MOG_DIGEST_ERROR, + MOG_DIGEST_ABORT }; /* XXX gc_hash_handle is a typedef which hides a pointer, ugh... */ diff --git a/mgmt.c b/mgmt.c index b6b2aa5..5726ab7 100644 --- a/mgmt.c +++ b/mgmt.c @@ -12,6 +12,7 @@ static void mgmt_digest_step(struct mog_fd *mfd) struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_fd *fmfd = mgmt->forward; enum mog_digest_next next; + struct mog_digest *digest = &fmfd->as.file.digest; /* * MOG_PRIO_FSCK means we're likely the _only_ thread handling @@ -21,13 +22,16 @@ static void mgmt_digest_step(struct mog_fd *mfd) int ioprio = mog_ioprio_drop(); do { - next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd); + if (mog_socket_alive(mfd)) + next = mog_digest_read(digest, fmfd->fd); + else + next = MOG_DIGEST_ABORT; } while (next == MOG_DIGEST_CONTINUE); if (ioprio != -1) mog_ioprio_restore(ioprio); } else { - next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd); + next = mog_digest_read(digest, fmfd->fd); } assert(mgmt->wbuf == NULL && "wbuf should be NULL here"); @@ -39,6 +43,11 @@ static void mgmt_digest_step(struct mog_fd *mfd) break; case MOG_DIGEST_ERROR: mog_mgmt_fn_digest_err(mgmt); + break; + case MOG_DIGEST_ABORT: + syslog(LOG_ERR, "fd=%d aborted while waiting for fsck", + mfd->fd); + mgmt->wbuf = MOG_WR_ERROR; /* fake the error */ } if (mgmt->prio == MOG_PRIO_FSCK) diff --git a/socket_alive.c b/socket_alive.c new file mode 100644 index 0000000..37e52a1 --- /dev/null +++ b/socket_alive.c @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2012-2013, Eric Wong + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* + * check for POLLERR/POLLHUP on the socket to see if it died while + * we were in the queue. This can save us from expensive processing. + */ +bool mog_socket_alive(struct mog_fd *mfd) +{ + static const short dead_events = POLLERR | POLLHUP; + struct pollfd fds = { .fd = mfd->fd, .events = POLLIN }; + int rc; + int nbytes; + + do { + rc = poll(&fds, 1, 0); + } while (rc < 0 && errno == EINTR); + + if (rc == 1) { + if (fds.revents & POLLIN) { + CHECK(int, 0, ioctl(mfd->fd, FIONREAD, &nbytes)); + return (nbytes <= 0) ? false : true; + } + return (fds.revents & dead_events) ? false : true; + } + if (rc == 0) + return true; + + assert(rc < 0 && "poll returned unexpected value"); + + if (errno == ENOMEM || errno == EAGAIN) + return false; /* kernel is in trouble, abort */ + + assert(0 && "poll usage bug?"); + return false; +} diff --git a/test/mgmt.rb b/test/mgmt.rb index 373cd69..1086f56 100644 --- a/test/mgmt.rb +++ b/test/mgmt.rb @@ -4,6 +4,7 @@ # License: GPLv3 or later (see COPYING for details) require 'test/test_helper' require 'digest/md5' +require 'timeout' class TestMgmt < Test::Unit::TestCase def setup @@ -238,28 +239,42 @@ class TestMgmt < Test::Unit::TestCase # ensure aborted requests do not trigger failure in graceful shutdown def test_concurrent_md5_fsck_abort sparse_file_prepare + nabort = 5 File.open("#@tmpdir/dev666/sparse-file.fid") do |fp| - if fp.respond_to?(:advise) - # clear the cache - fp.advise(:dontneed) - req = "MD5 /dev666/sparse-file.fid fsck\r\n" - starter = get_client - clients = (1..5).map { get_client } - - starter.write(req) - threads = clients.map do |c| - Thread.new(c) do |client| - client.write(req) - client.shutdown - client.close - :ok - end + unless fp.respond_to?(:advise) + skip("IO#advise not supported, skipping test") rescue nil + return + end + # clear the cache + fp.advise(:dontneed) + req = "MD5 /dev666/sparse-file.fid fsck\r\n" + starter = get_client + clients = (1..nabort).map { get_client } + + starter.write(req) + threads = clients.map do |c| + Thread.new(c) do |client| + client.write(req) + client.shutdown + client.close + :ok end - threads.each { |thr| assert_equal :ok, thr.value } - line = starter.gets - assert_match(%r{\A/dev666/sparse-file\.fid MD5=[a-f0-9]{32}\r\n}, line) - starter.close end + threads.each { |thr| assert_equal :ok, thr.value } + line = starter.gets + assert_match(%r{\A/dev666/sparse-file\.fid MD5=[a-f0-9]{32}\r\n}, line) + starter.close + + lines = nil + Timeout.timeout(30) do + begin + @err.rewind + lines = @err.readlines.grep(/aborted while waiting for fsck/) + end while lines.empty? && sleep(0.05) + end + assert lines[0], lines.inspect + @err.rewind + @err.truncate(0) end end -- cgit v1.2.3-24-ge0c7