Async::GRPCSourceAsyncGRPCDispatcherMiddleware

class DispatcherMiddleware

Represents middleware that dispatches gRPC requests to registered services. Handles routing based on service name from the request path.

server = Async::HTTP::Server.for(endpoint, dispatcher)

Example: Registering services:

dispatcher = DispatcherMiddleware.new
dispatcher.register("hello.Greeter", GreeterService.new(GreeterInterface, "hello.Greeter"))
dispatcher.register("world.Greeter", WorldService.new(WorldInterface, "world.Greeter"))

Signature

Definitions

def initialize(app = nil, services: {})

Initialize the dispatcher.

Signature

parameter app #call | Nil

The next middleware in the chain

parameter services Hash

Optional initial services hash (service_name => service_instance)

Implementation

def initialize(app = nil, services: {})
	super(app)
	@services = services
end

def register(service_name, service)

Register a service.

Signature

parameter service_name String

Service name (e.g., "hello.Greeter")

parameter service Async::GRPC::Service

Service instance

Implementation

def register(service_name, service)
	@services[service_name] = service
end

def dispatch(request)

Dispatch the request to the appropriate service.

Signature

parameter request Protocol::HTTP::Request

The HTTP request

returns Protocol::HTTP::Response

The HTTP response

raises Protocol::GRPC::Error

If service or method is not found

Implementation

def dispatch(request)
	# Parse service and method from path:
	service_name, method_name = Protocol::GRPC::Methods.parse_path(request.path)
	
	# Find service:
	service = @services[service_name]
	unless service
		raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Service not found: #{service_name}")
	end
	
	# Verify service name matches:
	unless service_name == service.service_name
		raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Service name mismatch: expected #{service.service_name}, got #{service_name}")
	end
	
	# Get RPC descriptions from the service:
	rpc_descriptor = service.rpc_descriptions[method_name]
	unless rpc_descriptor
		raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Method not found: #{method_name}")
	end
	
	handler_method = rpc_descriptor.method
	request_class = rpc_descriptor.request_class
	response_class = rpc_descriptor.response_class
	
	# Verify handler method exists:
	unless service.respond_to?(handler_method, true)
		raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Handler method not implemented: #{handler_method}")
	end
	
	# Create protocol-level objects for gRPC handling:
	encoding = request.headers["grpc-encoding"]
	input = Protocol::GRPC::Body::ReadableBody.new(request.body, message_class: request_class, encoding: encoding)
	output = Protocol::GRPC::Body::WritableBody.new(message_class: response_class, encoding: encoding)
	
	# Create response headers:
	response_headers = Protocol::HTTP::Headers.new([], nil, policy: Protocol::GRPC::HEADER_POLICY)
	response_headers["content-type"] = "application/grpc+proto"
	response_headers["grpc-encoding"] = encoding if encoding
	
	# Create response object:
	response = Protocol::HTTP::Response[200, response_headers, output]
	
	# Parse deadline from timeout header:
	timeout = Protocol::GRPC::Methods.parse_timeout(request.headers["grpc-timeout"])
	deadline = if timeout
		Async::Deadline.start(timeout)
	end
	
	# Create call context with request and response:
	call = Protocol::GRPC::Call.new(request, response, deadline: deadline)
	
	if rpc_descriptor.streaming?
		Async do |task|
			dispatch_to_service(service, handler_method, input, output, call, deadline, parent: task)
		end
	else
		# Unary call:
		dispatch_to_service(service, handler_method, input, output, call, deadline)
	end
	
	response
end