From: Pete Zaitcev <zaitcev@redhat.com>
To: Jeff Garzik <jeff@garzik.org>
Cc: Project Hail List <hail-devel@vger.kernel.org>
Subject: [Patch 1/1] tabled: switch to ncld
Date: Mon, 8 Feb 2010 21:42:14 -0700 [thread overview]
Message-ID: <20100208214214.16f39ec8@redhat.com> (raw)
No new function just yet, only a switch-over.
Signed-Off-By: Pete Zaitcev <zaitcev@redhat.com>
---
server/cldu.c | 789 +++++++++++++-----------------------------------
1 file changed, 215 insertions(+), 574 deletions(-)
diff --git a/server/cldu.c b/server/cldu.c
index 7e176d4..aecf336 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -29,8 +29,8 @@
#include <unistd.h>
#include <event.h>
#include <errno.h>
-#include <cldc.h>
#include <elist.h>
+#include <ncld.h>
#include "tabled.h"
#define ALIGN8(n) ((8 - ((n) & 7)) & 7)
@@ -49,60 +49,38 @@ struct cld_host {
struct cld_session {
bool forced_hosts; /* Administrator overrode default CLD */
- bool sess_open;
- struct cldc_udp *lib; /* library state */
- struct event lib_timer;
- int retry_cnt;
- int last_recv_err;
+ bool is_dead;
+ struct ncld_sess *nsp; /* library state */
/*
* For code sanity and being isomorphic with conventional programming
* using sleep(), neither of the timers must ever be active simultane-
* ously with any other. But using one timer structure is too annoying.
*/
- struct event tm_retry;
+ // struct event tm_relock;
struct event tm_rescan;
- struct event tm_reopen;
int actx; /* Active host cldv[actx] */
struct cld_host cldv[N_CLD];
char *thisgroup;
char *thishost;
- struct event ev; /* Associated with fd */
char *cfname; /* /tabled-group directory */
- struct cldc_fh *cfh; /* /tabled-group directory, keep open for scan */
+ struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */
char *ffname; /* /tabled-group/thishost */
- struct cldc_fh *ffh; /* /tabled-group/thishost, keep open for lock */
+ struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */
char *xfname; /* /chunk-GROUP directory */
- struct cldc_fh *xfh; /* /chunk-GROUP directory */
- char *yfname; /* /chunk-GROUP/NID file */
- struct cldc_fh *yfh; /* /chunk-GROUP/NID file */
struct list_head chunks; /* found in xfname, struct chunk_node */
};
static int cldu_set_cldc(struct cld_session *sp, int newactive);
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void try_open_x(struct cld_session *sp);
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void next_chunk(struct cld_session *sp);
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
+static int scan_chunks(struct cld_session *sp);
+static void next_chunk(struct cld_session *sp, struct chunk_node *np);
static void add_remote(const char *name);
static void add_chunk_node(struct cld_session *sp, const char *name);
-static struct timeval cldu_retry_delay = { 5, 0 };
static struct timeval cldu_rescan_delay = { 50, 0 };
-static struct timeval cldu_reopen_delay = { 3, 0 };
struct hail_log cldu_hail_log = {
.func = applog,
@@ -169,170 +147,68 @@ err_oom:
return 0;
}
-static void cldu_tm_retry(int fd, short events, void *userdata)
-{
- struct cld_session *sp = userdata;
-
- if (++sp->retry_cnt >= 5) {
- applog(LOG_INFO, "Out of retries for %s, bailing", sp->xfname);
- exit(1);
- }
- if (debugging)
- applog(LOG_DEBUG, "Trying to open %s", sp->xfname);
- try_open_x(sp);
-}
-
static void cldu_tm_rescan(int fd, short events, void *userdata)
{
struct cld_session *sp = userdata;
+ int newactive;
/* Add rescanning for tabled nodes as well. FIXME */
if (debugging)
applog(LOG_DEBUG, "Rescanning for Chunks in %s", sp->xfname);
- try_open_x(sp);
-}
-
-static void cldu_tm_reopen(int fd, short events, void *userdata)
-{
- struct cld_session *sp = userdata;
-
- if (debugging)
- applog(LOG_DEBUG, "Trying to reopen %d storage nodes",
- tabled_srv.num_stor);
- if (stor_update_cb() < 1)
- evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
-}
-static void cldu_event(int fd, short events, void *userdata)
-{
- struct cld_session *sp = userdata;
- int rc;
-
- if (!sp->lib) {
- applog(LOG_WARNING, "Stray UDP event");
- return;
- }
-
- rc = cldc_udp_receive_pkt(sp->lib);
- if (rc) {
- if (rc != sp->last_recv_err) {
- if (rc < -1000) /* our internal code */
- applog(LOG_INFO,
- "cldc_udp_receive_pkt failed: %d", rc);
- else
- applog(LOG_INFO,
- "cldc_udp_receive_pkt failed: %s",
- strerror(-rc));
- sp->last_recv_err = rc;
- }
- /*
- * Reacting to ICMP messages is a bad idea, because
- * - it makes us loop hard in case CLD is down, unless we
- * insert additional tricky timeouts
- * - it deals poorly with transient problems like CLD reboots
- */
-#if 0
- if (rc == -ECONNREFUSED) { /* ICMP tells us */
- int newactive;
- // evtimer_del(&sp->tm);
- cldc_kill_sess(sp->lib->sess);
- sp->lib->sess = NULL;
- newactive = cldu_nextactive(sp);
- if (cldu_set_cldc(sp, newactive))
- return;
- // evtimer_add(&sp->tm, &cldc_to_delay);
+ if (sp->is_dead) {
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
+ sp->is_dead = 0;
+ newactive = cldu_nextactive(sp);
+ if (cldu_set_cldc(sp, newactive)) {
+ evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
+ return;
}
- return;
-#endif
- }
-}
-
-static bool cldu_p_timer_ctl(void *priv, bool add,
- int (*cb)(struct cldc_session *, void *),
- void *cb_priv, time_t secs)
-{
- struct cld_session *sp = priv;
- struct cldc_udp *udp = sp->lib;
- struct timeval tv = { secs, 0 };
-
- if (add) {
- udp->cb = cb;
- udp->cb_private = cb_priv;
- return evtimer_add(&sp->lib_timer, &tv) == 0;
- } else {
- return evtimer_del(&sp->lib_timer) == 0;
}
-}
-static int cldu_p_pkt_send(void *priv, const void *addr, size_t addrlen,
- const void *buf, size_t buflen)
-{
- struct cld_session *sp = priv;
- return cldc_udp_pkt_send(sp->lib, addr, addrlen, buf, buflen);
+ scan_chunks(sp);
+ evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
}
-static void cldu_udp_timer_event(int fd, short events, void *userdata)
-
-{
- struct cld_session *sp = userdata;
- struct cldc_udp *udp = sp->lib;
-
- if (udp->cb)
- udp->cb(udp->sess, udp->cb_private);
-}
-
-static void cldu_p_event(void *priv, struct cldc_session *csp,
- struct cldc_fh *fh, uint32_t what)
+static void cldu_sess_event(void *priv, uint32_t what)
{
struct cld_session *sp = priv;
- int newactive;
if (what == CE_SESS_FAILED) {
- sp->sess_open = false;
- if (sp->lib->sess != csp)
- applog(LOG_ERR, "Stray session failed, sid " SIDFMT,
- SIDARG(csp->sid));
- else
- applog(LOG_ERR, "Session failed, sid " SIDFMT,
- SIDARG(csp->sid));
- // evtimer_del(&sp->tm);
- sp->lib->sess = NULL;
- newactive = cldu_nextactive(sp);
- if (cldu_set_cldc(sp, newactive))
- return;
- // evtimer_add(&sp->tm, &cldc_to_delay);
+ applog(LOG_ERR, "Session failed, sid " SIDFMT,
+ SIDARG(sp->nsp->udp->sess->sid));
+ sp->is_dead = true;
} else {
- if (csp)
+ if (sp->nsp)
applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT,
- what, SIDARG(csp->sid));
+ what, SIDARG(sp->nsp->udp->sess->sid));
else
applog(LOG_INFO, "cldc event 0x%x no sid", what);
}
}
-static struct cldc_ops cld_ops = {
- .timer_ctl = cldu_p_timer_ctl,
- .pkt_send = cldu_p_pkt_send,
- .event = cldu_p_event,
- .errlog = applog,
-};
-
/*
- * Open the library, start its session, and reguster its socket with libevent.
+ * Open the library, start its session, pre-open files, and set timers.
* Our session remains consistent in case of an error in this function,
* so that we can continue and retry meaningfuly.
*/
static int cldu_set_cldc(struct cld_session *sp, int newactive)
{
struct cldc_host *hp;
- struct cldc_udp *lib;
- struct cldc_call_opts copts;
+ struct ncld_read *nrp;
+ char buf[100];
+ const char *ptr;
+ int dir_len;
+ int total_len, rec_len, name_len;
+ int len;
+ struct timespec tm;
+ int error;
int rc;
- if (sp->lib) {
- event_del(&sp->ev);
- cldc_udp_free(sp->lib);
- sp->lib = NULL;
+ if (sp->nsp) {
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
}
sp->actx = newactive;
@@ -342,105 +218,36 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
}
hp = &sp->cldv[sp->actx].h;
- evtimer_set(&sp->lib_timer, cldu_udp_timer_event, sp);
-
- rc = cldc_udp_new(hp->host, hp->port, &sp->lib);
- if (rc) {
- applog(LOG_ERR, "cldc_udp_new(%s,%u) error: %d",
- hp->host, hp->port, rc);
- goto err_lib_new;
- }
- lib = sp->lib;
-
if (debugging)
applog(LOG_INFO, "Selected CLD host %s port %u",
hp->host, hp->port);
- /*
- * This is a little iffy: we assume that it's ok to re-issue
- * event_set() for an event that was unregistered with event_del().
- * In any case, there's no other way to set the file descriptor.
- */
- event_set(&sp->ev, sp->lib->fd, EV_READ | EV_PERSIST, cldu_event, sp);
-
- if (event_add(&sp->ev, NULL) < 0) {
- applog(LOG_INFO, "Failed to add CLD event");
- goto err_event;
- }
-
- memset(&copts, 0, sizeof(struct cldc_call_opts));
- copts.cb = cldu_new_sess;
- copts.private = sp;
- rc = cldc_new_sess(&cld_ops, &copts, lib->addr, lib->addr_len,
- "tabled", "tabled", sp, &lib->sess);
- if (rc) {
- applog(LOG_INFO,
- "Failed to start CLD session on host %s port %u",
- hp->host, hp->port);
- goto err_sess;
- }
-
- // if (debugging)
- // lib->sess->verbose = true;
-
- return 0;
-
-err_sess:
-err_event:
- cldc_udp_free(sp->lib);
- sp->lib = NULL;
-err_lib_new:
-err_addr:
- return -1;
-}
-
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_INFO, "New CLD session creation failed: %d", errc);
- return 0;
+ sp->nsp = ncld_sess_open(hp->host, hp->port, &error,
+ cldu_sess_event, sp, "tabled", "tabled");
+ if (sp->nsp == NULL) {
+ if (error < 1000) {
+ applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %s",
+ hp->host, hp->port, strerror(error));
+ } else {
+ applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %d",
+ hp->host, hp->port, error);
+ }
+ goto err_nsess;
}
- sp->sess_open = true;
applog(LOG_INFO, "New CLD session created, sid " SIDFMT,
- SIDARG(sp->lib->sess->sid));
+ SIDARG(sp->nsp->udp->sess->sid));
/*
* First, make sure the base directory exists.
*/
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_open_c_cb;
- copts.private = sp;
- rc = cldc_open(sp->lib->sess, &copts, sp->cfname,
- COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
- CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->cfh);
- if (rc) {
- applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->cfname, rc);
- }
- return 0;
-}
-
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, errc);
- return 0;
- }
- if (sp->cfh == NULL) {
- applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->cfname);
- return 0;
- }
- if (!sp->cfh->valid) {
- applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->cfname);
- return 0;
+ sp->cfh = ncld_open(sp->nsp, sp->cfname,
+ COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
+ &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+ NULL, NULL);
+ if (!sp->cfh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, error);
+ goto err_copen;
}
if (debugging)
@@ -449,65 +256,38 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
/*
* Then, create the membership file for us.
*/
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_open_f_cb;
- copts.private = sp;
- rc = cldc_open(sp->lib->sess, &copts, sp->ffname,
- COM_WRITE | COM_LOCK | COM_CREATE,
- CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh);
- if (rc) {
- applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc);
- }
- return 0;
-}
-
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, errc);
- return 0;
- }
- if (sp->ffh == NULL) {
- applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->ffname);
- return 0;
- }
- if (!sp->ffh->valid) {
- applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->ffname);
- return 0;
+ sp->ffh = ncld_open(sp->nsp, sp->ffname,
+ COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!sp->ffh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, error);
+ goto err_fopen;
}
if (debugging)
applog(LOG_DEBUG, "CLD file \"%s\" created", sp->ffname);
- /*
- * Lock the file, in case two hosts got the same hostname.
- */
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_lock_cb;
- copts.private = sp;
- rc = cldc_lock(sp->ffh, &copts, 0, false);
- if (rc) {
- applog(LOG_ERR, "cldc_lock call error %d", rc);
- }
+ for (;;) {
+ rc = ncld_trylock(sp->ffh);
+ if (!rc)
+ break;
- return 0;
-}
+ applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, rc);
+ if (rc != CLE_LOCK_CONFLICT + 1100)
+ goto err_lock;
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- char buf[100];
- int len;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc);
- return 0;
+ /*
+ * The usual reason why we get a lock conflict is
+ * restarting too quickly and hitting the previous lock
+ * that is going to disappear soon.
+ *
+ * FIXME: However, it may also be that a master
+ * is ok we we should become a slave, e.g. start TDB.
+ * We do not support multi-node, but we should.
+ */
+ tm.tv_sec = 10;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
}
/*
@@ -515,65 +295,30 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
*/
len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
if (len >= sizeof(buf)) {
- applog(LOG_ERR,
- "internal error: overflow in cldu_lock_cb (%d)", len);
- return 0;
+ applog(LOG_ERR, "internal error: overflow for port (%d)", len);
+ goto err_wmem;
}
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_put_cb;
- copts.private = sp;
- rc = cldc_put(sp->ffh, &copts, buf, len);
+ rc = ncld_write(sp->ffh, buf, len);
if (rc) {
- applog(LOG_ERR, "cldc_put(%s) call error: %d", sp->ffname, rc);
- }
-
- return 0;
-}
-
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, errc);
- return 0;
+ applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
+ goto err_write;
}
/*
* Read the directory.
*/
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_get_1_cb;
- copts.private = sp;
- rc = cldc_get(sp->cfh, &copts, false);
- if (rc) {
- applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
- }
-
- return 0;
-}
-
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- const char *ptr;
- int dir_len;
- int total_len, rec_len, name_len;
- char buf[65];
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, errc);
- return 0;
+ nrp = ncld_get(sp->cfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
+ goto err_dread;
}
if (debugging)
applog(LOG_DEBUG, "Known tabled nodes");
- ptr = carg->u.get.buf;
- dir_len = carg->u.get.size;
+ ptr = nrp->ptr;
+ dir_len = nrp->length;
while (dir_len) {
name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
rec_len = name_len + 2;
@@ -598,6 +343,8 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
dir_len -= total_len;
}
+ ncld_read_free(nrp);
+
/*
* If configuration gives us storage nodes, we shortcut scanning
* of CLD, because:
@@ -609,96 +356,78 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
if (debugging)
applog(LOG_DEBUG, "Trying to open %d storage nodes",
tabled_srv.num_stor);
- if (stor_update_cb() < 1) {
- evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
+ while (stor_update_cb() < 1) {
+ tm.tv_sec = 3;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
+ if (debugging)
+ applog(LOG_DEBUG,
+ "Trying to reopen %d storage nodes",
+ tabled_srv.num_stor);
}
return 0;
}
- sp->retry_cnt = 0;
- try_open_x(sp);
return 0;
-}
-
-/*
- * Open the xfname, so we can collect registered Chunk servers.
- */
-static void try_open_x(struct cld_session *sp)
-{
- struct cldc_call_opts copts;
- int rc;
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_open_x_cb;
- copts.private = sp;
- rc = cldc_open(sp->lib->sess, &copts, sp->xfname,
- COM_READ | COM_DIRECTORY,
- CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh);
- if (rc) {
- applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc);
- }
+err_dread:
+err_write:
+err_wmem:
+err_lock:
+ ncld_close(sp->ffh); /* session-close closes these, maybe drop */
+err_fopen:
+ ncld_close(sp->cfh);
+err_copen:
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
+err_nsess:
+err_addr:
+ return -1;
}
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static int scan_chunks(struct cld_session *sp)
{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- if (errc == CLE_INODE_INVAL || errc == CLE_NAME_INVAL) {
+ struct ncld_fh *xfh; /* /chunk-GROUP directory */
+ struct ncld_read *nrp;
+ struct chunk_node *np;
+ const char *ptr;
+ int dir_len;
+ int total_len, rec_len, name_len;
+ char buf[65];
+ int error;
+
+ xfh = ncld_open(sp->nsp, sp->xfname, COM_READ | COM_DIRECTORY,
+ &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+ NULL, NULL);
+ if (!xfh) {
+ if (error == CLE_INODE_INVAL + 1100 ||
+ error == CLE_NAME_INVAL + 1100) {
applog(LOG_ERR, "%s: open failed, retrying",
sp->xfname);
- evtimer_add(&sp->tm_retry, &cldu_retry_delay);
+ return 1;
} else {
applog(LOG_ERR, "CLD open(%s) failed: %d",
- sp->xfname, errc);
+ sp->xfname, error);
/* XXX we're dead, why not exit(1) right away? */
+ return -1;
}
- return 0;
- }
- if (sp->xfh == NULL) {
- applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname);
- return 0;
- }
- if (!sp->xfh->valid) {
- applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname);
- return 0;
}
/*
* Read the directory.
*/
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_get_x_cb;
- copts.private = sp;
- rc = cldc_get(sp->xfh, &copts, false);
- if (rc) {
- applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
- }
- return 0;
-}
-
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
- const char *ptr;
- int dir_len;
- int total_len, rec_len, name_len;
- char buf[65];
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc);
- return 0;
+ nrp = ncld_get(xfh, &error);
+ if (!nrp) {
+ ncld_close(xfh);
+ applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, error);
+ return -1;
}
if (debugging)
applog(LOG_DEBUG, "Known Chunk nodes");
- ptr = carg->u.get.buf;
- dir_len = carg->u.get.size;
+ ptr = nrp->ptr;
+ dir_len = nrp->length;
while (dir_len) {
name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
rec_len = name_len + 2;
@@ -718,176 +447,82 @@ static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
dir_len -= total_len;
}
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_close_x_cb;
- copts.private = sp;
- rc = cldc_close(sp->xfh, &copts);
- if (rc) {
- applog(LOG_ERR, "cldc_close call error %d", rc);
- }
- return 0;
-}
-
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- // struct cldc_call_opts copts;
- // int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc);
- return 0;
- }
+ ncld_read_free(nrp);
+ ncld_close(xfh);
+ /*
+ * Scan the collected directory contents and fill the entries.
+ */
if (list_empty(&sp->chunks)) {
applog(LOG_INFO, "%s: No Chunk nodes found, retrying",
sp->xfname);
- if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
- applog(LOG_ERR, "evtimer_add error %s",
- strerror(errno));
- }
- } else {
- next_chunk(sp);
+ return 1;
}
- return 0;
-}
-
-static void next_chunk(struct cld_session *sp)
-{
- struct chunk_node *np;
- char *mem;
- struct cldc_call_opts copts;
- int rc;
-
- np = list_entry(sp->chunks.next, struct chunk_node, link);
-
- if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
- applog(LOG_WARNING, "OOM in cldu");
- return;
- }
- sp->yfname = mem;
-
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_open_y_cb;
- copts.private = sp;
- rc = cldc_open(sp->lib->sess, &copts, sp->yfname,
- COM_READ,
- CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh);
- if (rc) {
- applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc);
- }
-}
-
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc);
- free(sp->yfname);
- sp->yfname = NULL;
- return 0;
- }
- if (sp->yfh == NULL) {
- applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname);
- free(sp->yfname);
- sp->yfname = NULL;
- return 0;
- }
- if (!sp->yfh->valid) {
- applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname);
- free(sp->yfname);
- sp->yfname = NULL;
- return 0;
+ while (!list_empty(&sp->chunks)) {
+ np = list_entry(sp->chunks.next, struct chunk_node, link);
+ next_chunk(sp, np);
+ list_del(&np->link);
}
/*
- * Read the Chunk's parameter file.
+ * Poke the dispatch about the possible changes in the
+ * configuration of Chunk.
+ *
+ * It's possible that the CLD directories have many entries,
+ * but no useable Chunk servers. In that case, treat everything
+ * like a usual retry.
+ *
+ * For the case of normal operation, we also set up a rescan, for now.
+ * In the future, we'll subscribe for change notification. FIXME.
*/
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_get_y_cb;
- copts.private = sp;
- rc = cldc_get(sp->yfh, &copts, false);
- if (rc) {
- applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc);
- }
- return 0;
-}
+ if (!stor_update_cb())
+ return 1;
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
- struct cld_session *sp = carg->private;
- struct cldc_call_opts copts;
- int rc;
- const char *ptr;
- int len;
-
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc);
- goto close_and_next; /* spaghetti */
- }
-
- ptr = carg->u.get.buf;
- len = carg->u.get.size;
- stor_parse(sp->yfname, ptr, len);
-
-close_and_next:
- memset(&copts, 0, sizeof(copts));
- copts.cb = cldu_close_y_cb;
- copts.private = sp;
- rc = cldc_close(sp->yfh, &copts);
- if (rc) {
- applog(LOG_ERR, "cldc_close call error %d", rc);
- }
return 0;
}
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static void next_chunk(struct cld_session *sp, struct chunk_node *np)
{
- struct cld_session *sp = carg->private;
- struct chunk_node *np;
- // struct cldc_call_opts copts;
- // int rc;
+ char *mem;
+ char *yfname; /* /chunk-GROUP/NID file */
+ struct ncld_fh *yfh; /* /chunk-GROUP/NID file */
+ struct ncld_read *nrp;
+ int error;
- if (errc != CLE_OK) {
- applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc);
- return 0;
+ if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
+ applog(LOG_WARNING, "OOM in cldu");
+ goto err_mem;
}
+ yfname = mem;
- free(sp->yfname);
- sp->yfname = NULL;
-
- np = list_entry(sp->chunks.next, struct chunk_node, link);
- list_del(&np->link);
-
- if (!list_empty(&sp->chunks)) {
- next_chunk(sp);
- return 0;
+ yfh = ncld_open(sp->nsp, yfname, COM_READ, &error,
+ 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+ NULL, NULL);
+ if (!yfh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", yfname, error);
+ goto err_open;
}
/*
- * No more chunks to consider in this cycle, we're all done.
- * Now, poke the dispatch about the possible changes in the
- * configuration of Chunk.
- *
- * It's possible that the CLD directories are full of all garbage,
- * but no useable Chunk servers. In that case, treat everything
- * like a usual retry.
- *
- * For the case of normal operation, we also set up a rescan, for now.
- * In the future, we'll subscribe for change notification. FIXME.
+ * Read the Chunk's parameter file.
*/
- if (stor_update_cb()) {
- evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
- } else {
- if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
- applog(LOG_ERR, "evtimer_add error %s",
- strerror(errno));
- }
- }
- return 0;
+ nrp = ncld_get(yfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", yfname, error);
+ goto err_get;
+ }
+ stor_parse(yfname, nrp->ptr, nrp->length);
+ ncld_read_free(nrp);
+ ncld_close(yfh);
+ free(yfname);
+ return;
+
+err_get:
+ ncld_close(yfh);
+err_open:
+ free(yfname);
+err_mem:
+ return;
}
/*
@@ -937,7 +572,7 @@ static struct cld_session ses;
*/
void cld_init()
{
- cldc_init();
+ ncld_init();
// memset(&ses, 0, sizeof(struct cld_session));
INIT_LIST_HEAD(&ses.chunks);
@@ -949,10 +584,10 @@ void cld_init()
int cld_begin(const char *thishost, const char *thisgroup)
{
static struct cld_session *sp = &ses;
+ struct timespec tm;
+ int retry_cnt;
- evtimer_set(&ses.tm_retry, cldu_tm_retry, &ses);
evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
- evtimer_set(&ses.tm_reopen, cldu_tm_reopen, &ses);
if (cldu_setgroup(sp, thisgroup, thishost)) {
/* Already logged error */
@@ -999,8 +634,21 @@ int cld_begin(const char *thishost, const char *thisgroup)
goto err_net;
}
+ retry_cnt = 0;
+ for (;;) {
+ if (!scan_chunks(sp))
+ break;
+ if (++retry_cnt == 5)
+ goto err_scan;
+ tm.tv_sec = 5;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
+ }
+
+ evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
return 0;
+err_scan:
err_net:
err_addr:
err_group:
@@ -1034,12 +682,9 @@ void cld_end(void)
static struct cld_session *sp = &ses;
int i;
- if (sp->lib) {
- event_del(&sp->ev);
- // if (sp->sess_open) /* kill it always, include half-open */
- cldc_kill_sess(sp->lib->sess);
- cldc_udp_free(sp->lib);
- sp->lib = NULL;
+ if (sp->nsp) {
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
}
if (!sp->forced_hosts) {
@@ -1051,9 +696,7 @@ void cld_end(void)
}
}
- evtimer_del(&sp->tm_retry);
evtimer_del(&sp->tm_rescan);
- evtimer_del(&sp->tm_reopen);
free(sp->cfname);
sp->cfname = NULL;
@@ -1061,8 +704,6 @@ void cld_end(void)
sp->ffname = NULL;
free(sp->xfname);
sp->xfname = NULL;
- free(sp->yfname);
- sp->yfname = NULL;
free(sp->thisgroup);
sp->thisgroup = NULL;
free(sp->thishost);
next reply other threads:[~2010-02-09 4:42 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-02-09 4:42 Pete Zaitcev [this message]
2010-02-09 5:06 ` [Patch 1/1] tabled: switch to ncld Jeff Garzik
2010-02-14 2:44 ` Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20100208214214.16f39ec8@redhat.com \
--to=zaitcev@redhat.com \
--cc=hail-devel@vger.kernel.org \
--cc=jeff@garzik.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).