class Connection
Wraps a framer and implements for implementing connection specific interactions like reading and writing text.
Definitions
def initialize(framer, mask: nil, **options)
Signature
-
parameter
mask
String
4-byte mask to be used for frames generated by this connection.
Implementation
def initialize(framer, mask: nil, **options)
@framer = framer
@mask = mask
@state = :open
@frames = []
@reserved = Frame::RESERVED
@reader = self
@writer = self
end
attr :framer
Signature
-
attribute
Framer
The framer which is used for reading and writing frames.
attr :mask
Signature
-
attribute
String | Boolean | Nil
The optional mask which is used when generating frames.
attr :reserved
Signature
-
attribute
Integer
The allowed reserved bits.
attr_accessor :frames
Signature
-
attribute
Array(Frame)
Buffered frames which form part of a complete message.
attr_accessor :reader
Signature
-
attribute
Object
The reader which is used to unpack frames into messages.
attr_accessor :writer
Signature
-
attribute
Object
The writer which is used to pack messages into frames.
def reserve!(bit)
Reserve a bit in the reserved flags for an extension.
Signature
-
parameter
bit
Integer
The bit to reserve, see
RESERVED = RSV1 | RSV2 | RSV3
for more details.
Implementation
def reserve!(bit)
if (@reserved & bit).zero?
raise ArgumentError, "Unable to use #{bit}!"
end
@reserved &= ~bit
return true
end
def flush
Flush the underlying framer to ensure all buffered data is written to the connection.
Implementation
def flush
@framer.flush
end
def open!
Transition the connection to the open state (the default for new connections).
Implementation
def open!
@state = :open
return self
end
def close!(...)
If not already closed, transition the connection to the closed state and send a close frame. Will try to send a close frame with the specified code and reason, but will ignore any errors that occur while sending.
Implementation
def close!(...)
unless @state == :closed
@state = :closed
begin
send_close(...)
rescue
# Ignore errors.
end
end
return self
end
def closed?
Signature
-
returns
Boolean
if the connection is in the closed state.
Implementation
def closed?
@state == :closed
end
def close(...)
Immediately transition the connection to the closed state and close the underlying connection. Any data not yet read will be lost.
Implementation
def close(...)
close!(...)
@framer.close
end
def close_write(error = nil)
Close the connection gracefully, sending a close frame with the specified error code and reason. If an error occurs while sending the close frame, the connection will be closed immediately. You may continue to read data from the connection after calling this method, but you should not write any more data.
Signature
-
parameter
error
Error | Nil
The error that occurred, if any.
Implementation
def close_write(error = nil)
if error
send_close(Error::INTERNAL_ERROR, error.message)
else
send_close
end
end
def shutdown
Close the connection gracefully. This will send a close frame and wait for the remote end to respond with a close frame. Any data received after the close frame is sent will be ignored. If you want to process this data, use #close_write
instead, and read the data before calling #close
.
Implementation
def shutdown
send_close unless @state == :closed
# `read_frame` will return nil after receiving a close frame:
while read_frame
# Drain the connection.
end
end
def read_frame
Read a frame from the framer, and apply it to the connection.
Implementation
def read_frame
return nil if closed?
frame = @framer.read_frame
unless (frame.flags & @reserved).zero?
raise ProtocolError, "Received frame with reserved flags set!"
end
yield frame if block_given?
frame.apply(self)
return frame
rescue ProtocolError => error
close(error.code, error.message)
raise
rescue => error
close(Error::PROTOCOL_ERROR, error.message)
raise
end
def write_frame(frame)
Write a frame to the framer.
Note: This does not immediately write the frame to the connection, you must call #flush
to ensure the frame is written.
Implementation
def write_frame(frame)
@framer.write_frame(frame)
return frame
end
def receive_text(frame)
Receive a text frame from the connection.
Implementation
def receive_text(frame)
if @frames.empty?
@frames << frame
else
raise ProtocolError, "Received text, but expecting continuation!"
end
end
def receive_binary(frame)
Receive a binary frame for the connection.
Implementation
def receive_binary(frame)
if @frames.empty?
@frames << frame
else
raise ProtocolError, "Received binary, but expecting continuation!"
end
end
def receive_continuation(frame)
Receive a continuation frame for the connection.
Implementation
def receive_continuation(frame)
if @frames.any?
@frames << frame
else
raise ProtocolError, "Received unexpected continuation!"
end
end
def receive_close(frame)
Receive a close frame from the connection.
Implementation
def receive_close(frame)
code, reason = frame.unpack
# On receiving a close frame, we must enter the closed state:
close!(code, reason)
if code and code != Error::NO_ERROR
raise ClosedError.new reason, code
end
end
def send_ping(data = "")
Send a ping frame with the specified data.
Signature
-
parameter
data
String
The data to send in the ping frame.
Implementation
def send_ping(data = "")
if @state != :closed
frame = PingFrame.new(mask: @mask)
frame.pack(data)
write_frame(frame)
else
raise ProtocolError, "Cannot send ping in state #{@state}"
end
end
def receive_ping(frame)
Receive a ping frame from the connection.
Implementation
def receive_ping(frame)
if @state != :closed
write_frame(frame.reply(mask: @mask))
else
raise ProtocolError, "Cannot receive ping in state #{@state}"
end
end
def receive_pong(frame)
Receive a pong frame from the connection. By default, this method does nothing.
Implementation
def receive_pong(frame)
# Ignore.
end
def receive_frame(frame)
Receive a frame that is not a control frame. By default, this method raises a class Protocol::WebSocket::ProtocolError
.
Implementation
def receive_frame(frame)
raise ProtocolError, "Unhandled frame: #{frame}"
end
def pack_text_frame(buffer, **options)
Pack a text frame with the specified buffer. This is used by the #writer
interface.
Signature
-
parameter
buffer
String
The text to pack into the frame.
-
returns
TextFrame
The packed frame.
Implementation
def pack_text_frame(buffer, **options)
frame = TextFrame.new(mask: @mask)
frame.pack(buffer)
return frame
end
def send_text(buffer, **options)
Send a text frame with the specified buffer.
Signature
-
parameter
buffer
String
The text to send.
Implementation
def send_text(buffer, **options)
write_frame(@writer.pack_text_frame(buffer, **options))
end
def pack_binary_frame(buffer, **options)
Pack a binary frame with the specified buffer. This is used by the #writer
interface.
Signature
-
parameter
buffer
String
The binary data to pack into the frame.
-
returns
BinaryFrame
The packed frame.
Implementation
def pack_binary_frame(buffer, **options)
frame = BinaryFrame.new(mask: @mask)
frame.pack(buffer)
return frame
end
def send_binary(buffer, **options)
Send a binary frame with the specified buffer.
Signature
-
parameter
buffer
String
The binary data to send.
Implementation
def send_binary(buffer, **options)
write_frame(@writer.pack_binary_frame(buffer, **options))
end
def send_close(code = Error::NO_ERROR, reason = "")
Send a control frame with data containing a specified control sequence to begin the closing handshake. Does not close the connection, until the remote end responds with a close frame.
Signature
-
parameter
code
Integer
The close code to send.
-
parameter
reason
String
The reason for closing the connection.
Implementation
def send_close(code = Error::NO_ERROR, reason = "")
frame = CloseFrame.new(mask: @mask)
frame.pack(code, reason)
self.write_frame(frame)
self.flush
end
def write(message, **options)
Write a message to the connection.
Signature
-
parameter
message
Message
The message to send.
Implementation
def write(message, **options)
case message
when String
# This is a compatibility shim for the previous implementation. We may want to eventually deprecate this use case... or maybe it's convenient enough to leave it around.
if message.encoding == Encoding::UTF_8
return send_text(message, **options)
else
return send_binary(message, **options)
end
when Message
message.send(self, **options)
else
raise ArgumentError, "Unsupported message type: #{message}"
end
end
def unpack_frames(frames)
The default implementation for reading a message buffer. This is used by the #reader
interface.
Implementation
def unpack_frames(frames)
frames.map(&:unpack).join("")
end
def read(**options)
Read a message from the connection. If an error occurs while reading the message, the connection will be closed.
If the message is fragmented, this method will buffer the frames until a complete message is received.
Implementation
def read(**options)
@framer.flush
while read_frame
if @frames.last&.finished?
frames = @frames
@frames = []
buffer = @reader.unpack_frames(frames, **options)
return frames.first.read_message(buffer)
end
end
rescue ProtocolError => error
close(error.code, error.message)
raise
end