EventSourceIOEventSelectorSelect

class Select

Definitions

attr :idle_duration

This is the amount of time the event loop was idle during the last select call.

def wakeup

If the event loop is currently sleeping, wake it up.

Implementation

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

def transfer

Transfer from the current fiber to the event loop.

Implementation

def transfer
	@loop.transfer
end

def resume(fiber, *arguments)

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.

Implementation

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

def yield

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.

Implementation

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end

def push(fiber)

Append the given fiber into the ready list.

Implementation

def push(fiber)
	@ready.push(fiber)
end

def raise(fiber, *arguments)

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.

Implementation

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

def io_read(fiber, io, buffer, length, offset = 0)

Read from the given IO to the buffer.

Signature

parameter length Integer

The minimum number of bytes to read.

parameter offset Integer

The offset into the buffer to read to.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def io_read(fiber, io, buffer, length, offset = 0)

Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		maximum_size = buffer.size - offset
		while maximum_size > 0
			result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
			
			maximum_size = buffer.size - offset
		end
	end
	
	return total
end

def io_read(fiber, _io, buffer, length, offset = 0)

Ruby <= 3.1, limited IO::Buffer support.

Implementation

def io_read(fiber, _io, buffer, length, offset = 0)
	# We need to avoid any internal buffering, so we use a duplicated IO object:
	io = IO.for_fd(_io.fileno, autoclose: false)
	
	total = 0
	
	maximum_size = buffer.size - offset
	while maximum_size > 0
		case result = blocking{io.read_nonblock(maximum_size, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return EWOULDBLOCK
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return EWOULDBLOCK
			end
		when nil
			break
		else
			buffer.set_string(result, offset)
			
			size = result.bytesize
			total += size
			offset += size
			break if size >= length
			length -= size
		end
		
		maximum_size = buffer.size - offset
	end
	
	return total
rescue IOError => error
	return -Errno::EBADF::Errno
rescue SystemCallError => error
	return -error.errno
end

def io_write(fiber, io, buffer, length, offset = 0)

Write to the given IO from the buffer.

Signature

parameter length Integer

The minimum number of bytes to write.

parameter offset Integer

The offset into the buffer to write from.

Implementation

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end