~singpolyma/em_fiberscheduler

ed23246a06982427e73bdd35e554bfd765329fa2 — Stephen Paul Weber 4 months ago 87a7b92
Import io_read and io_write from fiber_scheduler
3 files changed, 83 insertions(+), 0 deletions(-)

M .rubocop.yml
M lib/em_fiberscheduler.rb
A lib/em_fiberscheduler/from_fiber_scheduler.rb
M .rubocop.yml => .rubocop.yml +1 -0
@@ 41,6 41,7 @@ Naming/MethodParameterName:
    - iq
    - db
    - to
    - io

Layout/IndentationStyle:
  Enabled: false

M lib/em_fiberscheduler.rb => lib/em_fiberscheduler.rb +2 -0
@@ 3,6 3,8 @@
require "eventmachine"
require "fiber"

require_relative "em_fiberscheduler/from_fiber_scheduler"

module EventMachine
	def self.work_left?
		if @timers.empty? && @next_tick_queue.empty? && defers_finished?

A lib/em_fiberscheduler/from_fiber_scheduler.rb => lib/em_fiberscheduler/from_fiber_scheduler.rb +80 -0
@@ 0,0 1,80 @@
# frozen_string_literal: true

# From https://github.com/bruno-/fiber_scheduler/blob/main/lib/fiber_scheduler/selector.rb
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
if RUBY_VERSION >= "3.1.0"
	module EventMachine
		class FiberScheduler
			EAGAIN = Errno::EAGAIN::Errno

			def io_read(io, buffer, length)
				offset = 0

				loop do
					maximum_size = buffer.size - offset

					result = Fiber.new(blocking: true) {
						io.read_nonblock(maximum_size, exception: false)
					}.resume

					case result
					when :wait_readable
						return -EAGAIN unless maximum_size.positive?

						io_wait(io, IO::READABLE)
					when :wait_writable
						return -EAGAIN unless maximum_size.positive?

						io_wait(io, IO::WRITABLE)
					when nil
						break
					else
						buffer.set_string(result, offset)

						size = result.bytesize
						offset += size
						break if size >= length

						length -= size
					end
				end

				offset
			end

			def io_write(io, buffer, length)
				offset = 0

				loop do
					maximum_size = buffer.size - offset

					chunk = buffer.get_string(offset, maximum_size)
					result = Fiber.new(blocking: true) {
						io.write_nonblock(chunk, exception: false)
					}.resume

					case result
					when :wait_readable
						return -EAGAIN unless length.positive?

						io_wait(io, IO::READABLE)
					when :wait_writable
						return -EAGAIN unless length.positive?

						io_wait(io, IO::WRITABLE)
					else
						offset += result
						break if result >= length

						length -= result
					end
				end

				offset
			end
		end
	end
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength