about summary refs log tree commit homepage
path: root/lib/rainbows/rev
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/rev')
-rw-r--r--lib/rainbows/rev/client.rb13
-rw-r--r--lib/rainbows/rev/master.rb15
2 files changed, 22 insertions, 6 deletions
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index b7c1c78..bc85fbd 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -42,6 +42,19 @@ module Rainbows
         super(buf)
       end
 
+      def on_readable
+        buf = @_io.kgio_tryread(16384)
+        case buf
+        when :wait_readable
+        when nil # eof
+          close
+        else
+          on_read buf
+        end
+      rescue Errno::ECONNRESET
+        close
+      end
+
       # queued, optional response bodies, it should only be unpollable "fast"
       # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
       # are also part of this.  We'll also stick DeferredResponse bodies in
diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb
index 21b583a..8e5d4ef 100644
--- a/lib/rainbows/rev/master.rb
+++ b/lib/rainbows/rev/master.rb
@@ -2,20 +2,23 @@
 # :enddoc:
 require 'rainbows/rev'
 
-class Rainbows::Rev::Master < Rev::AsyncWatcher
+class Rainbows::Rev::Master < Rev::IOWatcher
 
   def initialize(queue)
-    super()
+    @reader, @writer = Kgio::Pipe.new
+    super(@reader)
     @queue = queue
   end
 
   def <<(output)
     @queue << output
-    signal
+    @writer.kgio_trywrite("\0")
   end
 
-  def on_signal
-    client, response = @queue.pop
-    client.response_write(response)
+  def on_readable
+    if String === @reader.kgio_tryread(1)
+      client, response = @queue.pop
+      client.response_write(response)
+    end
   end
 end