1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
| | /*
* Copyright (C) 2012-2018 all contributors <cmogstored-public@bogomips.org>
* License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
*/
#include "cmogstored.h"
static int notes[MOG_NOTIFY_MAX];
static struct mog_fd *notify_mfd;
static time_t usage_file_updated_at;
static time_t usage_file_interval = 10;
struct mog_queue *mog_notify_queue;
void mog_notify_init(void)
{
const char *interval = getenv("MOG_DISK_USAGE_INTERVAL");
if (interval) {
int i = atoi(interval);
if (i > 0)
usage_file_interval = (time_t)i;
}
assert(mog_notify_queue == NULL && "notify queue already initialized");
assert(notify_mfd == NULL && "notify_mfd already initialized");
mog_notify_queue = mog_queue_new();
notify_mfd = mog_selfwake_new();
if (notify_mfd) {
struct mog_selfwake *notify = ¬ify_mfd->as.selfwake;
assert(notify->writer && "notify writer not initialized");
notify->queue = mog_notify_queue;
mog_idleq_add(notify->queue, notify_mfd, MOG_QEV_RD);
}
}
static void global_mkusage(void)
{
mog_mkusage_all();
usage_file_updated_at = time(NULL);
}
static inline bool note_xchg(enum mog_notification note, int from, int to)
{
return __sync_bool_compare_and_swap(¬es[note], from, to);
}
static void note_run(void)
{
if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0))
global_mkusage();
if (note_xchg(MOG_NOTIFY_AIO_THREADS, 1, 0))
mog_svc_aio_threads_handler();
}
/* drain the pipe and process notifications */
static void note_queue_step(struct mog_fd *mfd)
{
mog_selfwake_drain(mfd);
note_run();
mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD);
}
static void notify_queue_step(struct mog_fd *mfd)
{
switch (mfd->fd_type) {
case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return;
case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return;
default:
assert(0 && mfd->fd_type && "bad fd_type in queue");
}
}
/* this is the main loop of cmogstored */
void mog_notify_wait(bool need_usage_file)
{
time_t next = usage_file_updated_at + usage_file_interval;
time_t now = time(NULL);
time_t timeout = next - now;
struct mog_fd *mfd;
if (next <= now)
global_mkusage();
/*
* epoll_wait() with timeout==0 can avoid some slow paths,
* so take anything that's already ready before sleeping
*/
while ((mfd = mog_idleq_wait(mog_notify_queue, 0)))
notify_queue_step(mfd);
if (need_usage_file == false)
timeout = -1;
else if (timeout > 0)
timeout *= 1000;
else
timeout = 0;
mfd = mog_idleq_wait_intr(mog_notify_queue, timeout);
if (mfd)
notify_queue_step(mfd);
else if (errno == EINTR)
note_run();
}
/* this is async-signal safe */
void mog_notify(enum mog_notification note)
{
switch (note) {
case MOG_NOTIFY_DEVICE_REFRESH:
case MOG_NOTIFY_AIO_THREADS:
note_xchg(note, 0, 1);
mog_selfwake_interrupt();
break;
case MOG_NOTIFY_SIGNAL: break;
default: assert(0 && "bad note passed");
}
mog_selfwake_trigger(notify_mfd);
}
|