# frozen_string_literal: true
require "sentry-ruby"
require "statsd-instrument"
require_relative "customer_repo"
require_relative "session_manager"
class Command
def self.execution
Thread.current[:execution]
end
def self.execution=(exe)
Thread.current[:execution] = exe
end
def self.reply(stanza=nil, &blk)
execution.reply(stanza, &blk)
end
def self.finish(*args, **kwargs, &blk)
execution.finish(*args, **kwargs, &blk)
end
def self.customer
execution.customer
end
def self.log
execution.log
end
class Execution
class Timeout < SessionManager::Timeout; end
class FinalStanza
attr_reader :stanza
def initialize(stanza)
@stanza = stanza
end
end
attr_reader :customer_repo, :log, :iq
def initialize(customer_repo, blather, format_error, iq)
@customer_repo = customer_repo
@blather = blather
@format_error = format_error
@iq = iq
@log = LOG.child(node: iq.node)
end
def execute
StatsD.increment("command", tags: ["node:#{iq.node}"])
EMPromise.resolve(nil).then {
Command.execution = self
sentry_hub
catch_after(EMPromise.resolve(yield self))
}.catch(&method(:panic))
end
def reply(stanza=nil)
stanza ||= iq.reply.tap { |reply| reply.status = :executing }
yield stanza if block_given?
COMMAND_MANAGER.write(stanza).then { |new_iq|
@iq = new_iq
}.catch_only(Blather::Stanza::Iq) { |new_iq|
@iq = new_iq if new_iq.set?
EMPromise.reject(new_iq)
}.catch_only(SessionManager::Timeout) do
EMPromise.reject(Timeout.new)
end
end
def finish(text=nil, type: :info, status: :completed)
reply = @iq.reply
reply.status = status
yield reply if block_given?
if text
reply.note_type = type
reply.note_text = text
end
EMPromise.reject(FinalStanza.new(reply))
end
def sentry_hub
return @sentry_hub if @sentry_hub
# Stored on Fiber-local in 4.3.1 and earlier
# https://github.com/getsentry/sentry-ruby/issues/1495
@sentry_hub = Sentry.get_current_hub
raise "Sentry.init has not been called" unless @sentry_hub
@sentry_hub.push_scope
@sentry_hub.current_scope.clear_breadcrumbs
@sentry_hub.current_scope.set_transaction_name(@iq.node)
@sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
@sentry_hub
end
def customer
@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
sentry_hub.current_scope.set_user(
id: c.customer_id,
jid: @iq.from.stripped
)
c
}
end
protected
def catch_after(promise)
promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
next EMPromise.reject(iq) unless iq.cancel?
finish(status: :canceled)
}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
@blather << e.stanza
}.catch do |e|
log_error(e)
send_final_error(e)
end
end
def send_final_error(e)
finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
@blather << s.stanza
end
end
def log_error(e)
@log.error("Error raised during #{iq.node}: #{e.class}", e)
if e.is_a?(::Exception)
sentry_hub.capture_exception(e)
else
sentry_hub.capture_message(e.to_s)
end
end
end
attr_reader :node, :name
def initialize(
node,
name,
customer_repo: CustomerRepo.new,
list_for: ->(tel:, **) { !!tel },
format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
&blk
)
@node = node
@name = name
@customer_repo = customer_repo
@list_for = list_for
@format_error = format_error
@blk = blk
end
def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
blather.command(*guards) do |iq|
Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
end
self
end
def list_for?(**kwargs)
@list_for.call(**kwargs)
end
end