M lib/command.rb => lib/command.rb +19 -11
@@ 4,6 4,7 @@ require "sentry-ruby"
require "statsd-instrument"
require_relative "customer_repo"
+require_relative "session_manager"
class Command
def self.execution
@@ 27,6 28,8 @@ class Command
end
class Execution
+ class Timeout < SessionManager::Timeout; end
+
class FinalStanza
attr_reader :stanza
@@ 59,8 62,10 @@ class Command
reply.status = :executing
end
yield stanza if block_given?
- COMMAND_MANAGER.write(stanza).then do |new_iq|
+ COMMAND_MANAGER.write(stanza).then { |new_iq|
@iq = new_iq
+ }.catch_only(SessionManager::Timeout) do
+ EMPromise.reject(Timeout.new)
end
end
@@ 103,23 108,26 @@ class Command
protected
def catch_after(promise)
- promise.catch_only(FinalStanza) { |e|
+ promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
+ next EMPromise.reject(iq) unless iq.cancel?
+
+ finish(status: :canceled)
+ }.catch_only(Timeout) {}.catch_only(FinalStanza) { |e|
@blather << e.stanza
}.catch do |e|
log_error(e)
- finish(
- @format_error.call(e), type: :error
- ).catch_only(FinalStanza) do |to_send|
- @blather << to_send.stanza
- end
+ send_final_error(e)
+ end
+ end
+
+ def send_final_error(e)
+ finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
+ @blather << s.stanza
end
end
def log_error(e)
- @log.error(
- "Error raised during #{iq.node}: #{e.class}",
- e
- )
+ @log.error("Error raised during #{iq.node}: #{e.class}", e)
if e.is_a?(::Exception)
sentry_hub.capture_exception(e)
else
A lib/session_manager.rb => lib/session_manager.rb +39 -0
@@ 0,0 1,39 @@
+# frozen_string_literal: true
+
+class SessionManager
+ class Timeout < StandardError; end
+
+ def initialize(blather, id_msg, timeout: 5, error_if: nil)
+ @blather = blather
+ @sessions = {}
+ @id_msg = id_msg
+ @timeout = timeout
+ @error_if = error_if
+ end
+
+ def promise_for(stanza)
+ id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
+ @sessions.fetch(id) do
+ @sessions[id] = EMPromise.new
+ EM.add_timer(@timeout) do
+ @sessions.delete(id)&.reject(Timeout.new)
+ end
+ @sessions[id]
+ end
+ end
+
+ def write(stanza)
+ promise = promise_for(stanza)
+ @blather << stanza
+ promise
+ end
+
+ def fulfill(stanza)
+ id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
+ if stanza.error? || @error_if&.call(stanza)
+ @sessions.delete(id)&.reject(stanza)
+ else
+ @sessions.delete(id)&.fulfill(stanza)
+ end
+ end
+end
M sgx_jmp.rb => sgx_jmp.rb +1 -36
@@ 72,6 72,7 @@ require_relative "lib/payment_methods"
require_relative "lib/registration"
require_relative "lib/transaction"
require_relative "lib/web_register_manager"
+require_relative "lib/session_manager"
require_relative "lib/statsd"
ELECTRUM = Electrum.new(**CONFIG[:electrum])
@@ 275,42 276,6 @@ message :error? do |m|
LOG.error "MESSAGE ERROR", stanza: m
end
-class SessionManager
- def initialize(blather, id_msg, timeout: 5, error_if: nil)
- @blather = blather
- @sessions = {}
- @id_msg = id_msg
- @timeout = timeout
- @error_if = error_if
- end
-
- def promise_for(stanza)
- id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
- @sessions.fetch(id) do
- @sessions[id] = EMPromise.new
- EM.add_timer(@timeout) do
- @sessions.delete(id)&.reject(:timeout)
- end
- @sessions[id]
- end
- end
-
- def write(stanza)
- promise = promise_for(stanza)
- @blather << stanza
- promise
- end
-
- def fulfill(stanza)
- id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
- if stanza.error? || @error_if&.call(stanza)
- @sessions.delete(id)&.reject(stanza)
- else
- @sessions.delete(id)&.fulfill(stanza)
- end
- end
-end
-
IQ_MANAGER = SessionManager.new(self, :id)
COMMAND_MANAGER = SessionManager.new(
self,