about summary refs log tree commit homepage
path: root/cmogstored.h
diff options
context:
space:
mode:
Diffstat (limited to 'cmogstored.h')
-rw-r--r--cmogstored.h136
1 files changed, 90 insertions, 46 deletions
diff --git a/cmogstored.h b/cmogstored.h
index e72c071..1134827 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -70,6 +70,7 @@
 #include "defaults.h"
 #include "iostat.h"
 #include "mnt.h"
+#include "packaddr.h"
 
 #define MOG_WR_ERROR ((void *)-1)
 #define MOG_IOSTAT (MAP_FAILED)
@@ -122,7 +123,8 @@ struct mog_mgmt {
         int cs;
         enum mog_prio prio;
         struct mog_fd *forward;
-        size_t offset;
+        uint32_t buf_off;
+        uint32_t mog_devid;
         size_t mark[2];
         struct mog_rbuf *rbuf;
         struct mog_wbuf *wbuf; /* uncommonly needed */
@@ -133,21 +135,29 @@ struct mog_mgmt {
 };
 
 struct mog_queue;
+struct mog_svc;
 struct mog_svc {
         int docroot_fd;
         const char *docroot;
+        size_t nmogdev;
+        size_t user_set_aio_threads; /* only touched by main/notify thread */
+        size_t user_req_aio_threads; /* protected by aio_threads_lock */
+        size_t thr_per_dev;
 
         /* private */
         DIR *dir;
+        pthread_mutex_t by_mog_devid_lock;
+        Hash_table *by_mog_devid;
         Hash_table *by_st_dev;
         pthread_mutex_t devstats_lock;
         struct mog_queue *queue;
         LIST_HEAD(mgmt_head, mog_mgmt) devstats_subscribers;
+        SIMPLEQ_ENTRY(mog_svc) qentry;
         mode_t put_perms;
         mode_t mkcol_perms;
-        int http_fd;
-        int httpget_fd;
-        int mgmt_fd;
+        struct mog_fd *http_mfd;
+        struct mog_fd *httpget_mfd;
+        struct mog_fd *mgmt_mfd;
         uint32_t idle_timeout;
 };
 
@@ -169,35 +179,44 @@ enum mog_chunk_state {
 
 struct mog_http {
         int cs;
-        enum mog_http_method http_method:8;
-        unsigned persistent:1;
-        unsigned chunked:1;
-        unsigned has_trailer_md5:1;
-        unsigned has_expect_md5:1;
-        unsigned has_content_range:1; /* for PUT */
-        unsigned has_range:1;         /* for GET */
-        unsigned skip_rbuf_defer:1;
-        enum mog_chunk_state chunk_state:2;
-        uint8_t path_tip;
-        uint8_t path_end;
-        uint16_t line_end;
-        uint16_t tmp_tip;
+        struct {
+                /* only needs 4 bits, but we use 8 for alignment */
+                enum mog_http_method http_method:8;
+                unsigned persistent:1;
+                unsigned chunked:1;
+                unsigned has_md5:1;
+                unsigned has_content_range:1; /* for PUT */
+                unsigned has_range:1;         /* for GET */
+                unsigned skip_rbuf_defer:1;
+                enum mog_chunk_state chunk_state:2;
+                uint8_t path_tip;
+                uint8_t path_end;
+                uint16_t line_end;
+                uint16_t tmp_tip;
+                uint32_t buf_off;
+                uint32_t mog_devid;
+                off_t range_beg;
+                off_t range_end;
+                off_t content_len;
+        } _p;
         struct mog_fd *forward;
-        size_t offset;
-        off_t range_beg;
-        off_t range_end;
-        off_t content_len;
         struct mog_rbuf *rbuf;
         struct mog_wbuf *wbuf; /* uncommonly needed */
         struct mog_svc *svc;
         uint8_t expect_md5[16];
+        struct mog_packaddr mpa;
+} __attribute__((packed));
+
+struct mog_thread {
+        pthread_t thr;
+        unsigned *do_quit;
 };
 
 struct mog_thrpool {
         pthread_mutex_t lock;
         size_t n_threads;
         size_t want_threads;
-        pthread_t *threads;
+        struct mog_thread *threads;
         void *(*start_fn)(void *);
         void *start_arg;
 };
@@ -215,13 +234,16 @@ struct mog_queue {
 };
 
 /* accept.c */
-typedef void (*post_accept_fn)(int fd, struct mog_svc *);
+typedef void (*mog_post_accept_fn)(int fd, struct mog_svc *,
+                                union mog_sockaddr *, socklen_t);
 struct mog_accept {
         struct mog_svc *svc;
-        post_accept_fn post_accept_fn;
+        mog_post_accept_fn post_accept_fn;
+        struct mog_addrinfo *addrinfo; /* shared with cfg */
         struct mog_thrpool thrpool;
 };
-struct mog_accept * mog_accept_init(int fd, struct mog_svc *, post_accept_fn);
+struct mog_fd *mog_accept_init(int fd, struct mog_svc *,
+                                struct mog_addrinfo *, mog_post_accept_fn);
 void * mog_accept_loop(void *ac);
 
 struct mog_digest {
@@ -244,6 +266,7 @@ struct mog_file {
 #include "notify.h"
 
 /* sig.c */
+extern sigset_t mog_emptyset;
 void mog_intr_disable(void);
 void mog_intr_enable(void);
 void mog_sleep(long seconds);
@@ -280,7 +303,6 @@ struct mog_fd {
                 struct mog_svc *svc;
         } as;
 };
-struct mog_fd *mog_fd_get(int fd);
 void mog_fd_put(struct mog_fd *mfd);
 void mog_fdmap_requeue(struct mog_queue *quit_queue);
 size_t mog_fdmap_expire(uint32_t sec);
@@ -299,6 +321,7 @@ void mog_rbuf_free(struct mog_rbuf *);
 void mog_rbuf_free_and_null(struct mog_rbuf **);
 void *mog_fsbuf_get(size_t *size);
 void mog_alloc_quit(void);
+void mog_oom_if_null(const void *);
 
 #define die_errno(...) do { \
         error(EXIT_FAILURE, errno, __VA_ARGS__); \
@@ -320,10 +343,16 @@ struct mog_svc *mog_svc_new(const char *docroot);
 typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *);
 size_t mog_svc_each(Hash_processor processor, void *data);
 void mog_svc_upgrade_prepare(void);
+bool mog_svc_start_each(void *svc_ptr, void *have_mgmt_ptr);
+void mog_svc_thrpool_rescale(struct mog_svc *, size_t ndev_new);
+void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr);
+void mog_svc_aio_threads_handler(void);
 
 /* dev.c */
-struct mog_dev * mog_dev_new(struct mog_svc *, uint32_t mog_devid);
+struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid);
 int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *);
