1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
| | /*
* Copyright (C) 2012-2015 all contributors <cmogstored-public@bogomips.org>
* License: GPLv3 or later (see COPYING for details)
*/
#include "cmogstored.h"
static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd)
{
/*
* idle, just-ready clients are the most important
* We use a zero timeout here since epoll_wait() is
* optimizes for the non-blocking case.
*/
struct mog_fd *recent_mfd = mog_idleq_wait(q, 0);
if (recent_mfd) {
/*
* We got a more important client, push
* active_mfd into the active queue for another
* thread to service while we service a more
* recently-active client.
*/
mog_activeq_push(q, mfd);
return recent_mfd;
}
/*
* keep processing the currently-active mfd in this thread
* if no new work came up
*/
return mfd;
}
/* passed as a start_routine to pthread_create */
void * mog_queue_loop(void *arg)
{
struct mog_queue *q = arg;
struct mog_fd *mfd = NULL;
syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
(unsigned long)pthread_self());
for (;;) {
while (mfd == NULL)
mfd = mog_idleq_wait(q, -1);
switch (mog_queue_step(mfd)) {
case MOG_NEXT_ACTIVE:
mfd = queue_xchg_maybe(q, mfd);
break;
case MOG_NEXT_WAIT_RD:
mfd = mog_queue_xchg(q, mfd, MOG_QEV_RD);
break;
case MOG_NEXT_WAIT_WR:
mfd = mog_queue_xchg(q, mfd, MOG_QEV_WR);
break;
case MOG_NEXT_IGNORE:
case MOG_NEXT_CLOSE:
/* already hanndled */
mfd = mog_idleq_wait(q, -1);
}
}
return NULL;
}
static void queue_quit_step(struct mog_fd *mfd)
{
switch (mfd->fd_type) {
case MOG_FD_TYPE_MGMT: mog_mgmt_quit_step(mfd); return;
case MOG_FD_TYPE_HTTP:
case MOG_FD_TYPE_HTTPGET:
mog_http_quit_step(mfd); return;
case MOG_FD_TYPE_FILE:
case MOG_FD_TYPE_QUEUE:
case MOG_FD_TYPE_SVC:
assert(0 && "invalid fd_type in queue_quit_step");
default:
break;
}
}
/* called at shutdown when only one thread is active */
void mog_queue_quit_loop(struct mog_queue *queue)
{
struct mog_fd *mfd;
while (mog_nr_active_at_quit) {
assert(mog_nr_active_at_quit <= (size_t)INT_MAX
&& "mog_nr_active_at_quit underflow");
if ((mfd = mog_idleq_wait_intr(queue, -1)))
queue_quit_step(mfd);
}
}
|