IO::EventSourceIOEventSelectorSelect

class Select

A pure-Ruby implementation of the event selector.

Definitions

def initialize(loop)

Initialize the selector with the given event loop fiber.

Implementation

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	# Flag indicating whether the selector is currently blocked in a system call.
	# Set to true when blocked in ::IO.select, false otherwise.
	# Used by wakeup() to determine if an interrupt signal is needed.
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

attr :loop

Signature

attribute Fiber

The event loop fiber.

attr :idle_duration

Signature

attribute Float

This is the amount of time the event loop was idle during the last select call.

def wakeup

Wake up the event loop if it is currently sleeping.

Implementation

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

def close

Close the selector and release any resources.

Implementation

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

def transfer

Transfer from the current fiber to the event loop.

Implementation

def transfer
	@loop.transfer
end

def resume(fiber, *arguments)

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.

Implementation

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

def yield

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.

Implementation

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end

def push(fiber)

Append the given fiber into the ready list.

Implementation

def push(fiber)
	@ready.push(fiber)
end

def raise(fiber, *arguments, **options)

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.

Implementation

def raise(fiber, *arguments, **options)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments, **options)
ensure
	optional.nullify
end

def ready?

Signature

returns Boolean

Whether the ready list is not empty, i.e. there are fibers ready to be resumed.

Implementation

def ready?
	!@ready.empty?
end

def io_wait(fiber, io, events)

Wait for the given IO to become readable or writable.

Signature

parameter fiber Fiber

The fiber that is waiting.

parameter io IO

The IO object to wait on.

parameter events Integer

The events to wait for.

Implementation

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

def io_select(readable, writable, priority, timeout)

Wait for multiple IO objects to become readable or writable.

Signature

parameter readable Array(IO)

The list of IO objects to wait for readability.

parameter writable Array(IO)

The list of IO objects to wait for writability.

parameter priority Array(IO)

The list of IO objects to wait for priority events.

Implementation

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

def io_read(fiber, io, buffer, length, offset = 0)

Read from the given IO to the buffer.

Signature

parameter length Integer

The minimum number of bytes to read.

parameter offset Integer

The offset into the buffer to read to.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def io_read(fiber, io, buffer, length, offset = 0)

Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		maximum_size = buffer.size - offset
		while maximum_size > 0
			result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
			
			maximum_size = buffer.size - offset
		end
	end
	
	return total
end

def io_write(fiber, io, buffer, length, offset = 0)

Write to the given IO from the buffer.

Signature

parameter length Integer

The minimum number of bytes to write.

parameter offset Integer

The offset into the buffer to write from.

Implementation

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def process_wait(fiber, pid, flags)

Wait for a process to change state.

Signature

parameter fiber Fiber

The fiber to resume after waiting.

parameter pid Integer

The process ID to wait for.

parameter flags Integer

Flags to pass to Process::Status.wait.

returns Process::Status

The status of the waited process.

Implementation

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

def select(duration = nil)

Wait for IO events or a timeout.

Signature

parameter duration Numeric | Nil

The maximum time to wait, or nil for no timeout.

returns Integer

The number of ready IO objects.

Implementation

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.delete_if do |io, waiter|
		if io.closed?
			true
		else
			waiter.each do |fiber, events|
				if (events & IO::READABLE) > 0
					readable << io
				end
				
				if (events & IO::WRITABLE) > 0
					writable << io
				end
				
				if (events & IO::PRIORITY) > 0
					priority << io
				end
			end

			false
		end
	end
	
	duration = 0 unless @ready.empty?
	error = nil
	
	if duration&.>(0)
		start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
	else
		@idle_duration = 0.0
	end
	
	# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
	Thread.handle_interrupt(::Exception => :on_blocking) do
		@blocked = true
		readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	rescue ::Exception => error
		# Requeue below...
	ensure
		@blocked = false
		if start_time
			end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
			@idle_duration = end_time - start_time
		end
	end
	
	if error
		# Requeue the error into the pending exception queue:
		Thread.current.raise(error)
		return 0
	end
	
	ready = Hash.new(0).compare_by_identity
	
	readable&.each do |io|
		ready[io] |= IO::READABLE
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY
	end
	
	ready.each do |io, events|
		@waiting.delete(io).dispatch(events) do |waiter|
			# Re-schedule the waiting IO:
			waiter.tail = @waiting[io]
			@waiting[io] = waiter
		end
	end
	
	return ready.size
end