# frozen_string_literal: true require "sentry-ruby" require "statsd-instrument" require_relative "customer_repo" require_relative "session_manager" class Command def self.execution Thread.current[:execution] end def self.reply(stanza=nil, &blk) execution.reply(stanza, &blk) end def self.finish(*args, **kwargs, &blk) execution.finish(*args, **kwargs, &blk) end def self.customer execution.customer end def self.log execution.log end class Execution class Timeout < SessionManager::Timeout; end class FinalStanza attr_reader :stanza def initialize(stanza) @stanza = stanza end end attr_reader :customer_repo, :log, :iq def initialize(customer_repo, blather, format_error, iq) @customer_repo = customer_repo @blather = blather @format_error = format_error @iq = iq @log = LOG.child(node: iq.node) end def execute StatsD.increment("command", tags: ["node:#{iq.node}"]) EMPromise.resolve(nil).then { Thread.current[:execution] = self sentry_hub catch_after(yield self) }.catch(&method(:panic)) end def reply(stanza=nil) stanza ||= iq.reply.tap do |reply| reply.status = :executing end yield stanza if block_given? COMMAND_MANAGER.write(stanza).then { |new_iq| @iq = new_iq }.catch_only(SessionManager::Timeout) do EMPromise.reject(Timeout.new) end end def finish(text=nil, type: :info, status: :completed) reply = @iq.reply reply.status = status yield reply if block_given? if text reply.note_type = type reply.note_text = text end EMPromise.reject(FinalStanza.new(reply)) end def sentry_hub return @sentry_hub if @sentry_hub # Stored on Fiber-local in 4.3.1 and earlier # https://github.com/getsentry/sentry-ruby/issues/1495 @sentry_hub = Sentry.get_current_hub raise "Sentry.init has not been called" unless @sentry_hub @sentry_hub.push_scope @sentry_hub.current_scope.clear_breadcrumbs @sentry_hub.current_scope.set_transaction_name(@iq.node) @sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s) @sentry_hub end def customer @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then do |c| sentry_hub.current_scope.set_user( id: c.customer_id, jid: @iq.from.stripped ) c end end protected def catch_after(promise) promise.catch_only(Blather::Stanza::Iq::Command) { |iq| next EMPromise.reject(iq) unless iq.cancel? finish(status: :canceled) }.catch_only(Timeout) {}.catch_only(FinalStanza) { |e| @blather << e.stanza }.catch do |e| log_error(e) send_final_error(e) end end def send_final_error(e) finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s| @blather << s.stanza end end def log_error(e) @log.error("Error raised during #{iq.node}: #{e.class}", e) if e.is_a?(::Exception) sentry_hub.capture_exception(e) else sentry_hub.capture_message(e.to_s) end end end attr_reader :node, :name def initialize( node, name, list_for: ->(tel:, **) { !!tel }, format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s }, &blk ) @node = node @name = name @list_for = list_for @format_error = format_error @blk = blk end def register(blather, guards: [:execute?, node: @node, sessionid: nil]) blather.command(*guards) do |iq| customer_repo = CustomerRepo.new Execution.new(customer_repo, blather, @format_error, iq).execute(&@blk) end self end def list_for?(**kwargs) @list_for.call(**kwargs) end end