AsyncSourceAsyncPromise

class Promise

A promise represents a value that will be available in the future. Unlike Condition, once resolved (or rejected), all future waits return immediately with the stored value or raise the stored exception.

This is thread-safe and integrates with the fiber scheduler.

Signature

public

Since Async v2.

Nested

Definitions

def initialize

Create a new promise.

Implementation

def initialize
	# nil = pending, :completed = success, :failed = failure, :cancelled = cancelled:
	@resolved = nil
	
	# Stores either the result value or the exception:
	@value = nil
	
	# Track how many fibers are currently waiting:
	@waiting = 0
	
	@mutex = Mutex.new
	@condition = ConditionVariable.new
end

def resolved?

Signature

returns Boolean

Whether the promise has been resolved or rejected.

Implementation

def resolved?
	@mutex.synchronize {!!@resolved}
end

def resolved

  • private

Signature

returns Symbol | Nil

The internal resolved state (:completed, :failed, :cancelled, or nil if pending).

private

For internal use by Task.

Implementation

def resolved
	@mutex.synchronize {@resolved}
end

def cancelled?

Signature

returns Boolean

Whether the promise has been cancelled.

Implementation

def cancelled?
	@mutex.synchronize {@resolved == :cancelled}
end

def failed?

Signature

returns Boolean

Whether the promise failed with an exception.

Implementation

def failed?
	@mutex.synchronize {@resolved == :failed}
end

def completed?

Signature

returns Boolean

Whether the promise has completed successfully.

Implementation

def completed?
	@mutex.synchronize {@resolved == :completed}
end

def waiting?

Signature

returns Boolean

Whether any fibers are currently waiting for this promise.

Implementation

def waiting?
	@mutex.synchronize {@waiting > 0}
end

def suppress_warnings!

  • private

Artificially mark that someone is waiting (useful for suppressing warnings).

Signature

private

Internal use only.

Implementation

def suppress_warnings!
	@mutex.synchronize {@waiting += 1}
end

def value

Non-blocking access to the current value. Returns nil if not yet resolved. Does not raise exceptions even if the promise was rejected or cancelled. For resolved promises, returns the raw stored value (result, exception, or cancel exception).

Signature

returns Object | Nil

The stored value, or nil if pending.

Implementation

def value
	@mutex.synchronize {@resolved ? @value : nil}
end

def wait

Wait for the promise to be resolved and return the value. If already resolved, returns immediately. If rejected, raises the stored exception.

Signature

returns Object

The resolved value.

raises Exception

The rejected or cancelled exception.

Implementation

def wait
	@mutex.synchronize do
		# Increment waiting count:
		@waiting += 1
		
		begin
			# Wait for resolution if not already resolved:
			@condition.wait(@mutex) unless @resolved
			
			# Return value or raise exception based on resolution type:
			if @resolved == :completed
				return @value
			else
				# Both :failed and :cancelled store exceptions in @value
				raise @value
			end
		ensure
			# Decrement waiting count when done:
			@waiting -= 1
		end
	end
end

def resolve(value)

Resolve the promise with a value. All current and future waiters will receive this value. Can only be called once - subsequent calls are ignored.

Signature

parameter value Object

The value to resolve the promise with.

Implementation

def resolve(value)
	@mutex.synchronize do
		return if @resolved
		
		@value = value
		@resolved = :completed
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return value
end

def reject(exception)

Reject the promise with an exception. All current and future waiters will receive this exception. Can only be called once - subsequent calls are ignored.

Signature

parameter exception Exception

The exception to reject the promise with.

Implementation

def reject(exception)
	@mutex.synchronize do
		return if @resolved
		
		@value = exception
		@resolved = :failed
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return nil
end

def cancel(exception = Cancel.new("Promise was cancelled!"))

Cancel the promise, indicating cancellation. All current and future waiters will receive nil. Can only be called on pending promises - no-op if already resolved.

Implementation

def cancel(exception = Cancel.new("Promise was cancelled!"))
	@mutex.synchronize do
		# No-op if already in any final state
		return if @resolved
		
		@value = exception
		@resolved = :cancelled
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return nil
end

def fulfill(&block)

Resolve the promise with the result of the block. If the block raises an exception, the promise will be rejected. If the promise was already resolved, the block will not be called.

Signature

yields {...}

The block to call to resolve the promise.

returns Object

The result of the block.

Implementation

def fulfill(&block)
	raise "Promise already resolved!" if @resolved
	
	begin
		return self.resolve(yield)
	rescue Cancel => exception
		return self.cancel(exception)
	rescue => error
		return self.reject(error)
	rescue Exception => exception
		self.reject(exception)
		raise
	ensure
		# Handle non-local exits (throw, etc.) that bypass normal flow:
		self.resolve(nil) unless @resolved
	end
end

Discussion