From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 64.71.128.0/18 X-Spam-Status: No, score=-1.6 required=3.0 tests=AWL,BAYES_00,FREEMAIL_FROM, MSGID_FROM_MTA_HEADER shortcircuit=no autolearn=ham version=3.3.2 Path: news.gmane.org!not-for-mail From: "=?utf-8?b?0K7RgNC40Lkg0KHQvtC60L7Qu9C+0LI=?=" Newsgroups: gmane.comp.lang.ruby.kgio.general Subject: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Date: Wed, 30 May 2012 23:55:16 +0400 Message-ID: References: <1338386216-14568-1-git-send-email-funny.falcon@gmail.com> <1338386216-14568-3-git-send-email-funny.falcon@gmail.com> NNTP-Posting-Host: plane.gmane.org Mime-Version: 1.0 Content-Type: text/plain; charset="us-ascii" X-Trace: dough.gmane.org 1338407753 13656 80.91.229.3 (30 May 2012 19:55:53 GMT) X-Complaints-To: usenet@dough.gmane.org NNTP-Posting-Date: Wed, 30 May 2012 19:55:53 +0000 (UTC) To: kgio@librelist.com Original-X-From: kgio@librelist.com Wed May 30 21:55:51 2012 Return-path: Envelope-to: gclrkg-kgio@m.gmane.org List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Precedence: list Original-Sender: kgio@librelist.com Xref: news.gmane.org gmane.comp.lang.ruby.kgio.general:161 Archived-At: Received: from zedshaw.xen.prgmr.com ([64.71.167.205]) by plane.gmane.org with esmtp (Exim 4.69) (envelope-from ) id 1SZozh-0004zB-AX for gclrkg-kgio@m.gmane.org; Wed, 30 May 2012 21:55:46 +0200 Received: from zedshaw.xen.prgmr.com (localhost [IPv6:::1]) by zedshaw.xen.prgmr.com (Postfix) with ESMTP id 618DF21DCE8 for ; Wed, 30 May 2012 20:03:38 +0000 (UTC) Content-Transfer-Encoding: 7bit X-Content-Filtered-By: PublicInbox::Filter 0.0.1 With many tiny strings substitution function from writev_compat.h is faster. It seems that we should not always make call to system writev, but do it depending on a data shape. I'll try play with tomorrow. 2012/5/30 Sokolov Yura 'funny-falcon > Add methods for using writev(2) syscall for sending array of string in > a single syscall. This is more efficient than concatenating strings on > Ruby side or sending them one by one. > `#kgio_trywritev` returns array of strings which are not sent to the > socket. If there were objects other than string, they could be converted > using `#to_s` method, but this is not strictly applied, cause > `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items > at once (for Linux its value is 1024). First string of returned array > could be part of string from array, so that you should assume it is not > in consistent state. > string, so that you could not rely on > --- > ext/kgio/extconf.rb | 3 + > ext/kgio/read_write.c | 209 > ++++++++++++++++++++++++++++++++++++++++++++++ > ext/kgio/writev_compat.h | 57 +++++++++++++ > test/lib_read_write.rb | 126 ++++++++++++++++++++++++++++ > 4 files changed, 395 insertions(+) > create mode 100644 ext/kgio/writev_compat.h > > diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb > index f6bd0cc..5fb15ac 100644 > --- a/ext/kgio/extconf.rb > +++ b/ext/kgio/extconf.rb > @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h > sys/socket.h)) or > have_func('accept4', %w(sys/socket.h)) > have_header("sys/select.h") > > +have_func("writev", "sys/uio.h") > + > if have_header('ruby/io.h') > rubyio = %w(ruby.h ruby/io.h) > have_struct_member("rb_io_t", "fd", rubyio) > @@ -50,5 +52,6 @@ have_func('rb_str_set_len') > have_func('rb_time_interval') > have_func('rb_wait_for_single_fd') > have_func('rb_str_subseq') > +have_func('rb_ary_subseq') > > create_makefile('kgio_ext') > diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c > index 9924743..2fc0628 100644 > --- a/ext/kgio/read_write.c > +++ b/ext/kgio/read_write.c > @@ -1,6 +1,9 @@ > #include "kgio.h" > #include "my_fileno.h" > #include "nonblock.h" > +#ifdef HAVE_WRITEV > +# include > +#endif > static VALUE sym_wait_readable, sym_wait_writable; > static VALUE eErrno_EPIPE, eErrno_ECONNRESET; > static ID id_set_backtrace; > @@ -8,6 +11,14 @@ static ID id_set_backtrace; > #define rb_str_subseq rb_str_substr > #endif > > +#ifndef HAVE_RB_ARY_SUBSEQ > +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len) > +{ > + VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)}; > + return rb_ary_aref(2, args, ary); > +} > +#endif > + > /* > * we know MSG_DONTWAIT works properly on all stream sockets under Linux > * we can define this macro for other platforms as people care and > @@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str) > return my_write(io, str, 0); > } > > +#ifndef HAVE_WRITEV > +# include "writev_compat.h" > +#endif > + > +static long iov_max = 128; /* this will be overrident in init */ > + > +struct io_args_v { > + VALUE io; > + VALUE buf; > + VALUE vec_buf; > + struct iovec *vec; > + int iov_cnt; > + int something_written; > + int fd; > +}; > + > +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary) > +{ > + long vec_cnt; > + a->io = io; > + a->fd = my_fileno(io); > + a->something_written = 0; > + > + /* rb_ary_subseq will not copy array unless it modified */ > + a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary)); > + > + vec_cnt = RARRAY_LEN(ary); > + if (vec_cnt > iov_max) vec_cnt = iov_max; > + a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt); > +} > + > +static void fill_iovec(struct io_args_v *a) > +{ > + long i; > + > + a->iov_cnt = RARRAY_LEN(a->buf); > + if (a->iov_cnt > iov_max) a->iov_cnt = iov_max; > + rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt); > + a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf); > + > + for (i=0; i < a->iov_cnt; i++) { > + /* rb_ary_store could reallocate array, > + * so that ought to use RARRAY_PTR */ > + VALUE str = RARRAY_PTR(a->buf)[i]; > + if (TYPE(str) != T_STRING) { > + str = rb_obj_as_string(str); > + rb_ary_store(a->buf, i, str); > + } > + a->vec[i].iov_base = RSTRING_PTR(str); > + a->vec[i].iov_len = RSTRING_LEN(str); > + } > +} > + > +static long trim_writev_buffer(struct io_args_v *a, long n) > +{ > + long i, ary_len = RARRAY_LEN(a->buf), str_len = 0; > + VALUE *elem = RARRAY_PTR(a->buf), str = 0; > + > + for (i = 0; n && i < ary_len; i++, elem++) { > + str = *elem; > + str_len = RSTRING_LEN(str); > + n -= str_len; > + if (n < 0) break; > + } > + > + if (i == ary_len) { > + assert(n == 0 && "writev system call is broken"); > + a->buf = Qnil; > + return 0; > + } > + > + if (i > 0) { > + a->buf = rb_ary_subseq(a->buf, i, ary_len - i); > + } > + > + if (n < 0) { > + str = rb_str_subseq(str, str_len + n, -n); > + rb_ary_store(a->buf, 0, str); > + } > + return RARRAY_LEN(a->buf); > +} > + > +static int writev_check(struct io_args_v *a, long n, const char *msg, int > io_wait) > +{ > + if (n >= 0) { > + if (n > 0) a->something_written = 1; > + return trim_writev_buffer(a, n); > + } else if (n == -1) { > + if (errno == EINTR) { > + a->fd = my_fileno(a->io); > + return -1; > + } > + if (errno == EAGAIN) { > + if (io_wait) { > + (void)kgio_call_wait_writable(a->io); > + return -1; > + } else if (!a->something_written) { > + a->buf = sym_wait_writable; > + } > + return 0; > + } > + wr_sys_fail(msg); > + } > + return 0; > +} > + > +static VALUE my_writev(VALUE io, VALUE str, int io_wait) > +{ > + struct io_args_v a; > + long n; > + > + prepare_writev(&a, io, str); > + set_nonblocking(a.fd); > + > + do { > + fill_iovec(&a); > + n = (long)writev(a.fd, a.vec, a.iov_cnt); > + } while (writev_check(&a, n, "writev", io_wait) != 0); > + > + if (TYPE(a.buf) != T_SYMBOL) > + kgio_autopush_write(io); > + return a.buf; > +} > + > +/* > + * call-seq: > + * > + * io.kgio_writev(array) -> nil > + * > + * Returns nil when the write completes. > + * > + * This may block and call any method defined to +kgio_wait_writable+ > + * for the class. > + * > + * Note: it uses +Array()+ semantic for converting argument, so that > + * it will succeed if you pass something else. > + */ > +static VALUE kgio_writev(VALUE io, VALUE ary) > +{ > + VALUE array = rb_Array(ary); > + return my_writev(io, array, 1); > +} > + > +/* > + * call-seq: > + * > + * io.kgio_trywritev(array) -> nil, Array or :wait_writable > + * > + * Returns nil if the write was completed in full. > + * > + * Returns an Array of strings containing the unwritten portion > + * if EAGAIN was encountered, but some portion was successfully written. > + * > + * Returns :wait_writable if EAGAIN is encountered and nothing > + * was written. > + * > + * Note: it uses +Array()+ semantic for converting argument, so that > + * it will succeed if you pass something else. > + */ > +static VALUE kgio_trywritev(VALUE io, VALUE ary) > +{ > + VALUE array = rb_Array(ary); > + return my_writev(io, array, 0); > +} > + > #ifdef USE_MSG_DONTWAIT > /* > * This method behaves like Kgio::PipeMethods#kgio_write, except > @@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE > str) > return my_write(io, str, 0); > } > > +/* > + * call-seq: > + * > + * Kgio.trywritev(io, array) -> nil, Array or :wait_writable > + * > + * Returns nil if the write was completed in full. > + * > + * Returns a Array of strings containing the unwritten portion if EAGAIN > + * was encountered, but some portion was successfully written. > + * > + * Returns :wait_writable if EAGAIN is encountered and nothing > + * was written. > + * > + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects > + */ > +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary) > +{ > + return kgio_trywritev(io, ary); > +} > + > void init_kgio_read_write(void) > { > VALUE mPipeMethods, mSocketMethods; > @@ -500,6 +696,7 @@ void init_kgio_read_write(void) > > rb_define_singleton_method(mKgio, "tryread", s_tryread, -1); > rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2); > + rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2); > rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1); > > /* > @@ -513,8 +710,10 @@ void init_kgio_read_write(void) > rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); > rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1); > rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); > + rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1); > rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); > rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); > + rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, > 1); > > /* > * Document-module: Kgio::SocketMethods > @@ -527,8 +726,10 @@ void init_kgio_read_write(void) > rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); > rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1); > rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); > + rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1); > rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); > rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); > + rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, > 1); > rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1); > rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1); > > @@ -544,4 +745,12 @@ void init_kgio_read_write(void) > eErrno_ECONNRESET = rb_const_get(rb_mErrno, > rb_intern("ECONNRESET")); > rb_include_module(mPipeMethods, mWaiters); > rb_include_module(mSocketMethods, mWaiters); > + > +#ifdef HAVE_WRITEV > +# ifdef IOV_MAX > + iov_max = IOV_MAX; > +# else > + iov_max = sysconf(_SC_IOV_MAX); > +# endif > +#endif > } > diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h > new file mode 100644 > index 0000000..fce489e > --- /dev/null > +++ b/ext/kgio/writev_compat.h > @@ -0,0 +1,57 @@ > +/* > + * this header for supporting strange systems which missing writev > + */ > +#if !defined(HAVE_WRITEV) && !defined(writev) > +#define writev writev_supl > +#define iovec iovec_supl > +#define WRITEV_MEMLIMIT (2*1024*1024) > +#define IOV_MAX 1024 > + > +struct iovec { > + void *iov_base; > + size_t iov_len; > +}; > + > +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt) > +{ > + int i; > + long result; > + size_t total = 0; > + const struct iovec *curv = vec; > + char *buf, *cur; > + > + if (iov_cnt == 0) return 0; > + > + i = 1; > + total = curv->iov_len; > + if ( total < WRITEV_MEMLIMIT ) { > + for (curv++; i < iov_cnt; i++, curv++) { > + size_t next = total + curv->iov_len; > + if ( next > WRITEV_MEMLIMIT ) break; > + total = next; > + } > + } > + iov_cnt = i; > + > + if (iov_cnt > 1) { > + cur = buf = (char*)malloc(total); > + if (!buf) rb_memerror(); > + > + curv = vec; > + for (i = 0; i < iov_cnt; i++, curv++) { > + memcpy(cur, curv->iov_base, curv->iov_len); > + cur += curv->iov_len; > + } > + } else { > + buf = vec->iov_base; > + } > + > + result = (long)write(fd, buf, total); > + > + if (iov_cnt > 1) { > + free(buf); > + } > + > + return result; > +} > +#endif > diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb > index 6f345cb..7bd14ff 100644 > --- a/test/lib_read_write.rb > +++ b/test/lib_read_write.rb > @@ -21,6 +21,14 @@ module LibReadWriteTest > assert_nil @wr.kgio_trywrite("") > end > > + def test_writev_empty > + assert_nil @wr.kgio_writev([]) > + end > + > + def test_trywritev_empty > + assert_nil @wr.kgio_trywritev([]) > + end > + > def test_read_zero > assert_equal "", @rd.kgio_read(0) > buf = "foo" > @@ -116,6 +124,28 @@ module LibReadWriteTest > assert false, "should never get here (line:#{__LINE__})" > end > > + def test_writev_closed > + @rd.close > + begin > + loop { @wr.kgio_writev ["HI"] } > + rescue Errno::EPIPE, Errno::ECONNRESET => e > + assert_equal [], e.backtrace > + return > + end > + assert false, "should never get here (line:#{__LINE__})" > + end > + > + def test_trywritev_closed > + @rd.close > + begin > + loop { @wr.kgio_trywritev ["HI"] } > + rescue Errno::EPIPE, Errno::ECONNRESET => e > + assert_equal [], e.backtrace > + return > + end > + assert false, "should never get here (line:#{__LINE__})" > + end > + > def test_trywrite_full > buf = "\302\251" * 1024 * 1024 > buf2 = "" > @@ -153,6 +183,43 @@ module LibReadWriteTest > assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value > end > > + def test_trywritev_full > + buf = ["\302\251" * 128] * 8 * 1024 > + buf2 = "" > + dig = Digest::SHA1.new > + t = Thread.new do > + sleep 1 > + nr = 0 > + begin > + dig.update(@rd.readpartial(4096, buf2)) > + nr += buf2.size > + rescue EOFError > + break > + rescue => e > + end while true > + dig.hexdigest > + end > + 50.times do > + wr = buf > + begin > + rv = @wr.kgio_trywritev(wr) > + case rv > + when Array > + wr = rv > + when :wait_readable > + assert false, "should never get here line=#{__LINE__}" > + when :wait_writable > + IO.select(nil, [ @wr ]) > + else > + wr = false > + end > + end while wr > + end > + @wr.close > + t.join > + assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value > + end > + > def test_write_conv > assert_equal nil, @wr.kgio_write(10) > assert_equal "10", @rd.kgio_read(2) > @@ -214,6 +281,19 @@ module LibReadWriteTest > tmp.each { |count| assert_equal nil, count } > end > > + def test_trywritev_return_wait_writable > + tmp = [] > + tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable > + assert :wait_writable === tmp[-1] > + assert(!(:wait_readable === tmp[-1])) > + assert_equal :wait_writable, tmp.pop > + assert tmp.size > 0 > + penultimate = tmp.pop > + assert(penultimate == "I" || penultimate == nil) > + assert tmp.size > 0 > + tmp.each { |count| assert_equal nil, count } > + end > + > def test_tryread_extra_buf_eagain_clears_buffer > tmp = "hello world" > rv = @rd.kgio_tryread(2, tmp) > @@ -248,6 +328,36 @@ module LibReadWriteTest > assert_equal buf, readed > end > > + def test_monster_trywritev > + buf, start = [], 0 > + while start < RANDOM_BLOB.size > + s = RANDOM_BLOB[start, 1000] > + start += s.size > + buf << s > + end > + rv = @wr.kgio_trywritev(buf) > + assert_kind_of Array, rv > + rv = rv.join > + assert rv.size < RANDOM_BLOB.size > + @rd.nonblock = false > + assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv) > + end > + > + def test_monster_writev > + buf, start = [], 0 > + while start < RANDOM_BLOB.size > + s = RANDOM_BLOB[start, 10000] > + start += s.size > + buf << s > + end > + thr = Thread.new { @wr.kgio_writev(buf) } > + @rd.nonblock = false > + readed = @rd.read(RANDOM_BLOB.size) > + thr.join > + assert_nil thr.value > + assert_equal RANDOM_BLOB, readed > + end > + > def test_monster_write_wait_writable > @wr.instance_variable_set :@nr, 0 > def @wr.kgio_wait_writable > @@ -263,6 +373,22 @@ module LibReadWriteTest > assert @wr.instance_variable_get(:@nr) > 0 > end > > + def test_monster_writev_wait_writable > + @wr.instance_variable_set :@nr, 0 > + def @wr.kgio_wait_writable > + @nr += 1 > + IO.select(nil, [self]) > + end > + buf = ["." * 1024] * 1024 * 10 > + buf_size = buf.inject(0){|c, s| c + s.size} > + thr = Thread.new { @wr.kgio_writev(buf) } > + readed = @rd.read(buf_size) > + thr.join > + assert_nil thr.value > + assert_equal buf.join, readed > + assert @wr.instance_variable_get(:@nr) > 0 > + end > + > def test_wait_readable_ruby_default > elapsed = 0 > foo = nil > -- > 1.7.9.5 > >