about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2010-11-18 14:37:05 -0800
committerEric Wong <e@yhbt.net>2010-11-18 15:02:17 -0800
commitc69955e64648ab6a3471a54f7885a320428682f9 (patch)
tree20ff90133eaebdc4cec347a3dc8ea2d1d9ab3afe
parentf1b497e601ed2acb54f75dc989d0a5ec7afebca0 (diff)
downloadkgio-c69955e64648ab6a3471a54f7885a320428682f9.tar.gz
This removes the global Kgio.wait_*able accesors and requires
each class to define (or fall back to) the Kgio::DefaultWaiters
methods.
-rw-r--r--README6
-rw-r--r--ext/kgio/connect.c8
-rw-r--r--ext/kgio/kgio.h4
-rw-r--r--ext/kgio/read_write.c13
-rw-r--r--ext/kgio/wait.c165
-rw-r--r--test/lib_read_write.rb20
-rw-r--r--test/test_connect_fd_leak.rb4
-rw-r--r--test/test_tcp_connect.rb5
-rw-r--r--test/test_unix_connect.rb4
9 files changed, 68 insertions, 161 deletions
diff --git a/README b/README
index 4dc37ce..0f1a71c 100644
--- a/README
+++ b/README
@@ -15,9 +15,9 @@ applications.
 * Returns the unwritten portion of the string on partial writes,
   making it ideal for buffering unwritten data.
 
-* May be assigned Kgio.wait_writable= and Kgio.wait_readable=
-  methods to allow socket/pipe objects to make custom callbacks
-  (such as adding the file descriptor to a poll set and yielding
+* May call any method defined to be "kgio_wait_writable" or
+  "kgio_wait_readable" methods to allow socket/pipe objects to make custom
+  callbacks (such as adding the file descriptor to a poll set and yielding
   the current Fiber).
 
 * Uses
diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c
index 1f670db..4e46704 100644
--- a/ext/kgio/connect.c
+++ b/ext/kgio/connect.c
@@ -46,7 +46,7 @@ my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen)
 
                         if (io_wait) {
                                 errno = EAGAIN;
-                                kgio_call_wait_writable(io, fd);
+                                (void)kgio_call_wait_writable(io);
                         }
                         return io;
                 }
@@ -81,7 +81,7 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
  * Creates a new Kgio::TCPSocket object and initiates a
  * non-blocking connection.
  *