+size_t mog_dev_hash(const void *, size_t tablesize);
+bool mog_dev_cmp(const void *a, const void *b);
 
 /* valid_path.rl */
 int mog_valid_path(const char *buf, size_t len);
@@ -345,7 +374,7 @@ void mog_pidfile_upgrade_abort(void);
 bool mog_svc_devstats_broadcast(void *svc, void *ignored);
 void mog_svc_devstats_subscribe(struct mog_mgmt *);
 void mog_svc_dev_shutdown(void);
-size_t mog_mkusage_all(void);
+void mog_mkusage_all(void);
 
 /* cloexec_detect.c */
 extern bool mog_cloexec_atomic;
@@ -375,17 +404,20 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode);
 char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode);
 
 /* thrpool.c */
+void mog_thr_test_quit(void);
 void mog_thrpool_start(struct mog_thrpool *, size_t n,
                        void *(*start_fn)(void *), void *arg);
 void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *);
-void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size);
 void mog_thrpool_process_queue(void);
+void mog_thrpool_set_size(struct mog_thrpool *, size_t size);
 
 /* mgmt.c */
 void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt);
-void mog_mgmt_post_accept(int fd, struct mog_svc *);
+void mog_mgmt_post_accept(int fd, struct mog_svc *,
+                                union mog_sockaddr *, socklen_t);
 enum mog_next mog_mgmt_queue_step(struct mog_fd *) MOG_CHECK;
 void mog_mgmt_quit_step(struct mog_fd *);
+void mog_mgmt_drop(struct mog_fd *);
 
 /* queue_epoll.c */
 struct mog_queue * mog_queue_new(void);
