diff options
Diffstat (limited to 'ext/kgio/connect.c')
-rw-r--r-- | ext/kgio/connect.c | 80 |
1 files changed, 72 insertions, 8 deletions
diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c index 42ab44c..21b3f7c 100644 --- a/ext/kgio/connect.c +++ b/ext/kgio/connect.c @@ -1,5 +1,7 @@ #include "kgio.h" +#include "my_fileno.h" #include "sock_for_fd.h" +#include "blocking_io_region.h" static void close_fail(int fd, const char *msg) { @@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) &addr, hints.ai_addrlen); } +static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr) +{ + if (TYPE(addr) == T_STRING) { + *addrlen = (socklen_t)RSTRING_LEN(addr); + return (struct sockaddr *)(RSTRING_PTR(addr)); + } + rb_raise(rb_eTypeError, "invalid address"); + return NULL; +} + +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) +#ifndef HAVE_RB_STR_SUBSEQ +#define rb_str_subseq rb_str_substr +#endif +struct tfo_args { + int fd; + void *buf; + size_t buflen; + struct sockaddr *addr; + socklen_t addrlen; +}; + +static VALUE tfo_sendto(void *_a) +{ + struct tfo_args *a = _a; + ssize_t w; + + w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen); + return (VALUE)w; +} + +/* + * call-seq: + * + * s = Kgio::Socket.new(:INET, :STREAM) + * addr = Socket.pack_sockaddr_in("example.com", 80) + * s.fastopen("hello world", addr) -> nil + * + * Starts a TCP connection using TCP Fast Open. This uses a blocking + * sendto() syscall and is only available on Ruby 1.9 or later. + * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN) + * on errors. Using this is only recommended for blocking sockets. + * s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_")) + */ +static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr) +{ + struct tfo_args a; + VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf); + ssize_t w; + + a.fd = my_fileno(sock); + a.buf = RSTRING_PTR(str); + a.buflen = (size_t)RSTRING_LEN(str); + a.addr = sockaddr_from(&a.addrlen, addr); + + /* n.b. rb_thread_blocking_region preserves errno */ + w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd); + if (w < 0) + rb_sys_fail("sendto"); + if ((size_t)w == a.buflen) + return Qnil; + + return rb_str_subseq(str, w, a.buflen - w); +} +#endif /* MSG_FASTOPEN */ + /* * call-seq: * @@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) { int domain; socklen_t addrlen; - struct sockaddr *sockaddr; + struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr); - if (TYPE(addr) == T_STRING) { - sockaddr = (struct sockaddr *)(RSTRING_PTR(addr)); - addrlen = (socklen_t)RSTRING_LEN(addr); - } else { - rb_raise(rb_eTypeError, "invalid address"); - } switch (((struct sockaddr_storage *)(sockaddr))->ss_family) { case AF_UNIX: domain = PF_UNIX; break; case AF_INET: domain = PF_INET; break; @@ -316,7 +378,9 @@ void init_kgio_connect(void) rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1); rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1); rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1); - +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) + rb_define_method(cKgio_Socket, "fastopen", fastopen, 2); +#endif /* * Document-class: Kgio::TCPSocket * |