- * This may block and call any method assigned to Kgio.wait_writable.
+ * This may block and call any method defined to kgio_wait_writable.
  *
  * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
  * lookups (which is subject to a different set of timeouts and
@@ -138,7 +138,7 @@ static VALUE unix_connect(VALUE klass, VALUE path, int io_wait)
  * Creates a new Kgio::UNIXSocket object and initiates a
  * non-blocking connection.
  *
- * This may block and call any method assigned to Kgio.wait_writable.
+ * This may block and call any method defined to kgio_wait_writable.
  */
 static VALUE kgio_unix_connect(VALUE klass, VALUE path)
 {
@@ -197,7 +197,7 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
  * Creates a generic Kgio::Socket object and initiates a
  * non-blocking connection.
  *
- * This may block and call any method assigned to Kgio.wait_writable.
+ * This may block and call any method assigned to kgio_wait_writable.
  */
 static VALUE kgio_connect(VALUE klass, VALUE addr)
 {
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
index 74c01b5..78445e3 100644
--- a/ext/kgio/kgio.h
+++ b/ext/kgio/kgio.h
@@ -34,7 +34,7 @@ void init_kgio_read_write(void);
 void init_kgio_accept(void);
 void init_kgio_connect(void);
 
-void kgio_call_wait_writable(VALUE io, int fd);
-void kgio_call_wait_readable(VALUE io, int fd);
+VALUE kgio_call_wait_writable(VALUE io);
+VALUE kgio_call_wait_readable(VALUE io);
 
 #endif /* KGIO_H */
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 3b208fe..7ba2925 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -67,7 +67,7 @@ static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
                 rb_str_set_len(a->buf, 0);
                 if (errno == EAGAIN) {
                         if (io_wait) {
-                                kgio_call_wait_readable(a->io, a->fd);
+                                (void)kgio_call_wait_readable(a->io);
 
                                 /* buf may be modified in other thread/fiber */
                                 rb_str_resize(a->buf, a->len);
@@ -112,8 +112,8 @@ retry:
  * Reads at most maxlen bytes from the stream socket.  Returns with a
  * newly allocated buffer, or may reuse an existing buffer if supplied.
  *
- * Calls the method assigned to Kgio.wait_readable, or blocks in a
- * thread-safe manner for writability.
+ * Calls whatever is is defined to be the kgio_wait_readable method
+ * for the class.
  *
  * Returns nil on EOF.
  *
@@ -232,7 +232,7 @@ done:
                         long written = RSTRING_LEN(a->buf) - a->len;
 
                         if (io_wait) {
-                                kgio_call_wait_writable(a->io, a->fd);
+                                (void)kgio_call_wait_writable(a->io);
 
                                 /* buf may be modified in other thread/fiber */
                                 a->len = RSTRING_LEN(a->buf) - written;
@@ -278,9 +278,8 @@ retry:
  *
  * Returns nil when the write completes.
  *
- * Calls the method Kgio.wait_writable if it is set.  Otherwise this
- * blocks in a thread-safe manner until all data is written or a
- * fatal error occurs.
+ * Calls whatever is is defined to be the kgio_wait_writable method
+ * for the class.
  */
 static VALUE kgio_write(VALUE io, VALUE str)
 {
diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c
index 9cfcbdb..76c46db 100644
--- a/ext/kgio/wait.c
+++ b/ext/kgio/wait.c
@@ -1,154 +1,81 @@
 #include "kgio.h"
 
-static ID io_wait_rd, io_wait_wr;
+static ID id_wait_rd, id_wait_wr;
 
 /*
  * avoiding rb_thread_select() or similar since rb_io_wait_*able can be
  * made to use poll() later on.  It's highly unlikely Ruby will move to
- * use an edge-triggered event notification, so assign EAGAIN is safe...
+ * use an edge-triggered event notification, so assigning EAGAIN is
+ * probably safe...
  */
-static VALUE force_wait_readable(VALUE self)
-{
-        errno = EAGAIN;
-        if (!rb_io_wait_readable(my_fileno(self)))
-                rb_sys_fail("wait readable");
 
-        return self;
-}
 
-static VALUE force_wait_writable(VALUE self)
+/*
+ * Blocks the running Thread indefinitely until +self+ IO object is writable.
+ * This method is automatically called by default whenever kgio_read needs
+ * to block on input.
+ *
+ * Users of alternative threading/fiber libraries are
+ * encouraged to override this method in their subclasses or modules to
+ * work with their threading/blocking methods.
+ */
+static VALUE kgio_wait_readable(VALUE self)
 {
         errno = EAGAIN;
-        if (!rb_io_wait_writable(my_fileno(self)))
-                rb_sys_fail("wait writable");
+        if (!rb_io_wait_readable(my_fileno(self)))
+                rb_sys_fail("kgio_wait_readable");
 
         return self;
 }
 
-void kgio_call_wait_readable(VALUE io, int fd)
-{
-        /*
-         * we _NEVER_ set errno = EAGAIN here by default so we can work
-         * (or fail hard) with edge-triggered epoll()
-         */
-        if (io_wait_rd) {
-                (void)rb_funcall(io, io_wait_rd, 0, 0);
-        } else {
-                if (!rb_io_wait_readable(fd))
-                        rb_sys_fail("wait readable");
-        }
-}
-
-void kgio_call_wait_writable(VALUE io, int fd)
-{
-        /*
-         * we _NEVER_ set errno = EAGAIN here by default so we can work
-         * (or fail hard) with edge-triggered epoll()
-         */
-        if (io_wait_wr) {
-                (void)rb_funcall(io, io_wait_wr, 0, 0);
-        } else {
-                if (!rb_io_wait_writable(fd))
-                        rb_sys_fail("wait writable");
-        }
-}
-
 /*
- * call-seq:
- *
- *        Kgio.wait_readable = :method_name
- *        Kgio.wait_readable = nil
- *
- * Sets a method for kgio_read to call when a read would block.
- * This is useful for non-blocking frameworks that use Fibers,
- * as the method referred to this may cause the current Fiber
- * to yield execution.
- *
- * A special value of nil will cause Ruby to wait using the
- * rb_io_wait_readable() function.
+ * blocks the running Thread indefinitely until +self+ IO object is writable
+ * This method is automatically called whenever kgio_write needs to
+ * block on output.
+ * Users of alternative threading/fiber libraries are
+ * encouraged to override this method in their subclasses or modules to
+ * work with their threading/blocking methods.
  */
-static VALUE set_wait_rd(VALUE mod, VALUE sym)
+static VALUE kgio_wait_writable(VALUE self)
 {
-        switch (TYPE(sym)) {
-        case T_SYMBOL:
-                io_wait_rd = SYM2ID(sym);
-                return sym;
-        case T_NIL:
-                io_wait_rd = 0;
-                return sym;
-        }
-        rb_raise(rb_eTypeError, "must be a symbol or nil");
-        return sym;
-}
+        errno = EAGAIN;
+        if (!rb_io_wait_writable(my_fileno(self)))
+                rb_sys_fail("kgio_wait_writable");
 
-/*
- * call-seq:
- *
- *        Kgio.wait_writable = :method_name
- *        Kgio.wait_writable = nil
- *
- * Sets a method for kgio_write to call when a read would block.
- * This is useful for non-blocking frameworks that use Fibers,
- * as the method referred to this may cause the current Fiber
- * to yield execution.
- *
- * A special value of nil will cause Ruby to wait using the
- * rb_io_wait_writable() function.
- */
-static VALUE set_wait_wr(VALUE mod, VALUE sym)
-{
-        switch (TYPE(sym)) {
-        case T_SYMBOL:
-                io_wait_wr = SYM2ID(sym);
-                return sym;
-        case T_NIL:
-                io_wait_wr = 0;
-                return sym;
-        }
-        rb_raise(rb_eTypeError, "must be a symbol or nil");
-        return sym;
+        return self;
 }
 
-/*
- * call-seq:
- *
- *        Kgio.wait_writable        -> Symbol or nil
- *
- * Returns the symbolic method name of the method assigned to
- * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write
- * or Kgio::SocketMethods#kgio_write call
- */
-static VALUE wait_wr(VALUE mod)
+VALUE kgio_call_wait_writable(VALUE io)
 {
-        return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil;
+        return rb_funcall(io, id_wait_wr, 0, 0);
 }
 
-/*
- * call-seq:
- *
- *        Kgio.wait_readable        -> Symbol or nil
- *
- * Returns the symbolic method name of the method assigned to
- * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read
- * or Kgio::SocketMethods#kgio_read call.
- */
-static VALUE wait_rd(VALUE mod)
+VALUE kgio_call_wait_readable(VALUE io)
 {
-        return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil;
+        return rb_funcall(io, id_wait_rd, 0, 0);
 }
 
 void init_kgio_wait(void)
 {
         VALUE mKgio = rb_define_module("Kgio");
+
+        /*
+         * Document-module: Kgio::DefaultWaiters
+         *
+         * This module contains default kgio_wait_readable and
+         * kgio_wait_writable methods that block indefinitely (in a
+         * thread-safe manner) until an IO object is read or writable.
+         * This module is included in the Kgio::PipeMethods and
+         * Kgio::SocketMethods modules used by all bundled IO-derived
+         * objects.
+         */
         VALUE mWaiters = rb_define_module_under(mKgio, "DefaultWaiters");
 
+        id_wait_rd = rb_intern("kgio_wait_readable");
+        id_wait_wr = rb_intern("kgio_wait_writable");
+
         rb_define_method(mWaiters, "kgio_wait_readable",
-                         force_wait_readable, 0);
+                         kgio_wait_readable, 0);
         rb_define_method(mWaiters, "kgio_wait_writable",
-                         force_wait_writable, 0);
-
-        rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1);
-        rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1);
-        rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0);
-        rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0);
+                         kgio_wait_writable, 0);
 }
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index c11b3af..20bdd59 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -13,9 +13,6 @@ module LibReadWriteTest
       @rd.close unless @rd.closed?
       @wr.close unless @wr.closed?
     end
-    assert_nothing_raised do
-      Kgio.wait_readable = Kgio.wait_writable = nil
-    end
   end
 
   def test_read_zero
@@ -205,11 +202,10 @@ module LibReadWriteTest
 
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
-    def @wr.wait_writable
+    def @wr.kgio_wait_writable
       @nr += 1
       IO.select(nil, [self])
     end
-    Kgio.wait_writable = :wait_writable
     buf = "." * 1024 * 1024 * 10
     thr = Thread.new { @wr.kgio_write(buf) }
     readed = @rd.read(buf.size)
@@ -220,7 +216,6 @@ module LibReadWriteTest
   end
 
   def test_wait_readable_ruby_default
-    assert_nothing_raised { Kgio.wait_readable = nil }
     elapsed = 0
     foo = nil
     t0 = Time.now
@@ -243,7 +238,6 @@ module LibReadWriteTest
     rescue Errno::EAGAIN
       break
     end while true
-    assert_nothing_raised { Kgio.wait_writable = nil }
     elapsed = 0
     foo = nil
     t0 = Time.now
@@ -261,10 +255,9 @@ module LibReadWriteTest
   end
 
   def test_wait_readable_method
-    def @rd.moo
+    def @rd.kgio_wait_readable
       defined?(@z) ? raise(RuntimeError, "Hello") : @z = "HI"
     end
-    assert_nothing_raised { Kgio.wait_readable = :moo }
     foo = nil
     begin
       foo = @rd.kgio_read(5)
@@ -277,18 +270,16 @@ module LibReadWriteTest
   end
 
   def test_tryread_wait_readable_method
-    def @rd.moo
+    def @rd.kgio_wait_readable
       raise "Hello"
     end
-    assert_nothing_raised { Kgio.wait_readable = :moo }
     assert_equal :wait_readable, @rd.kgio_tryread(5)
   end
 
   def test_trywrite_wait_readable_method
-    def @wr.moo
+    def @wr.kgio_wait_writable
       raise "Hello"
     end
-    assert_nothing_raised { Kgio.wait_writable = :moo }
     tmp = []
     buf = "." * 1024
     10000.times { tmp << @wr.kgio_trywrite(buf) }
@@ -296,10 +287,9 @@ module LibReadWriteTest
   end
 
   def test_wait_writable_method
-    def @wr.moo
+    def @wr.kgio_wait_writable
       defined?(@z) ? raise(RuntimeError, "Hello") : @z = "HI"
     end
-    assert_nothing_raised { Kgio.wait_writable = :moo }
     n = []
     begin
       loop { n << @wr.kgio_write("HIHIHIHIHIHI") }
diff --git a/test/test_connect_fd_leak.rb b/test/test_connect_fd_leak.rb
index 5889e3a..f6a8543 100644
--- a/test/test_connect_fd_leak.rb
+++ b/test/test_connect_fd_leak.rb
@@ -5,10 +5,6 @@ require 'kgio'
 
 class TestConnectFDLeak < Test::Unit::TestCase
 
-  def teardown
-    Kgio.wait_readable = Kgio.wait_writable = nil
-  end
-
   def test_unix_socket
     nr = 0
     path = "/non/existent/path"
diff --git a/test/test_tcp_connect.rb b/test/test_tcp_connect.rb
index bad2146..194a630 100644
--- a/test/test_tcp_connect.rb
+++ b/test/test_tcp_connect.rb
@@ -5,7 +5,7 @@ require 'kgio'
 
 class SubSocket < Kgio::Socket
   attr_accessor :foo
-  def wait_writable
+  def kgio_wait_writable
     @foo = "waited"
   end
 end
@@ -23,7 +23,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase
     @srv.close unless @srv.closed?
     Kgio.accept_cloexec = true
     Kgio.accept_nonblock = false
-    Kgio.wait_readable = Kgio.wait_writable = nil
   end
 
   def test_new
@@ -56,7 +55,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase
   end
 
   def test_socket_start
-    Kgio::wait_writable = :wait_writable
     sock = SubSocket.start(@addr)
     assert_nil sock.foo
     ready = IO.select(nil, [ sock ])
@@ -65,7 +63,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase
   end
 
   def test_wait_writable_set
-    Kgio::wait_writable = :wait_writable
     sock = SubSocket.new(@addr)
     assert_equal "waited", sock.foo
     assert_equal nil, sock.kgio_write("HELLO")
diff --git a/test/test_unix_connect.rb b/test/test_unix_connect.rb
index 4b7519c..f99a877 100644
--- a/test/test_unix_connect.rb
+++ b/test/test_unix_connect.rb
@@ -6,7 +6,7 @@ require 'tempfile'
 
 class SubSocket < Kgio::Socket
   attr_accessor :foo
-  def wait_writable
+  def kgio_wait_writable
     @foo = "waited"
   end
 end
@@ -57,7 +57,6 @@ class TestKgioUnixConnect < Test::Unit::TestCase
   end
 
   def test_socket_start
-    Kgio::wait_writable = :wait_writable
     sock = SubSocket.start(@addr)
     assert_nil sock.foo
     ready = IO.select(nil, [ sock ])
@@ -66,7 +65,6 @@ class TestKgioUnixConnect < Test::Unit::TestCase
   end
 
   def test_wait_writable_set
-    Kgio::wait_writable = :wait_writable
     sock = SubSocket.new(@addr)
     assert_kind_of Kgio::Socket, sock
     assert_instance_of SubSocket, sock