class Select
A pure-Ruby implementation of the event selector.
Definitions
def initialize(loop)
Initialize the selector with the given event loop fiber.
Implementation
def initialize(loop)
@loop = loop
@waiting = Hash.new.compare_by_identity
# Flag indicating whether the selector is currently blocked in a system call.
# Set to true when blocked in ::IO.select, false otherwise.
# Used by wakeup() to determine if an interrupt signal is needed.
@blocked = false
@ready = Queue.new
@interrupt = Interrupt.attach(self)
@idle_duration = 0.0
end
attr :loop
Signature
-
attribute
Fiber
The event loop fiber.
attr :idle_duration
Signature
-
attribute
Float
This is the amount of time the event loop was idle during the last select call.
def wakeup
Wake up the event loop if it is currently sleeping.
Implementation
def wakeup
if @blocked
@interrupt.signal
return true
end
return false
end
def close
Close the selector and release any resources.
Implementation
def close
@interrupt.close
@loop = nil
@waiting = nil
end
def transfer
Transfer from the current fiber to the event loop.
Implementation
def transfer
@loop.transfer
end
def resume(fiber, *arguments)
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
Implementation
def resume(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.transfer(*arguments)
ensure
optional.nullify
end
def yield
Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
Implementation
def yield
optional = Optional.new(Fiber.current)
@ready.push(optional)
@loop.transfer
ensure
optional.nullify
end
def push(fiber)
Append the given fiber into the ready list.
Implementation
def push(fiber)
@ready.push(fiber)
end
def raise(fiber, *arguments, **options)
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
Implementation
def raise(fiber, *arguments, **options)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.raise(*arguments, **options)
ensure
optional.nullify
end
def ready?
Signature
-
returns
Boolean
Whether the ready list is not empty, i.e. there are fibers ready to be resumed.
Implementation
def ready?
!@ready.empty?
end
def io_wait(fiber, io, events)
Wait for the given IO to become readable or writable.
Signature
-
parameter
fiber
Fiber
The fiber that is waiting.
-
parameter
io
IO
The IO object to wait on.
-
parameter
events
Integer
The events to wait for.
Implementation
def io_wait(fiber, io, events)
waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
@loop.transfer
ensure
waiter&.invalidate
end
def io_select(readable, writable, priority, timeout)
Wait for multiple IO objects to become readable or writable.
Signature
-
parameter
readable
Array(IO)
The list of IO objects to wait for readability.
-
parameter
writable
Array(IO)
The list of IO objects to wait for writability.
-
parameter
priority
Array(IO)
The list of IO objects to wait for priority events.
Implementation
def io_select(readable, writable, priority, timeout)
Thread.new do
IO.select(readable, writable, priority, timeout)
end.value
end
def io_read(fiber, io, buffer, length, offset = 0)
Read from the given IO to the buffer.
Signature
-
parameter
length
Integer
The minimum number of bytes to read.
-
parameter
offset
Integer
The offset into the buffer to read to.
Implementation
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.read(io, 0, offset)}
if result < 0
if length > 0 and again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break
else
total += result
break if total >= length
offset += result
end
end
end
return total
end
def io_read(fiber, io, buffer, length, offset = 0)
Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
Implementation
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
maximum_size = buffer.size - offset
while maximum_size > 0
result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
maximum_size = buffer.size - offset
end
end
return total
end
def io_write(fiber, io, buffer, length, offset = 0)
Write to the given IO from the buffer.
Signature
-
parameter
length
Integer
The minimum number of bytes to write.
-
parameter
offset
Integer
The offset into the buffer to write from.
Implementation
def io_write(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.write(io, 0, offset)}
if result < 0
if length > 0 and again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break result
else
total += result
break if total >= length
offset += result
end
end
end
return total
end
def process_wait(fiber, pid, flags)
Wait for a process to change state.
Signature
-
parameter
fiber
Fiber
The fiber to resume after waiting.
-
parameter
pid
Integer
The process ID to wait for.
-
parameter
flags
Integer
Flags to pass to Process::Status.wait.
-
returns
Process::Status
The status of the waited process.
Implementation
def process_wait(fiber, pid, flags)
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
def select(duration = nil)
Wait for IO events or a timeout.
Signature
-
parameter
duration
Numeric | Nil
The maximum time to wait, or nil for no timeout.
-
returns
Integer
The number of ready IO objects.
Implementation
def select(duration = nil)
if pop_ready
# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
duration = 0
end
readable = Array.new
writable = Array.new
priority = Array.new
@waiting.delete_if do |io, waiter|
if io.closed?
true
else
waiter.each do |fiber, events|
if (events & IO::READABLE) > 0
readable << io
end
if (events & IO::WRITABLE) > 0
writable << io
end
if (events & IO::PRIORITY) > 0
priority << io
end
end
false
end
end
duration = 0 unless @ready.empty?
error = nil
if duration&.>(0)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
else
@idle_duration = 0.0
end
# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
Thread.handle_interrupt(::Exception => :on_blocking) do
@blocked = true
readable, writable, priority = ::IO.select(readable, writable, priority, duration)
rescue ::Exception => error
# Requeue below...
ensure
@blocked = false
if start_time
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@idle_duration = end_time - start_time
end
end
if error
# Requeue the error into the pending exception queue:
Thread.current.raise(error)
return 0
end
ready = Hash.new(0).compare_by_identity
readable&.each do |io|
ready[io] |= IO::READABLE
end
writable&.each do |io|
ready[io] |= IO::WRITABLE
end
priority&.each do |io|
ready[io] |= IO::PRIORITY
end
ready.each do |io, events|
@waiting.delete(io).dispatch(events) do |waiter|
# Re-schedule the waiting IO:
waiter.tail = @waiting[io]
@waiting[io] = waiter
end
end
return ready.size
end