~singpolyma/em_fiberscheduler

29835d2425e8a52287c67f4e419219da2de0ea03 — Stephen Paul Weber 4 months ago 8f76970
Refactor process_wait
1 files changed, 44 insertions(+), 19 deletions(-)

M lib/em_fiberscheduler.rb
M lib/em_fiberscheduler.rb => lib/em_fiberscheduler.rb +44 -19
@@ 87,25 87,7 @@ module EventMachine
		def process_wait(pid, flags)
			fiber = Fiber.current
			EM.schedule do
				conn = EM.watch_process(pid, WatchProcess)
				conn.fiber = fiber
			rescue Unsupported
				fiber do
					# Inspired by bruno-/fiber_scheduler
					reader, writer = IO.pipe
					thread = Thread.new do
						Process::Status.wait(pid, flags)
					ensure
						writer.close
					end

					io_wait(reader, IO::READABLE)
					fiber.resume(thread.value)
				ensure
					reader.close
					writer.close
					thread&.kill
				end
				ProcessWait.for(fiber, pid, flags).call
			end
			Fiber.yield
		end


@@ 133,6 115,49 @@ module EventMachine
			Fiber.yield
		end

		class ProcessWait
			def self.for(fiber, pid, flags)
				# The only way to check this is to just try it
				conn = EM.watch_process(pid, WatchProcess)
				conn.fiber = fiber
				new
			rescue Unsupported
				Fallback.new(fiber, pid, flags)
			end

			def call; end

			# Inspired by bruno-/fiber_scheduler
			class Fallback
				def initialize(fiber, pid, flags)
					@fiber = fiber
					@pid = pid
					@flags = flags
					@reader, @writer = IO.pipe
				end

				def thread
					@thread ||= Thread.new do
						Process::Status.wait(@pid, @flags)
					ensure
						@writer.close
					end
				end

				def call
					thread
					Fiber.schedule do
						Fiber.scheduler&.io_wait(@reader, IO::READABLE)
						@fiber.resume(thread.value)
					ensure
						@reader.close
						@writer.close
						@thread&.kill
					end
				end
			end
		end

		module WatchProcess
			attr_writer :fiber