diff options
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | Makefile.am | 52 | ||||
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | accept.c | 10 | ||||
-rw-r--r-- | accept_loop.c | 31 | ||||
-rw-r--r-- | alloc.c | 5 | ||||
-rwxr-xr-x | autogen.sh | 2 | ||||
-rw-r--r-- | bind_listen.c | 23 | ||||
-rw-r--r-- | cfg.c | 22 | ||||
-rw-r--r-- | cfg_parser.rl | 7 | ||||
-rw-r--r-- | chunk_parser.rl | 48 | ||||
-rw-r--r-- | cmogstored.c | 93 | ||||
-rw-r--r-- | cmogstored.h | 136 | ||||
-rw-r--r-- | configure.ac | 9 | ||||
-rw-r--r-- | defaults.h | 4 | ||||
-rw-r--r-- | dev.c | 70 | ||||
-rw-r--r-- | exit.c | 15 | ||||
-rw-r--r-- | fdmap.c | 9 | ||||
-rw-r--r-- | fsck_queue.c | 1 | ||||
-rw-r--r-- | http.c | 142 | ||||
-rw-r--r-- | http_common.rl | 38 | ||||
-rw-r--r-- | http_dav.c | 10 | ||||
-rw-r--r-- | http_get.c | 54 | ||||
-rw-r--r-- | http_parser.rl | 79 | ||||
-rw-r--r-- | http_put.c | 183 | ||||
-rw-r--r-- | inherit.c | 38 | ||||
-rw-r--r-- | ioutil.c | 3 | ||||
-rw-r--r-- | listen_parser.h | 3 | ||||
-rw-r--r-- | listen_parser.rl | 3 | ||||
-rw-r--r-- | listen_parser_common.rl | 14 | ||||
-rw-r--r-- | listen_parser_internal.c | 5 | ||||
-rw-r--r-- | m4/ld_wrap.m4 | 21 | ||||
-rw-r--r-- | m4/systemtap.m4 | 37 | ||||
-rw-r--r-- | mgmt.c | 49 | ||||
-rw-r--r-- | mgmt_fn.c | 5 | ||||
-rw-r--r-- | mgmt_parser.rl | 15 | ||||
-rw-r--r-- | mnt.c | 32 | ||||
-rw-r--r-- | nameinfo.c | 48 | ||||
-rw-r--r-- | notify.c | 6 | ||||
-rw-r--r-- | notify.h | 2 | ||||
-rw-r--r-- | packaddr.h | 54 | ||||
-rw-r--r-- | path_parser.h | 14 | ||||
-rw-r--r-- | path_parser.rl | 21 | ||||
-rw-r--r-- | probes.d | 24 | ||||
-rw-r--r-- | process.c | 6 | ||||
-rw-r--r-- | queue_common.c | 18 | ||||
-rw-r--r-- | queue_epoll.c | 14 | ||||
-rw-r--r-- | queue_kqueue.c | 20 | ||||
-rw-r--r-- | queue_loop.c | 15 | ||||
-rw-r--r-- | sig.c | 23 | ||||
-rw-r--r-- | svc.c | 169 | ||||
-rw-r--r-- | svc_dev.c | 58 | ||||
-rw-r--r-- | tapset/http_request.stp | 40 | ||||
-rw-r--r-- | test/.gitignore | 1 | ||||
-rw-r--r-- | test/chunk-parser-1.c | 61 | ||||
-rw-r--r-- | test/epoll-wrap.c | 53 | ||||
-rw-r--r-- | test/epoll_enospc.rb | 100 | ||||
-rw-r--r-- | test/fdmap-1.c | 4 | ||||
-rw-r--r-- | test/http-parser-1.c | 125 | ||||
-rw-r--r-- | test/http_put6_fail.rb | 86 | ||||
-rw-r--r-- | test/mgmt.rb | 41 | ||||
-rw-r--r-- | test/queue-idle-1.c | 11 | ||||
-rw-r--r-- | test/ruby.mk | 3 | ||||
-rw-r--r-- | test/test_helper.rb | 2 | ||||
-rw-r--r-- | test/thrpool-1.c | 4 | ||||
-rw-r--r-- | thrpool.c | 205 | ||||
-rw-r--r-- | trace.h | 8 | ||||
-rw-r--r-- | trywrite.c | 19 | ||||
-rw-r--r-- | upgrade.c | 12 | ||||
-rw-r--r-- | util.h | 18 | ||||
-rw-r--r-- | valid_path.rl | 3 | ||||
-rw-r--r-- | yield.c | 29 |
72 files changed, 1758 insertions, 830 deletions
@@ -11,6 +11,7 @@ /*.cache /*.in /.version +/.gnulib-version /ChangeLog* /NEWS* /_build @@ -29,6 +30,7 @@ /listen_parser.c /mgmt_parser.c /valid_put_path.c +probes.h /stamp-h1 /tmp /valid_path.c diff --git a/Makefile.am b/Makefile.am index 4c9f19a..b2cbd38 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,6 +2,7 @@ ACLOCAL_AMFLAGS = -I m4 AM_CPPFLAGS = -I$(top_builddir)/lib -I$(top_srcdir)/lib AM_CFLAGS = $(WARN_CFLAGS) $(GCC_ATOMICS_CFLAGS) $(PTHREAD_CFLAGS) AM_LDFLAGS = $(LIBGNU_LIBDEPS) $(LIB_CLOCK_GETTIME) +DTRACE = @DTRACE@ SUBDIRS = lib # slow.mk is auto-generated by the maintainer (see GNUmakefile) @@ -66,9 +67,12 @@ mog_src += mkpath_for.c mog_src += mnt.c mog_src += mnt.h mog_src += mnt_usable.c +mog_src += nameinfo.c mog_src += nostd/setproctitle.h mog_src += notify.c mog_src += notify.h +mog_src += packaddr.h +mog_src += path_parser.h mog_src += pidfile.c mog_src += process.c mog_src += queue_common.c @@ -84,9 +88,11 @@ mog_src += sig.c mog_src += svc.c mog_src += svc_dev.c mog_src += thrpool.c +mog_src += trace.h mog_src += trywrite.c mog_src += util.h mog_src += upgrade.c +mog_src += yield.c LDADD = $(LIBINTL) $(top_builddir)/lib/libgnu.a libnostd.a noinst_LIBRARIES = libnostd.a @@ -99,18 +105,19 @@ RAGEL = ragel RL_MAIN = cfg_parser.rl iostat_parser.rl listen_parser.rl mgmt_parser.rl \ valid_path.rl http_parser.rl chunk_parser.rl valid_put_path.rl RL_CGEN = $(RL_MAIN:.rl=.c) -RL_ALL = listen_parser_common.rl http_common.rl $(RL_MAIN) +RL_ALL = listen_parser_common.rl http_common.rl path_parser.rl $(RL_MAIN) cfg_parser.c: cfg_parser.rl listen_parser_common.rl listen_parser.c: listen_parser.rl listen_parser_common.rl -http_parser.c: http_parser.rl http_common.rl +http_parser.c: http_parser.rl http_common.rl path_parser.rl +mgmt_parser.c: path_parser.rl chunk_parser.c: chunk_parser.rl http_common.rl .rl.c: $(AM_V_GEN)$(RAGEL) $< -C $(RLFLAGS) -o $@ -BUILT_SOURCES = $(top_srcdir)/.version +BUILT_SOURCES = $(top_srcdir)/.version $(top_srcdir)/.gnulib-version bin_PROGRAMS = cmogstored -cmogstored_SOURCES = $(mog_src) $(RL_CGEN) cmogstored.c +cmogstored_SOURCES = $(mog_src) $(RL_CGEN) cmogstored.c probes.d RUBY = ruby PERL = perl @@ -125,13 +132,14 @@ PERL_LOG_FLAGS = -v SLOWRB_LOG_COMPILER = RUBY="$(RUBY)" AM_SLOWRB_LOG_FLAGS = top_srcdir="$(top_srcdir)" include $(top_srcdir)/test/ruby.mk -check_PROGRAMS = test/valid-path-1 test/trywrite-1 \ +check_tests = test/valid-path-1 test/trywrite-1 \ test/cfg-parser-1 test/fdmap-1 test/thrpool-1 \ test/queue-idle-1 \ test/http-parser-1 test/chunk-parser-1 \ test/ioutil-1 +check_PROGRAMS = $(check_tests) -TESTS = $(SLOW_RB_FILES) $(RB_TESTS_FAST) $(check_PROGRAMS) $(PERL_TESTS) +TESTS = $(SLOW_RB_FILES) $(RB_TESTS_FAST) $(check_tests) $(PERL_TESTS) # we need TMPDIR to work in a place where iostat(1) gives stats test_tmpdir = $(top_builddir)/tmp @@ -142,7 +150,7 @@ $(PERL_TESTS:.perl=.log):: $(bin_PROGRAMS) $(test_tmpdir)/.stamp $(test_tmpdir)/.stamp: @mkdir -p $(test_tmpdir) && > $@ -test_COMMON = $(mog_src) $(RL_CGEN) check.h +test_COMMON = $(mog_src) $(RL_CGEN) check.h probes.d test_valid_path_1_SOURCES = test/valid-path-1.c $(test_COMMON) test_trywrite_1_SOURCES = test/trywrite-1.c $(test_COMMON) @@ -154,6 +162,15 @@ test_http_parser_1_SOURCES = test/http-parser-1.c $(test_COMMON) test_chunk_parser_1_SOURCES = test/chunk-parser-1.c $(test_COMMON) test_ioutil_1_SOURCES = test/ioutil-1.c $(test_COMMON) +if HAVE_LD_WRAP +if HAVE_EPOLL +check_PROGRAMS += test/epoll-wrap +test_epoll_wrap_SOURCES = $(cmogstored_SOURCES) test/epoll-wrap.c +test_epoll_wrap_LDFLAGS = $(cmogstored_LDFLAGS) $(AM_LDFLAGS) \ + -Wl,--wrap=epoll_ctl -Wl,--wrap=epoll_create +endif # HAVE_EPOLL +endif # HAVE_LD_WRAP + HELP2MAN = help2man dist_man_MANS = cmogstored.1 @@ -184,6 +201,8 @@ publish: NEWS.atom.xml NEWS ChangeLog .PHONY: publish +tapsets = tapset/http_request.stp + EXTRA_DIST = $(RB_TESTS) $(RL_CGEN) $(RL_ALL) $(PERL_TESTS) $(extra_doc) \ .gitignore Rakefile autogen.sh GNUmakefile bsd/README doc m4 \ test/test_helper.rb test/iostat-mock.rb \ @@ -191,8 +210,9 @@ EXTRA_DIST = $(RB_TESTS) $(RL_CGEN) $(RL_ALL) $(PERL_TESTS) $(extra_doc) \ cmogstored.x .ctags $(SLOW_RB_FILES) \ build-aux/.gitignore build-aux/snippet/.gitignore \ tests/.gitignore \ - $(top_srcdir)/.version \ - test/valgrind.supp nostd/README + $(top_srcdir)/.version $(top_srcdir)/.gnulib-version \ + test/valgrind.supp nostd/README \ + $(tapsets) TESTS_ENVIRONMENT = PATH=$(top_builddir):$$PATH TMPDIR=$(test_tmpdir) @@ -212,5 +232,19 @@ include $(top_srcdir)/build-aux/pgo.mk $(top_srcdir)/.version: echo $(VERSION) > $@-t && mv $@-t $@ +$(top_srcdir)/.gnulib-version: + cd $(top_srcdir) && ./autogen.sh dist-hook: echo $(VERSION) > $(distdir)/.tarball-version + +if ENABLE_SYSTEMTAP +probes.h: probes.d + $(DTRACE) -C -h -s $< -o $@ + +probes.o: probes.d + $(DTRACE) -C -G -s $< -o $@ + +EXTRA_DIST += probes.h +BUILT_SOURCES += probes.h +LDADD += probes.o +endif @@ -4,3 +4,4 @@ * reduce/minimize memory/stack usage * optional fsync/fdatasync/O_SYNC/msync for PUT * fallocate support? slow emulation interfaces can be a problem... +* inotify (and kqueue) support for detecting new device directories @@ -4,16 +4,16 @@ */ #include "cmogstored.h" -struct mog_accept * -mog_accept_init(int fd, struct mog_svc *svc, post_accept_fn fn) +struct mog_fd * +mog_accept_init(int fd, struct mog_svc *svc, + struct mog_addrinfo *a, mog_post_accept_fn fn) { - struct mog_fd *mfd = mog_fd_get(fd); + struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_ACCEPT); struct mog_accept *ac = &mfd->as.accept; - mfd->fd = fd; ac->post_accept_fn = fn; ac->svc = svc; memset(&ac->thrpool, 0, sizeof(struct mog_thrpool)); - return ac; + return mfd; } diff --git a/accept_loop.c b/accept_loop.c index d5eb840..874e6d7 100644 --- a/accept_loop.c +++ b/accept_loop.c @@ -40,8 +40,10 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) switch (errno) { case ECONNABORTED: + /* common error, nothing we can do about it */ case EINTR: - return; /* common errors, nothing we can do about it */ + /* we'll hit mog_thr_test_quit when we restart the loop */ + return; case EBADF: assert(0 && "BUG, called accept on bad FD"); case ENOTSOCK: @@ -74,11 +76,6 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) } } -static void accept_loop_cleanup(void *ignored) -{ - mog_alloc_quit(); -} - /* * passed as the start_routine argument to pthread_create. * This function may run concurrently in multiple threads. @@ -90,23 +87,21 @@ void *mog_accept_loop(void *arg) { struct mog_accept *ac = arg; int accept_fd = mog_fd_of(ac)->fd; - - pthread_cleanup_push(accept_loop_cleanup, NULL); + union mog_sockaddr msa; for (;;) { - /* pthread cancellation point */ - int client_fd = mog_accept_fn(accept_fd, NULL, NULL); + struct sockaddr *sa = mog_sockaddr_sa(&msa); + socklen_t salen = (socklen_t)sizeof(msa); + int client_fd; + + mog_thr_test_quit(); + client_fd = mog_accept_fn(accept_fd, sa, &salen); - if (client_fd >= 0) { - mog_cancel_disable(); - ac->post_accept_fn(client_fd, ac->svc); - mog_cancel_enable(); - } else { + if (client_fd >= 0) + ac->post_accept_fn(client_fd, ac->svc, &msa, salen); + else accept_error_check(ac); - } } - pthread_cleanup_pop(1); - return NULL; } @@ -58,6 +58,11 @@ _Noreturn void mog_oom(void) abort(); } +void mog_oom_if_null(const void *ptr) +{ + if (!ptr) + mog_oom(); +} /* * Cache alignment is important for sub-pagesized allocations @@ -1,7 +1,9 @@ #!/bin/sh >> test/slow.mk +glver="$(gnulib-tool --version | head -n1)" if gnulib-tool --update && autoreconf -i then + echo "$glver" > .gnulib-version exit 0 fi cat HACKING diff --git a/bind_listen.c b/bind_listen.c index 104f6ee..e1f0168 100644 --- a/bind_listen.c +++ b/bind_listen.c @@ -4,18 +4,6 @@ */ #include "cmogstored.h" -/* Under FreeBSD, TCP_NOPUSH is inherited by accepted sockets */ -static int tcp_nopush_prepare(int fd) -{ - socklen_t len = (socklen_t)sizeof(int); - int val = 1; - - if (MOG_TCP_NOPUSH == 0) - return 0; - - return setsockopt(fd, IPPROTO_TCP, MOG_TCP_NOPUSH, &val, len); -} - /* * TODO * - configurable socket buffer sizes (where to put config?) @@ -27,7 +15,7 @@ static int tcp_nopush_prepare(int fd) * http://labs.apnic.net/blabs/?p=57 */ -static int set_tcp_opts(int fd, const char *accept_filter) +static int set_tcp_opts(int fd) { int val; socklen_t len = sizeof(int); @@ -45,15 +33,10 @@ static int set_tcp_opts(int fd, const char *accept_filter) rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); if (rc < 0) return rc; - if (accept_filter) { - if (strcmp(accept_filter, "httpready") == 0) - rc = tcp_nopush_prepare(fd); - } - return rc; } -int mog_bind_listen(struct addrinfo *r, const char *accept_filter) +int mog_bind_listen(struct addrinfo *r) { /* see if we inherited the socket, first */ int fd = mog_inherit_get(r->ai_addr, r->ai_addrlen); @@ -73,7 +56,7 @@ int mog_bind_listen(struct addrinfo *r, const char *accept_filter) * everywhere yet (in 2012). */ if (mog_set_cloexec(fd, true) == 0 && - set_tcp_opts(fd, accept_filter) == 0 && + set_tcp_opts(fd) == 0 && bind(fd, r->ai_addr, r->ai_addrlen) == 0 && listen(fd, 1024) == 0) break; @@ -53,8 +53,7 @@ static void cfg_atexit(void) __attribute__((constructor)) static void cfg_init(void) { all_cfg = hash_initialize(7, NULL, cfg_hash, cfg_cmp, cfg_free); - if (!all_cfg) - mog_oom(); + mog_oom_if_null(all_cfg); atexit(cfg_atexit); } @@ -181,15 +180,17 @@ void mog_cfg_validate_or_die(struct mog_cfg *cli) mog_set_maxconns(cli->maxconns); } -static int bind_or_die(struct mog_addrinfo *a, const char *accept_filter) +static struct mog_fd * +bind_or_die(struct mog_addrinfo *a, struct mog_svc *svc, mog_post_accept_fn fn) { int fd; - if (a == NULL) return -1; - fd = mog_bind_listen(a->addr, accept_filter); - if (fd >= 0) return fd; + if (a == NULL) return NULL; + fd = mog_bind_listen(a->addr); + if (fd < 0) + die_errno("addr=%s failed to bind+listen", a->orig); - die_errno("addr=%s failed to bind+listen", a->orig); + return mog_accept_init(fd, svc, a, fn); } static bool svc_from_cfg(void *cfg_ptr, void *ignored) @@ -202,13 +203,14 @@ static bool svc_from_cfg(void *cfg_ptr, void *ignored) if (!svc) die("failed to load svc from docroot=%s", cfg->docroot); - svc->mgmt_fd = bind_or_die(cfg->mgmtlisten, "dataready"); + svc->mgmt_mfd = bind_or_die(cfg->mgmtlisten, svc, mog_mgmt_post_accept); if (cfg->server && strcmp(cfg->server, "none") == 0) return true; - svc->http_fd = bind_or_die(cfg->httplisten, "httpready"); - svc->httpget_fd = bind_or_die(cfg->httpgetlisten, "httpready"); + svc->http_mfd = bind_or_die(cfg->httplisten, svc, mog_http_post_accept); + svc->httpget_mfd = bind_or_die(cfg->httpgetlisten, svc, + mog_httpget_post_accept); return true; } diff --git a/cfg_parser.rl b/cfg_parser.rl index f14e495..9394ea7 100644 --- a/cfg_parser.rl +++ b/cfg_parser.rl @@ -36,20 +36,20 @@ static char *mystrdup(const char *key, char *mark_beg, const char *p) mgmtlisten = lws* "mgmtlisten" sep listen comment* (eor) > { a = mog_listen_parse_internal(mark_beg, mark_len, - port_beg, port_len); + port_beg, port_len, sa_family); if (!a) return -1; cfg->mgmtlisten = a; }; httplisten = lws* "httplisten" sep listen comment* eor > { a = mog_listen_parse_internal(mark_beg, mark_len, - port_beg, port_len); + port_beg, port_len, sa_family); if (!a) return -1; cfg->httplisten = a; }; httpgetlisten = lws* "httpgetlisten" sep listen comment* eor > { a = mog_listen_parse_internal(mark_beg, mark_len, - port_beg, port_len); + port_beg, port_len, sa_family); if (!a) return -1; cfg->httpgetlisten = a; }; @@ -111,6 +111,7 @@ int mog_cfg_parse(struct mog_cfg *cfg, char *buf, size_t len) char *port_beg = NULL; size_t mark_len = 0; size_t port_len = 0; + sa_family_t sa_family = AF_INET; struct mog_addrinfo *a; int cs; diff --git a/chunk_parser.rl b/chunk_parser.rl index 9234f71..e339a87 100644 --- a/chunk_parser.rl +++ b/chunk_parser.rl @@ -26,69 +26,69 @@ static inline off_t hexchar2off(int xdigit) off_t buf_remain; size_t wr_len; - if (http->content_len == 0) { /* final chunk */ - http->chunk_state = MOG_CHUNK_STATE_TRAILER; + if (http->_p.content_len == 0) { /* final chunk */ + http->_p.chunk_state = MOG_CHUNK_STATE_TRAILER; fhold; /* XXX this feels wrong ... */ if (fpc >= buf) { assert(fc == '\n' && "bad chunk end"); - http->line_end = to_u16(fpc - buf); + http->_p.line_end = to_u16(fpc - buf); } fgoto more_trailers; } - assert(http->content_len > 0 && "impossible content_len"); + assert(http->_p.content_len > 0 && "impossible content_len"); buf_remain = len - (fpc - buf); if (buf_remain == 0) fbreak; assert(buf_remain > 0 && "impossible buf_remain"); - wr_len = MIN((size_t)http->content_len, (size_t)buf_remain); + wr_len = MIN((size_t)http->_p.content_len, (size_t)buf_remain); assert(wr_len != 0 && "invalid wr_len"); if (! mog_http_write_full(http->forward, fpc, wr_len)) fbreak; - http->content_len -= wr_len; + http->_p.content_len -= wr_len; p += wr_len - 1; assert(p < pe && "buffer overrun"); - if (http->content_len > 0) { + if (http->_p.content_len > 0) { really_done = 1; /* let caller handle reading the rest of the body */ fbreak; } /* next chunk header */ - http->chunk_state = MOG_CHUNK_STATE_SIZE; + http->_p.chunk_state = MOG_CHUNK_STATE_SIZE; if (wr_len == buf_remain) { - if (http->content_len == 0) + if (http->_p.content_len == 0) fgoto main; really_done = 1; fbreak; } /* more chunks in this buffer */ - assert(http->content_len == 0 && + assert(http->_p.content_len == 0 && "bad content_len at chunk end"); fgoto main; }; chunk = "\r\n"? # account for trailing CRLF in previous chunk (xdigit+) $ { - off_t prev = http->content_len; + off_t prev = http->_p.content_len; - http->content_len *= 16; - http->content_len += hexchar2off(fc); - if (http->content_len < prev) { + http->_p.content_len *= 16; + http->_p.content_len += hexchar2off(fc); + if (http->_p.content_len < prev) { errno = ERANGE; - http->content_len = -1; + http->_p.content_len = -1; fbreak; } } (any -- [\r\n])* - '\r' '\n' @ { http->chunk_state = MOG_CHUNK_STATE_DATA; } + '\r' '\n' @ { http->_p.chunk_state = MOG_CHUNK_STATE_DATA; } chunk_data; main := chunk+; }%% @@ -100,12 +100,12 @@ void mog_chunk_init(struct mog_http *http) int cs; %% write init; - assert(http->chunked && "not chunked"); + assert(http->_p.chunked && "not chunked"); http->cs = cs; - http->line_end = 0; - http->content_len = 0; - http->offset = 0; - http->chunk_state = MOG_CHUNK_STATE_SIZE; + http->_p.line_end = 0; + http->_p.content_len = 0; + http->_p.buf_off = 0; + http->_p.chunk_state = MOG_CHUNK_STATE_SIZE; } enum mog_parser_state @@ -114,7 +114,7 @@ mog_chunk_parse(struct mog_http *http, char *buf, size_t len) char *p, *pe, *eof = NULL; int cs = http->cs; int really_done = 0; - size_t off = http->offset; + size_t off = http->_p.buf_off; assert(http->wbuf == NULL && "unwritten data in buffer"); assert(off <= len && "http offset past end of buffer"); @@ -132,13 +132,13 @@ mog_chunk_parse(struct mog_http *http, char *buf, size_t len) cs = chunk_parser_first_final; http->cs = cs; - http->offset = p - buf; + http->_p.buf_off = p - buf; if (cs == chunk_parser_error || errno) return MOG_PARSER_ERROR; assert(p <= pe && "buffer overflow after chunk parse"); - assert(http->offset <= len && "offset longer than len"); + assert(http->_p.buf_off <= len && "offset longer than len"); if (http->cs == chunk_parser_first_final) return MOG_PARSER_DONE; return MOG_PARSER_CONTINUE; diff --git a/cmogstored.c b/cmogstored.c index 35b08f9..31d8e64 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -14,13 +14,12 @@ static struct mog_fd *master_selfwake; static sig_atomic_t sigchld_hit; static sig_atomic_t do_exit; static sig_atomic_t do_upgrade; -static size_t nthr; -static bool have_mgmt; static pid_t master_pid; static pid_t upgrade_pid; -static unsigned long worker_processes; static bool iostat_running; +static struct mog_main mog_main; + #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1) static struct argp_option options[] = { { .name = "daemonize", .key = 'd', @@ -125,8 +124,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) break; case -'M': mog_cfg_multi = true; break; case -'W': - check_strtoul(&worker_processes, arg, "worker-processes"); - if (worker_processes > UINT_MAX) + check_strtoul(&mog_main.worker_processes, arg, + "worker-processes"); + if (mog_main.worker_processes > UINT_MAX) die("--worker-processes exceeded (max=%u)", UINT_MAX); break; case ARGP_KEY_ARG: @@ -261,55 +261,8 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) master_pid = getpid(); - /* 10 - 100 threads based on number of devices, same as mogstored */ - nthr = mog_mkusage_all() * 10; - nthr = MAX(10, nthr); - nthr = MIN(100, nthr); -} - -/* Hash iterator function */ -static bool svc_start_each(void *svcptr, void *qptr) -{ - struct mog_svc *svc = svcptr; - struct mog_queue *q = qptr; - struct mog_accept *ac; - size_t athr = (size_t)num_processors(NPROC_CURRENT); - - /* - * try to distribute accept() callers between workers more evenly - * with wake-one accept() behavior by trimming down on acceptors - */ - if (worker_processes) { - athr /= worker_processes; - if (athr == 0) - athr = 1; - } - - svc->queue = q; - - if (svc->mgmt_fd >= 0) { - have_mgmt = true; - ac = mog_accept_init(svc->mgmt_fd, svc, mog_mgmt_post_accept); - - /* - * mgmt port is rarely used and always persistent, so it - * does not need multiple threads for blocking accept() - */ - mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac); - } - - if (svc->http_fd >= 0) { - ac = mog_accept_init(svc->http_fd, svc, mog_http_post_accept); - mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); - } - - if (svc->httpget_fd >= 0) { - ac = mog_accept_init(svc->httpget_fd, svc, - mog_httpget_post_accept); - mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); - } - - return true; + /* set svc->nmogdev on all svc */ + mog_mkusage_all(); } static void worker_wakeup_handler(int signum) @@ -418,18 +371,17 @@ static void upgrade_handler(void) static void main_worker_loop(const pid_t parent) { - mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { - mog_notify_wait(have_mgmt); + mog_notify_wait(mog_main.have_mgmt); if (sigchld_hit) sigchld_handler(); if (do_upgrade) upgrade_handler(); if (do_exit) cmogstored_exit(); - if (have_mgmt) + if (mog_main.have_mgmt) mog_mnt_refresh(); - else if (have_mgmt && !iostat_running && !do_exit) + else if (mog_main.have_mgmt && !iostat_running && !do_exit) /* * maybe iostat was not installed/available/usable at * startup, but became usable later @@ -443,14 +395,13 @@ static void main_worker_loop(const pid_t parent) static void run_worker(const pid_t parent) { - struct mog_queue *q = mog_queue_new(); - mog_notify_init(); siginit(worker_wakeup_handler); - mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q); - have_mgmt = false; - mog_svc_each(svc_start_each, q); /* this will set have_mgmt */ - if (have_mgmt) { + + /* this can set mog_main->have_mgmt */ + mog_svc_each(mog_svc_start_each, &mog_main); + + if (mog_main.have_mgmt) { iostat_running = mog_iostat_respawn(0); if (!iostat_running) syslog(LOG_WARNING, "iostat(1) not available/running"); @@ -467,8 +418,6 @@ static void fork_worker(unsigned worker_id) if (pid > 0) { mog_process_register(pid, worker_id); } else if (pid == 0) { - /* workers have no workers of their own */ - worker_processes = 0; mog_process_reset(); /* worker will call mog_intr_enable() later in notify loop */ @@ -512,7 +461,7 @@ static void process_died(pid_t pid, int status) switch (id) { case MOG_PROC_IOSTAT: - assert(worker_processes == 0 && + assert(mog_main.worker_processes == 0 && "master process registered iostat process"); iostat_died(pid, status); return; @@ -536,12 +485,12 @@ static void process_died(pid_t pid, int status) static void run_master(void) { unsigned id; - size_t running = worker_processes; + size_t running = mog_main.worker_processes; master_selfwake = mog_selfwake_new(); siginit(master_wakeup_handler); - for (id = 0; id < worker_processes; id++) + for (id = 0; id < mog_main.worker_processes; id++) fork_worker(id); while (running > 0) { @@ -565,8 +514,8 @@ int main(int argc, char *argv[], char *envp[]) mog_intr_disable(); setup(argc, argv); /* this daemonizes */ - mog_process_init(worker_processes); - if (worker_processes == 0) + mog_process_init(mog_main.worker_processes); + if (mog_main.worker_processes == 0) run_worker(0); else run_master(); @@ -583,6 +532,6 @@ void cmogstored_quit(void) "SIGQUIT failed on master process (pid=%d): %m", master_pid); } else { - worker_wakeup_handler(SIGQUIT); + CHECK(int, 0, kill(getpid(), SIGQUIT)); } } diff --git a/cmogstored.h b/cmogstored.h index e72c071..1134827 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -70,6 +70,7 @@ #include "defaults.h" #include "iostat.h" #include "mnt.h" +#include "packaddr.h" #define MOG_WR_ERROR ((void *)-1) #define MOG_IOSTAT (MAP_FAILED) @@ -122,7 +123,8 @@ struct mog_mgmt { int cs; enum mog_prio prio; struct mog_fd *forward; - size_t offset; + uint32_t buf_off; + uint32_t mog_devid; size_t mark[2]; struct mog_rbuf *rbuf; struct mog_wbuf *wbuf; /* uncommonly needed */ @@ -133,21 +135,29 @@ struct mog_mgmt { }; struct mog_queue; +struct mog_svc; struct mog_svc { int docroot_fd; const char *docroot; + size_t nmogdev; + size_t user_set_aio_threads; /* only touched by main/notify thread */ + size_t user_req_aio_threads; /* protected by aio_threads_lock */ + size_t thr_per_dev; /* private */ DIR *dir; + pthread_mutex_t by_mog_devid_lock; + Hash_table *by_mog_devid; Hash_table *by_st_dev; pthread_mutex_t devstats_lock; struct mog_queue *queue; LIST_HEAD(mgmt_head, mog_mgmt) devstats_subscribers; + SIMPLEQ_ENTRY(mog_svc) qentry; mode_t put_perms; mode_t mkcol_perms; - int http_fd; - int httpget_fd; - int mgmt_fd; + struct mog_fd *http_mfd; + struct mog_fd *httpget_mfd; + struct mog_fd *mgmt_mfd; uint32_t idle_timeout; }; @@ -169,35 +179,44 @@ enum mog_chunk_state { struct mog_http { int cs; - enum mog_http_method http_method:8; - unsigned persistent:1; - unsigned chunked:1; - unsigned has_trailer_md5:1; - unsigned has_expect_md5:1; - unsigned has_content_range:1; /* for PUT */ - unsigned has_range:1; /* for GET */ - unsigned skip_rbuf_defer:1; - enum mog_chunk_state chunk_state:2; - uint8_t path_tip; - uint8_t path_end; - uint16_t line_end; - uint16_t tmp_tip; + struct { + /* only needs 4 bits, but we use 8 for alignment */ + enum mog_http_method http_method:8; + unsigned persistent:1; + unsigned chunked:1; + unsigned has_md5:1; + unsigned has_content_range:1; /* for PUT */ + unsigned has_range:1; /* for GET */ + unsigned skip_rbuf_defer:1; + enum mog_chunk_state chunk_state:2; + uint8_t path_tip; + uint8_t path_end; + uint16_t line_end; + uint16_t tmp_tip; + uint32_t buf_off; + uint32_t mog_devid; + off_t range_beg; + off_t range_end; + off_t content_len; + } _p; struct mog_fd *forward; - size_t offset; - off_t range_beg; - off_t range_end; - off_t content_len; struct mog_rbuf *rbuf; struct mog_wbuf *wbuf; /* uncommonly needed */ struct mog_svc *svc; uint8_t expect_md5[16]; + struct mog_packaddr mpa; +} __attribute__((packed)); + +struct mog_thread { + pthread_t thr; + unsigned *do_quit; }; struct mog_thrpool { pthread_mutex_t lock; size_t n_threads; size_t want_threads; - pthread_t *threads; + struct mog_thread *threads; void *(*start_fn)(void *); void *start_arg; }; @@ -215,13 +234,16 @@ struct mog_queue { }; /* accept.c */ -typedef void (*post_accept_fn)(int fd, struct mog_svc *); +typedef void (*mog_post_accept_fn)(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); struct mog_accept { struct mog_svc *svc; - post_accept_fn post_accept_fn; + mog_post_accept_fn post_accept_fn; + struct mog_addrinfo *addrinfo; /* shared with cfg */ struct mog_thrpool thrpool; }; -struct mog_accept * mog_accept_init(int fd, struct mog_svc *, post_accept_fn); +struct mog_fd *mog_accept_init(int fd, struct mog_svc *, + struct mog_addrinfo *, mog_post_accept_fn); void * mog_accept_loop(void *ac); struct mog_digest { @@ -244,6 +266,7 @@ struct mog_file { #include "notify.h" /* sig.c */ +extern sigset_t mog_emptyset; void mog_intr_disable(void); void mog_intr_enable(void); void mog_sleep(long seconds); @@ -280,7 +303,6 @@ struct mog_fd { struct mog_svc *svc; } as; }; -struct mog_fd *mog_fd_get(int fd); void mog_fd_put(struct mog_fd *mfd); void mog_fdmap_requeue(struct mog_queue *quit_queue); size_t mog_fdmap_expire(uint32_t sec); @@ -299,6 +321,7 @@ void mog_rbuf_free(struct mog_rbuf *); void mog_rbuf_free_and_null(struct mog_rbuf **); void *mog_fsbuf_get(size_t *size); void mog_alloc_quit(void); +void mog_oom_if_null(const void *); #define die_errno(...) do { \ error(EXIT_FAILURE, errno, __VA_ARGS__); \ @@ -320,10 +343,16 @@ struct mog_svc *mog_svc_new(const char *docroot); typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *); size_t mog_svc_each(Hash_processor processor, void *data); void mog_svc_upgrade_prepare(void); +bool mog_svc_start_each(void *svc_ptr, void *have_mgmt_ptr); +void mog_svc_thrpool_rescale(struct mog_svc *, size_t ndev_new); +void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr); +void mog_svc_aio_threads_handler(void); /* dev.c */ -struct mog_dev * mog_dev_new(struct mog_svc *, uint32_t mog_devid); +struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid); int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); +size_t mog_dev_hash(const void *, size_t tablesize); +bool mog_dev_cmp(const void *a, const void *b); /* valid_path.rl */ int mog_valid_path(const char *buf, size_t len); @@ -345,7 +374,7 @@ void mog_pidfile_upgrade_abort(void); bool mog_svc_devstats_broadcast(void *svc, void *ignored); void mog_svc_devstats_subscribe(struct mog_mgmt *); void mog_svc_dev_shutdown(void); -size_t mog_mkusage_all(void); +void mog_mkusage_all(void); /* cloexec_detect.c */ extern bool mog_cloexec_atomic; @@ -375,17 +404,20 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode); char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); /* thrpool.c */ +void mog_thr_test_quit(void); void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); -void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size); void mog_thrpool_process_queue(void); +void mog_thrpool_set_size(struct mog_thrpool *, size_t size); /* mgmt.c */ void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); -void mog_mgmt_post_accept(int fd, struct mog_svc *); +void mog_mgmt_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); enum mog_next mog_mgmt_queue_step(struct mog_fd *) MOG_CHECK; void mog_mgmt_quit_step(struct mog_fd *); +void mog_mgmt_drop(struct mog_fd *); /* queue_epoll.c */ struct mog_queue * mog_queue_new(void); @@ -404,7 +436,7 @@ struct mog_addrinfo { void mog_addrinfo_free(struct mog_addrinfo **); /* bind_listen.c */ -int mog_bind_listen(struct addrinfo *, const char *accept_filter); +int mog_bind_listen(struct addrinfo *); /* close.c */ void mog_close(int fd); @@ -433,23 +465,27 @@ void mog_http_init(struct mog_http *, struct mog_svc *); enum mog_parser_state mog_http_parse(struct mog_http *, char *buf, size_t len); /* http_get.c */ -void mog_http_get_open(struct mog_http *, char *buf); +void mog_http_get_open(struct mog_fd *, char *buf); enum mog_next mog_http_get_in_progress(struct mog_fd *); /* http.c */ -void mog_http_post_accept(int fd, struct mog_svc *); -void mog_httpget_post_accept(int fd, struct mog_svc *); +void mog_http_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); +void mog_httpget_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); enum mog_next mog_http_queue_step(struct mog_fd *) MOG_CHECK; void mog_http_quit_step(struct mog_fd *); char *mog_http_path(struct mog_http *, char *buf); void mog_http_reset(struct mog_http *); +void mog_http_unlink_ftmp(struct mog_http *); +void mog_http_drop(struct mog_fd *); /* http_dav.c */ -void mog_http_delete(struct mog_http *http, char *buf); -void mog_http_mkcol(struct mog_http *http, char *buf); +void mog_http_delete(struct mog_fd *, char *buf); +void mog_http_mkcol(struct mog_fd *, char *buf); /* http_put.c */ -void mog_http_put(struct mog_http *http, char *buf, size_t buf_len); +void mog_http_put(struct mog_fd *, char *buf, size_t buf_len); enum mog_next mog_http_put_in_progress(struct mog_fd *); bool mog_http_write_full(struct mog_fd *file_mfd, char *buf, size_t buf_len); @@ -474,6 +510,7 @@ int mog_mkpath_for(struct mog_svc *svc, char *path); /* queue_common.c */ struct mog_queue *mog_queue_init(int queue_fd); void mog_queue_stop(struct mog_queue *keep); +void mog_queue_drop(struct mog_fd *); /* fsck_queue.c */ bool mog_fsck_queue_ready(struct mog_fd *mfd) MOG_CHECK; @@ -511,6 +548,12 @@ void mog_iou_active(dev_t); # define MOG_TCP_NOPUSH (0) #endif +/* publically visible attributes of the current process */ +struct mog_main { + unsigned long worker_processes; + bool have_mgmt; +}; + /* cmogstored.c */ void cmogstored_quit(void); @@ -543,19 +586,20 @@ pid_t mog_upgrade_spawn(void); /* exit.c */ _Noreturn void cmogstored_exit(void); +verify(sizeof(in_port_t) <= sizeof(uint16_t)); /* * We only deal with ipv4 and ipv6 addresses (and no human-friendly * hostnames/service names), so we can use smaller constants than the * standard NI_MAXHOST/NI_MAXSERV values (1025 and 32 respectively). * This reduces our per-thread stack usage and keeps caches hotter. */ -#define MOG_NI_MAXHOST (INET6_ADDRSTRLEN) -#define MOG_NI_MAXSERV (sizeof(":65536")) - -/* avoid sockaddr_storage since that bigger than we need */ -union mog_sockaddr { - struct sockaddr_in in; - struct sockaddr_in6 in6; - struct sockaddr sa; - unsigned char bytes[1]; +struct mog_ni { + char ni_host[INET6_ADDRSTRLEN + sizeof("[]") - 1]; + char ni_serv[sizeof(":65536")]; }; + +/* nameinfo.c */ +void mog_nameinfo(struct mog_packaddr *, struct mog_ni *); + +/* yield.c */ +void mog_yield(void); diff --git a/configure.ac b/configure.ac index adb37f3..7194216 100644 --- a/configure.ac +++ b/configure.ac @@ -36,6 +36,9 @@ dnl gnulib *at functions aren't thread-safe, ask for the real thing AC_CHECK_FUNCS([openat renameat mkdirat fstatat unlinkat]) AC_CHECK_FUNCS([epoll_wait epoll_pwait ppoll]) +AC_CHECK_FUNCS([epoll_ctl], [HAVE_EPOLL=1], [HAVE_EPOLL=0]) +AC_SUBST(HAVE_EPOLL) +AM_CONDITIONAL(HAVE_EPOLL, test "x$HAVE_EPOLL" = "x1") dnl libkqueue should work in the future AC_CHECK_FUNCS([kqueue]) @@ -45,6 +48,9 @@ AC_CHECK_FUNCS([sendfile]) AC_CHECK_FUNCS([open_memstream]) AC_CHECK_FUNCS([posix_fadvise]) +dnl non-standard, but common +AC_CHECK_FUNCS([pthread_yield]) + dnl gnulib doesn't actually define SOCK_NONBLOCK/SOCK_CLOEXEC, and dnl even if it did, emulation wouldn't be thread-safe AC_CHECK_FUNCS([accept4]) @@ -61,5 +67,8 @@ case $build_os in esac ]) +CM_SYSTEMTAP +CM_LD_WRAP + AC_CONFIG_FILES([Makefile lib/Makefile]) AC_OUTPUT @@ -3,3 +3,7 @@ #define MOG_DEFAULT_MGMTLISTEN "0.0.0.0:7501" #define MOG_DEFAULT_DOCROOT "/var/mogdata" #define MOG_DEFAULT_CONFIGFILE "/etc/mogilefs/mogstored.conf" +#define MOG_DEVID_MAX (16777215) /* MEDIUMINT in DB */ + +/* TODO: update if MogileFS supports FIDs >= 10,000,000,000 */ +#define MOG_PATH_MAX (sizeof("/dev16777215/0/000/000/0123456789.fid")) @@ -4,7 +4,7 @@ */ #include "cmogstored.h" -struct mog_dev * mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) +static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) { struct mog_dev *dev; struct stat sb; @@ -29,6 +29,64 @@ struct mog_dev * mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) return dev; } +struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) +{ + struct mog_dev finder; + struct mog_dev *ret; + + finder.devid = mog_devid; + + CHECK(int, 0, pthread_mutex_lock(&svc->by_mog_devid_lock)); + ret = hash_lookup(svc->by_mog_devid, &finder); + if (ret) { + struct stat sb; + + /* + * devXXX dir existed before, but is no longer readable + * Possible FS/device error, it could come back, so do + * not remove here. + */ + if (mog_stat(svc, ret->prefix, &sb) < 0) + goto out; + + /* st_dev may change due to remount, update if needed */ + ret->st_dev = sb.st_dev; + } else { /* create a new dev */ + ret = mog_dev_new(svc, mog_devid); + + if (!ret) + goto out; /* could not stat */ + + switch (hash_insert_if_absent(svc->by_mog_devid, ret, NULL)) { + case 0: + assert(0 && "mog_dev existed while adding"); + abort(); + case 1: break; /* OK, inserted */ + default: mog_oom(); + } + } +out: + CHECK(int, 0, pthread_mutex_unlock(&svc->by_mog_devid_lock)); + + return ret; +} + + +size_t mog_dev_hash(const void *x, size_t tablesize) +{ + const struct mog_dev *dev = x; + + return dev->devid % tablesize; +} + +bool mog_dev_cmp(const void *a, const void *b) +{ + const struct mog_dev *dev_a = a; + const struct mog_dev *dev_b = b; + + return dev_a->devid == dev_b->devid; +} + static int emit_usage( const struct mog_dev *dev, struct mog_svc *svc, int fd, struct statvfs *v) @@ -84,10 +142,16 @@ const struct mog_dev *dev, struct mog_svc *svc, int fd, struct statvfs *v) int mog_dev_mkusage(const struct mog_dev *dev, struct mog_svc *svc) { struct statvfs v; - char *usage_path = xasprintf("%s/usage", dev->prefix); - char *tmp_path = xasprintf("%s.%x", usage_path, (unsigned)getpid()); + char *usage_path; + char *tmp_path; int fd = -1; + if (!svc->mgmt_mfd) + return 0; + + usage_path = xasprintf("%s/usage", dev->prefix); + tmp_path = xasprintf("%s.%x", usage_path, (unsigned)getpid()); + if (mog_unlink(svc, tmp_path) < 0 && errno != ENOENT) goto out; errno = 0; @@ -5,17 +5,16 @@ #include "cmogstored.h" #include "nostd/setproctitle.h" -static void acceptor_quit(int *fdp) +static void acceptor_quit(struct mog_fd **mfdp) { - int fd = *fdp; + struct mog_fd *mfd = *mfdp; - if (fd >= 0) { - struct mog_fd *mfd = mog_fd_get(fd); + if (mfd) { struct mog_accept *ac = &mfd->as.accept; mog_thrpool_quit(&ac->thrpool, NULL); mog_fd_put(mfd); - *fdp = -1; + *mfdp = NULL; } } @@ -23,9 +22,9 @@ static bool svc_quit_accept_i(void *svcptr, void *ignored) { struct mog_svc *svc = svcptr; - acceptor_quit(&svc->mgmt_fd); - acceptor_quit(&svc->http_fd); - acceptor_quit(&svc->httpget_fd); + acceptor_quit(&svc->mgmt_mfd); + acceptor_quit(&svc->http_mfd); + acceptor_quit(&svc->httpget_mfd); return true; } @@ -115,7 +115,7 @@ MOG_NOINLINE static struct mog_fd * grow_ref(size_t fd) * Look up a mog_fd structure based on fd. This means memory is reused * by us just as FDs are reused by the kernel. */ -struct mog_fd *mog_fd_get(int fd) +static struct mog_fd *mog_fd_get(int fd) { assert(fd >= 0 && "FD is negative"); if (MOG_LIKELY(fd < mog_sync_fetch(&max_fd))) @@ -230,8 +230,9 @@ static size_t expire_http(struct mog_fd *mfd, uint32_t msec) tcp_timedout(&info, msec)) { if (shutdown(mfd->fd, SHUT_RDWR) == 0) return 1; - - syslog(LOG_WARNING, "BUG? expire_http,shutdown: %m"); + if (errno != ENOTCONN) + syslog(LOG_WARNING, + "BUG? expire_http,shutdown: %m"); } } else { assert(errno != EINVAL && "BUG: getsockopt: EINVAL"); @@ -299,7 +300,7 @@ out: * 4) close sockets. */ for (fd = (int)expired * 8; --fd >= 0; ) - sched_yield(); + mog_yield(); return expired; } diff --git a/fsck_queue.c b/fsck_queue.c index 22f3d38..c99a9ed 100644 --- a/fsck_queue.c +++ b/fsck_queue.c @@ -37,6 +37,7 @@ static void fsck_queue_atexit(void) MOG_NOINLINE static void fsck_queue_once(void) { fsck_queues = hash_initialize(7, NULL, fq_hash, fq_cmp, free); + mog_oom_if_null(fsck_queues); atexit(fsck_queue_atexit); } @@ -3,6 +3,7 @@ * License: GPLv3 or later (see COPYING for details) */ #include "cmogstored.h" +#include "trace.h" #include "http.h" /* @@ -33,15 +34,15 @@ static void http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len) { struct mog_rbuf *old = http->rbuf; - size_t defer_bytes = buf_len - http->offset; - char *src = rbuf->rptr + http->offset; + size_t defer_bytes = buf_len - http->_p.buf_off; + char *src = rbuf->rptr + http->_p.buf_off; - if (http->skip_rbuf_defer) { - http->skip_rbuf_defer = 0; + if (http->_p.skip_rbuf_defer) { + http->_p.skip_rbuf_defer = 0; return; } - assert(http->offset >= 0 && "http->offset negative"); + assert(http->_p.buf_off >= 0 && "http->_p.buf_off negative"); assert(defer_bytes <= MOG_RBUF_MAX_SIZE && "defer bytes overflow"); if (defer_bytes == 0) { @@ -55,19 +56,21 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len) memcpy(http->rbuf->rptr, src, defer_bytes); http->rbuf->rsize = defer_bytes; } - http->offset = 0; + http->_p.buf_off = 0; } static void -http_process_client(struct mog_http *http, char *buf, size_t buf_len) +http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len) { - switch (http->http_method) { + struct mog_http *http = &mfd->as.http; + + switch (http->_p.http_method) { case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method"); - case MOG_HTTP_METHOD_GET: mog_http_get_open(http, buf); break; - case MOG_HTTP_METHOD_HEAD: mog_http_get_open(http, buf); break; - case MOG_HTTP_METHOD_DELETE: mog_http_delete(http, buf); break; - case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(http, buf); break; - case MOG_HTTP_METHOD_PUT: mog_http_put(http, buf, buf_len); break; + case MOG_HTTP_METHOD_GET: mog_http_get_open(mfd, buf); break; + case MOG_HTTP_METHOD_HEAD: mog_http_get_open(mfd, buf); break; + case MOG_HTTP_METHOD_DELETE: mog_http_delete(mfd, buf); break; + case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(mfd, buf); break; + case MOG_HTTP_METHOD_PUT: mog_http_put(mfd, buf, buf_len); break; } } @@ -87,10 +90,36 @@ MOG_NOINLINE static void http_close(struct mog_fd *mfd) * their connection to save ourselves bandwidth/cycles */ tcp_push(mfd, false); + mog_packaddr_free(&http->mpa); mog_fd_put(mfd); } +void mog_http_unlink_ftmp(struct mog_http *http) +{ + struct mog_file *file = &http->forward->as.file; + + if (!file->tmppath) + return; + + if (mog_unlink(http->svc, file->tmppath) != 0) + syslog(LOG_ERR, "Failed to unlink %s (in %s): %m", + file->tmppath, http->svc->docroot); +} + +/* called if epoll/kevent is out-of-space */ +void mog_http_drop(struct mog_fd *mfd) +{ + struct mog_http *http = &mfd->as.http; + + assert(http->forward != MOG_IOSTAT); + if (http->forward) { + mog_http_unlink_ftmp(http); + mog_file_close(http->forward); + } + http_close(mfd); +} + /* returns true if we can continue queue step, false if not */ static enum mog_next http_wbuf_in_progress(struct mog_http *http) { @@ -99,10 +128,10 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http) case MOG_WRSTATE_ERR: return MOG_NEXT_CLOSE; case MOG_WRSTATE_DONE: - if (!http->persistent) return MOG_NEXT_CLOSE; + if (!http->_p.persistent) return MOG_NEXT_CLOSE; if (http->forward == NULL) mog_http_reset(http); - assert(http->offset == 0 && "bad offset"); + assert(http->_p.buf_off == 0 && "bad offset"); return MOG_NEXT_ACTIVE; case MOG_WRSTATE_BUSY: /* unlikely, we never put anything big in wbuf */ @@ -114,7 +143,7 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http) static enum mog_next http_forward_in_progress(struct mog_fd *mfd) { - enum mog_http_method method = mfd->as.http.http_method; + enum mog_http_method method = mfd->as.http._p.http_method; if (method == MOG_HTTP_METHOD_GET) return mog_http_get_in_progress(mfd); @@ -130,7 +159,7 @@ static enum mog_next http_queue_step(struct mog_fd *mfd) struct mog_rbuf *rbuf; char *buf; ssize_t r; - off_t off; + uint32_t off; size_t buf_len = 0; enum mog_parser_state state; @@ -142,20 +171,23 @@ static enum mog_next http_queue_step(struct mog_fd *mfd) /* we may have pipelined data in http->rbuf */ rbuf = http->rbuf ? http->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE); buf = rbuf->rptr; - off = http->offset; - assert(off >= 0 && "offset is negative"); + off = http->_p.buf_off; assert(off < rbuf->rcapa && "offset is too big"); if (http->rbuf) { /* request got pipelined, resuming now */ buf_len = http->rbuf->rsize; - assert(http->offset <= buf_len && "bad offset from pipelining"); + assert(http->_p.buf_off <= buf_len + && "bad offset from pipelining"); assert(buf_len <= http->rbuf->rcapa && "bad rsize stashed"); - if (http->offset < buf_len) + if (http->_p.buf_off < buf_len) goto parse; } reread: r = read(mfd->fd, buf + off, rbuf->rcapa - off); if (r > 0) { + if (off == 0) + TRACE(CMOGSTORED_HTTP_REQ_BEGIN(false)); + buf_len = r + off; parse: state = mog_http_parse(http, buf, buf_len); @@ -166,17 +198,17 @@ parse: case MOG_PARSER_CONTINUE: assert(http->wbuf == NULL && "tried to write (and failed) with partial req"); - if (http->offset >= rbuf->rcapa) { + if (http->_p.buf_off >= rbuf->rcapa) { rbuf->rsize = buf_len; http->rbuf = rbuf = mog_rbuf_grow(rbuf); if (!rbuf) goto err400; buf = rbuf->rptr; } - off = http->offset; + off = http->_p.buf_off; goto reread; case MOG_PARSER_DONE: - http_process_client(http, buf, buf_len); + http_process_client(mfd, buf, buf_len); if (http->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; if (http->wbuf) { @@ -185,15 +217,20 @@ parse: } else if (http->forward) { http_defer_rbuf(http, rbuf, buf_len); return http_forward_in_progress(mfd); - } else if (!http->persistent) { + } else if (!http->_p.persistent) { return MOG_NEXT_CLOSE; } else { + /* pipelined request */ + if (buf_len) + TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true)); + http_defer_rbuf(http, rbuf, buf_len); mog_http_reset(http); } return MOG_NEXT_ACTIVE; } } else if (r == 0) { /* client shut down */ + TRACE(CMOGSTORED_HTTP_RDCLOSE(buf_len)); return MOG_NEXT_CLOSE; } else { switch (errno) { @@ -207,8 +244,11 @@ parse: case EINTR: goto reread; case ECONNRESET: case ENOTCONN: + /* these errors are too common to log, normally */ + TRACE(CMOGSTORED_HTTP_RDERR(buf_len, errno)); return MOG_NEXT_CLOSE; default: + TRACE(CMOGSTORED_HTTP_RDERR(buf_len, errno)); syslog(LOG_NOTICE, "http client died: %m"); return MOG_NEXT_CLOSE; } @@ -262,24 +302,49 @@ void mog_http_quit_step(struct mog_fd *mfd) } } -/* called immediately after accept(), this initializes the mfd (once) */ -void mog_http_post_accept(int fd, struct mog_svc *svc) +/* stringify the address for tracers */ +static MOG_NOINLINE void +trace_http_accepted(struct mog_fd *mfd) +{ +#ifdef HAVE_SYSTEMTAP + struct mog_packaddr *mpa = &mfd->as.http.mpa; + struct mog_ni ni; + + mog_nameinfo(mpa, &ni); + TRACE(CMOGSTORED_HTTP_ACCEPTED(mfd->fd, ni.ni_host, ni.ni_serv)); +#endif /* !HAVE_SYSTEMTAP */ +} + +static void http_post_accept_common(struct mog_fd *mfd, struct mog_svc *svc, + union mog_sockaddr *msa, socklen_t salen) { - struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_HTTP); struct mog_http *http = &mfd->as.http; mog_http_init(http, svc); + mog_packaddr_init(&http->mpa, msa, salen); + + if (TRACE_ENABLED(CMOGSTORED_HTTP_ACCEPTED)) + trace_http_accepted(mfd); + mog_idleq_add(svc->queue, mfd, MOG_QEV_RD); } /* called immediately after accept(), this initializes the mfd (once) */ -void mog_httpget_post_accept(int fd, struct mog_svc *svc) +void mog_http_post_accept(int fd, struct mog_svc *svc, + union mog_sockaddr *msa, socklen_t salen) +{ + struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_HTTP); + + http_post_accept_common(mfd, svc, msa, salen); +} + +/* called immediately after accept(), this initializes the mfd (once) */ +void mog_httpget_post_accept(int fd, struct mog_svc *svc, + union mog_sockaddr *msa, socklen_t salen) { struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_HTTPGET); - struct mog_http *http = &mfd->as.http; - mog_http_init(http, svc); - mog_idleq_add(svc->queue, mfd, MOG_QEV_RD); + http_post_accept_common(mfd, svc, msa, salen); } /* @@ -288,15 +353,16 @@ void mog_httpget_post_accept(int fd, struct mog_svc *svc) */ char *mog_http_path(struct mog_http *http, char *buf) { - char *path = buf + http->path_tip; - size_t len = http->path_end - http->path_tip; + char *path = buf + http->_p.path_tip; + size_t len = http->_p.path_end - http->_p.path_tip; - assert(http->path_end > http->path_tip && "bad HTTP path from parser"); + assert(http->_p.path_end > http->_p.path_tip + && "bad HTTP path from parser"); if (! mog_valid_path(path, len)) return NULL; - if (http->http_method == MOG_HTTP_METHOD_PUT) { + if (http->_p.http_method == MOG_HTTP_METHOD_PUT) { if (!mog_valid_put_path(path, len)) { errno = EINVAL; return NULL; @@ -330,12 +396,12 @@ mog_http_resp0( dst = CPY("\r\nDate: "); now = mog_now(); dst = mempcpy(dst, now->httpdate, sizeof(now->httpdate)-1); - if (alive && http->persistent) { + if (alive && http->_p.persistent) { dst = CPY("\r\nContent-Length: 0" "\r\nContent-Type: text/plain" "\r\nConnection: keep-alive\r\n\r\n"); } else { - http->persistent = 0; + http->_p.persistent = 0; dst = CPY("\r\nContent-Length: 0" "\r\nContent-Type: text/plain" "\r\nConnection: close\r\n\r\n"); diff --git a/http_common.rl b/http_common.rl index 8df5fed..a0fdd5a 100644 --- a/http_common.rl +++ b/http_common.rl @@ -6,29 +6,35 @@ machine http_common; LWS = (' ' | '\t'); - LF = '\n' > { http->line_end = to_u16(fpc - buf); }; + LF = '\n' > { http->_p.line_end = to_u16(fpc - buf); }; eor = LWS*'\r'LF; CTL = (cntrl | 127); header_name = [a-zA-Z0-9\-]+; header_value = (any -- (LWS|CTL))(any -- CTL)*; sep = (LWS*)|(eor LWS+); - b64_val = ([a-zA-Z0-9/+]{22}) > { http->tmp_tip = to_u16(fpc - buf); } - "==" - eor > { + b64_val = ([a-zA-Z0-9/+]{22}) > { + http->_p.tmp_tip = to_u16(fpc - buf); + } + "==" + eor > { uint16_t tmp_end = to_u16(fpc - buf); - char *in = buf + http->tmp_tip; - size_t inlen = tmp_end - http->tmp_tip; + char *in = buf + http->_p.tmp_tip; + size_t inlen = tmp_end - http->_p.tmp_tip; char *out = (char *)http->expect_md5; size_t outlen = sizeof(http->expect_md5); bool rc; + /* + * Ragel already validated the allowable bytes, + * so base64_decode_ctx must succeed: + */ rc = base64_decode_ctx(NULL, in, inlen, out, &outlen); assert(rc == true && outlen == 16 && "base64_decoder broke for HTTP"); - http->has_expect_md5 = 1; - }; - content_md5 = "Content-MD5:"i sep ( b64_val ) $! { - if (!http->has_expect_md5) { + http->_p.has_md5 = 1; + }; + content_md5 = "Content-MD5:"i sep ( b64_val ) $! { + if (!http->_p.has_md5) { errno = EINVAL; fbreak; } @@ -38,10 +44,10 @@ }; trailer_line = ( content_md5 ) $! { - if (http->line_end > 0) { - assert(buf[http->line_end] == '\n' - && "bad http->line_end"); - p = buf + http->line_end + 1; + if (http->_p.line_end > 0) { + assert(buf[http->_p.line_end] == '\n' + && "bad http->_p.line_end"); + p = buf + http->_p.line_end + 1; } else { p = buf; } @@ -49,8 +55,8 @@ fgoto ignored_trailer; }; trailers = trailer_line* '\r''\n' > { - http->chunk_state = MOG_CHUNK_STATE_DONE; - http->line_end = to_u16(fpc - buf); + http->_p.chunk_state = MOG_CHUNK_STATE_DONE; + http->_p.line_end = to_u16(fpc - buf); really_done = 1; fbreak; }; @@ -8,12 +8,13 @@ #include "cmogstored.h" #include "http.h" -void mog_http_delete(struct mog_http *http, char *buf) +void mog_http_delete(struct mog_fd *mfd, char *buf) { + struct mog_http *http = &mfd->as.http; int rc; char *path; - if (mog_fd_of(http)->fd_type == MOG_FD_TYPE_HTTPGET) { + if (mfd->fd_type == MOG_FD_TYPE_HTTPGET) { mog_http_resp(http, "405 Method Not Allowed", true); return; } @@ -45,11 +46,12 @@ forbidden: } while(0)); } -void mog_http_mkcol(struct mog_http *http, char *buf) +void mog_http_mkcol(struct mog_fd *mfd, char *buf) { + struct mog_http *http = &mfd->as.http; char *path; - if (mog_fd_of(http)->fd_type == MOG_FD_TYPE_HTTPGET) { + if (mfd->fd_type == MOG_FD_TYPE_HTTPGET) { mog_http_resp(http, "405 Method Not Allowed", true); return; } @@ -60,42 +60,43 @@ static off_t http_get_resp_hdr(struct mog_http *http, struct stat *sb) mog_http_date(modified, MOG_HTTPDATE_CAPA, &sb->st_mtime); /* validate ranges */ - if (http->has_range) { + if (http->_p.has_range) { long long offset; - if (http->range_end < 0 && http->range_beg < 0) + if (http->_p.range_end < 0 && http->_p.range_beg < 0) goto bad_range; - if (http->range_beg >= sb->st_size) + if (http->_p.range_beg >= sb->st_size) goto bad_range; /* bytes=M-N where M > N */ - if (http->range_beg >= 0 && http->range_end >= 0 - && http->range_beg > http->range_end) + if (http->_p.range_beg >= 0 && http->_p.range_end >= 0 + && http->_p.range_beg > http->_p.range_end) goto bad_range; - if (http->range_end < 0) { /* bytes=M- */ + if (http->_p.range_end < 0) { /* bytes=M- */ /* bytes starting at M until EOF */ - assert(http->range_beg >= 0 && "should've sent 416"); - offset = (long long)http->range_beg; + assert(http->_p.range_beg >= 0 && "should've sent 416"); + offset = (long long)http->_p.range_beg; count = (long long)(sb->st_size - offset); - } else if (http->range_beg < 0) { /* bytes=-N */ + } else if (http->_p.range_beg < 0) { /* bytes=-N */ /* last N bytes */ - assert(http->range_end >= 0 && "should've sent 416"); - offset = (long long)(sb->st_size - http->range_end); + assert(http->_p.range_end >= 0 && "should've sent 416"); + offset = (long long)(sb->st_size - http->_p.range_end); /* serve the entire file if client requested too much */ if (offset < 0) goto resp_200; count = (long long)(sb->st_size - offset); } else { /* bytes=M-N*/ - assert(http->range_beg >= 0 && http->range_end >= 0 + assert(http->_p.range_beg >= 0 + && http->_p.range_end >= 0 && "should've sent 416"); - offset = (long long)http->range_beg; + offset = (long long)http->_p.range_beg; /* truncate responses to current file size */ - if (http->range_end >= sb->st_size) - http->range_end = sb->st_size - 1; - count = (long long)http->range_end + 1 - offset; + if (http->_p.range_end >= sb->st_size) + http->_p.range_end = sb->st_size - 1; + count = (long long)http->_p.range_end + 1 - offset; } assert(count > 0 && "bad count for 206 response"); @@ -123,7 +124,7 @@ static off_t http_get_resp_hdr(struct mog_http *http, struct stat *sb) count, /* Content-Length */ offset, offset + count - 1, /* bytes M-N */ (long long)sb->st_size, - http->persistent ? "keep-alive" : "close"); + http->_p.persistent ? "keep-alive" : "close"); } else { resp_200: count = (long long)sb->st_size; @@ -140,7 +141,7 @@ resp_200: now->httpdate, modified, count, - http->persistent ? "keep-alive" : "close"); + http->_p.persistent ? "keep-alive" : "close"); } /* TODO: put down the crack pipe and refactor this */ @@ -151,7 +152,7 @@ bad_range: mog_file_close(http->forward); http->forward = NULL; } else { - assert(http->http_method == MOG_HTTP_METHOD_HEAD + assert(http->_p.http_method == MOG_HTTP_METHOD_HEAD && "not HTTP HEAD"); } rc = snprintf(buf, len, @@ -165,14 +166,14 @@ bad_range: "\r\n", now->httpdate, (long long)sb->st_size, - http->persistent ? "keep-alive" : "close"); + http->_p.persistent ? "keep-alive" : "close"); } assert(rc > 0 && rc < len && "we suck at snprintf"); len = (size_t)rc; assert(http->wbuf == NULL && "tried to write to a busy client"); - if (http->http_method == MOG_HTTP_METHOD_HEAD) + if (http->_p.http_method == MOG_HTTP_METHOD_HEAD) count = 0; http->wbuf = mog_trysend(mog_fd_of(http)->fd, buf, len, (off_t)count); @@ -180,8 +181,9 @@ bad_range: return (off_t)count; } -void mog_http_get_open(struct mog_http *http, char *buf) +void mog_http_get_open(struct mog_fd *mfd, char *buf) { + struct mog_http *http = &mfd->as.http; struct stat sb; struct mog_file *file = NULL; char *path = mog_http_path(http, buf); @@ -194,7 +196,7 @@ void mog_http_get_open(struct mog_http *http, char *buf) if (path[1] == '\0') { /* keep "mogadm check" happy */ sb.st_mtime = 0; sb.st_size = 0; - } else if (http->http_method == MOG_HTTP_METHOD_HEAD) { + } else if (http->_p.http_method == MOG_HTTP_METHOD_HEAD) { if (mog_stat(http->svc, path, &sb) < 0) goto err; if (!S_ISREG(sb.st_mode)) goto forbidden; } else { @@ -272,20 +274,20 @@ retry: case_EAGAIN: return MOG_NEXT_WAIT_WR; case EINTR: goto retry; } - http->persistent = 0; + http->_p.persistent = 0; } else { /* w == 0 */ /* * if we can't fulfill the value set by our Content-Length: * header, we must kill the TCP connection */ - http->persistent = 0; + http->_p.persistent = 0; syslog(LOG_ERR, "sendfile()-d 0 bytes at offset=%lld; file truncated?", (long long)file->foff); } done: mog_file_close(http->forward); - if (http->persistent) { + if (http->_p.persistent) { mog_http_reset(http); return MOG_NEXT_ACTIVE; } diff --git a/http_parser.rl b/http_parser.rl index d84d0ce..0622d62 100644 --- a/http_parser.rl +++ b/http_parser.rl @@ -24,75 +24,75 @@ static bool length_incr(off_t *len, unsigned c) %%{ machine http_parser; include http_common "http_common.rl"; + include path_parser "path_parser.rl"; ignored_header := header_name ':' sep header_value eor @ { fgoto more_headers; }; - mog_path = '/'[a-zA-Z0-9/\.\-]{0,36}; # only stuff MogileFS will use - GET = "GET "> { http->http_method = MOG_HTTP_METHOD_GET; }; - HEAD = "HEAD "> { http->http_method = MOG_HTTP_METHOD_HEAD; }; - PUT = "PUT "> { http->http_method = MOG_HTTP_METHOD_PUT; }; - DELETE = "DELETE "> { http->http_method = MOG_HTTP_METHOD_DELETE; }; - MKCOL = "MKCOL "> { http->http_method = MOG_HTTP_METHOD_MKCOL; }; + GET = "GET "> { http->_p.http_method = MOG_HTTP_METHOD_GET; }; + HEAD = "HEAD "> { http->_p.http_method = MOG_HTTP_METHOD_HEAD; }; + PUT = "PUT "> { http->_p.http_method = MOG_HTTP_METHOD_PUT; }; + DELETE = "DELETE "> { http->_p.http_method = MOG_HTTP_METHOD_DELETE; }; + MKCOL = "MKCOL "> { http->_p.http_method = MOG_HTTP_METHOD_MKCOL; }; # no HTTP/0.9 for now, sorry (not :P) req_line = (HEAD|GET|PUT|DELETE|MKCOL) ("http://" [^/]+)? - '/'*(mog_path) > { http->path_tip = to_u8(fpc - buf); } + '/'*(mog_path) > { http->_p.path_tip = to_u8(fpc - buf); } # TODO: maybe folks use query string/fragments for logging... - (" HTTP/1.") > { http->path_end = to_u8(fpc - buf); } - ('0'|'1'> { http->persistent = 1; }) '\r'LF; + (" HTTP/1.") > { http->_p.path_end = to_u8(fpc - buf); } + ('0'|'1'> { http->_p.persistent = 1; }) '\r'LF; content_length = "Content-Length:"i sep (digit+) $ { - if (!length_incr(&http->content_len, fc)) + if (!length_incr(&http->_p.content_len, fc)) fbreak; } $! { errno = EINVAL; fbreak; } eor; content_range = "Content-Range:"i sep "bytes"LWS+ (digit+) $ { - if (!length_incr(&http->range_beg, fc)) + if (!length_incr(&http->_p.range_beg, fc)) fbreak; } $! { errno = EINVAL; fbreak; } "-" (digit+) $ { - if (!length_incr(&http->range_end, fc)) + if (!length_incr(&http->_p.range_end, fc)) fbreak; } $! { errno = EINVAL; fbreak; } "/*" - eor > { http->has_content_range = 1; }; + eor > { http->_p.has_content_range = 1; }; range = "Range:"i sep ( "bytes=" > { - http->range_beg = http->range_end = -1; + http->_p.range_beg = http->_p.range_end = -1; } (digit*) $ { - if (http->range_beg < 0) - http->range_beg = 0; - if (!length_incr(&http->range_beg, fc)) + if (http->_p.range_beg < 0) + http->_p.range_beg = 0; + if (!length_incr(&http->_p.range_beg, fc)) fbreak; } '-' (digit*) $ { - if (http->range_end < 0) - http->range_end = 0; - if (!length_incr(&http->range_end, fc)) + if (http->_p.range_end < 0) + http->_p.range_end = 0; + if (!length_incr(&http->_p.range_end, fc)) fbreak; } ) $! { errno = EINVAL; fbreak; } - eor @ { http->has_range = 1; }; + eor @ { http->_p.has_range = 1; }; transfer_encoding_chunked = "Transfer-Encoding:"i sep - "chunked"i eor > { http->chunked = 1; }; + "chunked"i eor > { http->_p.chunked = 1; }; trailer = "Trailer:"i sep - (("Content-MD5"i @ { http->has_trailer_md5 = 1; }) + (("Content-MD5"i @ { http->_p.has_md5 = 1; }) | header_name | ',') eor; connection = "Connection:"i sep - (("close"i @ { http->persistent = 0; }) | - ("keep-alive"i @ { http->persistent = 1; })) eor; + (("close"i @ { http->_p.persistent = 0; }) | + ("keep-alive"i @ { http->_p.persistent = 1; })) eor; header_line = ( content_length | transfer_encoding_chunked | @@ -102,11 +102,11 @@ static bool length_incr(off_t *len, unsigned c) content_md5 | connection ) $! { - assert(http->line_end > 0 && + assert(http->_p.line_end > 0 && "no previous request/header line"); - assert(buf[http->line_end] == '\n' && - "bad http->line_end"); - p = buf + http->line_end + 1; + assert(buf[http->_p.line_end] == '\n' && + "bad http->_p.line_end"); + p = buf + http->_p.line_end + 1; assert(p <= pe && "overflow"); fgoto ignored_header; }; @@ -120,21 +120,21 @@ static bool length_incr(off_t *len, unsigned c) void mog_http_reset_parser(struct mog_http *http) { int cs; - struct mog_rbuf *rbuf = http->rbuf; - struct mog_svc *svc = http->svc; %% write init; - memset(http, 0, sizeof(struct mog_http)); http->cs = cs; - http->rbuf = rbuf; - http->svc = svc; + memset(&http->_p, 0, sizeof(http->_p)); + + /* these should probably be in mog_http_init */ + http->forward = NULL; + http->wbuf = NULL; } void mog_http_init(struct mog_http *http, struct mog_svc *svc) { - http->svc = svc; - http->rbuf = NULL; mog_http_reset_parser(http); + http->rbuf = NULL; + http->svc = svc; } enum mog_parser_state @@ -143,7 +143,8 @@ mog_http_parse(struct mog_http *http, char *buf, size_t len) char *p, *pe, *eof = NULL; int cs = http->cs; int really_done = 0; - size_t off = http->offset; + size_t off = http->_p.buf_off; + uint32_t *mog_devid = &http->_p.mog_devid; assert(http->wbuf == NULL && "unwritten data in buffer"); assert(off <= len && "http offset past end of buffer"); @@ -161,13 +162,13 @@ mog_http_parse(struct mog_http *http, char *buf, size_t len) cs = http_parser_first_final; http->cs = cs; - http->offset = p - buf; + http->_p.buf_off = p - buf; if (cs == http_parser_error || errno) return MOG_PARSER_ERROR; assert(p <= pe && "buffer overflow after http parse"); - assert(http->offset <= len && "offset longer than len"); + assert(http->_p.buf_off <= len && "offset longer than len"); if (http->cs == http_parser_first_final) return MOG_PARSER_DONE; return MOG_PARSER_CONTINUE; @@ -14,18 +14,9 @@ static __thread struct { static void file_close_null(struct mog_http *http) { - struct mog_file *file; - if (http->forward == NULL) return; - - file = &http->forward->as.file; - - if (file->tmppath) { - if (mog_unlink(http->svc, file->tmppath) != 0) - syslog(LOG_ERR, "Failed to unlink %s (in %s): %m", - file->tmppath, http->svc->docroot); - } + mog_http_unlink_ftmp(http); mog_file_close(http->forward); http->forward = NULL; } @@ -173,7 +164,7 @@ static enum mog_next http_put_commit(struct mog_http *http) if (http->wbuf && http->wbuf != MOG_WR_ERROR) return MOG_NEXT_WAIT_WR; - if (!http->persistent || http->wbuf == MOG_WR_ERROR) + if (!http->_p.persistent || http->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; mog_http_reset(http); return MOG_NEXT_ACTIVE; @@ -182,65 +173,65 @@ static enum mog_next http_put_commit(struct mog_http *http) static void stash_advance_rbuf(struct mog_http *http, char *buf, size_t buf_len) { struct mog_rbuf *rbuf = http->rbuf; - size_t end = http->line_end + 1; + size_t end = http->_p.line_end + 1; - if (http->line_end == 0 || buf_len <= end) { - http->offset = 0; + if (http->_p.line_end == 0 || buf_len <= end) { + http->_p.buf_off = 0; mog_rbuf_free_and_null(&http->rbuf); return; } - assert(buf[http->line_end] == '\n' && "line_end is not LF"); + assert(buf[http->_p.line_end] == '\n' && "line_end is not LF"); assert(buf_len <= MOG_RBUF_MAX_SIZE && "bad rbuf size"); - assert(end <= http->offset && "invalid line end"); + assert(end <= http->_p.buf_off && "invalid line end"); if (rbuf == NULL) http->rbuf = rbuf = mog_rbuf_new(MOG_RBUF_BASE_SIZE); memmove(rbuf->rptr, buf + end, buf_len - end); rbuf->rsize = buf_len - end; - http->offset -= end; - if (http->tmp_tip >= end) - http->tmp_tip -= end; - http->line_end = 0; + http->_p.buf_off -= end; + if (http->_p.tmp_tip >= end) + http->_p.tmp_tip -= end; + http->_p.line_end = 0; } static void chunked_body_after_header(struct mog_http *http, char *buf, size_t buf_len) { - size_t tmpoff = http->offset; + size_t tmpoff = http->_p.buf_off; mog_chunk_init(http); - http->offset = tmpoff; + http->_p.buf_off = tmpoff; switch (mog_chunk_parse(http, buf, buf_len)) { case MOG_PARSER_ERROR: (void)write_err(http, "400 Bad Request"); return; case MOG_PARSER_CONTINUE: - assert(http->chunk_state != MOG_CHUNK_STATE_DONE); + assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE); /* fall through */ case MOG_PARSER_DONE: - switch (http->chunk_state) { + switch (http->_p.chunk_state) { case MOG_CHUNK_STATE_SIZE: - assert(http->offset == buf_len + assert(http->_p.buf_off == buf_len && "HTTP chunk parser didn't finish size"); return; case MOG_CHUNK_STATE_DATA: - assert(http->offset == buf_len + assert(http->_p.buf_off == buf_len && "HTTP chunk parser didn't finish data"); return; case MOG_CHUNK_STATE_TRAILER: - assert(http->offset > 0 && - "http->offset unset while in trailer"); + assert(http->_p.buf_off > 0 && + "http->_p.buf_off unset while in trailer"); stash_advance_rbuf(http, buf, buf_len); - http->skip_rbuf_defer = 1; + http->_p.skip_rbuf_defer = 1; return; case MOG_CHUNK_STATE_DONE: put_commit_resp(http); - assert(http->offset > 0 && - "http->offset unset after chunk body done"); + assert(http->_p.buf_off > 0 && + "http->_p.buf_off unset after chunk body done"); stash_advance_rbuf(http, buf, buf_len); - http->skip_rbuf_defer = 1; + http->_p.skip_rbuf_defer = 1; } } } @@ -248,45 +239,46 @@ chunked_body_after_header(struct mog_http *http, char *buf, size_t buf_len) static void identity_body_after_header(struct mog_http *http, char *buf, size_t buf_len) { - size_t body_len = buf_len - http->offset; - char *body_ptr = buf + http->offset; + size_t body_len = buf_len - http->_p.buf_off; + char *body_ptr = buf + http->_p.buf_off; - if (http->content_len < body_len) - body_len = http->content_len; + if (http->_p.content_len < body_len) + body_len = http->_p.content_len; if (body_len == 0) return; - http->offset += body_len; + http->_p.buf_off += body_len; if (!mog_http_write_full(http->forward, body_ptr, body_len)) (void)write_err(http, NULL); } static bool lengths_ok(struct mog_http *http) { - if (http->content_len < 0) + if (http->_p.content_len < 0) return false; /* ERANGE */ - if (http->has_content_range) { - if (http->chunked) + if (http->_p.has_content_range) { + if (http->_p.chunked) return false; - if (http->range_end < 0 || http->range_beg < 0) + if (http->_p.range_end < 0 || http->_p.range_beg < 0) return false; /* ERANGE */ - assert(http->range_end >= 0 && http->range_beg >= 0 && + assert(http->_p.range_end >= 0 && http->_p.range_beg >= 0 && "bad range, http_parser.rl broken"); /* can't end after we start */ - if (http->range_end < http->range_beg) + if (http->_p.range_end < http->_p.range_beg) return false; /* * Content-Length should match Content-Range boundaries * WARNING: Eric Wong sucks at arithmetic, check this: */ - if (http->content_len >= 0) { - off_t expect = http->range_end - http->range_beg + 1; + if (http->_p.content_len >= 0) { + off_t expect = http->_p.range_end - + http->_p.range_beg + 1; - if (http->content_len != expect) + if (http->_p.content_len != expect) return false; } } @@ -324,7 +316,7 @@ static struct mog_file * open_put(struct mog_http *http, char *path) * we can't do an atomic rename(2) on successful PUT * if we have a partial upload */ - if (http->has_content_range) { + if (http->_p.has_content_range) { http->forward = mog_file_open_put(http->svc, path, O_CREAT); if (http->forward == NULL) return NULL; @@ -354,20 +346,21 @@ static struct mog_file * open_put(struct mog_http *http, char *path) file->path = xstrdup(path); assert(file->foff == 0 && "file->foff should be zero"); - if (http->has_content_range) - file->foff = http->range_beg; - if (http->has_trailer_md5 || http->has_expect_md5) + if (http->_p.has_content_range) + file->foff = http->_p.range_beg; + if (http->_p.has_md5) mog_digest_init(&file->digest, GC_MD5); return file; } -void mog_http_put(struct mog_http *http, char *buf, size_t buf_len) +void mog_http_put(struct mog_fd *mfd, char *buf, size_t buf_len) { + struct mog_http *http = &mfd->as.http; char *path; struct mog_file *file; - if (mog_fd_of(http)->fd_type == MOG_FD_TYPE_HTTPGET) { + if (mfd->fd_type == MOG_FD_TYPE_HTTPGET) { mog_http_resp(http, "405 Method Not Allowed", false); return; } @@ -387,12 +380,12 @@ void mog_http_put(struct mog_http *http, char *buf, size_t buf_len) if (file == NULL) goto err; - if (buf_len == http->offset) { + if (buf_len == http->_p.buf_off) { /* we got the HTTP header in one read() */ - if (http->chunked) { + if (http->_p.chunked) { mog_rbuf_free_and_null(&http->rbuf); mog_chunk_init(http); - http->offset = buf_len; + http->_p.buf_off = buf_len; } return; } @@ -400,10 +393,10 @@ void mog_http_put(struct mog_http *http, char *buf, size_t buf_len) * otherwise we got part of the request body with the header, * write partially read body */ - assert(buf_len > http->offset && http->offset > 0 - && "http->offset is wrong"); + assert(buf_len > http->_p.buf_off && http->_p.buf_off > 0 + && "http->_p.buf_off is wrong"); - if (http->chunked) + if (http->_p.chunked) chunked_body_after_header(http, buf, buf_len); else identity_body_after_header(http, buf, buf_len); @@ -440,41 +433,14 @@ static unsigned last_data_recv(int fd) MOG_NOINLINE static void read_err_dbg(struct mog_fd *mfd, ssize_t r) { - union mog_sockaddr any; - char addrbuf[MOG_NI_MAXHOST]; - char portbuf[MOG_NI_MAXSERV]; - const char *addr; - static const char no_ip[] = "unconnected"; + int save_errno = errno; + struct mog_ni ni; const char *path = "(unknown)"; long long bytes = -1; const char *errfmt; - unsigned last; + unsigned last = last_data_recv(mfd->fd); - PRESERVE_ERRNO(last = last_data_recv(mfd->fd)); - - portbuf[0] = 0; - PRESERVE_ERRNO(do { - socklen_t len = (socklen_t)sizeof(any); - socklen_t addrlen = (socklen_t)sizeof(addrbuf); - socklen_t portlen = (socklen_t)(sizeof(portbuf)); - int rc = getpeername(mfd->fd, &any.sa, &len); - - if (rc == 0) { - rc = getnameinfo(&any.sa, len, addrbuf, addrlen, - portbuf + 1, portlen - 1, - NI_NUMERICHOST|NI_NUMERICSERV); - if (rc == 0) { - addr = addrbuf; - portbuf[0] = ':'; - } else { - addr = gai_strerror(rc); - } - } else { - syslog(LOG_ERR, "getpeername() failed for fd=%d: %m", - mfd->fd); - addr = no_ip; - } - } while (0)); + mog_nameinfo(&mfd->as.http.mpa, &ni); if (mfd->as.http.forward) { path = mfd->as.http.forward->as.file.path; @@ -484,11 +450,12 @@ MOG_NOINLINE static void read_err_dbg(struct mog_fd *mfd, ssize_t r) #define PFX "PUT %s failed from %s%s after %lld bytes: " errfmt = (r == 0) ? PFX"premature EOF" : PFX"%m"; #undef PFX - syslog(LOG_ERR, errfmt, path, addr, portbuf, bytes); + errno = save_errno; + syslog(LOG_ERR, errfmt, path, ni.ni_host, ni.ni_serv, bytes); if (last != (unsigned)-1) syslog(LOG_ERR, "last_data_recv=%ums from %s%s for PUT %s", - last, addr, portbuf, path); + last, ni.ni_host, ni.ni_serv, path); } static enum mog_next identity_put_in_progress(struct mog_fd *mfd) @@ -502,9 +469,9 @@ static enum mog_next identity_put_in_progress(struct mog_fd *mfd) assert(http->wbuf == NULL && "can't receive file with http->wbuf"); assert(http->forward && http->forward != MOG_IOSTAT && "bad forward"); - need = http->content_len - http->forward->as.file.foff; - if (http->has_content_range) - need += http->range_beg; + need = http->_p.content_len - http->forward->as.file.foff; + if (http->_p.has_content_range) + need += http->_p.range_beg; if (need == 0) return http_put_commit(http); @@ -549,19 +516,19 @@ again: assert(http->wbuf == NULL && "can't receive file with http->wbuf"); assert(http->forward && http->forward != MOG_IOSTAT && "bad forward"); - switch (http->chunk_state) { + switch (http->_p.chunk_state) { case MOG_CHUNK_STATE_DATA: assert(http->rbuf == NULL && "unexpected http->rbuf"); - if (http->content_len == 0) { /* final chunk */ - http->chunk_state = MOG_CHUNK_STATE_TRAILER; - http->offset = 0; + if (http->_p.content_len == 0) { /* final chunk */ + http->_p.chunk_state = MOG_CHUNK_STATE_TRAILER; + http->_p.buf_off = 0; goto chunk_state_trailer; } - assert(http->content_len > 0 && "bad chunk length"); + assert(http->_p.content_len > 0 && "bad chunk length"); /* read the chunk into memory */ buf = mog_fsbuf_get(&buf_len); - if (buf_len > http->content_len) - buf_len = http->content_len; + if (buf_len > http->_p.content_len) + buf_len = http->_p.content_len; do { r = read(mfd->fd, buf, buf_len); } while (r < 0 && errno == EINTR); @@ -571,10 +538,10 @@ again: if (!mog_http_write_full(http->forward, buf, r)) return write_err(http, NULL); - http->content_len -= r; + http->_p.content_len -= r; /* chunk is complete */ - if (http->content_len == 0) + if (http->_p.content_len == 0) mog_chunk_init(http); goto again; case MOG_CHUNK_STATE_TRAILER: @@ -610,15 +577,15 @@ chunk_state_trailer: case MOG_PARSER_ERROR: return write_err(http, "400 Bad Request"); case MOG_PARSER_CONTINUE: - assert(http->chunk_state != MOG_CHUNK_STATE_DONE); + assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE); case MOG_PARSER_DONE: - switch (http->chunk_state) { + switch (http->_p.chunk_state) { case MOG_CHUNK_STATE_SIZE: if (in_trailer) assert(0 && "bad chunk state: size"); /* client is trickling chunk size :< */ mog_rbuf_free_and_null(&http->rbuf); - http->offset = 0; + http->_p.buf_off = 0; goto again; case MOG_CHUNK_STATE_DATA: if (in_trailer) @@ -635,7 +602,7 @@ chunk_state_trailer: /* pipelined HTTP request after trailers! */ if (http->rbuf) assert(http->rbuf->rsize > 0 - && http->offset == 0 + && http->_p.buf_off == 0 && "bad rbuf"); return http_put_commit(http); } @@ -657,7 +624,7 @@ read_err: enum mog_next mog_http_put_in_progress(struct mog_fd *mfd) { - if (mfd->as.http.chunked) + if (mfd->as.http._p.chunked) return chunked_put_in_progress(mfd); return identity_put_in_progress(mfd); @@ -6,7 +6,7 @@ static Hash_table *listeners; /* yes, we'll scale to 10K listen sockets, L10K! */ struct listener { - union mog_sockaddr as; + union mog_sockaddr msa; socklen_t len; int fd; }; @@ -17,7 +17,7 @@ static bool listener_cmp(const void *a, const void *b) const struct listener *lb = b; return (la->len == lb->len) && - (memcmp(&la->as.sa, &lb->as.sa, lb->len) == 0); + (memcmp(&la->msa, &lb->msa, lb->len) == 0); } static size_t listener_hash(const void *x, size_t tablesize) @@ -27,7 +27,7 @@ static size_t listener_hash(const void *x, size_t tablesize) socklen_t i; for (i = 0; i < l->len; i++) - value = (value * 31 + l->as.bytes[i]) % tablesize; + value = (value * 31 + l->msa.bytes[i]) % tablesize; return value; } @@ -36,21 +36,23 @@ static void register_listen_fd(int fd) { struct listener tmp; struct listener *ins; - int flags = NI_NUMERICHOST | NI_NUMERICSERV; - char hbuf[MOG_NI_MAXHOST]; - char sbuf[MOG_NI_MAXSERV]; - int rc; + struct mog_ni ni; + struct mog_packaddr mpa; + struct sockaddr *sa = mog_sockaddr_sa(&tmp.msa); - tmp.len = (socklen_t)sizeof(tmp.as); - if (getsockname(fd, &tmp.as.sa, &tmp.len) != 0) + tmp.len = (socklen_t)sizeof(tmp.msa); + if (getsockname(fd, sa, &tmp.len) != 0) die_errno("getsockname(fd=%d) failed", fd); - rc = getnameinfo(&tmp.as.sa, tmp.len, - hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), flags); - if (rc != 0) - die("getnameinfo failed: %s", gai_strerror(rc)); + if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) + die("invalid address family=%d (not AF_INET/AF_INET6)", + (int)sa->sa_family); - syslog(LOG_INFO, "inherited %s:%s on fd=%d", hbuf, sbuf, fd); + mog_packaddr_init(&mpa, &tmp.msa, tmp.len); + + mog_nameinfo(&mpa, &ni); + + syslog(LOG_INFO, "inherited %s%s on fd=%d", ni.ni_host, ni.ni_serv, fd); ins = xmalloc(sizeof(*ins)); *ins = tmp; @@ -58,7 +60,8 @@ static void register_listen_fd(int fd) switch (hash_insert_if_absent(listeners, ins, NULL)) { case 0: - die("duplicate listener %s:%s on fd=%d", hbuf, sbuf, fd); + die("duplicate listener %s%s on fd=%d", + ni.ni_host, ni.ni_serv, fd); break; case 1: /* success */ return; @@ -107,7 +110,7 @@ int mog_inherit_get(struct sockaddr *addr, socklen_t len) return fd; l.len = len; - memcpy(&l.as.sa, addr, len); + memcpy(&l.msa.bytes, addr, len); in = hash_delete(listeners, &l); if (in) { @@ -132,8 +135,7 @@ void mog_inherit_init(void) return; listeners = hash_initialize(3, NULL, listener_hash, listener_cmp, free); - if (!listeners) - die("failed to initialize inherited listeners hash"); + mog_oom_if_null(listeners); atexit(listeners_cleanup); fds = xstrdup(orig); @@ -39,8 +39,7 @@ __attribute__((destructor)) static void iou_destructor(void) __attribute__((constructor)) static void iou_constructor(void) { dev_iou = hash_initialize(7, NULL, iou_hash, iou_cmp, free); - if (!dev_iou) - mog_oom(); + mog_oom_if_null(dev_iou); } static bool cleanup_begin_i(void *ent, void *unused) diff --git a/listen_parser.h b/listen_parser.h index 7555846..fcfa08a 100644 --- a/listen_parser.h +++ b/listen_parser.h @@ -2,4 +2,5 @@ * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> * License: GPLv3 or later (see COPYING for details) */ -struct mog_addrinfo * mog_listen_parse_internal(char *, size_t, char *, size_t); +struct mog_addrinfo * +mog_listen_parse_internal(char *, size_t, char *, size_t, sa_family_t); diff --git a/listen_parser.rl b/listen_parser.rl index f68a821..c77c706 100644 --- a/listen_parser.rl +++ b/listen_parser.rl @@ -10,7 +10,7 @@ main := listen '\0'> { a = mog_listen_parse_internal(mark_beg, mark_len, - port_beg, port_len); + port_beg, port_len, sa_family); }; }%% @@ -23,6 +23,7 @@ static struct mog_addrinfo *listen_parse(char *str) char *port_beg = NULL; size_t mark_len = 0; size_t port_len = 0; + sa_family_t sa_family = AF_INET; struct mog_addrinfo *a = NULL; int cs; diff --git a/listen_parser_common.rl b/listen_parser_common.rl index 17b1933..ffb260b 100644 --- a/listen_parser_common.rl +++ b/listen_parser_common.rl @@ -7,12 +7,20 @@ ipv4 = (digit+ '.' digit+ '.' digit+ '.' digit+) > { mark_beg = fpc; } - @ { mark_len = fpc - mark_beg + 1; }; + @ { + mark_len = fpc - mark_beg + 1; + sa_family = AF_INET; + }; + ipv6 = '[' + ((xdigit|':')+) + > { mark_beg = fpc; } + @ { mark_len = fpc - mark_beg + 1; } + ']' @ { sa_family = AF_INET6; }; port = (digit+) > { port_beg = fpc; } @ { port_len = fpc - port_beg + 1; }; - listen = (((ipv4)? ':')? port ) $! { - syslog(LOG_ERR, "bad character in IPv4 address: %c", fc); + listen = (((ipv4|ipv6)? ':')? port ) $! { + syslog(LOG_ERR, "bad character in IP address: %c", fc); }; }%% diff --git a/listen_parser_internal.c b/listen_parser_internal.c index e64ba46..0d18c2a 100644 --- a/listen_parser_internal.c +++ b/listen_parser_internal.c @@ -7,7 +7,8 @@ struct mog_addrinfo * mog_listen_parse_internal( - char *mark_beg, size_t mark_len, char *port_beg, size_t port_len) + char *mark_beg, size_t mark_len, char *port_beg, size_t port_len, + sa_family_t sa_family) { const char *node = NULL; struct addrinfo hints; @@ -16,7 +17,7 @@ mog_listen_parse_internal( int s; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_INET; + hints.ai_family = sa_family; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; hints.ai_protocol = IPPROTO_TCP; diff --git a/m4/ld_wrap.m4 b/m4/ld_wrap.m4 new file mode 100644 index 0000000..6b949e2 --- /dev/null +++ b/m4/ld_wrap.m4 @@ -0,0 +1,21 @@ +dnl check for --wrap support in ld (expected of GNU ld) +AC_DEFUN([CM_LD_WRAP],[ +AC_CACHE_CHECK([if gcc/ld supports -Wl,--wrap], +[cm_cv_ld_wrap], +[if test "$enable_shared" = no +then + cm_cv_ld_wrap="not needed, shared libraries are disabled" +else + cm_ldflags_save="$LDFLAGS" + LDFLAGS="-Wl,--wrap=free" + AC_LINK_IFELSE([AC_LANG_PROGRAM([ + #include <stdlib.h> + static void __wrap_free(void *ptr) { __real_free(ptr); } + ],[ free(NULL); ] + )], + [cm_cv_ld_wrap=yes], + [cm_cv_ld_wrap=no]) + LDFLAGS="$cm_ldflags_save" +fi]) + AM_CONDITIONAL([HAVE_LD_WRAP], test "x$cm_cv_ld_wrap" = "xyes") +]) diff --git a/m4/systemtap.m4 b/m4/systemtap.m4 new file mode 100644 index 0000000..dccb9b3 --- /dev/null +++ b/m4/systemtap.m4 @@ -0,0 +1,37 @@ +dnl systemtap support +dnl enable automatically if dtrace and sdt.h are available +AC_DEFUN([CM_SYSTEMTAP],[ +AC_CHECK_PROGS(DTRACE, dtrace) +AC_CHECK_HEADER([sys/sdt.h], [SDT_H_FOUND='yes'], [SDT_H_FOUND='no']) +AS_IF([test "x${DTRACE}" != x && test $SDT_H_FOUND = yes], + AC_CACHE_CHECK([for sys/sdt.h usability], + [cm_cv_sdt_h_usable], [ + AC_TRY_COMPILE([ + #include <sys/sdt.h> + void foo(void) { STAP_PROBE(foo, foo); } + ],[], + [cm_cv_sdt_h_usable=yes], + [cm_cv_sdt_h_usable=no])])) +AS_IF([test $cm_cv_sdt_h_usable = yes], + [ENABLE_SYSTEMTAP=yes], [ENABLE_SYSTEMTAP=no]) + +dnl support explicit --disable-systemtap or --enable-systemtap +AC_MSG_CHECKING([whether to include systemtap tracing support]) +AC_ARG_ENABLE([systemtap], + [AS_HELP_STRING([--enable-systemtap], + [Enable inclusion of systemtap trace support])], + [ENABLE_SYSTEMTAP="${enableval}"]) +AM_CONDITIONAL([ENABLE_SYSTEMTAP], [test x$ENABLE_SYSTEMTAP = xyes]) +AC_MSG_RESULT(${ENABLE_SYSTEMTAP}) + +dnl maybe somebody forced --enable-systemtap w/o dtrace or sdt.h +if test "x${ENABLE_SYSTEMTAP}" = xyes +then + AS_IF([test "x${DTRACE}" = x], [AC_MSG_ERROR([dtrace not found])]) + AS_IF([test $SDT_H_FOUND = no], + [ AC_MSG_ERROR([systemtap support needs sys/sdt.h header]) ]) + AS_IF([test $cm_cv_sdt_h_usable = no], + [ AC_MSG_ERROR([sys/sdt.h header is not usable (clang?)]) ]) + AC_DEFINE([HAVE_SYSTEMTAP], [1], [Define to 1 if using sdt probes.]) +fi +]) @@ -3,6 +3,7 @@ * License: GPLv3 or later (see COPYING for details) */ #include "cmogstored.h" +#include "trace.h" #include "mgmt.h" #include "digest.h" #include "ioprio.h" @@ -81,6 +82,16 @@ MOG_NOINLINE static void mgmt_close(struct mog_fd *mfd) mog_fd_put(mfd); } +/* called if epoll/kevent is out-of-space */ +void mog_mgmt_drop(struct mog_fd *mfd) +{ + struct mog_mgmt *mgmt = &mfd->as.mgmt; + + if (mgmt->forward && mgmt->forward != MOG_IOSTAT) + mog_file_close(mgmt->forward); + mgmt_close(mfd); +} + void mog_mgmt_writev(struct mog_mgmt *mgmt, struct iovec *iov, int iovcnt) { struct mog_fd *mfd = mog_fd_of(mgmt); @@ -121,10 +132,10 @@ static void mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) { struct mog_rbuf *old = mgmt->rbuf; - size_t defer_bytes = buf_len - mgmt->offset; - char *src = rbuf->rptr + mgmt->offset; + size_t defer_bytes = buf_len - mgmt->buf_off; + char *src = rbuf->rptr + mgmt->buf_off; - assert(mgmt->offset >= 0 && "mgmt->offset negative"); + assert(mgmt->buf_off >= 0 && "mgmt->buf_off negative"); assert(defer_bytes <= MOG_RBUF_BASE_SIZE && "defer bytes overflow"); if (defer_bytes == 0) { @@ -138,7 +149,7 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) memcpy(mgmt->rbuf->rptr, src, defer_bytes); mgmt->rbuf->rsize = defer_bytes; } - mgmt->offset = 0; + mgmt->buf_off = 0; } /* @@ -163,7 +174,7 @@ static enum mog_next mgmt_queue_step(struct mog_fd *mfd) /* we may have pipelined data in mgmt->rbuf */ rbuf = mgmt->rbuf ? mgmt->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE); buf = rbuf->rptr; - off = mgmt->offset; + off = mgmt->buf_off; assert(off >= 0 && "offset is negative"); assert(off < MOG_RBUF_BASE_SIZE && "offset is too big"); if (mgmt->rbuf && off == 0) { @@ -186,13 +197,13 @@ parse: case MOG_PARSER_CONTINUE: assert(mgmt->wbuf == NULL && "tried to write (and failed) with partial req"); - if (mgmt->offset == MOG_RBUF_BASE_SIZE) { + if (mgmt->buf_off == MOG_RBUF_BASE_SIZE) { assert(buf_len == MOG_RBUF_BASE_SIZE && "bad rbuf"); syslog(LOG_ERR, "mgmt request too large"); return MOG_NEXT_CLOSE; } - off = mgmt->offset; + off = mgmt->buf_off; goto reread; case MOG_PARSER_DONE: if (mgmt->forward == MOG_IOSTAT) @@ -205,6 +216,7 @@ parse: return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE; } } else if (r == 0) { /* client shut down */ + TRACE(CMOGSTORED_MGMT_RDCLOSE(mfd, buf_len)); return MOG_NEXT_CLOSE; } else { switch (errno) { @@ -215,8 +227,11 @@ parse: case EINTR: goto reread; case ECONNRESET: case ENOTCONN: + /* these errors are too common to log, normally */ + TRACE(CMOGSTORED_MGMT_RDERR(mfd, buf_len, errno)); return MOG_NEXT_CLOSE; default: + TRACE(CMOGSTORED_MGMT_RDERR(mfd, buf_len, errno)); syslog(LOG_NOTICE, "mgmt client died: %m"); return MOG_NEXT_CLOSE; } @@ -268,12 +283,30 @@ void mog_mgmt_quit_step(struct mog_fd *mfd) } } +/* stringify the address for tracers */ +static MOG_NOINLINE void +trace_mgmt_accepted( + struct mog_fd *mfd, union mog_sockaddr *msa, socklen_t salen) +{ +#ifdef HAVE_SYSTEMTAP + struct mog_packaddr mpa; + struct mog_ni ni; + + mog_nameinfo(&mpa, &ni); + TRACE(CMOGSTORED_MGMT_ACCEPTED(mfd->fd, ni.ni_host, ni.ni_serv)); +#endif /* !HAVE_SYSTEMTAP */ +} + /* called immediately after accept(), this initializes the mfd (once) */ -void mog_mgmt_post_accept(int fd, struct mog_svc *svc) +void mog_mgmt_post_accept(int fd, struct mog_svc *svc, + union mog_sockaddr *msa, socklen_t salen) { struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_MGMT); struct mog_mgmt *mgmt = &mfd->as.mgmt; + if (TRACE_ENABLED(CMOGSTORED_MGMT_ACCEPTED)) + trace_mgmt_accepted(mfd, msa, salen); + mog_mgmt_init(mgmt, svc); mog_idleq_add(svc->queue, mfd, MOG_QEV_RD); } @@ -183,7 +183,6 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf) { char *end; unsigned long long nr; - struct mog_queue *q = mgmt->svc->queue; char *nptr = buf + mgmt->mark[0]; char *eor = nptr + mgmt->mark[1] - mgmt->mark[0]; struct iovec iov; @@ -194,8 +193,8 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf) nr = strtoull(nptr, &end, 10); assert(*end == 0 && "ragel misfed mog_mgmt_fn_set_aio_threads"); - if (nr > 0 && nr <= 100) - mog_thrpool_set_n_threads(q, nr); + if (nr > 0 && nr <= (size_t)INT_MAX) + mog_svc_aio_threads_enqueue(mgmt->svc, nr); IOV_STR(&iov, "\r\n"); mog_mgmt_writev(mgmt, &iov, 1); diff --git a/mgmt_parser.rl b/mgmt_parser.rl index e183cc3..d8342a1 100644 --- a/mgmt_parser.rl +++ b/mgmt_parser.rl @@ -11,15 +11,15 @@ */ static void set_prio_fsck(struct mog_mgmt *mgmt) { - if (mgmt->svc->mgmt_fd >= 0) + if (mgmt->svc->mgmt_mfd) mgmt->prio = MOG_PRIO_FSCK; } %%{ machine mgmt_parser; + include path_parser "path_parser.rl"; eor = '\r'?'\n'; - path = "/"[a-zA-Z0-9/\.\-]*; reason = ' '("fsck" @ { set_prio_fsck(mgmt); } | [a-zA-Z0-9_]+); invalid_line := ( [ \t]* @@ -32,7 +32,7 @@ static void set_prio_fsck(struct mog_mgmt *mgmt) fbreak; }; size = ( - "size "(path) > { mgmt->mark[0] = fpc - buf; } + "size "(mog_path) > { mgmt->mark[0] = fpc - buf; } eor > { mgmt->mark[1] = fpc - buf; } @ { mog_mgmt_fn_size(mgmt, buf); fbreak; } ); @@ -43,7 +43,7 @@ static void set_prio_fsck(struct mog_mgmt *mgmt) "SHA-1" @ { mgmt->alg = GC_SHA1; } ) " " - (path) > { mgmt->mark[0] = fpc - buf; } + (mog_path) > { mgmt->mark[0] = fpc - buf; } ( reason? eor) > { mgmt->mark[1] = fpc - buf; } @ { mog_mgmt_fn_digest(mgmt, buf); fbreak; } ); @@ -99,7 +99,8 @@ mog_mgmt_parse(struct mog_mgmt *mgmt, char *buf, size_t len) char *p, *pe, *eof = NULL; int cs = mgmt->cs; int really_done = 0; - size_t off = mgmt->offset; + size_t off = mgmt->buf_off; + uint32_t *mog_devid = &mgmt->mog_devid; assert(mgmt->wbuf == NULL && "unwritten data in buffer"); assert(off <= len && "mgmt offset past end of buffer"); @@ -116,13 +117,13 @@ mog_mgmt_parse(struct mog_mgmt *mgmt, char *buf, size_t len) cs = mgmt_parser_first_final; mgmt->cs = cs; - mgmt->offset = p - buf; + mgmt->buf_off = p - buf; if (cs == mgmt_parser_error) return MOG_PARSER_ERROR; assert(p <= pe && "buffer overflow after mgmt parse"); - assert(mgmt->offset <= len && "offset longer than len"); + assert(mgmt->buf_off <= len && "offset longer than len"); if (mgmt->cs == mgmt_parser_first_final) return MOG_PARSER_DONE; return MOG_PARSER_CONTINUE; @@ -62,8 +62,8 @@ static Hash_table * mnt_new(size_t n) { Hash_table *rv = hash_initialize(n, NULL, me_hash, me_cmp, me_free); - if (!rv) - mog_oom(); + mog_oom_if_null(rv); + return rv; } @@ -159,11 +159,11 @@ static void timed_init_once(void) break; /* this must succeed, keep looping */ - if (mog_pthread_create_retry(rc)) { + if (mog_pthread_create_retryable(rc)) { if ((++tries % 1024) == 0) warn("pthread_create: %s (tries: %lu)", strerror(rc), tries); - sched_yield(); + mog_yield(); } else { assert(0 && "pthread_create usage error"); } @@ -285,9 +285,12 @@ void mog_mnt_release(const struct mount_entry *me) CHECK(int, 0, pthread_mutex_unlock(&by_dev_lock) ); } +#define MOG_DEV_T_INVAL ((dev_t)-1) + struct mnt_update { const char *prefix; size_t prefixlen; + dev_t st_rdev; char util[MOG_IOUTIL_LEN]; }; @@ -297,6 +300,10 @@ struct mnt_update { */ static bool me_update_match(struct mount_entry *me, struct mnt_update *update) { + if (update->st_rdev != MOG_DEV_T_INVAL + && me->me_dev == update->st_rdev) + return true; + if (strlen(me->me_devname) < update->prefixlen) return false; return memcmp(update->prefix, me->me_devname, update->prefixlen) == 0; @@ -334,10 +341,27 @@ static bool update_util_each(void *ent, void *upd) void mog_mnt_update_util(struct mog_iostat *iostat) { struct mnt_update update; + struct stat st; const char *devsuffix = iostat->dev; update.prefix = xasprintf("/dev/%s", devsuffix); update.prefixlen = strlen(update.prefix); + + /* + * st_rdev matching is necessary for cryptmount(8) on Linux, where + * /dev/mapper/FOO is NOT a symlink to /dev/dm-N, but /dev/dm-N + * and /dev/mapper/FOO both refer to the same device (where + * /dev/mapper/FOO is the mounted device name, mountlist never + * sees /dev/dm-N). + * + * FIXME: parse /proc/partitions under Linux like mogstored does + * may avoid this stat. + */ + if (stat(update.prefix, &st) == 0 && S_ISBLK(st.st_mode)) + update.st_rdev = st.st_rdev; + else + update.st_rdev = MOG_DEV_T_INVAL; + assert(sizeof(update.util) == sizeof(iostat->util)); memcpy(&update.util, iostat->util, sizeof(update.util)); diff --git a/nameinfo.c b/nameinfo.c new file mode 100644 index 0000000..f5af802 --- /dev/null +++ b/nameinfo.c @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include <arpa/inet.h> + +/* + * small replacement for getnameinfo(3), this only handles numeric types + * for IPv4 and IPv6 and uses the compact mog_ni structure to reduce + * stack usage in error reporting. + */ +void mog_nameinfo(struct mog_packaddr *mpa, struct mog_ni *ni) +{ + char *hostptr = ni->ni_host; + size_t hostlen = sizeof(ni->ni_host) - (sizeof("[]") - 1); + char *servptr = ni->ni_serv + 1; /* offset for ':' */ + size_t servlen = sizeof(ni->ni_serv) - 1; /* offset for ':' */ + int rc; + const void *src; + const char *ret; + + if (mpa->sa_family == AF_INET6) { + hostptr[0] = '['; /* leading '[' */ + src = mpa->as.in6_ptr; + hostptr++; + } else { + assert(mpa->sa_family == AF_INET && "bad family"); + src = &mpa->as.in_addr; + } + + ret = inet_ntop(mpa->sa_family, src, hostptr, (socklen_t)hostlen); + + /* terminate serv string on error */ + assert(ret == hostptr && "inet_ntop"); + ni->ni_serv[0] = ':'; + + /* add trailing ']' */ + if (mpa->sa_family == AF_INET6) { + hostlen = strlen(hostptr); + hostptr[hostlen] = ']'; + hostptr[hostlen + 1] = 0; + } + + /* port number */ + rc = snprintf(servptr, servlen, "%u", (unsigned)ntohs(mpa->port)); + assert(rc > 0 && rc < (int)servlen && "we suck at snprintf"); +} @@ -50,8 +50,8 @@ static void note_run(void) if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0)) global_mkusage(); - if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0)) - mog_thrpool_process_queue(); + if (note_xchg(MOG_NOTIFY_AIO_THREADS, 1, 0)) + mog_svc_aio_threads_handler(); } /* drain the pipe and process notifications */ @@ -109,7 +109,7 @@ void mog_notify(enum mog_notification note) { switch (note) { case MOG_NOTIFY_DEVICE_REFRESH: - case MOG_NOTIFY_SET_N_THREADS: + case MOG_NOTIFY_AIO_THREADS: note_xchg(note, 0, 1); mog_selfwake_interrupt(); break; @@ -5,7 +5,7 @@ enum mog_notification { MOG_NOTIFY_SIGNAL = -1, MOG_NOTIFY_DEVICE_REFRESH = 0, - MOG_NOTIFY_SET_N_THREADS = 1, + MOG_NOTIFY_AIO_THREADS = 1, MOG_NOTIFY_MAX }; diff --git a/packaddr.h b/packaddr.h new file mode 100644 index 0000000..fdfb9cb --- /dev/null +++ b/packaddr.h @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* + * avoid sockaddr_storage since that bigger than we need + * This is meant to be cast to "struct sockaddr" via mog_sockaddr_sa + */ +union mog_sockaddr { + struct sockaddr_in in; + struct sockaddr_in6 in6; + uint8_t bytes[sizeof(struct sockaddr_in6)]; +}; + +static inline struct sockaddr *mog_sockaddr_sa(union mog_sockaddr *msa) +{ + assert((void *)msa == (void *)&msa->bytes); + return (struct sockaddr *)msa; +} + +/* this is the relevant part we may store in "struct mog_fd" */ +struct mog_packaddr { + sa_family_t sa_family; + in_port_t port; + union { + struct in6_addr *in6_ptr; + struct in_addr in_addr; + } as; +} __attribute__((packed)); + +static inline void mog_packaddr_free(struct mog_packaddr *mpa) +{ + if (mpa->sa_family != AF_INET) + free(mpa->as.in6_ptr); +} + +static inline void mog_packaddr_init(struct mog_packaddr *mpa, + union mog_sockaddr *msa, socklen_t salen) +{ + mpa->sa_family = msa->in.sin_family; + + if (mpa->sa_family == AF_INET) { + mpa->port = msa->in.sin_port; + memcpy(&mpa->as.in_addr, &msa->in.sin_addr, + sizeof(struct in_addr)); + } else { + assert(mpa->sa_family == AF_INET6 && "invalid sa_family"); + mpa->port = msa->in6.sin6_port; + mpa->as.in6_ptr = xmalloc(sizeof(struct in6_addr)); + memcpy(mpa->as.in6_ptr, &msa->in6.sin6_addr, + sizeof(struct in6_addr)); + } +} diff --git a/path_parser.h b/path_parser.h new file mode 100644 index 0000000..8be9a4d --- /dev/null +++ b/path_parser.h @@ -0,0 +1,14 @@ +/* + * Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "config.h" +#include <stdint.h> + +static void mog_devid_incr(uint32_t *mog_devid, unsigned c) +{ + *mog_devid *= 10; + *mog_devid += c - '0'; + + /* no overflow checking here */ +} diff --git a/path_parser.rl b/path_parser.rl new file mode 100644 index 0000000..fc77c98 --- /dev/null +++ b/path_parser.rl @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* + * caller needs to setup: uint32_t *mog_devid = ... + */ +%%{ + machine path_parser; + + devid = "dev" + (digit+) $ { + /* no overflow checking here, no need */ + *mog_devid *= 10; + *mog_devid += fc - '0'; + } + '/'; + # only stuff MogileFS will use + mog_path = '/' (devid)? [a-zA-Z0-9/\.\-]{0,36}; +}%% diff --git a/probes.d b/probes.d new file mode 100644 index 0000000..4be0d6c --- /dev/null +++ b/probes.d @@ -0,0 +1,24 @@ +/* + * note: probe names are upper-case in the source and prefixed by provider + * So we'll have trace points like: + * CMOGSTORED_HTTP_RDERR for the "http_rderr" trace point + * + * Warning: probe points are currently an unstable interface and likely + * to change in 2013 + */ +provider cmogstored { + probe http_rderr(size_t buf_len, int err); + probe http_rdclose(size_t buf_len); + probe http_accepted(int fd, const char *host, const char *port); + + probe http_req_begin(bool pipelined); + /* DWARF: http_process_client */ + /* DWARF: mog_http_get_open */ + probe http_req_end(); + + probe mgmt_accepted(int fd, const char *host, const char *port); + probe mgmt_rderr(struct mog_fd *mfd, size_t buf_len, int err); + probe mgmt_rdclose(struct mog_fd *mfd, size_t buf_len); + + probe write_buffered(); +}; @@ -38,8 +38,7 @@ void mog_process_init(size_t nr) if (nr < 3) nr = 3; processes = hash_initialize(nr, NULL, process_hash, process_cmp, free); - if (processes == NULL) - mog_oom(); + mog_oom_if_null(processes); } void mog_process_reset(void) @@ -123,8 +122,7 @@ void mog_process_register(pid_t pid, unsigned id) p->pid = pid; p->id = id; - if (hash_insert(processes, p) == NULL) - mog_oom(); + mog_oom_if_null(hash_insert(processes, p)); } /* diff --git a/queue_common.c b/queue_common.c index 79a5869..b9c2f99 100644 --- a/queue_common.c +++ b/queue_common.c @@ -46,3 +46,21 @@ void mog_queue_stop(struct mog_queue *keep) mog_fd_put(mfd); } } + +void mog_queue_drop(struct mog_fd *mfd) +{ + switch (mfd->fd_type) { + case MOG_FD_TYPE_HTTP: + case MOG_FD_TYPE_HTTPGET: + mog_http_drop(mfd); + return; + case MOG_FD_TYPE_MGMT: + mog_mgmt_drop(mfd); + return; + default: + syslog(LOG_ERR, + "dropping fd_type=%d, functionality may be compromised", + mfd->fd_type); + mog_fd_put(mfd); + } +} diff --git a/queue_epoll.c b/queue_epoll.c index e2e8222..c704ebb 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -101,7 +101,8 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(epoll_fd); } -static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event) +static struct mog_fd * +epoll_event_check(int rc, struct epoll_event *event) { struct mog_fd *mfd; @@ -117,6 +118,7 @@ static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event) if (errno != EINTR) /* rc could be > 1 if the kernel is broken :P */ die_errno("epoll_wait() failed with (%d)", rc); + return NULL; } @@ -132,13 +134,11 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) bool cancellable = timeout != 0; if (cancellable) - mog_cancel_enable(); + mog_thr_test_quit(); /* epoll_wait is a cancellation point since glibc 2.4 */ rc = epoll_wait(q->queue_fd, &event, 1, timeout); - if (cancellable) - mog_cancel_disable(); return epoll_event_check(rc, &event); } @@ -146,10 +146,8 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) { int rc; struct epoll_event event; - sigset_t set; - CHECK(int, 0, sigemptyset(&set)); - rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &set); + rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset); return epoll_event_check(rc, &event); } @@ -160,7 +158,7 @@ epoll_ctl_error(struct mog_queue *q, struct mog_fd *mfd) case ENOMEM: case ENOSPC: syslog(LOG_ERR, "epoll_ctl: %m, dropping file descriptor"); - mog_fd_put(mfd); + mog_queue_drop(mfd); return; default: syslog(LOG_ERR, "unhandled epoll_ctl() error: %m"); diff --git a/queue_kqueue.c b/queue_kqueue.c index 6d2da43..9772ca3 100644 --- a/queue_kqueue.c +++ b/queue_kqueue.c @@ -19,13 +19,6 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(kqueue_fd); } -static void check_cancel(void) -{ - mog_cancel_enable(); - pthread_testcancel(); - mog_cancel_disable(); -} - /* * grabs one active event off the event queue */ @@ -51,16 +44,11 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) * in kevent(). This allows us to wake up an respond to a * cancellation request (since kevent() is not a cancellation point). */ - if (cancellable) { - check_cancel(); - mog_intr_enable(); - } + if (cancellable) + mog_thr_test_quit(); rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp); - if (cancellable) - PRESERVE_ERRNO( mog_intr_disable() ); - if (rc > 0) { mfd = event.udata; mog_fd_check_out(mfd); @@ -69,7 +57,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) return mfd; } if (cancellable) - check_cancel(); + mog_thr_test_quit(); if (rc == 0) return NULL; @@ -97,7 +85,7 @@ kevent_add_error(struct mog_queue *q, struct mog_fd *mfd) case ENOMEM: syslog(LOG_ERR, "kevent(EV_ADD) out-of-space, dropping file descriptor"); - mog_fd_put(mfd); + mog_queue_drop(mfd); return; default: syslog(LOG_ERR, "unhandled kevent(EV_ADD) error: %m"); diff --git a/queue_loop.c b/queue_loop.c index 019e30b..f8a03a9 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -4,15 +4,6 @@ */ #include "cmogstored.h" -static void queue_loop_cleanup(void *arg) -{ - unsigned long self = (unsigned long)pthread_self(); - - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self); - mog_alloc_quit(); - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self); -} - static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd) { /* @@ -46,8 +37,6 @@ void * mog_queue_loop(void *arg) struct mog_queue *q = arg; struct mog_fd *mfd = NULL; - pthread_cleanup_push(queue_loop_cleanup, NULL); - mog_cancel_disable(); syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready", (unsigned long)pthread_self()); @@ -71,8 +60,6 @@ void * mog_queue_loop(void *arg) } } - pthread_cleanup_pop(1); - return NULL; } @@ -101,7 +88,7 @@ void mog_queue_quit_loop(struct mog_queue *queue) assert(mog_nr_active_at_quit <= (size_t)INT_MAX && "mog_nr_active_at_quit underflow"); - if ((mfd = mog_idleq_wait(queue, -1))) + if ((mfd = mog_idleq_wait_intr(queue, -1))) queue_quit_step(mfd); } } @@ -8,20 +8,23 @@ * we block signals in pool threads, only the main thread receives signals */ -void mog_intr_disable(void) +static sigset_t fullset; +sigset_t mog_emptyset; + +__attribute__((constructor)) static void sig_init(void) { - sigset_t set; + CHECK(int, 0, sigfillset(&fullset)); + CHECK(int, 0, sigemptyset(&mog_emptyset)); +} - CHECK(int, 0, sigfillset(&set)); - CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &set, NULL)); +void mog_intr_disable(void) +{ + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &fullset, NULL)); } void mog_intr_enable(void) { - sigset_t set; - - CHECK(int, 0, sigemptyset(&set)); - CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &set, NULL)); + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &mog_emptyset, NULL)); } /* @@ -47,7 +50,6 @@ static void sleeper(struct timespec *tsp, const sigset_t *sigmask) /* thread-safe, interruptible sleep, negative seconds -> sleep forever */ void mog_sleep(long seconds) { - sigset_t set; struct timespec ts; struct timespec *tsp; @@ -59,6 +61,5 @@ void mog_sleep(long seconds) tsp = &ts; } - CHECK(int, 0, sigemptyset(&set)); - sleeper(tsp, &set); + sleeper(tsp, &mog_emptyset); } @@ -12,6 +12,18 @@ static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER; static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */ static mode_t mog_umask; +/* + * maintain an internal queue of requests for the "server aio_threads = N" + * command in the side channel. The worker handling the client request must + * tell the main thread do change thread counts asynchronously (because + * the worker thread handling the request may die from a thread count + * reduction, so we have a worker thread make a fire-and-forget request + * to the notify thread. + */ +static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static SIMPLEQ_HEAD(sq, mog_svc) aio_threads_qhead = + SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead); + static void svc_free(void *ptr) { struct mog_svc *svc = ptr; @@ -22,6 +34,8 @@ static void svc_free(void *ptr) mog_free(svc->docroot); if (svc->by_st_dev) hash_free(svc->by_st_dev); + if (svc->by_mog_devid) + hash_free(svc->by_mog_devid); free(svc); } @@ -48,8 +62,7 @@ static void svc_atexit(void) /* called atexit */ static void svc_once(void) { by_docroot = hash_initialize(7, NULL, svc_hash, svc_cmp, svc_free); - if (!by_docroot) - mog_oom(); + mog_oom_if_null(by_docroot); mog_umask = umask(0); umask(mog_umask); @@ -86,14 +99,18 @@ struct mog_svc * mog_svc_new(const char *docroot) svc_once(); svc = xzalloc(sizeof(struct mog_svc)); - svc->http_fd = svc->httpget_fd = svc->mgmt_fd = -1; svc->docroot = docroot; svc->docroot_fd = fd; svc->dir = dir; svc->put_perms = (~mog_umask) & 0666; svc->mkcol_perms = (~mog_umask) & 0777; + svc->thr_per_dev = 10; svc->idle_timeout = 5; CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL)); + CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL)); + svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash, + mog_dev_cmp, free); + mog_oom_if_null(svc->by_mog_devid); switch (hash_insert_if_absent(by_docroot, svc, NULL)) { case 0: @@ -119,10 +136,10 @@ size_t mog_svc_each(Hash_processor processor, void *data) return rv; } -static bool cloexec_disable(int fd) +static bool cloexec_disable(struct mog_fd *mfd) { - if (fd >= 0) - CHECK(int, 0, mog_set_cloexec(fd, false)); + if (mfd) + CHECK(int, 0, mog_set_cloexec(mfd->fd, false)); return true; } @@ -130,9 +147,9 @@ static bool svc_cloexec_off_i(void *svcptr, void *unused) { struct mog_svc *svc = svcptr; - return (cloexec_disable(svc->mgmt_fd) - && cloexec_disable(svc->http_fd) - && cloexec_disable(svc->httpget_fd)); + return (cloexec_disable(svc->mgmt_mfd) + && cloexec_disable(svc->http_mfd) + && cloexec_disable(svc->httpget_mfd)); } /* @@ -143,3 +160,137 @@ void mog_svc_upgrade_prepare(void) { (void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL); } + +/* this is only called by the main (notify) thread */ +void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new) +{ + size_t size = ndev_new * svc->thr_per_dev; + struct mog_thrpool *tp = &svc->queue->thrpool; + + /* respect user-setting */ + if (svc->user_set_aio_threads) { + if (tp->n_threads >= ndev_new) + return; + + syslog(LOG_WARNING, + "server aio_threads=%zu is less than devcount=%zu", + tp->n_threads, ndev_new); + + return; + } + + if (size < svc->thr_per_dev) + size = svc->thr_per_dev; + + if (svc->nmogdev) + syslog(LOG_INFO, + "devcount(%zu->%zu), updating server aio_threads=%zu", + svc->nmogdev, ndev_new, size); + mog_thrpool_set_size(tp, size); +} + +/* Hash iterator function */ +bool mog_svc_start_each(void *svc_ptr, void *main_ptr) +{ + struct mog_svc *svc = svc_ptr; + struct mog_main *mog_main = main_ptr; + struct mog_accept *ac; + size_t athr = (size_t)num_processors(NPROC_CURRENT); + struct mog_queue *q = mog_queue_new(); + size_t nthr = svc->nmogdev * svc->thr_per_dev; + + if (!nthr) + nthr = svc->thr_per_dev; + + /* + * try to distribute accept() callers between workers more evenly + * with wake-one accept() behavior by trimming down on acceptors + * having too many acceptor threads does not make sense, these + * threads are only bounded by lock contention and local bus speeds. + * Increasing threads here just leads to lock contention inside the + * kernel (accept/accept4/EPOLL_CTL_ADD) + */ + athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr); + + svc->queue = q; + mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q); + + if (svc->mgmt_mfd) { + mog_main->have_mgmt = true; + ac = &svc->mgmt_mfd->as.accept; + + /* + * mgmt port is rarely used and always persistent, so it + * does not need multiple threads for blocking accept() + */ + mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac); + } + + if (svc->http_mfd) { + ac = &svc->http_mfd->as.accept; + mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); + } + + if (svc->httpget_mfd) { + ac = &svc->httpget_mfd->as.accept; + mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); + } + + return true; +} + +/* + * Fire and forget, we must run the actual thread count manipulation + * in the main notify thread because we may end up terminating the + * thread which invoked this. + * + * Called by threads inside the thrpool to wake-up the main/notify thread. + */ +void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size) +{ + size_t prev_enq; + + CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock)); + + prev_enq = svc->user_req_aio_threads; + svc->user_req_aio_threads = size; + if (!prev_enq) + /* put into the queue so main thread can process it */ + SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, svc, qentry); + + CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock)); + + /* wake up the main thread so it can process the queue */ + mog_notify(MOG_NOTIFY_AIO_THREADS); +} + +/* this runs in the main (notify) thread */ +void mog_svc_aio_threads_handler(void) +{ + struct mog_svc *svc; + + /* guard against requests bundled in one wakeup by looping here */ + for (;;) { + size_t req_size = 0; + + CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock)); + svc = SIMPLEQ_FIRST(&aio_threads_qhead); + if (svc) { + SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry); + req_size = svc->user_req_aio_threads; + svc->user_req_aio_threads = 0; + } + CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock)); + + /* + * spurious wakeup is possible since we loop here + * (and we must loop, see above comment) + */ + if (svc == NULL || req_size == 0) + return; + + syslog(LOG_INFO, "server aio_threads=%zu", req_size); + svc->user_set_aio_threads = req_size; + mog_thrpool_set_size(&svc->queue->thrpool, req_size); + } +} @@ -38,28 +38,21 @@ static void devlist_free(void *x) free(devlist); } -static size_t devid_hash(const void *x, size_t tablesize) -{ - const struct mog_dev *dev = x; - - return dev->devid % tablesize; -} - -static bool devid_cmp(const void *a, const void *b) -{ - const struct mog_dev *dev_a = a; - const struct mog_dev *dev_b = b; - - return dev_a->devid == dev_b->devid; -} - static struct mog_devlist * mog_devlist_new(dev_t st_dev) { struct mog_devlist *devlist = xmalloc(sizeof(struct mog_devlist)); devlist->st_dev = st_dev; devlist->by_mogdevid = hash_initialize(7, NULL, - devid_hash, devid_cmp, free); + mog_dev_hash, mog_dev_cmp, + + /* + * elements are freed when + * svc->by_mog_devid is freed + */ + NULL); + + mog_oom_if_null(devlist->by_mogdevid); return devlist; } @@ -99,8 +92,7 @@ static void svc_init_dev_hash(struct mog_svc *svc) svc->by_st_dev = hash_initialize(7, NULL, devlist_hash, devlist_cmp, devlist_free); - if (!svc->by_st_dev) - mog_oom(); + mog_oom_if_null(svc->by_st_dev); } static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) @@ -108,9 +100,6 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) struct dirent *ent; int rc = 0; - if (svc->mgmt_fd < 0) - return 0; - CHECK(int, 0, pthread_mutex_lock(&svc->devstats_lock)); svc_init_dev_hash(svc); rewinddir(svc->dir); @@ -127,15 +116,17 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) mog_devid = strtoul(ent->d_name + 3, &end, 10); if (*end != 0) continue; - if (mog_devid > 0xffffff) continue; /* MEDIUMINT in DB */ + if (mog_devid > MOG_DEVID_MAX) continue; - dev = mog_dev_new(svc, (uint32_t)mog_devid); + dev = mog_dev_for(svc, (uint32_t)mog_devid); if (!dev) continue; devlist = svc_devlist(svc, dev->st_dev); devhash = devlist->by_mogdevid; - if (cb) rc |= cb(dev, svc); /* mog_dev_mkusage */ + if (cb) + rc |= cb(dev, svc); /* mog_dev_mkusage */ + switch (hash_insert_if_absent(devhash, dev, NULL)) { case 0: free(dev); @@ -284,18 +275,21 @@ void mog_svc_dev_shutdown(void) mog_svc_each(devstats_shutdown_i, NULL); } -static bool svc_mkusage_each(void *svc, void *nr) +static bool svc_mkusage_each(void *svcptr, void *ignored) { - svc_scandev((struct mog_svc *)svc, nr, mog_dev_mkusage); + struct mog_svc *svc = svcptr; + size_t ndev = 0; + + svc_scandev(svc, &ndev, mog_dev_mkusage); + + if (svc->queue && (svc->nmogdev != ndev)) + mog_svc_thrpool_rescale(svc, ndev); + svc->nmogdev = ndev; return true; } -size_t mog_mkusage_all(void) +void mog_mkusage_all(void) { - size_t nr = 0; - - mog_svc_each(svc_mkusage_each, &nr); - - return nr; + mog_svc_each(svc_mkusage_each, NULL); } diff --git a/tapset/http_request.stp b/tapset/http_request.stp new file mode 100644 index 0000000..2f1514a --- /dev/null +++ b/tapset/http_request.stp @@ -0,0 +1,40 @@ +/* keyed by: [pid(),fd] */ +global cmogstored_http_req_begin; +global cmogstored_http_pipelined; +global cmogstored_http_addr; + +probe process("cmogstored").mark("http_accepted") { + fd = $arg1; + host = user_string($arg2); + serv = user_string($arg3); + host_serv = sprintf("%s%s", host, serv); + printf("% 8d accepted %s\n", fd, host_serv); + cmogstored_http_addr[pid(),fd] = host_serv; +} + +probe process("cmogstored").function("http_close") { + fd = @cast($mfd, "struct mog_fd")->fd; + printf("% 8d closing\n", fd); + delete cmogstored_http_addr[pid(),fd]; +} + +probe process("cmogstored").mark("http_req_begin") { + fd = @cast($mfd, "struct mog_fd")->fd; + is_pipelined = $arg1; + cmogstored_http_req_begin[pid(),fd] = gettimeofday_us(); + cmogstored_http_pipelined[pid(),fd] = is_pipelined; +} + +probe process("cmogstored").function("http_process_client") { + fd = @cast($mfd, "struct mog_fd")->fd; + starttime = cmogstored_http_req_begin[pid(),fd]; + diff = gettimeofday_us() - starttime; + + is_pipelined = cmogstored_http_pipelined[pid(),fd]; + printf("% 8d http_process_client time %ldus (pipelined:%s)\n", + fd, diff, is_pipelined ? "true" : "false"); +} + +probe process("cmogstored").mark("write_buffered") { + printf("% 8d blocked with %lu bytes to write\n", $fd, $len); +} diff --git a/test/.gitignore b/test/.gitignore index 9d883f4..02f2051 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -1,3 +1,4 @@ /.* slow.mk +epoll-wrap *.so diff --git a/test/chunk-parser-1.c b/test/chunk-parser-1.c index 6804a17..fd1de49 100644 --- a/test/chunk-parser-1.c +++ b/test/chunk-parser-1.c @@ -23,14 +23,13 @@ static void buf_set(const char *s) { struct mog_file *file; - http->chunked = 1; + http->_p.chunked = 1; reset(); tmpfp = tmpfile(); assert(tmpfp != NULL && "tmpfile(3) failed"); tmpfd = fileno(tmpfp); assert(tmpfd >= 0 && "invalid fd"); - http->forward = mog_fd_get(tmpfd); - http->forward->fd = tmpfd; + http->forward = mog_fd_init(tmpfd, MOG_FD_TYPE_FILE); file = &http->forward->as.file; file->foff = 0; buf = xstrdup(s); @@ -46,16 +45,16 @@ int main(void) buf_set("666\r\n"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->content_len == 0x666); - assert(http->chunk_state == MOG_CHUNK_STATE_DATA); + assert(http->_p.content_len == 0x666); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DATA); } if ("incomplete chunk") { buf_set("666\r"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_CONTINUE); - assert(http->content_len == 0x666); - assert(http->chunk_state == MOG_CHUNK_STATE_SIZE); + assert(http->_p.content_len == 0x666); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_SIZE); } if ("bad chunk") { @@ -68,8 +67,8 @@ int main(void) buf_set("abcde; foo=bar\r\n"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->content_len == 0xabcde); - assert(http->chunk_state == MOG_CHUNK_STATE_DATA); + assert(http->_p.content_len == 0xabcde); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DATA); } if ("chunk with complete header and data") { @@ -77,8 +76,8 @@ int main(void) buf_set("5\r\nabcde"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_CONTINUE); - assert(http->content_len == 0); - assert(http->chunk_state == MOG_CHUNK_STATE_SIZE); + assert(http->_p.content_len == 0); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_SIZE); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde", sizeof(tmp))); } @@ -88,33 +87,33 @@ int main(void) buf_set("5\r\nabcde\r\n3"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_CONTINUE); - assert(http->content_len == 3); - assert(http->chunk_state == MOG_CHUNK_STATE_SIZE); + assert(http->_p.content_len == 3); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_SIZE); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("multiple chunks with end") { char tmp[8]; buf_set("5\r\nabcde\r\n3\r\n123\r\n0\r\n\r\n"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->chunk_state == MOG_CHUNK_STATE_DONE); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DONE); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde123", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("multiple chunks with trailer") { char tmp[8]; buf_set("5\r\nabcde\r\n3\r\n123\r\n0\r\nFoo: bar\r\n\r\n"); state = mog_chunk_parse(http, buf, len); - assert(http->chunk_state == MOG_CHUNK_STATE_DONE); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DONE); assert(state == MOG_PARSER_DONE); - assert(http->content_len == 0); + assert(http->_p.content_len == 0); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde123", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("multiple chunks with almost end") { @@ -122,11 +121,11 @@ int main(void) buf_set("5\r\nabcde\r\n3\r\n123\r\n0\r\n"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->chunk_state == MOG_CHUNK_STATE_DATA); - assert(http->content_len == 0); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DATA); + assert(http->_p.content_len == 0); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde123", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("multiple chunks with almost end (more)") { @@ -134,11 +133,11 @@ int main(void) buf_set("5\r\nabcde\r\n3\r\n123\r\n0\r\n\r"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_CONTINUE); - assert(http->chunk_state == MOG_CHUNK_STATE_TRAILER); - assert(http->content_len == 0); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_TRAILER); + assert(http->_p.content_len == 0); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde123", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("multiple chunks with incomplete") { @@ -146,11 +145,11 @@ int main(void) buf_set("5\r\nabcde\r\n3\r\n12"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->chunk_state == MOG_CHUNK_STATE_DATA); - assert(http->content_len == 1); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DATA); + assert(http->_p.content_len == 1); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abcde12", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } if ("incomplete data") { @@ -158,11 +157,11 @@ int main(void) buf_set("666\r\nabc"); state = mog_chunk_parse(http, buf, len); assert(state == MOG_PARSER_DONE); - assert(http->chunk_state == MOG_CHUNK_STATE_DATA); - assert(http->content_len == (0x666 - sizeof(tmp))); + assert(http->_p.chunk_state == MOG_CHUNK_STATE_DATA); + assert(http->_p.content_len == (0x666 - sizeof(tmp))); assert(sizeof(tmp) == pread(tmpfd, tmp, sizeof(tmp), 0)); assert(0 == memcmp(tmp, "abc", sizeof(tmp))); - assert(http->offset == len); + assert(http->_p.buf_off == len); } reset(); diff --git a/test/epoll-wrap.c b/test/epoll-wrap.c new file mode 100644 index 0000000..a527e20 --- /dev/null +++ b/test/epoll-wrap.c @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* + * fault injection wrapper for epoll + */ +#include "cmogstored.h" +#if defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE +static sig_atomic_t epoll_ctl_fail; +#define EMIT(s) write(STDERR_FILENO, (s), sizeof(s)-1) + +/* test/epoll_enospc depends on the following line */ +static const char msg[] = "epoll_ctl failure injection\n"; + +int __real_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int __real_epoll_create(int flags); + +int __wrap_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) +{ + if (epoll_ctl_fail) { + EMIT(msg); + errno = epoll_ctl_fail; + return -1; + } + + return __real_epoll_ctl(epfd, op, fd, event); +} + +static void set_wrap_epoll_ctl(int signum) +{ + if (signum == SIGTTIN) { + epoll_ctl_fail = ENOSPC; + EMIT("epoll_ctl ENOSPC on\n"); + } else { + epoll_ctl_fail = 0; + EMIT("epoll_ctl ENOSPC off\n"); + } +} + +int __wrap_epoll_create(int flags) +{ + struct sigaction sa; + + memset(&sa, 0, sizeof(struct sigaction)); + CHECK(int, 0, sigemptyset(&sa.sa_mask) ); + sa.sa_handler = set_wrap_epoll_ctl; + CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); + CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); + + return __real_epoll_create(flags); +} +#endif /* defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE */ diff --git a/test/epoll_enospc.rb b/test/epoll_enospc.rb new file mode 100644 index 0000000..5c254c0 --- /dev/null +++ b/test/epoll_enospc.rb @@ -0,0 +1,100 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/test_helper' + +TEST_PROG = 'test/epoll-wrap' +has_epoll = false +if File.exist?(TEST_PROG) && `which lsof 2>/dev/null`.strip.size > 0 + s = `strings test/epoll-wrap`.split(/\n/) + has_epoll = !!s.grep(/epoll_ctl failure injection/)[0] +end + +class TestEpollEnospc < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-epoll-enospc-test') + @to_close = [] + @host = TEST_HOST + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + @err = Tempfile.new("stderr") + cmd = [ TEST_PROG, "--docroot=#@tmpdir", "--httplisten=#@host:#@port", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @pid = fork { + $stderr.reopen(@err) + @err.close + exec(*cmd) + } + @client = get_client + end + + def wait_for(sec, reason) + stop = Time.now + sec + begin + return if yield + sleep 0.1 + end while Time.now < stop + assert false, reason + end + + def test_close_file + sparse_file_prepare + @client.write "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" + buf = @client.readpartial(1000) + assert_match(/\r\n\r\n\z/, buf) + Process.kill(:TTIN, @pid) + + wait_for(5, "ENOSPC injection signal") do + File.readlines(@err.path).grep(/ENOSPC on/)[0] + end + @client.write("GET /dev666/sparse-file.fid HTTP/1.1\r\n" \ + "Host: example.com\r\n\r\n") + sleep 1 + bytes = 0 + buf = "" + begin + bytes += @client.readpartial(666666, buf).bytesize + rescue EOFError + break + end while true + + wait_for(5, "failure injection message") do + File.readlines(@err.path).grep(/epoll_ctl failure injection/)[0] + end + + wait_for(5, "sparse file close") do + `lsof -p #@pid` !~ /sparse-file\.fid/ + end + end + + def teardown + Process.kill(:QUIT, @pid) rescue nil + _, status = Process.waitpid2(@pid) + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + @err.rewind + #$stderr.write(@err.read) + assert status.success?, status.inspect + end + + def sparse_file_prepare(big = nil) + Dir.mkdir("#@tmpdir/dev666") + if nil == big + big = 1024 * 1024 * 500 # only 500M + big /= 10 if ENV["VALGRIND"] # valgrind slows us down enough :P + end + File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp| + begin + fp.seek(big - 1) + rescue Errno::EINVAL, Errno::ENOSPC + big /= 2 + warn "trying large file size: #{big}" + retry + end + fp.write('.') + end + end +end if has_epoll diff --git a/test/fdmap-1.c b/test/fdmap-1.c index de30545..d4176f0 100644 --- a/test/fdmap-1.c +++ b/test/fdmap-1.c @@ -10,7 +10,7 @@ int main(void) int open_max = (int)sysconf(_SC_OPEN_MAX); int i; - mfd = mog_fd_get(0); + mfd = mog_fd_init(0, MOG_FD_TYPE_UNUSED); { struct mog_mgmt *mgmt = &mfd->as.mgmt; @@ -18,7 +18,7 @@ int main(void) } for (i = 0; i < open_max; i++) { - mfd = mog_fd_get(i); + mfd = mog_fd_init(i, MOG_FD_TYPE_UNUSED); assert(mfd && "mfd unset"); } diff --git a/test/http-parser-1.c b/test/http-parser-1.c index 688e0a5..053237a 100644 --- a/test/http-parser-1.c +++ b/test/http-parser-1.c @@ -14,8 +14,8 @@ static void assert_path_equal(const char *str) { size_t slen = strlen(str); - assert(0 == memcmp(str, buf + http->path_tip, slen)); - assert(http->path_end == http->path_tip + slen); + assert(0 == memcmp(str, buf + http->_p.path_tip, slen)); + assert(http->_p.path_end == http->_p.path_tip + slen); } static void reset(void) @@ -36,9 +36,9 @@ int main(void) if ("normal HTTP GET request") { buf_set("GET /foo HTTP/1.1\r\nHost: 127.6.6.6\r\n\r\n"); state = mog_http_parse(http, buf, len); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent && "not persistent"); + assert(http->_p.persistent && "not persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -46,9 +46,9 @@ int main(void) if ("normal HTTP GET request with redundant leading slash") { buf_set("GET //foo HTTP/1.1\r\nHost: 127.6.6.6\r\n\r\n"); state = mog_http_parse(http, buf, len); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent && "not persistent"); + assert(http->_p.persistent && "not persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -59,9 +59,9 @@ int main(void) "Connection: close\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent == 0 && "should not be persistent"); + assert(http->_p.persistent == 0 && "should not be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -71,9 +71,9 @@ int main(void) "Connection:\r\n keep-alive\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -99,9 +99,9 @@ int main(void) "Host: 127.6.6.6\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->http_method == MOG_HTTP_METHOD_HEAD + assert(http->_p.http_method == MOG_HTTP_METHOD_HEAD && "http_method should be HEAD "); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -113,14 +113,14 @@ int main(void) "\r\n" "partial body request"); state = mog_http_parse(http, buf, len); - assert(http->content_len == 12345); - assert(http->http_method == MOG_HTTP_METHOD_PUT + assert(http->_p.content_len == 12345); + assert(http->_p.http_method == MOG_HTTP_METHOD_PUT && "http_method should be PUT"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); - assert(strcmp(buf + http->offset, "partial body request") == 0 - && "buffer repositioned to body start"); + assert(strcmp(buf + http->_p.buf_off, "partial body request") + == 0 && "buffer repositioned to body start"); } if ("HTTP/1.1 PUT chunked request header") { @@ -130,14 +130,14 @@ int main(void) "\r\n" "16\r\npartial..."); state = mog_http_parse(http, buf, len); - assert(http->chunked); - assert(http->has_trailer_md5 == 0); - assert(http->http_method == MOG_HTTP_METHOD_PUT + assert(http->_p.chunked); + assert(http->_p.has_md5 == 0); + assert(http->_p.http_method == MOG_HTTP_METHOD_PUT && "http_method should be PUT"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); - assert(strcmp(buf + http->offset, "16\r\npartial...") == 0 + assert(strcmp(buf + http->_p.buf_off, "16\r\npartial...") == 0 && "buffer repositioned to body start"); } @@ -149,16 +149,16 @@ int main(void) "\r\n" "16\r\npartial..."); state = mog_http_parse(http, buf, len); - assert(http->range_beg == 666); - assert(http->range_end == 666666); - assert(http->has_content_range == 1); - assert(http->has_trailer_md5 == 0); - assert(http->http_method == MOG_HTTP_METHOD_PUT + assert(http->_p.range_beg == 666); + assert(http->_p.range_end == 666666); + assert(http->_p.has_content_range == 1); + assert(http->_p.has_md5 == 0); + assert(http->_p.http_method == MOG_HTTP_METHOD_PUT && "http_method should be PUT"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); - assert(strcmp(buf + http->offset, "16\r\npartial...") == 0 + assert(strcmp(buf + http->_p.buf_off, "16\r\npartial...") == 0 && "buffer repositioned to body start"); } @@ -170,14 +170,14 @@ int main(void) "\r\n" "16\r\npartial..."); state = mog_http_parse(http, buf, len); - assert(http->chunked); - assert(http->has_trailer_md5 == 1); - assert(http->http_method == MOG_HTTP_METHOD_PUT + assert(http->_p.chunked); + assert(http->_p.has_md5 == 1); + assert(http->_p.http_method == MOG_HTTP_METHOD_PUT && "http_method should be PUT"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); - assert(strcmp(buf + http->offset, "16\r\npartial...") == 0 + assert(strcmp(buf + http->_p.buf_off, "16\r\npartial...") == 0 && "buffer repositioned to body start"); } @@ -186,11 +186,11 @@ int main(void) "Host: 127.6.6.6\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->content_len == 0); - assert(http->has_trailer_md5 == 0); - assert(http->http_method == MOG_HTTP_METHOD_DELETE + assert(http->_p.content_len == 0); + assert(http->_p.has_md5 == 0); + assert(http->_p.http_method == MOG_HTTP_METHOD_DELETE && "http_method should be DELETE"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -200,11 +200,11 @@ int main(void) "Host: 127.6.6.6\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->content_len == 0); - assert(http->has_trailer_md5 == 0); - assert(http->http_method == MOG_HTTP_METHOD_MKCOL + assert(http->_p.content_len == 0); + assert(http->_p.has_md5 == 0); + assert(http->_p.http_method == MOG_HTTP_METHOD_MKCOL && "http_method should be MKCOL"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -215,12 +215,12 @@ int main(void) "Range: bytes=5-55\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->has_range == 1); - assert(http->range_beg == 5 && "range_beg didn't match"); - assert(http->range_end == 55 && "range_end didn't match"); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.has_range == 1); + assert(http->_p.range_beg == 5 && "range_beg didn't match"); + assert(http->_p.range_end == 55 && "range_end didn't match"); + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -231,12 +231,12 @@ int main(void) "Range: bytes=-55\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->has_range == 1); - assert(http->range_beg == -1 && "range_beg didn't match"); - assert(http->range_end == 55 && "range_end didn't match"); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.has_range == 1); + assert(http->_p.range_beg == -1 && "range_beg didn't match"); + assert(http->_p.range_end == 55 && "range_end didn't match"); + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } @@ -247,16 +247,27 @@ int main(void) "Range: bytes=55-\r\n" "\r\n"); state = mog_http_parse(http, buf, len); - assert(http->has_range == 1); - assert(http->range_beg == 55 && "range_beg didn't match"); - assert(http->range_end == -1 && "range_end didn't match"); - assert(http->http_method == MOG_HTTP_METHOD_GET + assert(http->_p.has_range == 1); + assert(http->_p.range_beg == 55 && "range_beg didn't match"); + assert(http->_p.range_end == -1 && "range_end didn't match"); + assert(http->_p.http_method == MOG_HTTP_METHOD_GET && "http_method should be GET"); - assert(http->persistent == 1 && "should be persistent"); + assert(http->_p.persistent == 1 && "should be persistent"); assert(state == MOG_PARSER_DONE && "parser not done"); assert_path_equal("/foo"); } + if ("HTTP/1.1 devid parse") { + buf_set("GET /dev666/0/1.fid HTTP/1.0\r\n" + "\r\n"); + state = mog_http_parse(http, buf, len); + assert(http->_p.http_method == MOG_HTTP_METHOD_GET + && "http_method should be GET"); + assert(http->_p.mog_devid == 666 && "dev666 set"); + assert(state == MOG_PARSER_DONE && "parser not done"); + assert_path_equal("/dev666/0/1.fid"); + } + reset(); return 0; } diff --git a/test/http_put6_fail.rb b/test/http_put6_fail.rb new file mode 100644 index 0000000..aee9778 --- /dev/null +++ b/test/http_put6_fail.rb @@ -0,0 +1,86 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/test_helper' +require 'net/http' +require 'stringio' + +class TestHTTPPut6Fail < Test::Unit::TestCase + def setup + @skip = true + @tmpdir = Dir.mktmpdir('cmogstored-httpput6-test') + Dir.mkdir("#@tmpdir/dev666") + @to_close = [] + @host = TEST_HOST6 + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + @err = Tempfile.new("stderr") + cmd = [ "cmogstored", "--docroot=#@tmpdir", "--httplisten=[#@host]:#@port", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @pid = fork { + $stderr.reopen(@err) + @err.close + exec(*cmd) + } + @client = get_client + @skip = false + rescue Errno::EAFNOSUPPORT + # host does not have IPv6 + end + + def teardown + return if @skip + Process.kill(:QUIT, @pid) rescue nil + _, status = Process.waitpid2(@pid) + @to_close.each { |io| io.close unless io.closed? } + @err.rewind + assert status.success?, status.inspect + ensure + FileUtils.rm_rf(@tmpdir) + end + + def test_put_premature_eof + return if @skip + path = "/dev666/foo" + url = "http://[#@host]:#@port#{path}" + req = "PUT #{url} HTTP/1.1\r\n" \ + "Host: [#@host]:#@port\r\n" \ + "Content-Length: 666\r\n" \ + "\r\n" \ + "abcde" + @client.write(req) + @client.shutdown(Socket::SHUT_WR) + addr = Regexp.escape("[#{@client.addr[3]}]:#{@client.addr[1]}") + assert_nil @client.read(1) + assert ! File.exist?("#@tmpdir#{path}") + buf = File.read(@err.path) + assert_match(%r{PUT #{path} failed from #{addr} after 5 bytes:}, buf) + if RUBY_PLATFORM =~ /linux/ + assert_match(%r{last_data_recv=\d+ms from #{addr} for PUT #{path}}, buf) + end + end + + def test_put_premature_eof_chunked + return if @skip + path = "/dev666/foo" + url = "http://[#@host]:#@port#{path}" + req = "PUT #{url} HTTP/1.1\r\n" \ + "Host: [#@host]:#@port\r\n" \ + "Transfer-Encoding: chunked\r\n" \ + "\r\n" \ + "666\r\nf" + @client.write(req) + @client.shutdown(Socket::SHUT_WR) + addr = Regexp.escape("[#{@client.addr[3]}]:#{@client.addr[1]}") + assert_nil @client.read(1) + assert ! File.exist?("#@tmpdir#{path}") + buf = File.read(@err.path) + assert_match(%r{PUT #{path} failed from #{addr} after 1 bytes:}, buf) + if RUBY_PLATFORM =~ /linux/ + assert_match(%r{last_data_recv=\d+ms from #{addr} for PUT #{path}}, buf) + end + end +end diff --git a/test/mgmt.rb b/test/mgmt.rb index 373cd69..7101245 100644 --- a/test/mgmt.rb +++ b/test/mgmt.rb @@ -270,17 +270,19 @@ class TestMgmt < Test::Unit::TestCase t_yield # wait for threads to spawn taskdir = "/proc/#@pid/task" glob = "#{taskdir}/*" - nr_threads = Dir[glob].size if File.directory?(taskdir) + prev_threads = Dir[glob].size if File.directory?(taskdir) @client.write "server aio_threads = 1\r\n" assert_equal "\r\n", @client.gets if RUBY_PLATFORM =~ /linux/ assert File.directory?(taskdir), "/proc not mounted on Linux?" end if File.directory?(taskdir) - while nr_threads == Dir[glob].size && (tries -= 1) > 0 + while prev_threads == Dir[glob].size && (tries -= 1) > 0 sleep(0.1) end - assert nr_threads != Dir[glob].size + cur_threads = Dir[glob].size + assert prev_threads != cur_threads, + "prev_threads=#{prev_threads} != cur_threads=#{cur_threads}" end @client.write "server aio_threads=6\r\n" assert_equal "\r\n", @client.gets @@ -288,6 +290,39 @@ class TestMgmt < Test::Unit::TestCase assert_match(%r{ERROR: unknown command}, @client.gets) end + def test_aio_threads_spam + tries = 1000 + @client.write "WTF\r\n" + assert_match(%r{ERROR: unknown command}, @client.gets) + t_yield # wait for threads to spawn + 100.times do |i| + @client.write "server aio_threads = 1\r\n" + assert_equal "\r\n", @client.readpartial(4) + @client.write "server aio_threads = 2\r\n" + assert_equal "\r\n", @client.readpartial(4) + end + end + + def test_giant_devid_skip + max = 16777215 # devid is MEDIUMINT in DB + Dir.mkdir("#@tmpdir/dev#{max}") + Dir.mkdir("#@tmpdir/dev#{max + 1}") + @client.write "watch\n" + lines = [] + + 2.times do # 2 times in case we're slow + begin + line = @client.gets + lines << line + end until line == ".\n" + end + + assert lines.grep(/\b#{max}\b/)[0] + assert_nil lines.grep(/\b#{max + 1}\b/)[0] + assert File.exist?("#@tmpdir/dev#{max}/usage") + assert ! File.exist?("#@tmpdir/dev#{max + 1}/usage") + end + def test_iostat_watch Dir.mkdir("#@tmpdir/dev666") @client.write "watch\n" diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c index 5d592c1..3b71923 100644 --- a/test/queue-idle-1.c +++ b/test/queue-idle-1.c @@ -12,8 +12,7 @@ static void setup(void) { q = mog_queue_new(); pipe_or_die(fds); - mfd = mog_fd_get(fds[0]); - mfd->fd = fds[0]; + mfd = mog_fd_init(fds[0], MOG_FD_TYPE_UNUSED); mog_set_nonblocking(fds[0], true); assert(read(fds[0], buf, sizeof(buf)) == -1 && @@ -30,9 +29,10 @@ static void test_nonblocking(void) setup(); mog_idleq_add(q, mfd, MOG_QEV_RD); - assert(NULL == mog_idleq_wait(q, 0) && "q wait should return NULL"); + assert(NULL == mog_idleq_wait_intr(q, 0) + && "q wait should return NULL"); assert(1 == write(fds[1], ".", 1) && "couldn't write"); - assert(mfd == mog_idleq_wait(q, 0) && "q wait should return mfd"); + assert(mfd == mog_idleq_wait_intr(q, 0) && "q wait should return mfd"); teardown(); } @@ -54,8 +54,7 @@ static void test_blocking(void) mog_idleq_add(q, mfd, MOG_QEV_RD); CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL)); printf("start wait: %d\n", (int)time(NULL)); - mog_cancel_disable(); - assert(mfd == mog_idleq_wait(q, -1)); + assert(mfd == mog_idleq_wait_intr(q, -1)); printf(" end wait: %d\n", (int)time(NULL)); assert(1 == read(fds[0], buf, 1) && "read failed"); assert(buf[0] == 'B' && "didn't read expected 'B'"); diff --git a/test/ruby.mk b/test/ruby.mk index b3323ca..c82a1ec 100644 --- a/test/ruby.mk +++ b/test/ruby.mk @@ -1,5 +1,6 @@ RB_TESTS_FAST = test/cmogstored-cfg.rb test/http_dav.rb test/http_range.rb \ - test/http_put.rb test/http_getonly.rb test/inherit.rb test/upgrade.rb + test/http_put.rb test/http_getonly.rb test/inherit.rb test/upgrade.rb \ + test/http_put6_fail.rb test/epoll_enospc.rb RB_TESTS_SLOW = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \ test/http.rb test/http_put_slow.rb test/http_chunked_put.rb \ test/graceful_quit.rb test/http_idle_expire.rb diff --git a/test/test_helper.rb b/test/test_helper.rb index 153ca9f..b08999f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -21,6 +21,8 @@ TEST_HOST = ENV["TEST_HOST"] || (RUBY_PLATFORM =~ /linux/ ? "127.#{rand(256)}.#{rand(256)}.#{rand(256)}" : "127.0.0.1") +TEST_HOST6 = ENV["TEST_HOST6"] || "::1" + # expand relative paths, --daemonize chdirs path = ENV["PATH"].split(/:/) ENV["PATH"] = path.map { |x| File.expand_path(x) }.join(":") diff --git a/test/thrpool-1.c b/test/thrpool-1.c index 7df099e..67aaff2 100644 --- a/test/thrpool-1.c +++ b/test/thrpool-1.c @@ -24,12 +24,10 @@ void *fn(void *xarg) t.tv_sec++; } - mog_cancel_disable(); CHECK(int, 0, pthread_mutex_lock(&lock)); pthread_cond_timedwait(&cond, &lock, &t); CHECK(int, 0, pthread_mutex_unlock(&lock)); - mog_cancel_enable(); - pthread_testcancel(); + mog_thr_test_quit(); } assert(strcmp("whazzup", s) == 0 && "arg changed"); @@ -4,6 +4,14 @@ */ #include "cmogstored.h" +static __thread unsigned mog_do_quit; +struct mog_thr_start_arg { + struct mog_thrpool *tp; + pthread_mutex_t mtx; + pthread_cond_t cond; + unsigned *do_quit; +}; + /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -22,27 +30,59 @@ #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; -static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER; -struct sat_arg; -struct sat_arg { - struct mog_queue *queue; - size_t size; - SIMPLEQ_ENTRY(sat_arg) qentry; -}; +static sigset_t quitset; + +__attribute__((constructor)) static void thrpool_init(void) +{ + CHECK(int, 0, sigfillset(&quitset)); + CHECK(int, 0, sigdelset(&quitset, SIGURG)); +} -static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead); +/* child thread notifies the parent about its readiness */ +static void *thr_start_wrapper(void *ptr) +{ + struct mog_thr_start_arg *arg = ptr; + struct mog_thrpool *tp; + + mog_do_quit = 0; + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL)); + CHECK(int, 0, pthread_mutex_lock(&arg->mtx)); + + arg->do_quit = &mog_do_quit; + tp = arg->tp; /* arg becomes invalid once we unlock */ + + CHECK(int, 0, pthread_cond_signal(&arg->cond)); + CHECK(int, 0, pthread_mutex_unlock(&arg->mtx)); + + return tp->start_fn(tp->start_arg); +} + +/* child thread tests if its quit flag is set and exits if it is */ +void mog_thr_test_quit(void) +{ + if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) { + mog_alloc_quit(); + pthread_exit(NULL); + } +} /* - * kevent() sleep is not a cancellation point, so it's possible for - * a thread to sleep on it if the cancel request arrived right after - * we checked for cancellation + * we no longer rely on pthreads cancellation, so our explicit checks for + * thread quitting requires us to continuously signal a thread for death + * in case it enters a sleeping syscall (epoll_wait/kevent) immediately + * after checking the mog_do_quit TLS variable */ static void poke(pthread_t thr, int sig) { int err; + /* + * This is an uncommon code path and only triggered when + * we lower thread counts or shut down + */ while ((err = pthread_kill(thr, sig)) == 0) - sched_yield(); + mog_yield(); + assert(err == ESRCH && "pthread_kill() usage bug"); } @@ -57,7 +97,7 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, syslog(LOG_ERR, "pthread_create: %m (tries: %lu)", *nr_eagain); } - sched_yield(); + mog_yield(); return true; } else { errno = err; @@ -68,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, } } -static void thrpool_set_size(struct mog_thrpool *tp, size_t size) +static bool +thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain) +{ + struct mog_thr_start_arg arg = { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + }; + pthread_t *thr; + pthread_attr_t attr; + size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread); + int rc; + + assert(tp && "tp no defined"); + arg.tp = tp; + tp->threads = xrealloc(tp->threads, bytes); + + CHECK(int, 0, pthread_attr_init(&attr)); + + if (stacksize > 0) + CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize)); + + thr = &tp->threads[tp->n_threads].thr; + + CHECK(int, 0, pthread_mutex_lock(&arg.mtx)); + rc = pthread_create(thr, &attr, thr_start_wrapper, &arg); + CHECK(int, 0, pthread_attr_destroy(&attr)); + if (rc == 0) { + CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx)); + tp->threads[tp->n_threads].do_quit = arg.do_quit; + } + CHECK(int, 0, pthread_mutex_unlock(&arg.mtx)); + + if (rc == 0) { + tp->n_threads++; + *nr_eagain = 0; + } else if (mog_pthread_create_retryable(rc)) { + if (!thr_create_fail_retry(tp, size, nr_eagain, rc)) + return false; + } else { + assert(rc == 0 && "pthread_create usage error"); + } + return true; +} + +void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) { unsigned long nr_eagain = 0; CHECK(int, 0, pthread_mutex_lock(&tp->lock)); - while (size > tp->n_threads) { - pthread_t *thr; - pthread_attr_t attr; - size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t); - int rc; - - tp->threads = xrealloc(tp->threads, bytes); - - CHECK(int, 0, pthread_attr_init(&attr)); - - if (stacksize > 0) { - CHECK(int, 0, - pthread_attr_setstacksize(&attr, stacksize)); - } - - thr = tp->threads + tp->n_threads; - rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg); - CHECK(int, 0, pthread_attr_destroy(&attr)); - - if (rc == 0) { - tp->n_threads++; - nr_eagain = 0; - } else if (mog_pthread_create_retry(rc)) { - if (!thr_create_fail_retry(tp, size, &nr_eagain, rc)) - goto out; - } else { - assert(rc == 0 && "pthread_create usage error"); - } - } + while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain)) + /* nothing */; if (tp->n_threads > size) { size_t i; int err; + /* set the do_quit flag for all threads we kill */ for (i = size; i < tp->n_threads; i++) { - CHECK(int, 0, pthread_cancel(tp->threads[i])); - err = pthread_kill(tp->threads[i], SIGURG); + __sync_add_and_fetch(tp->threads[i].do_quit, 1); + err = pthread_kill(tp->threads[i].thr, SIGURG); switch (err) { case 0: @@ -121,79 +179,36 @@ static void thrpool_set_size(struct mog_thrpool *tp, size_t size) } } + /* keep poking them to kick them out out epoll_wait/kevent */ for (i = size; i < tp->n_threads; i++) { - poke(tp->threads[i], SIGURG); + poke(tp->threads[i].thr, SIGURG); - CHECK(int, 0, pthread_join(tp->threads[i], NULL)); + CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL)); } tp->n_threads = size; } -out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } -/* - * fire and forget, we must run the actual thread count manipulation - * in the main notify thread because we may end up terminating the - * thread which invoked this. - */ -void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size) -{ - struct sat_arg *arg; - - /* this gets free'ed in mog_thrpool_process_queue() */ - arg = xmalloc(sizeof(struct sat_arg)); - arg->size = size; - arg->queue = q; - - /* put into the queue so main thread can process it */ - CHECK(int, 0, pthread_mutex_lock(&sat_lock)); - SIMPLEQ_INSERT_TAIL(&satqhead, arg, qentry); - CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); - - /* wake up the main thread so it can process the queue */ - mog_notify(MOG_NOTIFY_SET_N_THREADS); -} - -/* this runs in the main (notify) thread */ -void mog_thrpool_process_queue(void) -{ - /* guard against requests bundled in one wakeup by looping here */ - for (;;) { - struct sat_arg *arg; - - CHECK(int, 0, pthread_mutex_lock(&sat_lock)); - arg = SIMPLEQ_FIRST(&satqhead); - if (arg) - SIMPLEQ_REMOVE_HEAD(&satqhead, qentry); - CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); - - if (arg == NULL) - return; - - syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size); - thrpool_set_size(&arg->queue->thrpool, arg->size); - free(arg); - } -} - void mog_thrpool_start(struct mog_thrpool *tp, size_t n, void *(*start_fn)(void *), void *arg) { + /* we may be started on a new server before device dirs exist */ if (n == 0) n = 1; + tp->threads = NULL; tp->n_threads = 0; tp->start_fn = start_fn; tp->start_arg = arg; CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL)); - thrpool_set_size(tp, n); + mog_thrpool_set_size(tp, n); } void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q) { - thrpool_set_size(tp, 0); + mog_thrpool_set_size(tp, 0); CHECK(int, 0, pthread_mutex_destroy(&tp->lock)); mog_free_and_null(&tp->threads); } @@ -0,0 +1,8 @@ +#ifdef HAVE_SYSTEMTAP +# include "probes.h" +# define TRACE(probe) probe +# define TRACE_ENABLED(probe) probe ## _ENABLED() +#else /* HAVE_SYSTEMTAP */ +# define TRACE(probe) +# define TRACE_ENABLED(probe) (0) +#endif /* !HAVE_SYSTEMTAP */ @@ -3,6 +3,7 @@ * License: GPLv3 or later (see COPYING for details) */ #include "cmogstored.h" +#include "trace.h" struct mog_wbuf { size_t len; @@ -76,30 +77,32 @@ enum mog_write_state mog_tryflush(int fd, struct mog_wbuf **x) */ void * mog_trywritev(int fd, struct iovec *iov, int iovcnt) { - ssize_t total = 0; + ssize_t len = 0; ssize_t w; int i; for (i = 0; i < iovcnt; i++) - total += iov[i].iov_len; + len += iov[i].iov_len; - if (total == 0) + if (len == 0) return NULL; retry: w = writev(fd, iov, iovcnt); - if (w == total) { + if (w == len) { return NULL; } else if (w < 0) { switch (errno) { - case_EAGAIN: return wbuf_newv(total, iov, iovcnt); + case_EAGAIN: + TRACE(CMOGSTORED_WRITE_BUFFERED()); + return wbuf_newv(len, iov, iovcnt); case EINTR: goto retry; } return MOG_WR_ERROR; } else { struct iovec *new_iov = iov; - total -= w; + len -= w; /* skip over iovecs we've already written completely */ for (i = 0; i < iovcnt; i++, new_iov++) { @@ -147,7 +150,9 @@ void * mog_trysend(int fd, void *buf, size_t len, off_t more) if (w < 0) { switch (errno) { - case_EAGAIN: return wbuf_new(buf, len); + case_EAGAIN: + TRACE(CMOGSTORED_WRITE_BUFFERED()); + return wbuf_new(buf, len); case EINTR: continue; } return MOG_WR_ERROR; @@ -58,16 +58,16 @@ void mog_upgrade_prepare(int argc, char *argv[], char *envp[]) } /* writes one comma-delimited fd to fp */ -static bool emit_fd(FILE *fp, int fd) +static bool emit_fd(FILE *fp, struct mog_fd *mfd) { int r; /* no error, just the FD isn't used */ - if (fd < 0) + if (mfd == NULL) return true; errno = 0; - r = fprintf(fp, "%d,", fd); + r = fprintf(fp, "%d,", mfd->fd); if (r > 0) return true; if (errno == 0) @@ -81,9 +81,9 @@ static bool svc_emit_fd_i(void *svcptr, void *_fp) FILE *fp = _fp; struct mog_svc *svc = svcptr; - return (emit_fd(fp, svc->mgmt_fd) - && emit_fd(fp, svc->http_fd) - && emit_fd(fp, svc->httpget_fd)); + return (emit_fd(fp, svc->mgmt_mfd) + && emit_fd(fp, svc->http_mfd) + && emit_fd(fp, svc->httpget_mfd)); } /* returns the PID of the newly spawned child */ @@ -36,22 +36,6 @@ static inline void mog_free(const void *ptr) assert(checkvar==(expect)&& "BUG" && __FILE__ && __LINE__); \ } while (0) -static inline void mog_cancel_enable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old)); - assert(old == PTHREAD_CANCEL_DISABLE && "redundant cancel enable"); -} - -static inline void mog_cancel_disable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old)); - assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable"); -} - /* compiler should optimize this away */ __attribute__((const)) static inline off_t off_t_max(void) { @@ -84,7 +68,7 @@ static inline int mog_set_cloexec(int fd, const bool set) return fcntl(fd, F_SETFD, set ? FD_CLOEXEC : 0); } -static inline bool mog_pthread_create_retry(const int err) +static inline bool mog_pthread_create_retryable(const int err) { /* * older versions of glibc return ENOMEM instead of EAGAIN diff --git a/valid_path.rl b/valid_path.rl index 9d26cfe..2d365c7 100644 --- a/valid_path.rl +++ b/valid_path.rl @@ -31,8 +31,7 @@ static bool path_traversal_found(const char *buf, size_t len) int mog_valid_path(const char *buf, size_t len) { - /* TODO: update if MogileFS supports FIDs >= 10,000,000,000 */ - if (len >= (sizeof("/dev16777215/0/000/000/0123456789.fid"))) + if (len >= MOG_PATH_MAX) return 0; return ! path_traversal_found(buf, len); @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#ifndef HAVE_PTHREAD_YIELD +# define pthread_yield() (void)sched_yield() +#endif + +/* + * pthread_yield may migrate us to the same CPU as the task we're waiting + * on, so just keep yielding for every CPU we have as this throttles + * our ability to spam SIGURG. This means the threads we're trying to + * gracefully kill off can finish their work and check their mog_do_quit + * flag sooner + * + * We only use this as a last resort when normal wakeups/notifications + * are not usable (e.g. recovering from out-of-resource problems) + */ +void mog_yield(void) +{ + static unsigned long nproc_all; + unsigned long i; + + if (!nproc_all) + nproc_all = num_processors(NPROC_ALL) * 2; + for (i = 0; i < nproc_all; i++) + pthread_yield(); +} |