~singpolyma/em_fiberscheduler

2e456fea57802b8d69d567fba512b4bb8fd6d147 — Stephen Paul Weber 4 months ago 48534fa
Do not resume dead Fibers

They may be dead due to a Fiber#raise, especially due to Timeout
1 files changed, 26 insertions(+), 8 deletions(-)

M lib/em_fiberscheduler.rb
M lib/em_fiberscheduler.rb => lib/em_fiberscheduler.rb +26 -8
@@ 50,7 50,7 @@ module EventMachine
		end

		def kernel_sleep(duration)
			fiber = Fiber.current
			fiber = FiberWhenAlive.current
			EM.schedule do
				EM.add_timer(duration) { fiber.resume }
			end


@@ 61,8 61,8 @@ module EventMachine
			if timeout
				fiber = Fiber.current
				@timeouts[fiber] = EM.add_timer(timeout) {
					@timeouts.delte(fiber)
					fiber.resume
					@timeouts.delete(fiber)
					FiberWhenAlive.new(fiber).resume
				}
			end
			Fiber.yield


@@ 71,11 71,11 @@ module EventMachine
		def unblock(_, fiber)
			timeout = @timeouts.delete(fiber)
			EM.cancel_timer(timeout) if timeout
			EM.next_tick { fiber.resume }
			EM.next_tick { FiberWhenAlive.new(fiber).resume }
		end

		def io_wait(io_or_fd, events, timeout=nil)
			fiber = Fiber.current
			fiber = FiberWhenAlive.current
			EM.schedule do
				conn = EM.watch(io_or_fd, IOWait)
				conn.fiber = fiber


@@ 88,7 88,7 @@ module EventMachine
		end

		def process_wait(pid, flags)
			fiber = Fiber.current
			fiber = FiberWhenAlive.current
			EM.schedule do
				ProcessWait.for(fiber, pid, flags).call
			end


@@ 97,7 97,7 @@ module EventMachine

		def timeout_after(duration, exception_class, *exception_arguments, &block)
			timer = nil
			fiber = Fiber.current
			fiber = FiberWhenAlive.current
			EM.schedule do
				timer = EM.add_timer(duration) {
					fiber.raise(exception_class, *exception_arguments)


@@ 109,7 109,7 @@ module EventMachine
		end

		def address_resolve(hostname)
			fiber = Fiber.current
			fiber = FiberWhenAlive.current
			EM.schedule do
				defer = DNS::Resolver.resolve(hostname)
				defer.callback(&fiber.method(:resume))


@@ 188,5 188,23 @@ module EventMachine
				detach
			end
		end

		class FiberWhenAlive
			def self.current
				new(Fiber.current)
			end

			def initialize(fiber)
				@fiber = fiber
			end

			def resume(*args)
				@fiber.resume(*args) if @fiber.alive?
			end

			def raise(*args)
				@fiber.raise(*args) if @fiber.alive?
			end
		end
	end
end