diff options
Diffstat (limited to 'cmogstored.h')
-rw-r--r-- | cmogstored.h | 136 |
1 files changed, 90 insertions, 46 deletions
diff --git a/cmogstored.h b/cmogstored.h index e72c071..1134827 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -70,6 +70,7 @@ #include "defaults.h" #include "iostat.h" #include "mnt.h" +#include "packaddr.h" #define MOG_WR_ERROR ((void *)-1) #define MOG_IOSTAT (MAP_FAILED) @@ -122,7 +123,8 @@ struct mog_mgmt { int cs; enum mog_prio prio; struct mog_fd *forward; - size_t offset; + uint32_t buf_off; + uint32_t mog_devid; size_t mark[2]; struct mog_rbuf *rbuf; struct mog_wbuf *wbuf; /* uncommonly needed */ @@ -133,21 +135,29 @@ struct mog_mgmt { }; struct mog_queue; +struct mog_svc; struct mog_svc { int docroot_fd; const char *docroot; + size_t nmogdev; + size_t user_set_aio_threads; /* only touched by main/notify thread */ + size_t user_req_aio_threads; /* protected by aio_threads_lock */ + size_t thr_per_dev; /* private */ DIR *dir; + pthread_mutex_t by_mog_devid_lock; + Hash_table *by_mog_devid; Hash_table *by_st_dev; pthread_mutex_t devstats_lock; struct mog_queue *queue; LIST_HEAD(mgmt_head, mog_mgmt) devstats_subscribers; + SIMPLEQ_ENTRY(mog_svc) qentry; mode_t put_perms; mode_t mkcol_perms; - int http_fd; - int httpget_fd; - int mgmt_fd; + struct mog_fd *http_mfd; + struct mog_fd *httpget_mfd; + struct mog_fd *mgmt_mfd; uint32_t idle_timeout; }; @@ -169,35 +179,44 @@ enum mog_chunk_state { struct mog_http { int cs; - enum mog_http_method http_method:8; - unsigned persistent:1; - unsigned chunked:1; - unsigned has_trailer_md5:1; - unsigned has_expect_md5:1; - unsigned has_content_range:1; /* for PUT */ - unsigned has_range:1; /* for GET */ - unsigned skip_rbuf_defer:1; - enum mog_chunk_state chunk_state:2; - uint8_t path_tip; - uint8_t path_end; - uint16_t line_end; - uint16_t tmp_tip; + struct { + /* only needs 4 bits, but we use 8 for alignment */ + enum mog_http_method http_method:8; + unsigned persistent:1; + unsigned chunked:1; + unsigned has_md5:1; + unsigned has_content_range:1; /* for PUT */ + unsigned has_range:1; /* for GET */ + unsigned skip_rbuf_defer:1; + enum mog_chunk_state chunk_state:2; + uint8_t path_tip; + uint8_t path_end; + uint16_t line_end; + uint16_t tmp_tip; + uint32_t buf_off; + uint32_t mog_devid; + off_t range_beg; + off_t range_end; + off_t content_len; + } _p; struct mog_fd *forward; - size_t offset; - off_t range_beg; - off_t range_end; - off_t content_len; struct mog_rbuf *rbuf; struct mog_wbuf *wbuf; /* uncommonly needed */ struct mog_svc *svc; uint8_t expect_md5[16]; + struct mog_packaddr mpa; +} __attribute__((packed)); + +struct mog_thread { + pthread_t thr; + unsigned *do_quit; }; struct mog_thrpool { pthread_mutex_t lock; size_t n_threads; size_t want_threads; - pthread_t *threads; + struct mog_thread *threads; void *(*start_fn)(void *); void *start_arg; }; @@ -215,13 +234,16 @@ struct mog_queue { }; /* accept.c */ -typedef void (*post_accept_fn)(int fd, struct mog_svc *); +typedef void (*mog_post_accept_fn)(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); struct mog_accept { struct mog_svc *svc; - post_accept_fn post_accept_fn; + mog_post_accept_fn post_accept_fn; + struct mog_addrinfo *addrinfo; /* shared with cfg */ struct mog_thrpool thrpool; }; -struct mog_accept * mog_accept_init(int fd, struct mog_svc *, post_accept_fn); +struct mog_fd *mog_accept_init(int fd, struct mog_svc *, + struct mog_addrinfo *, mog_post_accept_fn); void * mog_accept_loop(void *ac); struct mog_digest { @@ -244,6 +266,7 @@ struct mog_file { #include "notify.h" /* sig.c */ +extern sigset_t mog_emptyset; void mog_intr_disable(void); void mog_intr_enable(void); void mog_sleep(long seconds); @@ -280,7 +303,6 @@ struct mog_fd { struct mog_svc *svc; } as; }; -struct mog_fd *mog_fd_get(int fd); void mog_fd_put(struct mog_fd *mfd); void mog_fdmap_requeue(struct mog_queue *quit_queue); size_t mog_fdmap_expire(uint32_t sec); @@ -299,6 +321,7 @@ void mog_rbuf_free(struct mog_rbuf *); void mog_rbuf_free_and_null(struct mog_rbuf **); void *mog_fsbuf_get(size_t *size); void mog_alloc_quit(void); +void mog_oom_if_null(const void *); #define die_errno(...) do { \ error(EXIT_FAILURE, errno, __VA_ARGS__); \ @@ -320,10 +343,16 @@ struct mog_svc *mog_svc_new(const char *docroot); typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *); size_t mog_svc_each(Hash_processor processor, void *data); void mog_svc_upgrade_prepare(void); +bool mog_svc_start_each(void *svc_ptr, void *have_mgmt_ptr); +void mog_svc_thrpool_rescale(struct mog_svc *, size_t ndev_new); +void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr); +void mog_svc_aio_threads_handler(void); /* dev.c */ -struct mog_dev * mog_dev_new(struct mog_svc *, uint32_t mog_devid); +struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid); int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); +size_t mog_dev_hash(const void *, size_t tablesize); +bool mog_dev_cmp(const void *a, const void *b); /* valid_path.rl */ int mog_valid_path(const char *buf, size_t len); @@ -345,7 +374,7 @@ void mog_pidfile_upgrade_abort(void); bool mog_svc_devstats_broadcast(void *svc, void *ignored); void mog_svc_devstats_subscribe(struct mog_mgmt *); void mog_svc_dev_shutdown(void); -size_t mog_mkusage_all(void); +void mog_mkusage_all(void); /* cloexec_detect.c */ extern bool mog_cloexec_atomic; @@ -375,17 +404,20 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode); char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); /* thrpool.c */ +void mog_thr_test_quit(void); void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); -void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size); void mog_thrpool_process_queue(void); +void mog_thrpool_set_size(struct mog_thrpool *, size_t size); /* mgmt.c */ void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); -void mog_mgmt_post_accept(int fd, struct mog_svc *); +void mog_mgmt_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); enum mog_next mog_mgmt_queue_step(struct mog_fd *) MOG_CHECK; void mog_mgmt_quit_step(struct mog_fd *); +void mog_mgmt_drop(struct mog_fd *); /* queue_epoll.c */ struct mog_queue * mog_queue_new(void); @@ -404,7 +436,7 @@ struct mog_addrinfo { void mog_addrinfo_free(struct mog_addrinfo **); /* bind_listen.c */ -int mog_bind_listen(struct addrinfo *, const char *accept_filter); +int mog_bind_listen(struct addrinfo *); /* close.c */ void mog_close(int fd); @@ -433,23 +465,27 @@ void mog_http_init(struct mog_http *, struct mog_svc *); enum mog_parser_state mog_http_parse(struct mog_http *, char *buf, size_t len); /* http_get.c */ -void mog_http_get_open(struct mog_http *, char *buf); +void mog_http_get_open(struct mog_fd *, char *buf); enum mog_next mog_http_get_in_progress(struct mog_fd *); /* http.c */ -void mog_http_post_accept(int fd, struct mog_svc *); -void mog_httpget_post_accept(int fd, struct mog_svc *); +void mog_http_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); +void mog_httpget_post_accept(int fd, struct mog_svc *, + union mog_sockaddr *, socklen_t); enum mog_next mog_http_queue_step(struct mog_fd *) MOG_CHECK; void mog_http_quit_step(struct mog_fd *); char *mog_http_path(struct mog_http *, char *buf); void mog_http_reset(struct mog_http *); +void mog_http_unlink_ftmp(struct mog_http *); +void mog_http_drop(struct mog_fd *); /* http_dav.c */ -void mog_http_delete(struct mog_http *http, char *buf); -void mog_http_mkcol(struct mog_http *http, char *buf); +void mog_http_delete(struct mog_fd *, char *buf); +void mog_http_mkcol(struct mog_fd *, char *buf); /* http_put.c */ -void mog_http_put(struct mog_http *http, char *buf, size_t buf_len); +void mog_http_put(struct mog_fd *, char *buf, size_t buf_len); enum mog_next mog_http_put_in_progress(struct mog_fd *); bool mog_http_write_full(struct mog_fd *file_mfd, char *buf, size_t buf_len); @@ -474,6 +510,7 @@ int mog_mkpath_for(struct mog_svc *svc, char *path); /* queue_common.c */ struct mog_queue *mog_queue_init(int queue_fd); void mog_queue_stop(struct mog_queue *keep); +void mog_queue_drop(struct mog_fd *); /* fsck_queue.c */ bool mog_fsck_queue_ready(struct mog_fd *mfd) MOG_CHECK; @@ -511,6 +548,12 @@ void mog_iou_active(dev_t); # define MOG_TCP_NOPUSH (0) #endif +/* publically visible attributes of the current process */ +struct mog_main { + unsigned long worker_processes; + bool have_mgmt; +}; + /* cmogstored.c */ void cmogstored_quit(void); @@ -543,19 +586,20 @@ pid_t mog_upgrade_spawn(void); /* exit.c */ _Noreturn void cmogstored_exit(void); +verify(sizeof(in_port_t) <= sizeof(uint16_t)); /* * We only deal with ipv4 and ipv6 addresses (and no human-friendly * hostnames/service names), so we can use smaller constants than the * standard NI_MAXHOST/NI_MAXSERV values (1025 and 32 respectively). * This reduces our per-thread stack usage and keeps caches hotter. */ -#define MOG_NI_MAXHOST (INET6_ADDRSTRLEN) -#define MOG_NI_MAXSERV (sizeof(":65536")) - -/* avoid sockaddr_storage since that bigger than we need */ -union mog_sockaddr { - struct sockaddr_in in; - struct sockaddr_in6 in6; - struct sockaddr sa; - unsigned char bytes[1]; +struct mog_ni { + char ni_host[INET6_ADDRSTRLEN + sizeof("[]") - 1]; + char ni_serv[sizeof(":65536")]; }; + +/* nameinfo.c */ +void mog_nameinfo(struct mog_packaddr *, struct mog_ni *); + +/* yield.c */ +void mog_yield(void); |