diff options
author | Eric Wong <e@yhbt.net> | 2010-09-27 00:11:43 +0000 |
---|---|---|
committer | Eric Wong <e@yhbt.net> | 2010-09-27 00:11:43 +0000 |
commit | 6fbde1518578dd1b828efcecaf2caf893bddc110 (patch) | |
tree | 75261028e9d6c1644eacfd1e5ad881d08a4ea2bd | |
parent | fdfecc6d815bab8dfc1d8ad6758a66d44ab51e31 (diff) | |
download | kgio-6fbde1518578dd1b828efcecaf2caf893bddc110.tar.gz |
These initiate (but do not wait for) non-blocking connects.
-rw-r--r-- | ext/kgio/kgio_ext.c | 115 | ||||
-rw-r--r-- | test/test_tcp_connect.rb | 17 | ||||
-rw-r--r-- | test/test_unix_connect.rb | 17 |
3 files changed, 110 insertions, 39 deletions
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index e301971..eac04a5 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -448,7 +448,7 @@ static VALUE set_nonblock(VALUE mod, VALUE boolean) } static VALUE -my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen) +my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen) { int rc; int fd = socket(domain, SOCK_STREAM, 0); @@ -473,8 +473,10 @@ my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen) if (errno == EINPROGRESS) { VALUE io = sock_for_fd(klass, fd); - errno = EAGAIN; - wait_writable(io); + if (io_wait) { + errno = EAGAIN; + wait_writable(io); + } return io; } rb_sys_fail("connect"); @@ -482,27 +484,7 @@ my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen) return sock_for_fd(klass, fd); } -/* - * call-seq: - * - * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket - * - * Creates a new Kgio::TCPSocket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. - * - * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS - * lookups (which is subject to a different set of timeouts and - * best handled elsewhere). - * - * This is only intended as a convenience for testing, - * Kgio::Socket.new (along with a cached/memoized addr argument) - * is recommended for applications that repeatedly connect to - * the same backend servers. - */ -static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) +static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) { struct sockaddr_in addr = { 0 }; @@ -511,32 +493,46 @@ static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) { case 1: - return my_connect(klass, PF_INET, &addr, sizeof(addr)); + return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr)); case -1: rb_sys_fail("inet_pton"); } - rb_raise(rb_eArgError, "invalid address: %s", - StringValuePtr(ip)); + rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip)); + return Qnil; } /* * call-seq: * - * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket + * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket * - * Creates a new Kgio::UNIXSocket object and initiates a + * Creates a new Kgio::TCPSocket object and initiates a * non-blocking connection. The caller should select/poll * on the socket for writability before attempting to write * or optimistically attempt a write and handle Kgio::WaitWritable * or Errno::EAGAIN. * + * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS + * lookups (which is subject to a different set of timeouts and + * best handled elsewhere). + * * This is only intended as a convenience for testing, * Kgio::Socket.new (along with a cached/memoized addr argument) * is recommended for applications that repeatedly connect to * the same backend servers. */ -static VALUE kgio_unix_connect(VALUE klass, VALUE path) +static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) +{ + return tcp_connect(klass, ip, port, 1); +} + +static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port) +{ + return tcp_connect(klass, ip, port, 0); +} + +static VALUE unix_connect(VALUE klass, VALUE path, int io_wait) { struct sockaddr_un addr = { 0 }; long len; @@ -551,22 +547,36 @@ static VALUE kgio_unix_connect(VALUE klass, VALUE path) memcpy(addr.sun_path, RSTRING_PTR(path), len); addr.sun_family = AF_UNIX; - return my_connect(klass, PF_UNIX, &addr, sizeof(addr)); + return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr)); } /* * call-seq: * - * addr = Socket.pack_sockaddr_in(80, 'example.com') - * Kgio::Socket.new(addr) -> socket + * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket * - * addr = Socket.pack_sockaddr_un("/tmp/unix.sock") - * Kgio::Socket.new(addr) -> socket + * Creates a new Kgio::UNIXSocket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. * - * Generic connect method for addr generated by Socket.pack_sockaddr_in - * or Socket.pack_sockaddr_un + * This is only intended as a convenience for testing, + * Kgio::Socket.new (along with a cached/memoized addr argument) + * is recommended for applications that repeatedly connect to + * the same backend servers. */ -static VALUE kgio_connect(VALUE klass, VALUE addr) +static VALUE kgio_unix_connect(VALUE klass, VALUE path) +{ + return unix_connect(klass, path, 1); +} + +static VALUE kgio_unix_start(VALUE klass, VALUE path) +{ + return unix_connect(klass, path, 0); +} + +static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) { int domain; socklen_t addrlen; @@ -588,9 +598,33 @@ static VALUE kgio_connect(VALUE klass, VALUE addr) rb_raise(rb_eArgError, "invalid address family"); } - return my_connect(klass, domain, sockaddr, addrlen); + return my_connect(klass, io_wait, domain, sockaddr, addrlen); } +static VALUE kgio_connect(VALUE klass, VALUE addr) +{ + return stream_connect(klass, addr, 1); +} + +static VALUE kgio_start(VALUE klass, VALUE addr) +{ + return stream_connect(klass, addr, 0); +} + +/* + * call-seq: + * + * addr = Socket.pack_sockaddr_in(80, 'example.com') + * Kgio::Socket.new(addr) -> socket + * + * addr = Socket.pack_sockaddr_un("/tmp/unix.sock") + * Kgio::Socket.new(addr) -> socket + * + * Generic connect method for addr generated by Socket.pack_sockaddr_in + * or Socket.pack_sockaddr_un + */ + + void Init_kgio_ext(void) { VALUE mKgio = rb_define_module("Kgio"); @@ -640,6 +674,7 @@ void Init_kgio_ext(void) rb_define_attr(mSocketMethods, "kgio_addr", 1, 1); rb_include_module(cSocket, mSocketMethods); rb_define_singleton_method(cSocket, "new", kgio_connect, 1); + rb_define_singleton_method(cSocket, "start", kgio_start, 1); cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer")); cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer); @@ -653,11 +688,13 @@ void Init_kgio_ext(void) cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket); rb_include_module(cTCPSocket, mSocketMethods); rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2); + rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2); cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket")); cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket); rb_include_module(cUNIXSocket, mSocketMethods); rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1); + rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1); iv_kgio_addr = rb_intern("@kgio_addr"); init_sock_for_fd(); diff --git a/test/test_tcp_connect.rb b/test/test_tcp_connect.rb index 028f852..bad2146 100644 --- a/test/test_tcp_connect.rb +++ b/test/test_tcp_connect.rb @@ -34,6 +34,14 @@ class TestKgioTcpConnect < Test::Unit::TestCase assert_equal nil, sock.kgio_write("HELLO") end + def test_start + sock = Kgio::Socket.start(@addr) + assert_kind_of Kgio::Socket, sock + ready = IO.select(nil, [ sock ]) + assert_equal sock, ready[1][0] + assert_equal nil, sock.kgio_write("HELLO") + end + def test_tcp_socket_new_invalid assert_raises(ArgumentError) { Kgio::TCPSocket.new('example.com', 80) } assert_raises(ArgumentError) { Kgio::TCPSocket.new('999.999.999.999', 80) } @@ -47,6 +55,15 @@ class TestKgioTcpConnect < Test::Unit::TestCase assert_equal nil, sock.kgio_write("HELLO") end + def test_socket_start + Kgio::wait_writable = :wait_writable + sock = SubSocket.start(@addr) + assert_nil sock.foo + ready = IO.select(nil, [ sock ]) + assert_equal sock, ready[1][0] + assert_equal nil, sock.kgio_write("HELLO") + end + def test_wait_writable_set Kgio::wait_writable = :wait_writable sock = SubSocket.new(@addr) diff --git a/test/test_unix_connect.rb b/test/test_unix_connect.rb index 458149d..4b7519c 100644 --- a/test/test_unix_connect.rb +++ b/test/test_unix_connect.rb @@ -48,6 +48,23 @@ class TestKgioUnixConnect < Test::Unit::TestCase assert_equal nil, sock.kgio_write("HELLO") end + def test_start + sock = Kgio::Socket.start(@addr) + assert_instance_of Kgio::Socket, sock + ready = IO.select(nil, [ sock ]) + assert_equal sock, ready[1][0] + assert_equal nil, sock.kgio_write("HELLO") + end + + def test_socket_start + Kgio::wait_writable = :wait_writable + sock = SubSocket.start(@addr) + assert_nil sock.foo + ready = IO.select(nil, [ sock ]) + assert_equal sock, ready[1][0] + assert_equal nil, sock.kgio_write("HELLO") + end + def test_wait_writable_set Kgio::wait_writable = :wait_writable sock = SubSocket.new(@addr) |