EventSourceIOEventDebugSelector

class Selector

Enforces the selector interface and delegates operations to a wrapped selector instance.

You can enable this in the default selector by setting the IO_EVENT_DEBUG_SELECTOR environment variable. In addition, you can log all selector operations to a file by setting the IO_EVENT_DEBUG_SELECTOR_LOG environment variable. This is useful for debugging and understanding the behavior of the event loop.

Definitions

def self.wrap(selector, env = ENV)

Wrap the given selector with debugging.

Signature

parameter selector Selector

The selector to wrap.

parameter env Hash

The environment to read configuration from.

Implementation

def self.wrap(selector, env = ENV)
	log = nil
	
	if log_path = env["IO_EVENT_DEBUG_SELECTOR_LOG"]
		log = File.open(log_path, "w")
	end
	
	return self.new(selector, log: log)
end

def initialize(selector, log: nil)

Initialize the debug selector with the given selector and optional log.

Signature

parameter selector Selector

The selector to wrap.

parameter log IO

The log to write debug messages to.

Implementation

def initialize(selector, log: nil)
	@selector = selector
	
	@readable = {}
	@writable = {}
	@priority = {}
	
	unless Fiber.current == selector.loop
		Kernel::raise "Selector must be initialized on event loop fiber!"
	end
	
	@log = log
end

def idle_duration

The idle duration of the underlying selector.

Signature

returns Numeric

The idle duration.

Implementation

def idle_duration
	@selector.idle_duration
end

def now

The current time.

Signature

returns Numeric

The current time.

Implementation

def now
	Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def log(message)

  • asynchronous

Log the given message.

Signature

asynchronous

Will block the calling fiber and the entire event loop.

Implementation

def log(message)
	return unless @log
	
	Fiber.blocking do
		@log.puts("T+%10.1f; %s" % [now, message])
	end
end

def wakeup

Wakeup the the selector.

Implementation

def wakeup
	@selector.wakeup
end

def close

Close the selector.

Implementation

def close
	log("Closing selector")
	
	if @selector.nil?
		Kernel::raise "Selector already closed!"
	end
	
	@selector.close
	@selector = nil
end

def transfer

Transfer from the calling fiber to the selector.

Implementation

def transfer
	log("Transfering to event loop")
	@selector.transfer
end

def resume(*arguments)

Resume the given fiber with the given arguments.

Implementation

def resume(*arguments)
	log("Resuming fiber with #{arguments.inspect}")
	@selector.resume(*arguments)
end

def yield

Yield to the selector.

Implementation

def yield
	log("Yielding to event loop")
	@selector.yield
end

def push(fiber)

Push the given fiber to the selector ready list, such that it will be resumed on the next call to IO::Event::Debug::Selector#select.

Signature

parameter fiber Fiber

The fiber that is ready.

Implementation

def push(fiber)
	log("Pushing fiber #{fiber.inspect} to ready list")
	@selector.push(fiber)
end

def raise(fiber, *arguments)

Raise the given exception on the given fiber.

Signature

parameter fiber Fiber

The fiber to raise the exception on.

parameter arguments Array

The arguments to use when raising the exception.

Implementation

def raise(fiber, *arguments)
	log("Raising exception on fiber #{fiber.inspect} with #{arguments.inspect}")
	@selector.raise(fiber, *arguments)
end

def ready?

Check if the selector is ready.

Signature

returns Boolean

Whether the selector is ready.

Implementation

def ready?
	@selector.ready?
end

def process_wait(*arguments)

Wait for the given process, forwarded to the underlying selector.

Implementation

def process_wait(*arguments)
	log("Waiting for process with #{arguments.inspect}")
	@selector.process_wait(*arguments)
end

def io_wait(fiber, io, events)

Wait for the given IO, forwarded to the underlying selector.

Implementation

def io_wait(fiber, io, events)
	log("Waiting for IO #{io.inspect} for events #{events.inspect}")
	@selector.io_wait(fiber, io, events)
end

def io_read(fiber, io, buffer, length, offset = 0)

Read from the given IO, forwarded to the underlying selector.

Implementation

def io_read(fiber, io, buffer, length, offset = 0)
	log("Reading from IO #{io.inspect} with buffer #{buffer}; length #{length} offset #{offset}")
	@selector.io_read(fiber, io, buffer, length, offset)
end

def io_write(fiber, io, buffer, length, offset = 0)

Write to the given IO, forwarded to the underlying selector.

Implementation

def io_write(fiber, io, buffer, length, offset = 0)
	log("Writing to IO #{io.inspect} with buffer #{buffer}; length #{length} offset #{offset}")
	@selector.io_write(fiber, io, buffer, length, offset)
end

def respond_to?(name, include_private = false)

Forward the given method to the underlying selector.

Implementation

def respond_to?(name, include_private = false)
	@selector.respond_to?(name, include_private)
end

def select(duration = nil)

Select for the given duration, forwarded to the underlying selector.

Implementation

def select(duration = nil)
	log("Selecting for #{duration.inspect}")
	unless Fiber.current == @selector.loop
		Kernel::raise "Selector must be run on event loop fiber!"
	end
	
	@selector.select(duration)
end