Async::Service::Supervisor::EnvoySourceAsyncServiceSupervisorEnvoyMonitor

class Monitor

Represents a supervisor monitor that publishes worker endpoints to Envoy using xDS.

Definitions

def initialize(bind: nil, delegate: Delegate.new, control_plane: Async::GRPC::XDS::ControlPlane.new, **options)

Initialize the monitor.

Signature

parameter bind String | Nil

The optional address for the xDS control plane server.

parameter delegate Delegate

The delegate used to map supervisor state into Envoy endpoints.

parameter control_plane Async::GRPC::XDS::ControlPlane

The xDS control plane to update.

Implementation

def initialize(
	bind: nil,
	delegate: Delegate.new,
	control_plane: Async::GRPC::XDS::ControlPlane.new,
	**options
)
	super(**options)
	
	@bind = bind
	@delegate = delegate
	@control_plane = control_plane
	@controllers = {}
	@published_clusters = {}
	@server_task = nil
	@mutex = Mutex.new
end

attr :control_plane

Signature

attribute Async::GRPC::XDS::ControlPlane

The xDS control plane receiving cluster and endpoint updates.

attr :delegate

Signature

attribute Delegate

The delegate used to map supervisor state into Envoy endpoints.

def register(supervisor_controller)

Register a supervisor worker with Envoy.

Signature

parameter supervisor_controller Object

The supervisor controller describing the worker.

returns void

Implementation

def register(supervisor_controller)
	@mutex.synchronize do
		@controllers[supervisor_controller.id] = supervisor_controller
		reconcile
	end
end

def remove(supervisor_controller)

Remove a supervisor worker from Envoy.

Signature

parameter supervisor_controller Object

The supervisor controller describing the worker.

returns void

Implementation

def remove(supervisor_controller)
	@mutex.synchronize do
		@controllers.delete(supervisor_controller.id)
		reconcile
	end
end

def run(parent: Async::Task.current)

Run the monitor and optional xDS server task.

Signature

parameter parent Async::Task

The parent task used for the xDS server.

returns Async::Task

The monitor task.

Implementation

def run(parent: Async::Task.current)
	task = super(parent: parent)
	
	if @bind
		@server_task = parent.async do
			endpoint = Async::HTTP::Endpoint.parse(@bind, protocol: Async::HTTP::Protocol::HTTP2)
			Async::GRPC::XDS::Server.new(@control_plane).run(endpoint)
		end
	end
	
	task
end

def as_json

Convert the currently published endpoints to JSON-compatible data.

Signature

returns Hash

The clusters and endpoint hashes.

Implementation

def as_json
	@mutex.synchronize do
		{
			clusters: build_clusters
		}
	end
end

def run_once

Refresh endpoint health and publish updated EDS state.

Signature

returns void

Implementation

def run_once
	@mutex.synchronize do
		reconcile
	end
end