Protocol::WebSocketSourceProtocolWebSocketConnection

class Connection

Wraps a framer and implements for implementing connection specific interactions like reading and writing text.

Definitions

def initialize(framer, mask: nil, **options)

Signature

parameter mask String

4-byte mask to be used for frames generated by this connection.

Implementation

def initialize(framer, mask: nil, **options)
	@framer = framer
	@mask = mask
	
	@state = :open
	@frames = []
	
	@reserved = Frame::RESERVED
	
	@reader = self
	@writer = self
end

attr :framer

Signature

attribute Framer

The framer which is used for reading and writing frames.

attr :mask

Signature

attribute String | Boolean | Nil

The optional mask which is used when generating frames.

attr :reserved

Signature

attribute Integer

The allowed reserved bits.

attr_accessor :frames

Signature

attribute Array(Frame)

Buffered frames which form part of a complete message.

attr_accessor :reader

Signature

attribute Object

The reader which is used to unpack frames into messages.

attr_accessor :writer

Signature

attribute Object

The writer which is used to pack messages into frames.

def reserve!(bit)

Reserve a bit in the reserved flags for an extension.

Signature

parameter bit Integer

The bit to reserve, see RESERVED = RSV1 | RSV2 | RSV3 for more details.

Implementation

def reserve!(bit)
	if (@reserved & bit).zero?
		raise ArgumentError, "Unable to use #{bit}!"
	end
	
	@reserved &= ~bit
	
	return true
end

def flush

Flush the underlying framer to ensure all buffered data is written to the connection.

Implementation

def flush
	@framer.flush
end

def open!

Transition the connection to the open state (the default for new connections).

Implementation

def open!
	@state = :open
	
	return self
end

def close!(...)

If not already closed, transition the connection to the closed state and send a close frame. Will try to send a close frame with the specified code and reason, but will ignore any errors that occur while sending.

Implementation

def close!(...)
	unless @state == :closed
		@state = :closed
		
		begin
			send_close(...)
		rescue
			# Ignore errors.
		end
	end
	
	return self
end

def closed?

Signature

returns Boolean

if the connection is in the closed state.

Implementation

def closed?
	@state == :closed
end

def close(...)

Immediately transition the connection to the closed state and close the underlying connection. Any data not yet read will be lost.

Implementation

def close(...)
	close!(...)
	
	@framer.close
end

def close_write(error = nil)

Close the connection gracefully, sending a close frame with the specified error code and reason. If an error occurs while sending the close frame, the connection will be closed immediately. You may continue to read data from the connection after calling this method, but you should not write any more data.

Signature

parameter error Error | Nil

The error that occurred, if any.

Implementation

def close_write(error = nil)
	if error
		send_close(Error::INTERNAL_ERROR, error.message)
	else
		send_close
	end
end

def shutdown

Close the connection gracefully. This will send a close frame and wait for the remote end to respond with a close frame. Any data received after the close frame is sent will be ignored. If you want to process this data, use #close_write instead, and read the data before calling #close.

Implementation

def shutdown
	send_close unless @state == :closed
	
	# `read_frame` will return nil after receiving a close frame:
	while read_frame
		# Drain the connection.
	end
end

def read_frame

Read a frame from the framer, and apply it to the connection.

Implementation

def read_frame
	return nil if closed?
	
	frame = @framer.read_frame
	
	unless (frame.flags & @reserved).zero?
		raise ProtocolError, "Received frame with reserved flags set!"
	end
	
	yield frame if block_given?
	
	frame.apply(self)
	
	return frame
rescue ProtocolError => error
	close(error.code, error.message)
	raise
rescue => error
	close(Error::PROTOCOL_ERROR, error.message)
	raise
end

def write_frame(frame)

Write a frame to the framer. Note: This does not immediately write the frame to the connection, you must call #flush to ensure the frame is written.

Implementation

def write_frame(frame)
	@framer.write_frame(frame)
	
	return frame
end

def receive_text(frame)

Receive a text frame from the connection.

Implementation

def receive_text(frame)
	if @frames.empty?
		@frames << frame
	else
		raise ProtocolError, "Received text, but expecting continuation!"
	end
end

def receive_binary(frame)

Receive a binary frame for the connection.

Implementation

def receive_binary(frame)
	if @frames.empty?
		@frames << frame
	else
		raise ProtocolError, "Received binary, but expecting continuation!"
	end
end

def receive_continuation(frame)

Receive a continuation frame for the connection.

