class Connection
A native FFI connection to the PostgreSQL client library.
Definitions
def self.connect(types: DEFAULT_TYPES, **options)
Establish a connection to the PostgreSQL server.
Signature
-
parameter
typesHash Type mapping configuration.
-
parameter
optionsHash Connection options (database, username, password, host, port, etc.).
-
returns
Connection A new connected instance.
-
raises
Error If the connection fails.
Implementation
def self.connect(types: DEFAULT_TYPES, **options)
# Postgres expects "dbname" as the key name:
if database = options.delete(:database)
options[:dbname] = database
end
# Postgres expects "user" as the key name:
if username = options.delete(:username)
options[:user] = username
end
keys = Strings.new(options.keys)
values = Strings.new(options.values)
pointer = Native.connect_start_params(keys.array, values.array, 1)
Native.set_nonblocking(pointer, 1)
io = ::IO.new(Native.socket(pointer), "r+", autoclose: false)
while status = Native.connect_poll(pointer)
break if status == :ok || status == :failed
# one of :wait_readable or :wait_writable
io.send(status)
end
if status == :failed
io.close
error_message = Native.error_message(pointer)
Native.finish(pointer)
raise Error, "Could not connect: #{error_message}"
end
return self.new(pointer, io, types)
end
def initialize(address, io, types)
Initialize a native connection wrapper.
Signature
-
parameter
addressFFI::Pointer The pointer to the native connection.
-
parameter
ioIO The IO object for the socket.
-
parameter
typesHash Type mapping configuration.
Implementation
def initialize(address, io, types)
super(address)
@io = io
@types = types
end
attr :types
Signature
-
attribute
Hash The type mapping configuration.
def status
Return the status of the connection.
Implementation
def status
Native.status(self)
end
def error_message
Return the last error message.
Implementation
def error_message
Native.error_message(self)
end
def socket
Return the underlying socket used for IO.
Implementation
def socket
Native.socket(self)
end
def close
Close the connection.
Implementation
def close
Native.finish(self)
@io.close
end
def escape_literal(value)
Escape a literal string value for safe inclusion in SQL queries.
Signature
-
parameter
valueString The value to escape.
-
returns
String The escaped string with quotes.
Implementation
def escape_literal(value)
value = value.to_s
result = Native.escape_literal(self, value, value.bytesize)
string = result.read_string
Native.free_memory(result)
return string
end
def escape_identifier(value)
Escape an identifier for safe inclusion in SQL queries.
Signature
-
parameter
valueString The identifier to escape.
-
returns
String The escaped identifier with quotes.
Implementation
def escape_identifier(value)
value = value.to_s
result = Native.escape_identifier(self, value, value.bytesize)
string = result.read_string
Native.free_memory(result)
return string
end
def single_row_mode!
Enable single row mode for streaming large result sets.
Implementation
def single_row_mode!
Native.set_single_row_mode(self)
end
def send_query(statement)
Send a query to the server for execution.
Signature
-
parameter
statementString The SQL statement to execute.
Implementation
def send_query(statement)
check! Native.send_query(self, statement)
flush
end
def next_result(types: @types)
Get the next result set from a multi-result query.
Signature
-
parameter
typesHash Type mapping to use for this result.
-
returns
Result | Nil The next result set, or
nilif no more results.-
raises
Error If the query resulted in an error.
Implementation
def next_result(types: @types)
if result = self.get_result
status = Native.result_status(result)
if status == :fatal_error
message = Native.result_error_message(result)
Native.clear(result)
raise Error, message
end
return Result.new(self, types, result)
end
end
def discard_results
Silently discard any results that application didn't read.
Implementation
def discard_results
while result = self.get_result
status = Native.result_status(result)
Native.clear(result)
case status
when :copy_in
self.put_copy_end("Discard results")
when :copy_out
self.flush_copy_out
end
end
return nil
end
def flush
After sending any command or data on a nonblocking connection, call PQflush. If it returns 1, wait for the socket to become read- or write-ready. If it becomes write-ready, call PQflush again. If it becomes read-ready, call PQconsumeInput, then call PQflush again. Repeat until PQflush returns 0. (It is necessary to check for read-ready and drain the input with PQconsumeInput, because the server can block trying to send us data, e.g. NOTICE messages, and won't read our data until we read its.) Once PQflush returns 0, wait for the socket to be read-ready and then read the response as described above.
Implementation
def flush
while true
case Native.flush(self)
when 1
@io.wait_any
check! Native.consume_input(self)
when 0
return
end
end
end