# frozen_string_literal: true require "sentry-ruby" require "statsd-instrument" require_relative "customer_repo" require_relative "session_manager" class Command def self.execution Thread.current[:execution] end def self.execution=(exe) Thread.current[:execution] = exe end def self.reply(stanza=nil, &blk) execution.reply(stanza, &blk) end def self.finish(*args, **kwargs, &blk) execution.finish(*args, **kwargs, &blk) end def self.customer execution.customer end def self.log execution.log end class Execution class Timeout < SessionManager::Timeout; end class FinalStanza attr_reader :stanza def initialize(stanza) @stanza = stanza end end attr_reader :customer_repo, :log, :iq def initialize(customer_repo, blather, format_error, iq) @customer_repo = customer_repo @blather = blather @format_error = format_error @iq = iq @log = Thread.current[:log] || LOG end def execute StatsD.increment("command", tags: ["node:#{iq.node}"]) EMPromise.resolve(nil).then { Command.execution = self catch_after(EMPromise.resolve(yield self)) } end def reply(stanza=nil) stanza ||= iq.reply.tap { |reply| reply.status = :executing } yield stanza if block_given? COMMAND_MANAGER.write(stanza).then { |new_iq| @iq = new_iq }.catch_only(Blather::Stanza::Iq) { |new_iq| @iq = new_iq if new_iq.set? EMPromise.reject(new_iq) }.catch_only(SessionManager::Timeout) do EMPromise.reject(Timeout.new) end end def finish(text=nil, type: :info, status: :completed) reply = @iq.reply reply.status = status EMPromise.resolve(block_given? ? yield(reply) : nil).then { if text reply.note_type = type reply.note_text = text end }.then do EMPromise.reject(FinalStanza.new(reply)) end end def customer @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c| Sentry.set_user( id: c.customer_id, jid: @iq.from.stripped ) c } end protected def catch_after(promise) promise.catch_only(Blather::Stanza::Iq::Command) { |iq| next EMPromise.reject(iq) unless iq.cancel? finish(status: :canceled) }.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e| @blather << e.stanza }.catch do |e| e = RuntimeError.new(e) if e.is_a?(String) send_final_error(e) EMPromise.reject(e) end end def send_final_error(e) def e.replied? true end finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s| @blather << s.stanza end end end attr_reader :node, :name def initialize( node, name, customer_repo: CustomerRepo.new, list_for: ->(tel:, **) { !!tel }, format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s }, &blk ) @node = node @name = name @customer_repo = customer_repo @list_for = list_for @format_error = format_error @blk = blk end def register(blather, guards: [:execute?, { node: @node, sessionid: nil }]) blather.command(*guards) do |iq| Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk) end self end def list_for?(**kwargs) @list_for.call(**kwargs) end end