#include "rbtdb.h" #include #include #include #include #ifdef HAVE_RUBY_ST_H # include #else # include #endif #include /* this protects the global list of tdb objects maintained by libtdb */ static pthread_mutex_t big_lock = PTHREAD_MUTEX_INITIALIZER; static VALUE cTDB, cERR; static st_table *exc_hash; static VALUE hashes; /* must be a macro to prevent GC from killing converted 'val's */ #define TO_TDB_DATA(data,val) do { \ StringValue(val); \ (data).dptr = (unsigned char *)RSTRING_PTR(val); \ (data).dsize = RSTRING_LEN(val); \ } while (0) static void init_exc(enum TDB_ERROR ecode, const char *name) { VALUE exc = rb_define_class_under(cERR, name, cERR); st_insert(exc_hash, (st_data_t)ecode, (st_data_t)exc); } static void init_errors(void) { cERR = rb_define_class_under(cTDB, "ERR", rb_eStandardError); exc_hash = st_init_numtable(); init_exc(TDB_ERR_CORRUPT, "CORRUPT"); init_exc(TDB_ERR_IO, "IO"); init_exc(TDB_ERR_LOCK, "LOCK"); init_exc(TDB_ERR_OOM, "OOM"); init_exc(TDB_ERR_EXISTS, "EXISTS"), init_exc(TDB_ERR_NOLOCK, "NOLOCK"); init_exc(TDB_ERR_LOCK_TIMEOUT, "LOCK_TIMEOUT"); init_exc(TDB_ERR_EINVAL, "EINVAL"); init_exc(TDB_ERR_NOEXIST, "NOEXIST"); init_exc(TDB_ERR_RDONLY, "RDONLY"); #ifdef HAVE_CONST_TDB_ERR_NESTING init_exc(TDB_ERR_NESTING, "NESTING"); #endif /* HAVE_CONST_TDB_ERR_NESTING */ } static void my_raise(struct tdb_context *tdb) { enum TDB_ERROR ecode = tdb_error(tdb); const char *str = tdb_errorstr(tdb); VALUE exc; switch (ecode) { case TDB_SUCCESS: rb_bug("attempted to raise with no error"); case TDB_ERR_CORRUPT: case TDB_ERR_IO: case TDB_ERR_LOCK: case TDB_ERR_OOM: case TDB_ERR_EXISTS: case TDB_ERR_NOLOCK: case TDB_ERR_LOCK_TIMEOUT: case TDB_ERR_EINVAL: case TDB_ERR_NOEXIST: case TDB_ERR_RDONLY: #ifdef HAVE_CONST_TDB_ERR_NESTING case TDB_ERR_NESTING: #endif /* HAVE_CONST_TDB_ERR_NESTING */ if (!st_lookup(exc_hash, (st_data_t)ecode, (st_data_t *)&exc)) rb_bug("no-existent exception: %s\n", str); } rb_raise(exc, str); } static void init_hashes(void) { #define HF(x) \ rb_hash_aset(hashes,ID2SYM(rb_intern(#x)),ULONG2NUM((unsigned long)rbtdb_##x)) HF(murmur1); HF(murmur1_aligned); HF(murmur2); HF(murmur2a); HF(murmur2_neutral); HF(murmur2_aligned); HF(fnv1a); HF(djb2); HF(djb3); HF(jenkins_lookup3); HF(default); } #ifndef HAVE_RB_THREAD_BLOCKING_REGION /* (very) partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */ # include typedef VALUE rb_blocking_function_t(void *); static VALUE my_tbr(rb_blocking_function_t *fn, void *data) { VALUE rv; TRAP_BEG; rv = fn(data); TRAP_END; return rv; } #else static VALUE my_tbr(rb_blocking_function_t *fn, void *data) { return rb_thread_blocking_region(fn, data, RUBY_UBF_IO, 0); } #endif /* HAVE_RUBY_THREAD_BLOCKING_REGION */ static void gcfree(void *ptr) { struct tdb_context *tdb = ptr; /* no error checking in GC :< */ if (tdb) { (void)pthread_mutex_lock(&big_lock); (void)tdb_close(tdb); (void)pthread_mutex_unlock(&big_lock); } } static VALUE alloc(VALUE klass) { return Data_Wrap_Struct(klass, NULL, gcfree, NULL); } static struct tdb_context *db(VALUE self, int check_opened) { struct tdb_context *tdb; Data_Get_Struct(self, struct tdb_context, tdb); if (!tdb && check_opened) rb_raise(rb_eIOError, "closed database"); return tdb; } struct open_args { const char *name; int hash_size; int tdb_flags; int open_flags; mode_t mode; struct tdb_logging_context *log_ctx; tdb_hash_func hash_fn; }; static VALUE nogvl_open(void *ptr) { struct open_args *o = ptr; struct tdb_context *tdb; pthread_mutex_lock(&big_lock); tdb = tdb_open_ex(o->name, o->hash_size, o->tdb_flags, o->open_flags, o->mode, o->log_ctx, o->hash_fn); pthread_mutex_unlock(&big_lock); return (VALUE)tdb; } static void set_args(VALUE self, struct open_args *o, VALUE opts) { VALUE tmp; o->name = NULL; o->hash_size = 0; /* default */ o->tdb_flags = TDB_DEFAULT; o->open_flags = O_RDWR | O_CREAT; o->mode = 0666; o->log_ctx = NULL; o->hash_fn = NULL; if (NIL_P(opts)) return; Check_Type(opts, T_HASH); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("hash_size"))); if (!NIL_P(tmp)) o->hash_size = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("mode"))); if (!NIL_P(tmp)) o->mode = NUM2UINT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("open_flags"))); if (!NIL_P(tmp)) o->open_flags = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("tdb_flags"))); if (!NIL_P(tmp)) o->tdb_flags = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("hash"))); if (!NIL_P(tmp)) { VALUE num = rb_hash_aref(hashes, tmp); if (NIL_P(num)) { tmp = rb_inspect(tmp); rb_raise(rb_eArgError, "`%s' is not a valid hash function", StringValuePtr(tmp)); } o->hash_fn = (tdb_hash_func)NUM2ULONG(num); } tmp = rb_hash_aref(opts, ID2SYM(rb_intern("threadsafe"))); if (RTEST(tmp)) rb_funcall(self, rb_intern("threadsafe!"), 0); } /* * :call-seq: * * TDB.new("/path/to/file") -> TDB * TDB.new("/path/to/file", :hash_size => 666) -> TDB * TDB.new("/path/to/file", :hash => :murmur2) -> TDB * TDB.new("/path/to/file", :open_flags => IO::RDONLY) -> TDB * TDB.new("/path/to/file", :tdb_flags => TDB::NOSYNC) -> TDB * * Initializes a TDB context. It takes several options. * * :hash_size - the number of buckets, this is the most important tuning * parameter when creating large databases. This parameter only affects * the creation of new databases. * * :open_flags - a bit mask of IO flags passed directly to open(2), * File.open-compatible flags are accepted. * * :hash - any of the hashes described in Hash_Functions. * This must remain the same for all clients. * * :tdb_flags - a bitmask of any combination of TDB::CLEAR_IF_FIRST, * TDB::INTERNAL, TDB::NOLOCK, TDB::NOMMAP, TDB::CONVERT, * TDB::BIGENDIAN, TDB::NOSYNC, TDB::SEQNUM, TDB::VOLATILE, * TDB::ALLOW_NESTING, TDB::DISALLOW_NESTING, TDB::INCOMPATIBLE_HASH * * :mode - octal mode mask passed to open(2) */ static VALUE init(int argc, VALUE *argv, VALUE self) { struct tdb_context *tdb = db(self, 0); VALUE path, opts; struct open_args o; if (tdb) rb_raise(rb_eRuntimeError, "TDB already initialized"); rb_scan_args(argc, argv, "11", &path, &opts); set_args(self, &o, opts); if (NIL_P(path)) o.tdb_flags |= TDB_INTERNAL; else o.name = StringValuePtr(path); tdb = (struct tdb_context *)my_tbr(nogvl_open, &o); if (!tdb) { switch (errno) { case ENOMEM: case EMFILE: case ENFILE: rb_gc(); tdb = (struct tdb_context *)my_tbr(nogvl_open, &o); } if (!tdb) rb_sys_fail("tdb_open_ex"); } DATA_PTR(self) = tdb; return self; } /* tdb_close can do a lot, including cancel transactions an munmap */ static VALUE nogvl_close(void *ptr) { struct tdb_context *tdb = ptr; VALUE rv; pthread_mutex_lock(&big_lock); rv = (VALUE)tdb_close(tdb); pthread_mutex_unlock(&big_lock); return rv; } static VALUE tdbclose(VALUE self) { struct tdb_context *tdb = db(self, 1); DATA_PTR(self) = NULL; if ((int)my_tbr(nogvl_close, tdb) == -1) rb_sys_fail("tdb_close"); return Qnil; } static VALUE closed(VALUE self) { struct tdb_context *tdb = db(self, 0); return tdb ? Qfalse : Qtrue; } #ifdef HAVE_RB_THREAD_CALL_WITH_GVL /* missing prototype in ruby.h: */ void *rb_thread_call_with_gvl(void *(*func)(void *), void *data); #else static void * my_rb_thread_call_with_gvl(void *(*func)(void *), void *data) { return (*func)(data); } #define rb_thread_call_with_gvl my_rb_thread_call_with_gvl #endif /* !HAVE_RB_THREAD_CALL_WITH_GVL */ /* * We avoid the extra malloc/free pair enforced by tdb_fetch. We * use tdb_parse_record to give us pointers to (hopefully) mmap-ed * regions and create a String object directly off that region. */ struct fetch_parse_args { struct tdb_context *tdb; union { TDB_DATA key; TDB_DATA val; } as; VALUE value; }; static VALUE str_new_tdb_data(TDB_DATA *val) { return rb_str_new((const char *)val->dptr, val->dsize); } static void *gvl_str_new(void *data) { struct fetch_parse_args *f = data; f->value = str_new_tdb_data(&f->as.val); return NULL; } static int fetch_parse(TDB_DATA key, TDB_DATA val, void *data) { struct fetch_parse_args *f = data; f->as.val = val; (void)rb_thread_call_with_gvl(gvl_str_new, data); return 0; } static VALUE nogvl_parse_record(void *ptr) { struct fetch_parse_args *f = ptr; if (tdb_parse_record(f->tdb, f->as.key, fetch_parse, ptr) == -1) return Qnil; return f->value; } static VALUE fetch(VALUE self, VALUE key) { struct fetch_parse_args f; f.tdb = db(self, 1); TO_TDB_DATA(f.as.key, key); f.value = Qnil; return my_tbr(nogvl_parse_record, &f); } struct store_args { struct tdb_context *tdb; TDB_DATA key; TDB_DATA val; int flag; }; static VALUE nogvl_store(void *ptr) { struct store_args *s = ptr; return (VALUE)tdb_store(s->tdb, s->key, s->val, s->flag); } static VALUE rbtdb_store(VALUE self, VALUE key, VALUE val, int flag, int soft) { struct store_args s; s.tdb = db(self, 1); TO_TDB_DATA(s.key, key); TO_TDB_DATA(s.val, val); s.flag = flag; if ((int)my_tbr(nogvl_store, &s) == -1) { if (soft) { int ecode = tdb_error(s.tdb); if ((flag == TDB_INSERT) && (ecode == TDB_ERR_EXISTS)) return Qnil; if ((flag == TDB_MODIFY) && (ecode == TDB_ERR_NOEXIST)) return Qnil; } my_raise(s.tdb); } return val; } static VALUE store(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, 0, 0); } static VALUE insert_bang(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_INSERT, 0); } static VALUE insert(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_INSERT, 1); } static VALUE modify_bang(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_MODIFY, 0); } static VALUE modify(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_MODIFY, 1); } struct exists_args { struct tdb_context *tdb; TDB_DATA key; }; static VALUE nogvl_exists(void *ptr) { struct exists_args *e = ptr; return tdb_exists(e->tdb, e->key) == 0 ? Qfalse : Qtrue; } static VALUE has_key(VALUE self, VALUE key) { struct exists_args e; e.tdb = db(self, 1); TO_TDB_DATA(e.key, key); return my_tbr(nogvl_exists, &e); } struct traverse_args { struct tdb_context *tdb; TDB_DATA key; TDB_DATA val; int state; }; static VALUE protected_yield(VALUE val) { VALUE *kv = (VALUE *)val; return rb_yield_values(2, kv[0], kv[1]); } static void *my_yield(void *data) { struct traverse_args *t = data; VALUE kv[2]; kv[0] = str_new_tdb_data(&t->key); kv[1] = str_new_tdb_data(&t->val); rb_protect(protected_yield, (VALUE)kv, &t->state); return NULL; } static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA val, void *data) { struct traverse_args *t = data; t->key = key; t->val = val; (void)rb_thread_call_with_gvl(my_yield, t); return t->state; } static VALUE nogvl_traverse(void *ptr) { struct traverse_args *t = ptr; (void)tdb_traverse(t->tdb, traverse_fn, t); return Qfalse; } static VALUE each(VALUE self) { struct traverse_args t; t.tdb = db(self, 1); t.state = 0; my_tbr(nogvl_traverse, &t); if (t.state) rb_jump_tag(t.state); return self; } struct delete_args { struct tdb_context *tdb; TDB_DATA key; }; static VALUE nogvl_delete(void *ptr) { struct delete_args *d = ptr; return tdb_delete(d->tdb, d->key) == 0 ? Qtrue : Qfalse; } static VALUE nuke(VALUE self, VALUE key) { struct delete_args d; d.tdb = db(self, 1); TO_TDB_DATA(d.key, key); return my_tbr(nogvl_delete, &d); } static VALUE delete(VALUE self, VALUE key) { VALUE rc = fetch(self, key); if (! NIL_P(rc)) if (nuke(self, key) == Qfalse) return Qnil; return rc; } static VALUE lockall(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_lockall, tdb)) my_raise(tdb); return Qtrue; } static VALUE trylockall(VALUE self) { struct tdb_context *tdb = db(self, 1); void *fn = tdb_lockall_nonblock; if ((int)my_tbr((rb_blocking_function_t *)fn, tdb)) { if (tdb_error(tdb) == TDB_ERR_LOCK) return Qfalse; my_raise(tdb); } return Qtrue; } static VALUE unlockall(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_unlockall, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_lockall_read, tdb)) my_raise(tdb); return Qtrue; } static VALUE trylockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); void *fn = tdb_lockall_read_nonblock; if ((int)my_tbr((rb_blocking_function_t *)fn, tdb)) { if (tdb_error(tdb) == TDB_ERR_LOCK) return Qfalse; my_raise(tdb); } return Qtrue; } static VALUE unlockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_unlockall_read, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_mark(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_lockall_mark, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_unmark(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_lockall_unmark, tdb)) my_raise(tdb); return Qtrue; } /* * clears out the database */ static VALUE clear(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_wipe_all, tdb)) my_raise(tdb); return self; } #ifdef HAVE_TDB_REPACK /* repacks a database to reduce fragmentation, available with tdb 1.2.x+ */ static VALUE repack(VALUE self) { struct tdb_context *tdb = db(self, 1); if ((int)my_tbr((rb_blocking_function_t *)tdb_repack, tdb)) my_raise(tdb); return self; } #endif /* HAVE_TDB_REPACK */ void Init_tdb_ext(void) { cTDB = rb_define_class("TDB", rb_cObject); hashes = rb_hash_new(); /* * Available hash functions, the key is the name of the hash * and the value is a pointer for internal for usage. */ rb_define_const(cTDB, "HASHES", hashes); rb_define_alloc_func(cTDB, alloc); rb_include_module(cTDB, rb_mEnumerable); rb_define_method(cTDB, "initialize", init, -1); rb_define_method(cTDB, "close", tdbclose, 0); rb_define_method(cTDB, "closed?", closed, 0); rb_define_method(cTDB, "fetch", fetch, 1); rb_define_method(cTDB, "[]", fetch, 1); rb_define_method(cTDB, "store", store, 2); rb_define_method(cTDB, "[]=", store, 2); rb_define_method(cTDB, "insert!", insert_bang, 2); rb_define_method(cTDB, "modify!", modify_bang, 2); rb_define_method(cTDB, "insert", insert, 2); rb_define_method(cTDB, "modify", modify, 2); rb_define_method(cTDB, "key?", has_key, 1); rb_define_method(cTDB, "has_key?", has_key, 1); rb_define_method(cTDB, "include?", has_key, 1); rb_define_method(cTDB, "member?", has_key, 1); rb_define_method(cTDB, "each", each, 0); rb_define_method(cTDB, "nuke!", nuke, 1); rb_define_method(cTDB, "delete", delete, 1); rb_define_method(cTDB, "lockall", lockall, 0); rb_define_method(cTDB, "trylockall", trylockall, 0); rb_define_method(cTDB, "unlockall", unlockall, 0); rb_define_method(cTDB, "lockall_read", lockall_read, 0); rb_define_method(cTDB, "trylockall_read", trylockall_read, 0); rb_define_method(cTDB, "unlockall_read", unlockall_read, 0); rb_define_method(cTDB, "lockall_mark", lockall_mark, 0); rb_define_method(cTDB, "lockall_unmark", lockall_unmark, 0); rb_define_method(cTDB, "clear", clear, 0); #ifdef HAVE_TDB_REPACK rb_define_method(cTDB, "repack", repack, 0); #endif /* HAVE_TDB_REPACK */ init_errors(); init_hashes(); /* just a readability place holder */ rb_define_const(cTDB, "DEFAULT", UINT2NUM(TDB_DEFAULT)); /* clear database if we are the only one with it open */ rb_define_const(cTDB, "CLEAR_IF_FIRST", UINT2NUM(TDB_CLEAR_IF_FIRST)); /* don't store on disk, use in-memory database */ rb_define_const(cTDB, "INTERNAL", UINT2NUM(TDB_INTERNAL)); /* don't do any locking */ rb_define_const(cTDB, "NOLOCK", UINT2NUM(TDB_NOLOCK)); /* don't use mmap */ rb_define_const(cTDB, "NOMMAP", UINT2NUM(TDB_NOMMAP)); /* convert endian (internal use) */ rb_define_const(cTDB, "CONVERT", UINT2NUM(TDB_CONVERT)); /* header is big-endian (internal use) */ rb_define_const(cTDB, "BIGENDIAN", UINT2NUM(TDB_BIGENDIAN)); /* don't use synchronous transactions */ rb_define_const(cTDB, "NOSYNC", UINT2NUM(TDB_NOSYNC)); /* maintain a sequence number */ rb_define_const(cTDB, "SEQNUM", UINT2NUM(TDB_SEQNUM)); /* Activate the per-hashchain freelist, default 5 */ rb_define_const(cTDB, "VOLATILE", UINT2NUM(TDB_VOLATILE)); #ifdef TDB_ALLOW_NESTING /* Allow transactions to nest */ rb_define_const(cTDB, "ALLOW_NESTING", UINT2NUM(TDB_ALLOW_NESTING)); #endif #ifdef TDB_DISALLOW_NESTING /* Disallow transactions to nest */ rb_define_const(cTDB, "DISALLOW_NESTING", UINT2NUM(TDB_DISALLOW_NESTING)); #endif #ifdef TDB_INCOMPATIBLE_HASH /* Better hashing, but can't be opened by tdb < 1.2.6. */ rb_define_const(cTDB, "INCOMPATIBLE_HASH", UINT2NUM(TDB_INCOMPATIBLE_HASH)); #endif rbtdb_init_tdb_hash_functions(); } /* * Document-class: TDB * * * tdb = TDB.new("/path/to/file", flags => IO::RDWR|IO::CREAT) * tdb.store("HELLO", "world") * tdb.fetch("HELLO") -> "world" * tdb.delete("HELLO") -> "world" * */