Async::IO SourceAsyncIOStream

class Stream

Definitions

def read(size = nil)

Reads size bytes from the stream. If size is not specified, read until end of file.

Implementation

def read(size = nil)
	return String.new(encoding: Encoding::BINARY) if size == 0
	
	if size
		until @eof or @read_buffer.bytesize >= size
			# Compute the amount of data we need to read from the underlying stream:
			read_size = size - @read_buffer.bytesize
			
			# Don't read less than @block_size to avoid lots of small reads:
			fill_read_buffer(read_size > @block_size ? read_size : @block_size)
		end
	else
		until @eof
			fill_read_buffer
		end
	end
	
	return consume_read_buffer(size)
end

def read_partial(size = nil)

Read at most size bytes from the stream. Will avoid reading from the underlying stream if possible.

Implementation

def read_partial(size = nil)
	return String.new(encoding: Encoding::BINARY) if size == 0

	if !@eof and @read_buffer.empty?
		fill_read_buffer
	end
	
	return consume_read_buffer(size)
end

def read_until(pattern, offset = 0, chomp: true)

Efficiently read data from the stream until encountering pattern.

Implementation

def read_until(pattern, offset = 0, chomp: true)
	# We don't want to split on the pattern, so we subtract the size of the pattern.
	split_offset = pattern.bytesize - 1
	
	until index = @read_buffer.index(pattern, offset)
		offset = @read_buffer.bytesize - split_offset
		
		offset = 0 if offset < 0
		
		return unless fill_read_buffer
	end
	
	@read_buffer.freeze
	matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
	@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
	
	return matched
end

def flush

Flushes buffered data to the stream.

Implementation

def flush
	return if @write_buffer.empty?
	
	@writing.acquire do
		# Flip the write buffer and drain buffer:
		@write_buffer, @drain_buffer = @drain_buffer, @write_buffer
		
		begin
			@io.write(@drain_buffer)
		ensure
			# If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
			@drain_buffer.clear
		end
	end
end

def write(string)

Writes string to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying io.

Implementation

def write(string)
	@write_buffer << string
	
	if @write_buffer.bytesize >= @block_size
		flush
	end
	
	return string.bytesize
end

def <<(string)

Writes string to the stream and returns self.

Implementation

def <<(string)
	write(string)
	
	return self
end

def close

Best effort to flush any unwritten data, and then close the underling IO.

Implementation

def close
	return if @io.closed?
	
	begin
		flush
	rescue
		# We really can't do anything here unless we want #close to raise exceptions.
	ensure
		@io.close
	end
end

def eof?

Returns true if the stream is at file which means there is no more data to be read.

Implementation

def eof?
	if !@read_buffer.empty?
		return false
	elsif @eof
		return true
	else
		return @io.eof?
	end
end

def fill_read_buffer(size = @block_size)

Fills the buffer from the underlying stream.

Implementation

def fill_read_buffer(size = @block_size)
	# We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
	if size > @maximum_read_size
		size = @maximum_read_size
	end
	
	# This effectively ties the input and output stream together.
	flush
	
	if @read_buffer.empty?
		if sysread(size, @read_buffer)
			# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
			return true
		end
	else
		if chunk = sysread(size, @input_buffer)
			@read_buffer << chunk
			# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
			
			return true
		end
	end
	
	# else for both cases above:
	@eof = true
	return false
end

def consume_read_buffer(size = nil)

Consumes at most size bytes from the buffer.

Implementation

def consume_read_buffer(size = nil)
	# If we are at eof, and the read buffer is empty, we can't consume anything.
	return nil if @eof && @read_buffer.empty?
	
	result = nil
	
	if size.nil? or size >= @read_buffer.bytesize
		# Consume the entire read buffer:
		result = @read_buffer
		@read_buffer = Buffer.new
	else
		# This approach uses more memory.
		# result = @read_buffer.slice!(0, size)
		
		# We know that we are not going to reuse the original buffer.
		# But byteslice will generate a hidden copy. So let's freeze it first:
		@read_buffer.freeze
		
		result = @read_buffer.byteslice(0, size)
		@read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize)
	end
	
	return result
end