#include "rbtdb.h" #include #include #include #include #include /* this protects the global list of tdb objects maintained by libtdb */ static pthread_mutex_t big_lock = PTHREAD_MUTEX_INITIALIZER; static VALUE cTDB, cERR; static VALUE exc_hash; static VALUE hashes; /* must be a macro to prevent GC from killing converted 'val's */ #define TO_TDB_DATA(data,val) do { \ StringValue(val); \ (data).dptr = (unsigned char *)RSTRING_PTR(val); \ (data).dsize = RSTRING_LEN(val); \ } while (0) static void init_exc(enum TDB_ERROR ecode, const char *name) { VALUE exc = rb_define_class_under(cERR, name, cERR); rb_hash_aset(exc_hash, INT2NUM(ecode), exc); } static void init_errors(void) { cERR = rb_define_class_under(cTDB, "ERR", rb_eStandardError); exc_hash = rb_hash_new(); rb_global_variable(&exc_hash); init_exc(TDB_ERR_CORRUPT, "CORRUPT"); init_exc(TDB_ERR_IO, "IO"); init_exc(TDB_ERR_LOCK, "LOCK"); init_exc(TDB_ERR_OOM, "OOM"); init_exc(TDB_ERR_EXISTS, "EXISTS"), init_exc(TDB_ERR_NOLOCK, "NOLOCK"); init_exc(TDB_ERR_LOCK_TIMEOUT, "LOCK_TIMEOUT"); init_exc(TDB_ERR_EINVAL, "EINVAL"); init_exc(TDB_ERR_NOEXIST, "NOEXIST"); init_exc(TDB_ERR_RDONLY, "RDONLY"); #ifdef HAVE_CONST_TDB_ERR_NESTING init_exc(TDB_ERR_NESTING, "NESTING"); #endif /* HAVE_CONST_TDB_ERR_NESTING */ } static void my_raise(struct tdb_context *tdb) { enum TDB_ERROR ecode = tdb_error(tdb); const char *str = tdb_errorstr(tdb); VALUE exc = Qnil; switch (ecode) { case TDB_SUCCESS: rb_bug("attempted to raise with no error"); case TDB_ERR_CORRUPT: case TDB_ERR_IO: case TDB_ERR_LOCK: case TDB_ERR_OOM: case TDB_ERR_EXISTS: case TDB_ERR_NOLOCK: case TDB_ERR_LOCK_TIMEOUT: case TDB_ERR_EINVAL: case TDB_ERR_NOEXIST: case TDB_ERR_RDONLY: #ifdef HAVE_CONST_TDB_ERR_NESTING case TDB_ERR_NESTING: #endif /* HAVE_CONST_TDB_ERR_NESTING */ exc = rb_hash_aref(exc_hash, INT2NUM(ecode)); } if (NIL_P(exc)) rb_bug("non-existent exception: %s\n", str); rb_raise(exc, "%s", str); } static void init_hashes(void) { #define HF(x) \ rb_hash_aset(hashes,ID2SYM(rb_intern(#x)),ULONG2NUM((unsigned long)rbtdb_##x)) HF(siphash24); HF(murmur1); HF(murmur1_aligned); HF(murmur2); HF(murmur2a); HF(murmur2_neutral); HF(murmur2_aligned); HF(murmur3a); HF(murmur3f); HF(fnv1a); HF(djb2); HF(djb3); HF(jenkins_lookup3); HF(default); } #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) /* Ruby 2.1+ */ # include # define WITHOUT_GVL(fn,a,ubf,b) \ rb_thread_call_without_gvl((fn),(a),(ubf),(b)) /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */ #elif defined(HAVE_RB_THREAD_BLOCKING_REGION) /* Ruby 1.9-2.0 */ typedef VALUE (*my_blocking_fn_t)(void*); # define WITHOUT_GVL(fn,a,ubf,b) \ rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) #else /* Ruby 1.8 */ # include # define RUBY_UBF_IO ((rb_unblock_function_t *)-1) typedef void rb_unblock_function_t(void *); typedef void * rb_blocking_function_t(void *); static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1, rb_unblock_function_t *ubf, void *data2) { void *rv; TRAP_BEG; rv = func(data1); TRAP_END; return rv; } #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ #define my_tbr(fn,data) WITHOUT_GVL((void *)(fn),(data),RUBY_UBF_IO,0) static void gcfree(void *ptr) { struct tdb_context *tdb = ptr; /* no error checking in GC :< */ if (tdb) { (void)pthread_mutex_lock(&big_lock); (void)tdb_close(tdb); (void)pthread_mutex_unlock(&big_lock); } } static VALUE alloc(VALUE klass) { return Data_Wrap_Struct(klass, NULL, gcfree, NULL); } static struct tdb_context *db(VALUE self, int check_opened) { struct tdb_context *tdb; Data_Get_Struct(self, struct tdb_context, tdb); if (!tdb && check_opened) rb_raise(rb_eIOError, "closed database"); return tdb; } struct open_args { const char *name; int hash_size; int tdb_flags; int open_flags; mode_t mode; struct tdb_logging_context *log_ctx; tdb_hash_func hash_fn; }; static void * nogvl_open(void *ptr) { struct open_args *o = ptr; struct tdb_context *tdb; pthread_mutex_lock(&big_lock); tdb = tdb_open_ex(o->name, o->hash_size, o->tdb_flags, o->open_flags, o->mode, o->log_ctx, o->hash_fn); pthread_mutex_unlock(&big_lock); return (void *)tdb; } static void set_args(VALUE self, struct open_args *o, VALUE opts) { VALUE tmp; o->name = NULL; o->hash_size = 0; /* default */ o->tdb_flags = TDB_DEFAULT; o->open_flags = O_RDWR | O_CREAT; o->mode = 0666; o->log_ctx = NULL; o->hash_fn = NULL; if (NIL_P(opts)) return; Check_Type(opts, T_HASH); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("hash_size"))); if (!NIL_P(tmp)) o->hash_size = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("mode"))); if (!NIL_P(tmp)) o->mode = NUM2UINT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("open_flags"))); if (!NIL_P(tmp)) o->open_flags = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("tdb_flags"))); if (!NIL_P(tmp)) o->tdb_flags = NUM2INT(tmp); tmp = rb_hash_aref(opts, ID2SYM(rb_intern("hash"))); if (!NIL_P(tmp)) { VALUE num = rb_hash_aref(hashes, tmp); if (NIL_P(num)) { tmp = rb_inspect(tmp); rb_raise(rb_eArgError, "`%s' is not a valid hash function", StringValuePtr(tmp)); } o->hash_fn = (tdb_hash_func)NUM2ULONG(num); } tmp = rb_hash_aref(opts, ID2SYM(rb_intern("threadsafe"))); if (RTEST(tmp)) rb_funcall(self, rb_intern("threadsafe!"), 0); } /* * :call-seq: * * TDB.new("/path/to/file") -> TDB * TDB.new("/path/to/file", :hash_size => 666) -> TDB * TDB.new("/path/to/file", :hash => :murmur2) -> TDB * TDB.new("/path/to/file", :open_flags => IO::RDONLY) -> TDB * TDB.new("/path/to/file", :tdb_flags => TDB::NOSYNC) -> TDB * * Initializes a TDB context. It takes several options. * * :hash_size - the number of buckets, this is the most important tuning * parameter when creating large databases. This parameter only affects * the creation of new databases. * * :open_flags - a bit mask of IO flags passed directly to open(2), * File.open-compatible flags are accepted. * * :hash - any of the hashes described in Hash_Functions. * This must remain the same for all clients. * * :tdb_flags - a bitmask of any combination of TDB::CLEAR_IF_FIRST, * TDB::INTERNAL, TDB::NOLOCK, TDB::NOMMAP, TDB::CONVERT, * TDB::BIGENDIAN, TDB::NOSYNC, TDB::SEQNUM, TDB::VOLATILE, * TDB::ALLOW_NESTING, TDB::DISALLOW_NESTING, TDB::INCOMPATIBLE_HASH * * :mode - octal mode mask passed to open(2) */ static VALUE init(int argc, VALUE *argv, VALUE self) { struct tdb_context *tdb = db(self, 0); VALUE path, opts; struct open_args o; if (tdb) rb_raise(rb_eRuntimeError, "TDB already initialized"); rb_scan_args(argc, argv, "11", &path, &opts); set_args(self, &o, opts); if (NIL_P(path)) o.tdb_flags |= TDB_INTERNAL; else o.name = StringValueCStr(path); tdb = (struct tdb_context *)my_tbr(nogvl_open, &o); if (!tdb) { switch (errno) { case ENOMEM: case EMFILE: case ENFILE: rb_gc(); tdb = (struct tdb_context *)my_tbr(nogvl_open, &o); } if (!tdb) rb_sys_fail("tdb_open_ex"); } DATA_PTR(self) = tdb; return self; } /* tdb_close can do a lot, including cancel transactions an munmap */ static void * nogvl_close(void *ptr) { struct tdb_context *tdb = ptr; long rv; pthread_mutex_lock(&big_lock); rv = tdb_close(tdb); pthread_mutex_unlock(&big_lock); return (void *)rv; } static VALUE tdbclose(VALUE self) { struct tdb_context *tdb = db(self, 1); DATA_PTR(self) = NULL; if ((long)my_tbr(nogvl_close, tdb) == -1) rb_sys_fail("tdb_close"); return Qnil; } static VALUE closed(VALUE self) { struct tdb_context *tdb = db(self, 0); return tdb ? Qfalse : Qtrue; } #ifdef HAVE_RB_THREAD_CALL_WITH_GVL /* missing prototype in ruby.h: */ void *rb_thread_call_with_gvl(void *(*func)(void *), void *data); #else static void * my_rb_thread_call_with_gvl(void *(*func)(void *), void *data) { return (*func)(data); } #define rb_thread_call_with_gvl my_rb_thread_call_with_gvl #endif /* !HAVE_RB_THREAD_CALL_WITH_GVL */ /* * We avoid the extra malloc/free pair enforced by tdb_fetch. We * use tdb_parse_record to give us pointers to (hopefully) mmap-ed * regions and create a String object directly off that region. */ struct fetch_parse_args { struct tdb_context *tdb; union { TDB_DATA key; long value_len; char *value_ptr; VALUE value; } as; VALUE value; }; static VALUE str_new_tdb_data(TDB_DATA *val) { return rb_str_new((const char *)val->dptr, val->dsize); } static void *gvl_str_resize(void *data) { struct fetch_parse_args *f = data; rb_str_resize(f->value, f->as.value_len); f->as.value_ptr = RSTRING_PTR(f->value); return NULL; } static int fetch_parse(TDB_DATA key, TDB_DATA val, void *data) { struct fetch_parse_args *f = data; f->as.value_len = val.dsize; (void)rb_thread_call_with_gvl(gvl_str_resize, data); memcpy(f->as.value_ptr, val.dptr, val.dsize); f->as.value = f->value; return 0; } static void * nogvl_parse_record(void *ptr) { struct fetch_parse_args *f = ptr; if (tdb_parse_record(f->tdb, f->as.key, fetch_parse, ptr) == -1) return (void *)Qnil; return (void *)(f->value == f->as.value ? f->value : Qnil); } static VALUE fetch(int argc, VALUE *argv, VALUE self) { struct fetch_parse_args f; VALUE key; rb_scan_args(argc, argv, "11", &key, &f.value); if (NIL_P(f.value)) { f.value = rb_str_new(0, 0); } else { StringValue(f.value); rb_str_set_len(f.value, 0); } f.tdb = db(self, 1); TO_TDB_DATA(f.as.key, key); return (VALUE)my_tbr(nogvl_parse_record, &f); } struct store_args { struct tdb_context *tdb; TDB_DATA key; TDB_DATA val; int flag; }; static void * nogvl_store(void *ptr) { struct store_args *s = ptr; long rc = tdb_store(s->tdb, s->key, s->val, s->flag); return (void *)rc; } static VALUE rbtdb_store(VALUE self, VALUE key, VALUE val, int flag, int soft) { struct store_args s; s.tdb = db(self, 1); TO_TDB_DATA(s.key, key); TO_TDB_DATA(s.val, val); s.flag = flag; if ((long)my_tbr(nogvl_store, &s) == -1) { if (soft) { int ecode = tdb_error(s.tdb); if ((flag == TDB_INSERT) && (ecode == TDB_ERR_EXISTS)) return Qnil; if ((flag == TDB_MODIFY) && (ecode == TDB_ERR_NOEXIST)) return Qnil; } my_raise(s.tdb); } return val; } static VALUE store(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, 0, 0); } static VALUE insert_bang(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_INSERT, 0); } static VALUE insert(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_INSERT, 1); } static VALUE modify_bang(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_MODIFY, 0); } static VALUE modify(VALUE self, VALUE key, VALUE val) { return rbtdb_store(self, key, val, TDB_MODIFY, 1); } struct exists_args { struct tdb_context *tdb; TDB_DATA key; }; static void * nogvl_exists(void *ptr) { struct exists_args *e = ptr; return (void *)(tdb_exists(e->tdb, e->key) == 0 ? Qfalse : Qtrue); } static VALUE has_key(VALUE self, VALUE key) { struct exists_args e; e.tdb = db(self, 1); TO_TDB_DATA(e.key, key); return (VALUE)my_tbr(nogvl_exists, &e); } struct traverse_args { struct tdb_context *tdb; TDB_DATA key; TDB_DATA val; int state; }; static VALUE protected_yield(VALUE val) { VALUE *kv = (VALUE *)val; return rb_yield_values(2, kv[0], kv[1]); } static void *my_yield(void *data) { struct traverse_args *t = data; VALUE kv[2]; kv[0] = str_new_tdb_data(&t->key); kv[1] = str_new_tdb_data(&t->val); rb_protect(protected_yield, (VALUE)kv, &t->state); return NULL; } static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA val, void *data) { struct traverse_args *t = data; t->key = key; t->val = val; (void)rb_thread_call_with_gvl(my_yield, t); return t->state; } static void * nogvl_traverse(void *ptr) { struct traverse_args *t = ptr; (void)tdb_traverse(t->tdb, traverse_fn, t); return (void *)Qfalse; } static VALUE each(VALUE self) { struct traverse_args t; t.tdb = db(self, 1); t.state = 0; my_tbr(nogvl_traverse, &t); if (t.state) rb_jump_tag(t.state); return self; } struct delete_args { struct tdb_context *tdb; TDB_DATA key; }; static void * nogvl_delete(void *ptr) { struct delete_args *d = ptr; return (void *)(tdb_delete(d->tdb, d->key) == 0 ? Qtrue : Qfalse); } static VALUE nuke(VALUE self, VALUE key) { struct delete_args d; d.tdb = db(self, 1); TO_TDB_DATA(d.key, key); return (VALUE)my_tbr(nogvl_delete, &d); } static VALUE aref(VALUE self, VALUE key) { return fetch(1, &key, self); } static VALUE delete(int argc, VALUE *argv, VALUE self) { VALUE rc = fetch(argc, argv, self); if (! NIL_P(rc)) if (nuke(self, argv[0]) == Qfalse) return Qnil; return rc; } static VALUE lockall(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_lockall, tdb)) my_raise(tdb); return Qtrue; } static VALUE trylockall(VALUE self) { struct tdb_context *tdb = db(self, 1); void *fn = tdb_lockall_nonblock; if (my_tbr(fn, tdb)) { if (tdb_error(tdb) == TDB_ERR_LOCK) return Qfalse; my_raise(tdb); } return Qtrue; } static VALUE unlockall(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_unlockall, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_lockall_read, tdb)) my_raise(tdb); return Qtrue; } static VALUE trylockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); void *fn = tdb_lockall_read_nonblock; if (my_tbr(fn, tdb)) { if (tdb_error(tdb) == TDB_ERR_LOCK) return Qfalse; my_raise(tdb); } return Qtrue; } static VALUE unlockall_read(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_unlockall_read, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_mark(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_lockall_mark, tdb)) my_raise(tdb); return Qtrue; } static VALUE lockall_unmark(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_lockall_unmark, tdb)) my_raise(tdb); return Qtrue; } /* * clears out the database */ static VALUE clear(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_wipe_all, tdb)) my_raise(tdb); return self; } #ifdef HAVE_TDB_REPACK /* repacks a database to reduce fragmentation, available with tdb 1.2.x+ */ static VALUE repack(VALUE self) { struct tdb_context *tdb = db(self, 1); if (my_tbr(tdb_repack, tdb)) my_raise(tdb); return self; } #endif /* HAVE_TDB_REPACK */ void Init_tdb_ext(void) { cTDB = rb_define_class("TDB", rb_cObject); hashes = rb_hash_new(); /* * Available hash functions, the key is the name of the hash * and the value is a pointer for internal for usage. */ rb_define_const(cTDB, "HASHES", hashes); rb_define_alloc_func(cTDB, alloc); rb_include_module(cTDB, rb_mEnumerable); rb_define_method(cTDB, "initialize", init, -1); rb_define_method(cTDB, "close", tdbclose, 0); rb_define_method(cTDB, "closed?", closed, 0); rb_define_method(cTDB, "fetch", fetch, -1); rb_define_method(cTDB, "[]", aref, 1); rb_define_method(cTDB, "store", store, 2); rb_define_method(cTDB, "[]=", store, 2); rb_define_method(cTDB, "insert!", insert_bang, 2); rb_define_method(cTDB, "modify!", modify_bang, 2); rb_define_method(cTDB, "insert", insert, 2); rb_define_method(cTDB, "modify", modify, 2); rb_define_method(cTDB, "key?", has_key, 1); rb_define_method(cTDB, "has_key?", has_key, 1); rb_define_method(cTDB, "include?", has_key, 1); rb_define_method(cTDB, "member?", has_key, 1); rb_define_method(cTDB, "each", each, 0); rb_define_method(cTDB, "nuke!", nuke, 1); rb_define_method(cTDB, "delete", delete, -1); rb_define_method(cTDB, "lockall", lockall, 0); rb_define_method(cTDB, "trylockall", trylockall, 0); rb_define_method(cTDB, "unlockall", unlockall, 0); rb_define_method(cTDB, "lockall_read", lockall_read, 0); rb_define_method(cTDB, "trylockall_read", trylockall_read, 0); rb_define_method(cTDB, "unlockall_read", unlockall_read, 0); rb_define_method(cTDB, "lockall_mark", lockall_mark, 0); rb_define_method(cTDB, "lockall_unmark", lockall_unmark, 0); rb_define_method(cTDB, "clear", clear, 0); #ifdef HAVE_TDB_REPACK rb_define_method(cTDB, "repack", repack, 0); #endif /* HAVE_TDB_REPACK */ init_errors(); init_hashes(); /* just a readability place holder */ rb_define_const(cTDB, "DEFAULT", UINT2NUM(TDB_DEFAULT)); /* clear database if we are the only one with it open */ rb_define_const(cTDB, "CLEAR_IF_FIRST", UINT2NUM(TDB_CLEAR_IF_FIRST)); /* don't store on disk, use in-memory database */ rb_define_const(cTDB, "INTERNAL", UINT2NUM(TDB_INTERNAL)); /* don't do any locking */ rb_define_const(cTDB, "NOLOCK", UINT2NUM(TDB_NOLOCK)); /* don't use mmap */ rb_define_const(cTDB, "NOMMAP", UINT2NUM(TDB_NOMMAP)); /* convert endian (internal use) */ rb_define_const(cTDB, "CONVERT", UINT2NUM(TDB_CONVERT)); /* header is big-endian (internal use) */ rb_define_const(cTDB, "BIGENDIAN", UINT2NUM(TDB_BIGENDIAN)); /* don't use synchronous transactions */ rb_define_const(cTDB, "NOSYNC", UINT2NUM(TDB_NOSYNC)); /* maintain a sequence number */ rb_define_const(cTDB, "SEQNUM", UINT2NUM(TDB_SEQNUM)); /* Activate the per-hashchain freelist, default 5 */ rb_define_const(cTDB, "VOLATILE", UINT2NUM(TDB_VOLATILE)); #ifdef TDB_ALLOW_NESTING /* Allow transactions to nest */ rb_define_const(cTDB, "ALLOW_NESTING", UINT2NUM(TDB_ALLOW_NESTING)); #endif #ifdef TDB_DISALLOW_NESTING /* Disallow transactions to nest */ rb_define_const(cTDB, "DISALLOW_NESTING", UINT2NUM(TDB_DISALLOW_NESTING)); #endif #ifdef TDB_INCOMPATIBLE_HASH /* Better hashing, but can't be opened by tdb < 1.2.6. */ rb_define_const(cTDB, "INCOMPATIBLE_HASH", UINT2NUM(TDB_INCOMPATIBLE_HASH)); #endif rbtdb_init_tdb_hash_functions(); } /* * Document-class: TDB * * * tdb = TDB.new("/path/to/file", flags => IO::RDWR|IO::CREAT) * tdb.store("HELLO", "world") * tdb.fetch("HELLO") -> "world" * tdb.delete("HELLO") -> "world" * */