class Stream
Definitions
def read(size = nil)
Reads size
bytes from the stream. If size is not specified, read until end of file.
Implementation
def read(size = nil)
return String.new(encoding: Encoding::BINARY) if size == 0
if size
until @eof or @read_buffer.bytesize >= size
# Compute the amount of data we need to read from the underlying stream:
read_size = size - @read_buffer.bytesize
# Don't read less than @block_size to avoid lots of small reads:
fill_read_buffer(read_size > @block_size ? read_size : @block_size)
end
else
until @eof
fill_read_buffer
end
end
return consume_read_buffer(size)
end
def read_partial(size = nil)
Read at most size
bytes from the stream. Will avoid reading from the underlying stream if possible.
Implementation
def read_partial(size = nil)
return String.new(encoding: Encoding::BINARY) if size == 0
if !@eof and @read_buffer.empty?
fill_read_buffer
end
return consume_read_buffer(size)
end
def read_until(pattern, offset = 0, chomp: true)
Efficiently read data from the stream until encountering pattern.
Implementation
def read_until(pattern, offset = 0, chomp: true)
# We don't want to split on the pattern, so we subtract the size of the pattern.
split_offset = pattern.bytesize - 1
until index = @read_buffer.index(pattern, offset)
offset = @read_buffer.bytesize - split_offset
offset = 0 if offset < 0
return unless fill_read_buffer
end
@read_buffer.freeze
matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
return matched
end
def flush
Flushes buffered data to the stream.
Implementation
def flush
return if @write_buffer.empty?
@writing.acquire do
# Flip the write buffer and drain buffer:
@write_buffer, @drain_buffer = @drain_buffer, @write_buffer
begin
@io.write(@drain_buffer)
ensure
# If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
@drain_buffer.clear
end
end
end
def write(string)
Writes string
to the buffer. When the buffer is full or #sync is true the
buffer is flushed to the underlying io
.
Implementation
def write(string)
@write_buffer << string
if @write_buffer.bytesize >= @block_size
flush
end
return string.bytesize
end
def <<(string)
Writes string
to the stream and returns self.
Implementation
def <<(string)
write(string)
return self
end
def close
Best effort to flush any unwritten data, and then close the underling IO.
Implementation
def close
return if @io.closed?
begin
flush
rescue
# We really can't do anything here unless we want #close to raise exceptions.
ensure
@io.close
end
end
def eof?
Returns true if the stream is at file which means there is no more data to be read.
Implementation
def eof?
if !@read_buffer.empty?
return false
elsif @eof
return true
else
return @io.eof?
end
end
def fill_read_buffer(size = @block_size)
Fills the buffer from the underlying stream.
Implementation
def fill_read_buffer(size = @block_size)
# We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
if size > @maximum_read_size
size = @maximum_read_size
end
# This effectively ties the input and output stream together.
flush
if @read_buffer.empty?
if sysread(size, @read_buffer)
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
return true
end
else
if chunk = sysread(size, @input_buffer)
@read_buffer << chunk
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
return true
end
end
# else for both cases above:
@eof = true
return false
end
def consume_read_buffer(size = nil)
Consumes at most size
bytes from the buffer.
Implementation
def consume_read_buffer(size = nil)
# If we are at eof, and the read buffer is empty, we can't consume anything.
return nil if @eof && @read_buffer.empty?
result = nil
if size.nil? or size >= @read_buffer.bytesize
# Consume the entire read buffer:
result = @read_buffer
@read_buffer = Buffer.new
else
# This approach uses more memory.
# result = @read_buffer.slice!(0, size)
# We know that we are not going to reuse the original buffer.
# But byteslice will generate a hidden copy. So let's freeze it first:
@read_buffer.freeze
result = @read_buffer.byteslice(0, size)
@read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize)
end
return result
end