EventSourceIOEventSelectorSelect

class Select

A pure-Ruby implementation of the event selector.

Definitions

def initialize(loop)

Initialize the selector with the given event loop fiber.

Implementation

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

attr :loop

Signature

attribute Fiber

The event loop fiber.

attr :idle_duration

Signature

attribute Float

This is the amount of time the event loop was idle during the last select call.

def wakeup

Wake up the event loop if it is currently sleeping.

Implementation

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

def close

Close the selector and release any resources.

Implementation

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

def transfer

Transfer from the current fiber to the event loop.

Implementation

def transfer
	@loop.transfer
end

def resume(fiber, *arguments)

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.

Implementation

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

def yield

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.

Implementation

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end

def push(fiber)

Append the given fiber into the ready list.

Implementation

def push(fiber)
	@ready.push(fiber)
end

def raise(fiber, *arguments)

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.

Implementation

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

def ready?

Signature

returns Boolean

Whether the ready list is not empty, i.e. there are fibers ready to be resumed.

Implementation

def ready?
	!@ready.empty?
end

def io_wait(fiber, io, events)

Wait for the given IO to become readable or writable.

Signature

parameter fiber Fiber

The fiber that is waiting.

parameter io IO

The IO object to wait on.

parameter events Integer

The events to wait for.

Implementation

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

def io_select(readable, writable, priority, timeout)

Wait for multiple IO objects to become readable or writable.

Signature

parameter readable Array(IO)

The list of IO objects to wait for readability.

parameter writable Array(IO)

The list of IO objects to wait for writability.

parameter priority Array(IO)

The list of IO objects to wait for priority events.

Implementation

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

def io_read(fiber, io, buffer, length, offset = 0)

Read from the given IO to the buffer.

Signature

parameter length Integer

The minimum number of bytes to read.

parameter offset Integer

The offset into the buffer to read to.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def io_read(fiber, io, buffer, length, offset = 0)

Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		maximum_size = buffer.size - offset
		while maximum_size > 0
			result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
			
			maximum_size = buffer.size - offset
		end
	end
	
	return total
end

def io_read(fiber, _io, buffer, length, offset = 0)

Ruby <= 3.1, limited IO::Buffer support.

Implementation

def io_read(fiber, _io, buffer, length, offset = 0)
	# We need to avoid any internal buffering, so we use a duplicated IO object:
	io = IO.for_fd(_io.fileno, autoclose: false)
	
	total = 0
	
	maximum_size = buffer.size - offset
	while maximum_size > 0
		case result = blocking{io.read_nonblock(maximum_size, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return EWOULDBLOCK
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return EWOULDBLOCK
			end
		when nil
			break
		else
			buffer.set_string(result, offset)
			
			size = result.bytesize
			total += size
			offset += size
			break if size >= length
			length -= size
		end
		
		maximum_size = buffer.size - offset
	end
	
	return total
rescue IOError => error
	return -Errno::EBADF::Errno
rescue SystemCallError => error
	return -error.errno
end

def io_write(fiber, io, buffer, length, offset = 0)

Write to the given IO from the buffer.

Signature

parameter length Integer

The minimum number of bytes to write.

parameter offset Integer

The offset into the buffer to write from.

Implementation

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end