# frozen_string_literal: true require "pg/em/connection_pool" require "bigdecimal" require "blather/client/dsl" # Require this first to not auto-include require "blather/client" require "braintree" require "date" require "dhall" require "em-hiredis" require "em_promise" require "ruby-bandwidth-iris" require "sentry-ruby" require "statsd-instrument" Sentry.init CONFIG = Dhall::Coder .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc]) .load(ARGV[0], transform_keys: ->(k) { k&.to_sym }) singleton_class.class_eval do include Blather::DSL Blather::DSL.append_features(self) end require_relative "lib/alt_top_up_form" require_relative "lib/add_bitcoin_address" require_relative "lib/backend_sgx" require_relative "lib/bandwidth_tn_order" require_relative "lib/btc_sell_prices" require_relative "lib/buy_account_credit_form" require_relative "lib/command_list" require_relative "lib/customer" require_relative "lib/electrum" require_relative "lib/em" require_relative "lib/payment_methods" require_relative "lib/registration" require_relative "lib/transaction" require_relative "lib/web_register_manager" ELECTRUM = Electrum.new(**CONFIG[:electrum]) Faraday.default_adapter = :em_synchrony BandwidthIris::Client.global_options = { account_id: CONFIG[:creds][:account], username: CONFIG[:creds][:username], password: CONFIG[:creds][:password] } def new_sentry_hub(stanza, name: nil) hub = Sentry.get_current_hub&.new_from_top raise "Sentry.init has not been called" unless hub hub.push_scope hub.current_scope.clear_breadcrumbs hub.current_scope.set_transaction_name(name) if name hub.current_scope.set_user(jid: stanza.from.stripped.to_s) hub end # Braintree is not async, so wrap in EM.defer for now class AsyncBraintree def initialize(environment:, merchant_id:, public_key:, private_key:, **) @gateway = Braintree::Gateway.new( environment: environment, merchant_id: merchant_id, public_key: public_key, private_key: private_key ) end def respond_to_missing?(m, *) @gateway.respond_to?(m) end def method_missing(m, *args) return super unless respond_to_missing?(m, *args) EM.promise_defer(klass: PromiseChain) do @gateway.public_send(m, *args) end end class PromiseChain < EMPromise def respond_to_missing?(*) false # We don't actually know what we respond to... end def method_missing(m, *args) return super if respond_to_missing?(m, *args) self.then { |o| o.public_send(m, *args) } end end end BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree]) def panic(e, hub=nil) m = e.respond_to?(:message) ? e.message : e warn "Error raised during event loop: #{e.class}: #{m}" warn e.backtrace if e.respond_to?(:backtrace) if e.is_a?(::Exception) (hub || Sentry).capture_exception(e, hint: { background: false }) else (hub || Sentry).capture_message(e.to_s, hint: { background: false }) end exit 1 end EM.error_handler(&method(:panic)) when_ready do BLATHER = self REDIS = EM::Hiredis.connect BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id]) DB = PG::EM::ConnectionPool.new(dbname: "jmp") DB.type_map_for_results = PG::BasicTypeMapForResults.new(DB) DB.type_map_for_queries = PG::BasicTypeMapForQueries.new(DB) EM.add_periodic_timer(3600) do ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host]) ping.from = CONFIG[:component][:jid] self << ping end end # workqueue_count MUST be 0 or else Blather uses threads! setup( CONFIG[:component][:jid], CONFIG[:component][:secret], CONFIG[:server][:host], CONFIG[:server][:port], nil, nil, workqueue_count: 0 ) message to: /\Aaccount@/, body: /./ do |m| StatsD.increment("deprecated_account_bot") self << m.reply.tap do |out| out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com" end end before( :iq, type: [:error, :result], to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ ) { |iq| halt if IQ_MANAGER.fulfill(iq) } before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s| StatsD.increment("stanza_customer") sentry_hub = new_sentry_hub(s, name: "stanza_customer") Customer.for_customer_id( s.to.node.delete_prefix("customer_") ).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: s.from.stripped.to_s ) customer.stanza_to(s) }.catch { |e| panic(e, sentry_hub) } halt end # Ignore messages to component # Especially if we have the component join MUC for notifications message(to: /\A#{CONFIG[:component][:jid]}\Z/) { true } def billable_message(m) !m.body.empty? || m.find("ns:x", ns: OOB.registered_ns).first end message do |m| StatsD.increment("message") sentry_hub = new_sentry_hub(m, name: "message") today = Time.now.utc.to_date Customer.for_jid(m.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: m.from.stripped.to_s ) EMPromise.all([ customer, (customer.incr_message_usage if billable_message(m)), REDIS.exists("jmp_usage_notify-#{customer.customer_id}"), customer.stanza_from(m) ]) }.then { |(customer, _, already, _)| next if already == 1 customer.message_usage((today..(today - 30))).then do |usage| next unless usage > 500 BLATHER.join(CONFIG[:notify_admin], "sgx-jmp") BLATHER.say( CONFIG[:notify_admin], "#{customer.customer_id} has used #{usage} messages since #{today - 30}", :groupchat ) REDIS.set("jmp_usage_notify-#{customer.customer_id}", ex: 60 * 60 * 24) end }.catch { |e| panic(e, sentry_hub) } end message :error? do |m| StatsD.increment("message_error") puts "MESSAGE ERROR: #{m.inspect}" end class SessionManager def initialize(blather, id_msg, timeout: 5) @blather = blather @sessions = {} @id_msg = id_msg @timeout = timeout end def promise_for(stanza) id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}" @sessions.fetch(id) do @sessions[id] = EMPromise.new EM.add_timer(@timeout) do @sessions.delete(id)&.reject(:timeout) end @sessions[id] end end def write(stanza) promise = promise_for(stanza) @blather << stanza promise end def fulfill(stanza) id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}" if stanza.error? @sessions.delete(id)&.reject(stanza) else @sessions.delete(id)&.fulfill(stanza) end end end IQ_MANAGER = SessionManager.new(self, :id) COMMAND_MANAGER = SessionManager.new(self, :sessionid, timeout: 60 * 60) web_register_manager = WebRegisterManager.new disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq| reply = iq.reply reply.identities = [{ name: "JMP.chat", type: "sms", category: "gateway" }] form = Blather::Stanza::X.find_or_create(reply.query) form.type = "result" form.fields = [ { var: "FORM_TYPE", type: "hidden", value: "http://jabber.org/network/serverinfo" } ] + CONFIG[:xep0157] self << reply end disco_items node: "http://jabber.org/protocol/commands" do |iq| StatsD.increment("command_list") sentry_hub = new_sentry_hub(iq, name: iq.node) reply = iq.reply CommandList.for(iq.from.stripped).then { |list| reply.items = list.map do |item| Blather::Stanza::DiscoItems::Item.new( iq.to, item[:node], item[:name] ) end self << reply }.catch { |e| panic(e, sentry_hub) } end iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq| StatsD.increment("extdisco") reply = iq.reply reply << Nokogiri::XML::Builder.new { services(xmlns: "urn:xmpp:extdisco:2") do service( type: "sip", host: CONFIG[:sip_host] ) end }.doc.root self << reply end command :execute?, node: "jabber:iq:register", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) EMPromise.resolve(nil).then { Customer.for_jid(iq.from.stripped) }.catch { sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new( message: "Customer.create" )) Customer.create(iq.from.stripped) }.then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: iq.from.stripped.to_s ) sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new( message: "Registration.for" )) Registration.for( iq, customer, web_register_manager ).then(&:write) }.catch_only(Blather::Stanza) { |reply| self << reply }.catch { |e| panic(e, sentry_hub) } end def reply_with_note(iq, text, type: :info) reply = iq.reply reply.status = :completed reply.note_type = type reply.note_text = text self << reply end # Commands that just pass through to the SGX command node: [ "number-display", "configure-calls", "record-voicemail-greeting" ] do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) Customer.for_jid(iq.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: iq.from.stripped.to_s ) customer.stanza_from(iq) }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "credit cards", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) reply = iq.reply reply.status = :completed Customer.for_jid(iq.from.stripped).then { |customer| oob = OOB.find_or_create(reply.command) oob.url = CONFIG[:credit_card_url].call( reply.to.stripped.to_s.gsub("\\", "%5C"), customer.customer_id ) oob.desc = "Manage credits cards and settings" reply.note_type = :info reply.note_text = "#{oob.desc}: #{oob.url}" self << reply }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "top up", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) reply = iq.reply reply.allowed_actions = [:complete] Customer.for_jid(iq.from.stripped).then { |customer| BuyAccountCreditForm.for(customer).then do |credit_form| credit_form.add_to_form(reply.form) COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] } end }.then { |(customer, credit_form, iq2)| iq = iq2 # This allows the catch to use it also Transaction.sale(customer, **credit_form.parse(iq2.form)) }.then { |transaction| transaction.insert.then do reply_with_note(iq, "#{transaction} added to your account balance.") end }.catch_only(BuyAccountCreditForm::AmountValidationError) { |e| reply_with_note(iq, e.message, type: :error) }.catch { |e| sentry_hub.capture_exception(e) text = "Failed to buy credit, system said: #{e.message}" reply_with_note(iq, text, type: :error) }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "alt top up", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) reply = iq.reply reply.status = :executing reply.allowed_actions = [:complete] Customer.for_jid(iq.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: iq.from.stripped.to_s ) EMPromise.all([AltTopUpForm.for(customer), customer]) }.then { |(alt_form, customer)| reply.command << alt_form.form COMMAND_MANAGER.write(reply).then do |iq2| AddBitcoinAddress.for(iq2, alt_form, customer).write end }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "reset sip account", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) Customer.for_jid(iq.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: iq.from.stripped.to_s ) customer.reset_sip_account }.then { |sip_account| reply = iq.reply reply.command << sip_account.form BLATHER << reply }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "usage", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) report_for = (Date.today..(Date.today << 1)) Customer.for_jid(iq.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: iq.from.stripped.to_s ) customer.usage_report(report_for) }.then { |usage_report| reply = iq.reply reply.status = :completed reply.command << usage_report.form BLATHER << reply }.catch { |e| panic(e, sentry_hub) } end command :execute?, node: "web-register", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) begin jid = iq.form.field("jid")&.value.to_s.strip tel = iq.form.field("tel")&.value.to_s.strip hub.current_scope.set_user(jid: jid, tel: tel) if iq.from.stripped != CONFIG[:web_register][:from] BLATHER << iq.as_error("forbidden", :auth) elsif jid == "" || tel !~ /\A\+\d+\Z/ reply_with_note(iq, "Invalid JID or telephone number.", type: :error) else IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd| cmd.to = CONFIG[:web_register][:to] cmd.node = "push-register" cmd.form.fields = [var: "to", value: jid] cmd.form.type = "submit" }).then { |result| final_jid = result.form.field("from")&.value.to_s.strip web_register_manager[final_jid] = tel BLATHER << iq.reply.tap { |reply| reply.status = :completed } }.catch { |e| panic(e, sentry_hub) } end rescue StandardError => e sentry_hub.capture_exception(e) end end command sessionid: /./ do |iq| COMMAND_MANAGER.fulfill(iq) end iq type: [:result, :error] do |iq| IQ_MANAGER.fulfill(iq) end iq type: [:get, :set] do |iq| StatsD.increment("unknown_iq") self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel) end