about summary refs log tree commit homepage
path: root/ext/kgio/connect.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kgio/connect.c')
-rw-r--r--ext/kgio/connect.c80
1 files changed, 72 insertions, 8 deletions
diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c
index 42ab44c..21b3f7c 100644
--- a/ext/kgio/connect.c
+++ b/ext/kgio/connect.c
@@ -1,5 +1,7 @@
 #include "kgio.h"
+#include "my_fileno.h"
 #include "sock_for_fd.h"
+#include "blocking_io_region.h"
 
 static void close_fail(int fd, const char *msg)
 {
@@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
                           &addr, hints.ai_addrlen);
 }
 
+static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr)
+{
+        if (TYPE(addr) == T_STRING) {
+                *addrlen = (socklen_t)RSTRING_LEN(addr);
+                return (struct sockaddr *)(RSTRING_PTR(addr));
+        }
+        rb_raise(rb_eTypeError, "invalid address");
+        return NULL;
+}
+
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+#ifndef HAVE_RB_STR_SUBSEQ
+#define rb_str_subseq rb_str_substr
+#endif
+struct tfo_args {
+        int fd;
+        void *buf;
+        size_t buflen;
+        struct sockaddr *addr;
+        socklen_t addrlen;
+};
+
+static VALUE tfo_sendto(void *_a)
+{
+        struct tfo_args *a = _a;
+        ssize_t w;
+
+        w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen);
+        return (VALUE)w;
+}
+
+/*
+ * call-seq:
+ *
+ *        s = Kgio::Socket.new(:INET, :STREAM)
+ *        addr = Socket.pack_sockaddr_in("example.com", 80)
+ *        s.fastopen("hello world", addr) -> nil
+ *
+ * Starts a TCP connection using TCP Fast Open.  This uses a blocking
+ * sendto() syscall and is only available on Ruby 1.9 or later.
+ * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN)
+ * on errors.  Using this is only recommended for blocking sockets.
+ *         s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_"))
+ */
+static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr)
+{
+        struct tfo_args a;
+        VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf);
+        ssize_t w;
+
+        a.fd = my_fileno(sock);
+        a.buf = RSTRING_PTR(str);
+        a.buflen = (size_t)RSTRING_LEN(str);
+        a.addr = sockaddr_from(&a.addrlen, addr);
+
+        /* n.b. rb_thread_blocking_region preserves errno */
+        w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd);
+        if (w < 0)
+                rb_sys_fail("sendto");
+        if ((size_t)w == a.buflen)
+                return Qnil;
+
+        return rb_str_subseq(str, w, a.buflen - w);
+}
+#endif /* MSG_FASTOPEN */
+
 /*
  * call-seq:
  *
@@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
 {
         int domain;
         socklen_t addrlen;
-        struct sockaddr *sockaddr;
+        struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr);
 
-        if (TYPE(addr) == T_STRING) {
-                sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
-                addrlen = (socklen_t)RSTRING_LEN(addr);
-        } else {
-                rb_raise(rb_eTypeError, "invalid address");
-        }
         switch (((struct sockaddr_storage *)(sockaddr))->ss_family) {
         case AF_UNIX: domain = PF_UNIX; break;
         case AF_INET: domain = PF_INET; break;
@@ -316,7 +378,9 @@ void init_kgio_connect(void)
         rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1);
         rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1);
         rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1);
-
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+        rb_define_method(cKgio_Socket, "fastopen", fastopen, 2);
+#endif
         /*
          * Document-class: Kgio::TCPSocket
          *