@@ -404,7 +436,7 @@ struct mog_addrinfo {
 void mog_addrinfo_free(struct mog_addrinfo **);
 
 /* bind_listen.c */
-int mog_bind_listen(struct addrinfo *, const char *accept_filter);
+int mog_bind_listen(struct addrinfo *);
 
 /* close.c */
 void mog_close(int fd);
@@ -433,23 +465,27 @@ void mog_http_init(struct mog_http *, struct mog_svc *);
 enum mog_parser_state mog_http_parse(struct mog_http *, char *buf, size_t len);
 
 /* http_get.c */
-void mog_http_get_open(struct mog_http *, char *buf);
+void mog_http_get_open(struct mog_fd *, char *buf);
 enum mog_next mog_http_get_in_progress(struct mog_fd *);
 
 /* http.c */
-void mog_http_post_accept(int fd, struct mog_svc *);
-void mog_httpget_post_accept(int fd, struct mog_svc *);
+void mog_http_post_accept(int fd, struct mog_svc *,
+                                union mog_sockaddr *, socklen_t);
+void mog_httpget_post_accept(int fd, struct mog_svc *,
+                                union mog_sockaddr *, socklen_t);
 enum mog_next mog_http_queue_step(struct mog_fd *) MOG_CHECK;
 void mog_http_quit_step(struct mog_fd *);
 char *mog_http_path(struct mog_http *, char *buf);
 void mog_http_reset(struct mog_http *);
+void mog_http_unlink_ftmp(struct mog_http *);
+void mog_http_drop(struct mog_fd *);
 
 /* http_dav.c */
-void mog_http_delete(struct mog_http *http, char *buf);
-void mog_http_mkcol(struct mog_http *http, char *buf);
+void mog_http_delete(struct mog_fd *, char *buf);
+void mog_http_mkcol(struct mog_fd *, char *buf);
 
 /* http_put.c */
-void mog_http_put(struct mog_http *http, char *buf, size_t buf_len);
+void mog_http_put(struct mog_fd *, char *buf, size_t buf_len);
 enum mog_next mog_http_put_in_progress(struct mog_fd *);
 bool mog_http_write_full(struct mog_fd *file_mfd, char *buf, size_t buf_len);
 
@@ -474,6 +510,7 @@ int mog_mkpath_for(struct mog_svc *svc, char *path);
 /* queue_common.c */
 struct mog_queue *mog_queue_init(int queue_fd);
 void mog_queue_stop(struct mog_queue *keep);
+void mog_queue_drop(struct mog_fd *);
 
 /* fsck_queue.c */
 bool mog_fsck_queue_ready(struct mog_fd *mfd) MOG_CHECK;
@@ -511,6 +548,12 @@ void mog_iou_active(dev_t);
 #  define MOG_TCP_NOPUSH (0)
 #endif
 
+/* publically visible attributes of the current process */
+struct mog_main {
+        unsigned long worker_processes;
+        bool have_mgmt;
+};
+
 /* cmogstored.c */
 void cmogstored_quit(void);
 
@@ -543,19 +586,20 @@ pid_t mog_upgrade_spawn(void);
 /* exit.c */
 _Noreturn void cmogstored_exit(void);
 
+verify(sizeof(in_port_t) <= sizeof(uint16_t));
 /*
  * We only deal with ipv4 and ipv6 addresses (and no human-friendly
  * hostnames/service names), so we can use smaller constants than the
  * standard NI_MAXHOST/NI_MAXSERV values (1025 and 32 respectively).
  * This reduces our per-thread stack usage and keeps caches hotter.
  */
-#define MOG_NI_MAXHOST (INET6_ADDRSTRLEN)
-#define MOG_NI_MAXSERV (sizeof(":65536"))
-
-/* avoid sockaddr_storage since that bigger than we need */
-union mog_sockaddr {
-        struct sockaddr_in in;
-        struct sockaddr_in6 in6;
-        struct sockaddr sa;
-        unsigned char bytes[1];
+struct mog_ni {
+        char ni_host[INET6_ADDRSTRLEN + sizeof("[]") - 1];
+        char ni_serv[sizeof(":65536")];
 };
+
+/* nameinfo.c */
+void mog_nameinfo(struct mog_packaddr *, struct mog_ni *);
+
+/* yield.c */
+void mog_yield(void);