class Select
A pure-Ruby implementation of the event selector.
Definitions
def initialize(loop)
Initialize the selector with the given event loop fiber.
Implementation
def initialize(loop)
@loop = loop
@waiting = Hash.new.compare_by_identity
@blocked = false
@ready = Queue.new
@interrupt = Interrupt.attach(self)
@idle_duration = 0.0
end
attr :loop
Signature
-
attribute
Fiber
The event loop fiber.
attr :idle_duration
Signature
-
attribute
Float
This is the amount of time the event loop was idle during the last select call.
def wakeup
Wake up the event loop if it is currently sleeping.
Implementation
def wakeup
if @blocked
@interrupt.signal
return true
end
return false
end
def close
Close the selector and release any resources.
Implementation
def close
@interrupt.close
@loop = nil
@waiting = nil
end
def transfer
Transfer from the current fiber to the event loop.
Implementation
def transfer
@loop.transfer
end
def resume(fiber, *arguments)
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
Implementation
def resume(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.transfer(*arguments)
ensure
optional.nullify
end
def yield
Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
Implementation
def yield
optional = Optional.new(Fiber.current)
@ready.push(optional)
@loop.transfer
ensure
optional.nullify
end
def push(fiber)
Append the given fiber into the ready list.
Implementation
def push(fiber)
@ready.push(fiber)
end
def raise(fiber, *arguments)
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
Implementation
def raise(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.raise(*arguments)
ensure
optional.nullify
end
def ready?
Signature
-
returns
Boolean
Whether the ready list is not empty, i.e. there are fibers ready to be resumed.
Implementation
def ready?
!@ready.empty?
end
def io_wait(fiber, io, events)
Wait for the given IO to become readable or writable.
Signature
-
parameter
fiber
Fiber
The fiber that is waiting.
-
parameter
io
IO
The IO object to wait on.
-
parameter
events
Integer
The events to wait for.
Implementation
def io_wait(fiber, io, events)
waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
@loop.transfer
ensure
waiter&.invalidate
end
def io_select(readable, writable, priority, timeout)
Wait for multiple IO objects to become readable or writable.
Signature
-
parameter
readable
Array(IO)
The list of IO objects to wait for readability.
-
parameter
writable
Array(IO)
The list of IO objects to wait for writability.
-
parameter
priority
Array(IO)
The list of IO objects to wait for priority events.
Implementation
def io_select(readable, writable, priority, timeout)
Thread.new do
IO.select(readable, writable, priority, timeout)
end.value
end
def io_read(fiber, io, buffer, length, offset = 0)
Read from the given IO to the buffer.
Signature
-
parameter
length
Integer
The minimum number of bytes to read.
-
parameter
offset
Integer
The offset into the buffer to read to.
Implementation
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.read(io, 0, offset)}
if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break
else
total += result
break if total >= length
offset += result
end
end
end
return total
end
def io_read(fiber, io, buffer, length, offset = 0)
Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
Implementation
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
maximum_size = buffer.size - offset
while maximum_size > 0
result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
maximum_size = buffer.size - offset
end
end
return total
end
def io_read(fiber, _io, buffer, length, offset = 0)
Ruby <= 3.1, limited IO::Buffer support.
Implementation
def io_read(fiber, _io, buffer, length, offset = 0)
# We need to avoid any internal buffering, so we use a duplicated IO object:
io = IO.for_fd(_io.fileno, autoclose: false)
total = 0
maximum_size = buffer.size - offset
while maximum_size > 0
case result = blocking{io.read_nonblock(maximum_size, exception: false)}
when :wait_readable
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return EWOULDBLOCK
end
when :wait_writable
if length > 0
self.io_wait(fiber, io, IO::WRITABLE)
else
return EWOULDBLOCK
end
when nil
break
else
buffer.set_string(result, offset)
size = result.bytesize
total += size
offset += size
break if size >= length
length -= size
end
maximum_size = buffer.size - offset
end
return total
rescue IOError => error
return -Errno::EBADF::Errno
rescue SystemCallError => error
return -error.errno
end
def io_write(fiber, io, buffer, length, offset = 0)
Write to the given IO from the buffer.
Signature
-
parameter
length
Integer
The minimum number of bytes to write.
-
parameter
offset
Integer
The offset into the buffer to write from.
Implementation
def io_write(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.write(io, 0, offset)}
if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break result
else
total += result
break if total >= length
offset += result
end
end
end
return total
end