class DispatcherMiddleware
Represents middleware that dispatches gRPC requests to registered services. Handles routing based on service name from the request path.
server = Async::HTTP::Server.for(endpoint, dispatcher)
Example: Registering services:
dispatcher = DispatcherMiddleware.new
dispatcher.register("hello.Greeter", GreeterService.new(GreeterInterface, "hello.Greeter"))
dispatcher.register("world.Greeter", WorldService.new(WorldInterface, "world.Greeter"))
Signature
Definitions
def initialize(app = nil, services: {})
Initialize the dispatcher.
Signature
-
parameter
app#call | Nil The next middleware in the chain
-
parameter
servicesHash Optional initial services hash (service_name => service_instance)
Implementation
def initialize(app = nil, services: {})
super(app)
@services = services
end
def register(service_name, service)
Register a service.
Signature
-
parameter
service_nameString Service name (e.g., "hello.Greeter")
-
parameter
serviceAsync::GRPC::Service Service instance
Implementation
def register(service_name, service)
@services[service_name] = service
end
def dispatch(request)
Dispatch the request to the appropriate service.
Signature
-
parameter
requestProtocol::HTTP::Request The HTTP request
-
returns
Protocol::HTTP::Response The HTTP response
-
raises
Protocol::GRPC::Error If service or method is not found
Implementation
def dispatch(request)
# Parse service and method from path:
service_name, method_name = Protocol::GRPC::Methods.parse_path(request.path)
# Find service:
service = @services[service_name]
unless service
raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Service not found: #{service_name}")
end
# Verify service name matches:
unless service_name == service.service_name
raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Service name mismatch: expected #{service.service_name}, got #{service_name}")
end
# Get RPC descriptions from the service:
rpc_descriptor = service.rpc_descriptions[method_name]
unless rpc_descriptor
raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Method not found: #{method_name}")
end
handler_method = rpc_descriptor.method
request_class = rpc_descriptor.request_class
response_class = rpc_descriptor.response_class
# Verify handler method exists:
unless service.respond_to?(handler_method, true)
raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Handler method not implemented: #{handler_method}")
end
# Create protocol-level objects for gRPC handling:
encoding = request.headers["grpc-encoding"]
input = Protocol::GRPC::Body::ReadableBody.new(request.body, message_class: request_class, encoding: encoding)
output = Protocol::GRPC::Body::WritableBody.new(message_class: response_class, encoding: encoding)
# Create response headers:
response_headers = Protocol::HTTP::Headers.new([], nil, policy: Protocol::GRPC::HEADER_POLICY)
response_headers["content-type"] = "application/grpc+proto"
response_headers["grpc-encoding"] = encoding if encoding
# Create response object:
response = Protocol::HTTP::Response[200, response_headers, output]
# Parse deadline from timeout header:
timeout = Protocol::GRPC::Methods.parse_timeout(request.headers["grpc-timeout"])
deadline = if timeout
Async::Deadline.start(timeout)
end
# Create call context with request and response:
call = Protocol::GRPC::Call.new(request, response, deadline: deadline)
if rpc_descriptor.streaming?
Async do |task|
dispatch_to_service(service, handler_method, input, output, call, deadline, parent: task)
end
else
# Unary call:
dispatch_to_service(service, handler_method, input, output, call, deadline)
end
response
end