# frozen_string_literal: true require "pg/em/connection_pool" require "bigdecimal" require "blather/client/dsl" # Require this first to not auto-include require "blather/client" require "braintree" require "date" require "dhall" require "em-hiredis" require "em_promise" require "ougai" require "ruby-bandwidth-iris" require "sentry-ruby" require "statsd-instrument" $stdout.sync = true LOG = Ougai::Logger.new($stdout) LOG.level = ENV.fetch("LOG_LEVEL", "info") LOG.formatter = Ougai::Formatters::Readable.new( nil, nil, plain: !$stdout.isatty ) Blather.logger = LOG EM::Hiredis.logger = LOG StatsD.logger = LOG LOG.info "Starting" Sentry.init do |config| config.logger = LOG config.breadcrumbs_logger = [:sentry_logger] end module SentryOugai class SentryLogger include Sentry::Breadcrumb::SentryLogger include Singleton end def _log(severity, message=nil, ex=nil, data=nil, &block) super SentryLogger.instance.add_breadcrumb(severity, message || ex.to_s, &block) end end LOG.extend SentryOugai CONFIG = Dhall::Coder .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc]) .load(ARGV[0], transform_keys: ->(k) { k&.to_sym }) singleton_class.class_eval do include Blather::DSL Blather::DSL.append_features(self) end require_relative "lib/polyfill" require_relative "lib/alt_top_up_form" require_relative "lib/add_bitcoin_address" require_relative "lib/backend_sgx" require_relative "lib/bandwidth_tn_order" require_relative "lib/btc_sell_prices" require_relative "lib/buy_account_credit_form" require_relative "lib/command" require_relative "lib/command_list" require_relative "lib/customer" require_relative "lib/customer_repo" require_relative "lib/electrum" require_relative "lib/expiring_lock" require_relative "lib/em" require_relative "lib/low_balance" require_relative "lib/payment_methods" require_relative "lib/registration" require_relative "lib/transaction" require_relative "lib/web_register_manager" require_relative "lib/session_manager" require_relative "lib/statsd" ELECTRUM = Electrum.new(**CONFIG[:electrum]) EM::Hiredis::Client.load_scripts_from("./redis_lua") Faraday.default_adapter = :em_synchrony BandwidthIris::Client.global_options = { account_id: CONFIG[:creds][:account], username: CONFIG[:creds][:username], password: CONFIG[:creds][:password] } def new_sentry_hub(stanza, name: nil) hub = Sentry.get_current_hub&.new_from_top raise "Sentry.init has not been called" unless hub hub.push_scope hub.current_scope.clear_breadcrumbs hub.current_scope.set_transaction_name(name) if name hub.current_scope.set_user(jid: stanza.from.stripped.to_s) hub end # Braintree is not async, so wrap in EM.defer for now class AsyncBraintree def initialize(environment:, merchant_id:, public_key:, private_key:, **) @gateway = Braintree::Gateway.new( environment: environment, merchant_id: merchant_id, public_key: public_key, private_key: private_key ) @gateway.config.logger = LOG end def respond_to_missing?(m, *) @gateway.respond_to?(m) end def method_missing(m, *args) return super unless respond_to_missing?(m, *args) EM.promise_defer(klass: PromiseChain) do @gateway.public_send(m, *args) end end class PromiseChain < EMPromise def respond_to_missing?(*) false # We don't actually know what we respond to... end def method_missing(m, *args) return super if respond_to_missing?(m, *args) self.then { |o| o.public_send(m, *args) } end end end BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree]) def panic(e, hub=nil) (Thread.current[:log] || LOG).fatal( "Error raised during event loop: #{e.class}", e ) if e.is_a?(::Exception) (hub || Sentry).capture_exception(e, hint: { background: false }) else (hub || Sentry).capture_message(e.to_s, hint: { background: false }) end exit 1 end EM.error_handler(&method(:panic)) def poll_for_notify(db) db.wait_for_notify_defer.then { |notify| Customer.for_customer_id(notify[:extra]) }.then(&LowBalance.method(:for)).then(&:notify!).then { poll_for_notify(db) }.catch(&method(:panic)) end when_ready do LOG.info "Ready" BLATHER = self REDIS = EM::Hiredis.connect BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id]) DB = PG::EM::ConnectionPool.new(dbname: "jmp") do |conn| conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn) conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn) end DB.hold do |conn| conn.query("LISTEN low_balance") conn.query("SELECT customer_id FROM balances WHERE balance < 5").each do |c| conn.query("SELECT pg_notify('low_balance', $1)", c.values) end poll_for_notify(conn) end EM.add_periodic_timer(3600) do ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host]) ping.from = CONFIG[:component][:jid] self << ping end end # workqueue_count MUST be 0 or else Blather uses threads! setup( CONFIG[:component][:jid], CONFIG[:component][:secret], CONFIG[:server][:host], CONFIG[:server][:port], nil, nil, workqueue_count: 0 ) message to: /\Aaccount@/, body: /./ do |m| StatsD.increment("deprecated_account_bot") self << m.reply.tap do |out| out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com" end end before( :iq, type: [:error, :result], to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ ) { |iq| halt if IQ_MANAGER.fulfill(iq) } before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s| StatsD.increment("stanza_customer") sentry_hub = new_sentry_hub(s, name: "stanza_customer") CustomerRepo.new.find( s.to.node.delete_prefix("customer_") ).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: s.from.stripped.to_s ) customer.stanza_to(s) }.catch { |e| panic(e, sentry_hub) } halt end ADDRESSES_NS = "http://jabber.org/protocol/address" message( to: /\A#{CONFIG[:component][:jid]}\Z/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ ) do |m| StatsD.increment("inbound_group_text") sentry_hub = new_sentry_hub(m, name: "message") address = m.find("ns:addresses", ns: ADDRESSES_NS).first &.find("ns:address", ns: ADDRESSES_NS) &.find { |el| el["jid"].to_s.start_with?("customer_") } pass unless address CustomerRepo.new.find( Blather::JID.new(address["jid"].to_s).node.delete_prefix("customer_") ).then { |customer| m.from = m.from.with(domain: CONFIG[:component][:jid]) m.to = m.to.with(domain: customer.jid.domain) address["jid"] = customer.jid.to_s BLATHER << m }.catch { |e| panic(e, sentry_hub) } end # Ignore groupchat messages # Especially if we have the component join MUC for notifications message(type: :groupchat) { true } def billable_message(m) (m.body && !m.body.empty?) || m.find("ns:x", ns: OOB.registered_ns).first end message do |m| StatsD.increment("message") sentry_hub = new_sentry_hub(m, name: "message") today = Time.now.utc.to_date CustomerRepo.new.find_by_jid(m.from.stripped).then { |customer| sentry_hub.current_scope.set_user( id: customer.customer_id, jid: m.from.stripped.to_s ) EMPromise.all([ (customer.incr_message_usage if billable_message(m)), customer.stanza_from(m) ]).then { customer } }.then { |customer| ExpiringLock.new("jmp_usage_notify-#{customer.customer_id}").with do customer.message_usage((today..(today - 30))).then do |usage| next unless usage > 500 BLATHER.join(CONFIG[:notify_admin], "sgx-jmp") BLATHER.say( CONFIG[:notify_admin], "#{customer.customer_id} has used #{usage} messages since #{today - 30}", :groupchat ) end end }.catch { |e| panic(e, sentry_hub) } end message :error? do |m| StatsD.increment("message_error") LOG.error "MESSAGE ERROR", stanza: m end IQ_MANAGER = SessionManager.new(self, :id) COMMAND_MANAGER = SessionManager.new( self, :sessionid, timeout: 60 * 60, error_if: ->(s) { s.cancel? } ) web_register_manager = WebRegisterManager.new disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq| reply = iq.reply reply.identities = [{ name: "JMP.chat", type: "sms", category: "gateway" }] reply.features = [ "http://jabber.org/protocol/disco#info", "http://jabber.org/protocol/commands" ] form = Blather::Stanza::X.find_or_create(reply.query) form.type = "result" form.fields = [ { var: "FORM_TYPE", type: "hidden", value: "http://jabber.org/network/serverinfo" } ] + CONFIG[:xep0157] self << reply end disco_info do |iq| reply = iq.reply reply.identities = [{ name: "JMP.chat", type: "sms", category: "client" }] reply.features = [ "urn:xmpp:receipts" ] self << reply end disco_items node: "http://jabber.org/protocol/commands" do |iq| StatsD.increment("command_list") sentry_hub = new_sentry_hub(iq, name: iq.node) reply = iq.reply CustomerRepo.new.find_by_jid(iq.from.stripped).catch { nil }.then { |customer| CommandList.for(customer) }.then { |list| reply.items = list.map do |item| Blather::Stanza::DiscoItems::Item.new( iq.to, item[:node], item[:name] ) end self << reply }.catch { |e| panic(e, sentry_hub) } end iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq| StatsD.increment("extdisco") reply = iq.reply reply << Nokogiri::XML::Builder.new { services(xmlns: "urn:xmpp:extdisco:2") do service( type: "sip", host: CONFIG[:sip_host] ) end }.doc.root self << reply end Command.new( "jabber:iq:register", "Register", list_for: ->(*) { true } ) { Command.customer.catch { Sentry.add_breadcrumb(Sentry::Breadcrumb.new(message: "Customer.create")) Command.execution.customer_repo.create(Command.execution.iq.from.stripped) }.then { |customer| Sentry.add_breadcrumb(Sentry::Breadcrumb.new(message: "Registration.for")) Registration.for(customer, web_register_manager).then(&:write) }.then { StatsD.increment("registration.completed") }.catch_only(Command::Execution::FinalStanza) do |e| StatsD.increment("registration.completed") EMPromise.reject(e) end }.register(self).then(&CommandList.method(:register)) # Commands that just pass through to the SGX { "number-display" => ["Display JMP Number"], "configure-calls" => ["Configure Calls"], "record-voicemail-greeting" => [ "Record Voicemail Greeting", list_for: ->(fwd: nil, **) { !!fwd } ] }.each do |node, args| Command.new(node, *args) { Command.customer.then do |customer| customer.stanza_from(Command.execution.iq) end }.register(self, guards: [node: node]).then(&CommandList.method(:register)) end Command.new( "credit cards", "Credit Card Settings and Management" ) { Command.customer.then do |customer| url = CONFIG[:credit_card_url].call( customer.jid.to_s.gsub("\\", "%5C"), customer.customer_id ) desc = "Manage credits cards and settings" Command.finish("#{desc}: #{url}") do |reply| oob = OOB.find_or_create(reply.command) oob.url = url oob.desc = desc end end }.register(self).then(&CommandList.method(:register)) Command.new( "top up", "Buy Account Credit by Credit Card", list_for: ->(payment_methods: [], **) { !payment_methods.empty? }, format_error: ->(e) { "Failed to buy credit, system said: #{e.message}" } ) { Command.customer.then { |customer| BuyAccountCreditForm.for(customer).then do |credit_form| Command.reply { |reply| reply.allowed_actions = [:complete] credit_form.add_to_form(reply.form) }.then do |iq| Transaction.sale(customer, **credit_form.parse(iq.form)) end end }.then { |transaction| transaction.insert.then do Command.finish("#{transaction} added to your account balance.") end }.catch_only(BuyAccountCreditForm::AmountValidationError) do |e| Command.finish(e.message, type: :error) end }.register(self).then(&CommandList.method(:register)) Command.new( "alt top up", "Buy Account Credit by Bitcoin, Mail, or Interac eTransfer", list_for: ->(customer:, **) { !!customer&.currency } ) { Command.customer.then { |customer| EMPromise.all([AltTopUpForm.for(customer), customer]) }.then do |(alt_form, customer)| Command.reply { |reply| reply.allowed_actions = [:complete] reply.command << alt_form.form }.then do |iq| AddBitcoinAddress.for(iq, alt_form, customer).write end end }.register(self).then(&CommandList.method(:register)) Command.new( "reset sip account", "Create or Reset SIP Account" ) { Command.customer.then(&:reset_sip_account).then do |sip_account| Command.finish do |reply| reply.command << sip_account.form end end }.register(self).then(&CommandList.method(:register)) Command.new( "usage", "Show Monthly Usage" ) { report_for = (Date.today..(Date.today << 1)) Command.customer.then { |customer| customer.usage_report(report_for) }.then do |usage_report| Command.finish do |reply| reply.command << usage_report.form end end }.register(self).then(&CommandList.method(:register)) command :execute?, node: "web-register", sessionid: nil do |iq| StatsD.increment("command", tags: ["node:#{iq.node}"]) sentry_hub = new_sentry_hub(iq, name: iq.node) begin jid = iq.form.field("jid")&.value.to_s.strip tel = iq.form.field("tel")&.value.to_s.strip sentry_hub.current_scope.set_user(jid: jid, tel: tel) if iq.from.stripped != CONFIG[:web_register][:from] BLATHER << iq.as_error("forbidden", :auth) elsif jid == "" || tel !~ /\A\+\d+\Z/ reply_with_note(iq, "Invalid JID or telephone number.", type: :error) else IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd| cmd.to = CONFIG[:web_register][:to] cmd.node = "push-register" cmd.form.fields = [var: "to", value: jid] cmd.form.type = "submit" }).then { |result| final_jid = result.form.field("from")&.value.to_s.strip web_register_manager[final_jid] = tel BLATHER << iq.reply.tap { |reply| reply.status = :completed } }.catch { |e| panic(e, sentry_hub) } end rescue StandardError => e sentry_hub.capture_exception(e) end end command sessionid: /./ do |iq| COMMAND_MANAGER.fulfill(iq) end iq type: [:result, :error] do |iq| IQ_MANAGER.fulfill(iq) end iq type: [:get, :set] do |iq| StatsD.increment("unknown_iq") self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel) end