~singpolyma/em_promise.rb

3b7ee7c83111d02467196c4911573e8168b240cd — Stephen Paul Weber 3 months ago 7232fcf 0.0.3
Defer through a Fiber trampoline

Instead of running all defer blocks on the EM reactor fiber, run in a dedicated
Fiber trampoline.  Inherit the trampoline from scope if present, or from the
promise you call #then on, or else make a new trampoline.  This means that all
callbacks on promises in a logical chain run in the same Fiber as each other,
but a different one from all other code, so Fiber-local variables are scoped to
the logical chain, like so:

    EMPromise.resolve(nil).then {
    	Thread.current[:fiber_local] = 123
    	promise_timer(1)
    }.then {
    	p Thread.current[:fiber_local] # 123
    }

This also allows us to mix promise-based and fiber-based code arbitrarily,
without any extra special helpers, including #sync itself, for example:

    EMPromise.resolve(nil).then {
		promise_timer(1).sync
		DB.exec("SQL")
    }
2 files changed, 64 insertions(+), 5 deletions(-)

M .rubocop.yml
M lib/em_promise.rb
M .rubocop.yml => .rubocop.yml +3 -0
@@ 1,3 1,6 @@
AllCops:
  TargetRubyVersion: 2.5

Metrics/LineLength:
  Max: 80


M lib/em_promise.rb => lib/em_promise.rb +61 -5
@@ 1,9 1,53 @@
# frozen_string_literal: true

require "eventmachine"
require "fiber"
require "promise"

class EMPromise < Promise
	def initialize(deferrable=nil)
	class Trampoline
		attr_accessor :ready

		def initialize
			@ready = true
			@fiber = Fiber.new do |blk|
				Thread.current[:_em_promise_trampoline] = self
				loop do
					result = blk.call
					self.ready = true
					blk = Fiber.yield result
					self.ready = false
				end
			end
		end

		def submit(&blk)
			EM.next_tick do
				if ready
					@fiber.resume(blk)
				else
					submit(&blk)
				end
			end
		end
	end

	# Make sure that self.class.new inherits the trampoline
	def class
		tramp = @trampoline
		klass = super.dup
		klass.define_singleton_method(:new) do |*args|
			super(*args, trampoline: tramp)
		end
		klass
	end

	def initialize(deferrable=nil, trampoline: nil)
		super()
		@trampoline =
			trampoline ||
			Thread.current[:_em_promise_trampoline] ||
			Trampoline.new
		fulfill(deferrable) if deferrable
	end



@@ 17,17 61,29 @@ class EMPromise < Promise
	end

	def defer
		EM.next_tick { yield }
		@trampoline.submit { yield }
	end

	def wait
		fiber = Fiber.current
		resume = proc do |arg|
			defer { fiber.resume(arg) }
		resume = proc do
			EM.next_tick { fiber.resume }
		end

		self.then(resume, resume)
		Fiber.yield

		# We might be in a trampoline, so keep that going
		tramp = Thread.current[:_em_promise_trampoline]
		raise "Trampoline already ready?" if tramp&.ready

		result = nil
		loop do
			tramp&.ready = true
			blk = Fiber.yield result
			tramp&.ready = false
			result = blk&.call
			break unless blk
		end
	end

	def self.reject(e)