DB::PostgresSourceDBPostgresNativeConnection

class Connection

A native FFI connection to the PostgreSQL client library.

Definitions

def self.connect(types: DEFAULT_TYPES, **options)

Establish a connection to the PostgreSQL server.

Signature

parameter types Hash

Type mapping configuration.

parameter options Hash

Connection options (database, username, password, host, port, etc.).

returns Connection

A new connected instance.

raises Error

If the connection fails.

Implementation

def self.connect(types: DEFAULT_TYPES, **options)
	# Postgres expects "dbname" as the key name:
	if database = options.delete(:database)
		options[:dbname] = database
	end
	
	# Postgres expects "user" as the key name:
	if username = options.delete(:username)
		options[:user] = username
	end
	
	keys = Strings.new(options.keys)
	values = Strings.new(options.values)
	
	pointer = Native.connect_start_params(keys.array, values.array, 1)
	Native.set_nonblocking(pointer, 1)
	
	io = ::IO.new(Native.socket(pointer), "r+", autoclose: false)
	
	while status = Native.connect_poll(pointer)
		break if status == :ok || status == :failed
		
		# one of :wait_readable or :wait_writable
		io.send(status)
	end
	
	if status == :failed
		io.close
		
		error_message = Native.error_message(pointer)
		
		Native.finish(pointer)
		
		raise Error, "Could not connect: #{error_message}"
	end
	
	return self.new(pointer, io, types)
end

def initialize(address, io, types)

Initialize a native connection wrapper.

Signature

parameter address FFI::Pointer

The pointer to the native connection.

parameter io IO

The IO object for the socket.

parameter types Hash

Type mapping configuration.

Implementation

def initialize(address, io, types)
	super(address)
	
	@io = io
	@types = types
end

attr :types

Signature

attribute Hash

The type mapping configuration.

def status

Return the status of the connection.

Implementation

def status
	Native.status(self)
end

def error_message

Return the last error message.

Implementation

def error_message
	Native.error_message(self)
end

def socket

Return the underlying socket used for IO.

Implementation

def socket
	Native.socket(self)
end

def close

Close the connection.

Implementation

def close
	Native.finish(self)
	
	@io.close
end

def escape_literal(value)

Escape a literal string value for safe inclusion in SQL queries.

Signature

parameter value String

The value to escape.

returns String

The escaped string with quotes.

Implementation

def escape_literal(value)
	value = value.to_s
	
	result = Native.escape_literal(self, value, value.bytesize)
	
	string = result.read_string
	
	Native.free_memory(result)
	
	return string
end

def escape_identifier(value)

Escape an identifier for safe inclusion in SQL queries.

Signature

parameter value String

The identifier to escape.

returns String

The escaped identifier with quotes.

Implementation

def escape_identifier(value)
	value = value.to_s
	
	result = Native.escape_identifier(self, value, value.bytesize)
	
	string = result.read_string
	
	Native.free_memory(result)
	
	return string
end

def single_row_mode!

Enable single row mode for streaming large result sets.

Implementation

def single_row_mode!
	Native.set_single_row_mode(self)
end

def send_query(statement)

Send a query to the server for execution.

Signature

parameter statement String

The SQL statement to execute.

Implementation

def send_query(statement)
	check! Native.send_query(self, statement)
	
	flush
end

def next_result(types: @types)

Get the next result set from a multi-result query.

Signature

parameter types Hash

Type mapping to use for this result.

returns Result | Nil

The next result set, or nil if no more results.

raises Error

If the query resulted in an error.

Implementation

def next_result(types: @types)
	if result = self.get_result
		status = Native.result_status(result)
		
		if status == :fatal_error
			message = Native.result_error_message(result)
			
			Native.clear(result)
			
			raise Error, message
		end
		
		return Result.new(self, types, result)
	end
end

def discard_results

Silently discard any results that application didn't read.

Implementation

def discard_results
	while result = self.get_result
		status = Native.result_status(result)
		Native.clear(result)
		
		case status
		when :copy_in
			self.put_copy_end("Discard results")
		when :copy_out
			self.flush_copy_out
		end
	end
	
	return nil
end

def flush

After sending any command or data on a nonblocking connection, call PQflush. If it returns 1, wait for the socket to become read- or write-ready. If it becomes write-ready, call PQflush again. If it becomes read-ready, call PQconsumeInput, then call PQflush again. Repeat until PQflush returns 0. (It is necessary to check for read-ready and drain the input with PQconsumeInput, because the server can block trying to send us data, e.g. NOTICE messages, and won't read our data until we read its.) Once PQflush returns 0, wait for the socket to be read-ready and then read the response as described above.

Implementation

def flush
	while true
		case Native.flush(self)
		when 1
			@io.wait_any
			
			check! Native.consume_input(self)
		when 0
			return
		end
	end
end