rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 7dcd2e4b911aaf5d3fd0188f46ade2eb0bb1d1bf 2217 bytes (raw)
$ git show v0.1.1:lib/rainbows/thread_spawn.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
 
# -*- encoding: binary -*-
module Rainbows

  # Spawns a new thread for every client connection we accept().  This
  # model is recommended for platforms where spawning threads is
  # inexpensive.
  #
  # If you're connecting to external services and need to perform DNS
  # lookups, consider using the "resolv-replace" library which replaces
  # parts of the core Socket package with concurrent DNS lookup
  # capabilities
  module ThreadSpawn

    include Base

    def worker_loop(worker)
      init_worker_process(worker)
      threads = ThreadGroup.new
      alive = worker.tmp
      nr = 0
      limit = worker_connections

      # closing anything we IO.select on will raise EBADF
      trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
      trap(:QUIT) { LISTENERS.map! { |s| s.close rescue nil } }
      [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
      logger.info "worker=#{worker.nr} ready with ThreadSpawn"

      while alive && master_pid == Process.ppid
        ret = begin
          alive.chmod(nr += 1)
          IO.select(LISTENERS, nil, nil, timeout/2.0) or next
        rescue Errno::EINTR
          retry
        rescue Errno::EBADF
          alive = false
        end

        ret.first.each do |l|
          while threads.list.size >= limit
            nuke_old_thread(threads)
          end
          c = begin
            l.accept_nonblock
          rescue Errno::EINTR, Errno::ECONNABORTED
            next
          end
          threads.add(Thread.new(c) { |c| process_client(c) })
        end
      end
      join_spawned_threads(threads)
    end

    def nuke_old_thread(threads)
      threads.list.each do |thr|
        next if (Time.now - (thr[:t] || next)) < timeout
        thr.kill
        logger.error "killed #{thr.inspect} for being too old"
        return
      end
      # nothing to kill, yield to another thread
      Thread.pass
    end

    def join_spawned_threads(threads)
      logger.info "Joining spawned threads..."
      t0 = Time.now
      timeleft = timeout
      threads.list.each { |thr|
        thr.join(timeleft)
        timeleft -= (Time.now - t0)
      }
      logger.info "Done joining spawned threads."
    end

  end
end

git clone https://yhbt.net/rainbows.git