Implementation

def receive_continuation(frame)
	if @frames.any?
		@frames << frame
	else
		raise ProtocolError, "Received unexpected continuation!"
	end
end

def receive_close(frame)

Receive a close frame from the connection.

Implementation

def receive_close(frame)
	code, reason = frame.unpack
	
	# On receiving a close frame, we must enter the closed state:
	close!(code, reason)
	
	if code and code != Error::NO_ERROR
		raise ClosedError.new reason, code
	end
end

def send_ping(data = "")

Send a ping frame with the specified data.

Signature

parameter data String

The data to send in the ping frame.

Implementation

def send_ping(data = "")
	if @state != :closed
		frame = PingFrame.new(mask: @mask)
		frame.pack(data)
		
		write_frame(frame)
	else
		raise ProtocolError, "Cannot send ping in state #{@state}"
	end
end

def receive_ping(frame)

Receive a ping frame from the connection.

Implementation

def receive_ping(frame)
	if @state != :closed
		write_frame(frame.reply(mask: @mask))
	else
		raise ProtocolError, "Cannot receive ping in state #{@state}"
	end
end

def receive_pong(frame)

Receive a pong frame from the connection. By default, this method does nothing.

Implementation

def receive_pong(frame)
	# Ignore.
end

def receive_frame(frame)

Receive a frame that is not a control frame. By default, this method raises a class Protocol::WebSocket::ProtocolError.

Implementation

def receive_frame(frame)
	raise ProtocolError, "Unhandled frame: #{frame}"
end

def pack_text_frame(buffer, **options)

Pack a text frame with the specified buffer. This is used by the #writer interface.

Signature

parameter buffer String

The text to pack into the frame.

returns TextFrame

The packed frame.

Implementation

def pack_text_frame(buffer, **options)
	frame = TextFrame.new(mask: @mask)
	frame.pack(buffer)
	
	return frame
end

def send_text(buffer, **options)

Send a text frame with the specified buffer.

Signature

parameter buffer String

The text to send.

Implementation

def send_text(buffer, **options)
	write_frame(@writer.pack_text_frame(buffer, **options))
end

def pack_binary_frame(buffer, **options)

Pack a binary frame with the specified buffer. This is used by the #writer interface.

Signature

parameter buffer String

The binary data to pack into the frame.

returns BinaryFrame

The packed frame.

Implementation

def pack_binary_frame(buffer, **options)
	frame = BinaryFrame.new(mask: @mask)
	frame.pack(buffer)
	
	return frame
end

def send_binary(buffer, **options)

Send a binary frame with the specified buffer.

Signature

parameter buffer String

The binary data to send.

Implementation

def send_binary(buffer, **options)
	write_frame(@writer.pack_binary_frame(buffer, **options))
end

def send_close(code = Error::NO_ERROR, reason = "")

Send a control frame with data containing a specified control sequence to begin the closing handshake. Does not close the connection, until the remote end responds with a close frame.

Signature

parameter code Integer

The close code to send.

parameter reason String

The reason for closing the connection.

Implementation

def send_close(code = Error::NO_ERROR, reason = "")
	frame = CloseFrame.new(mask: @mask)
	frame.pack(code, reason)
	
	self.write_frame(frame)
	self.flush
end

def write(message, **options)

Write a message to the connection.

Signature

parameter message Message

The message to send.

Implementation

def write(message, **options)
	case message
	when String
		# This is a compatibility shim for the previous implementation. We may want to eventually deprecate this use case... or maybe it's convenient enough to leave it around.
		if message.encoding == Encoding::UTF_8
			return send_text(message, **options)
		else
			return send_binary(message, **options)
		end
	when Message
		message.send(self, **options)
	else
		raise ArgumentError, "Unsupported message type: #{message}"
	end
end

def unpack_frames(frames)

The default implementation for reading a message buffer. This is used by the #reader interface.

Implementation

def unpack_frames(frames)
	frames.map(&:unpack).join("")
end

def read(**options)

Read a message from the connection. If an error occurs while reading the message, the connection will be closed.

If the message is fragmented, this method will buffer the frames until a complete message is received.

Implementation

def read(**options)
	@framer.flush
	
	while read_frame
		if @frames.last&.finished?
			frames = @frames
			@frames = []
			
			buffer = @reader.unpack_frames(frames, **options)
			return frames.first.read_message(buffer)
		end
	end
rescue ProtocolError => error
	close(error.code, error.message)
	raise
end