~singpolyma/sgx-jmp

540996de59cef98525e2067da6951eb17f648a94 — Stephen Paul Weber 1 year, 1 month ago 045da39 inbound-call-v2
WIP
5 files changed, 388 insertions(+), 0 deletions(-)

M Gemfile
A lib/roda_async.rb
A public/beep.mp3
M sgx_jmp.rb
A web.rb
M Gemfile => Gemfile +3 -0
@@ 14,9 14,11 @@ gem "em_promise.rb", "~> 0.0.3"
gem "eventmachine"
gem "money-open-exchange-rates"
gem "ougai"
gem "roda"
gem "ruby-bandwidth-iris"
gem "sentry-ruby", "<= 4.3.1"
gem "statsd-instrument", git: "https://github.com/singpolyma/statsd-instrument.git", branch: "graphite"
gem "time-hash"
gem "value_semantics", git: "https://github.com/singpolyma/value_semantics"

group(:development) do


@@ 24,6 26,7 @@ group(:development) do
	gem "pry-remote-em"
	gem "pry-rescue"
	gem "pry-stack_explorer"
	gem "roda-bin"
end

group(:test) do

A lib/roda_async.rb => lib/roda_async.rb +59 -0
@@ 0,0 1,59 @@
# frozen_string_literal: true

require "eventmachine"
require "promise"

module RodaAsync
	def self.configure(*, &block)
		@error_handler = block || ->(e) { EM.next_tick { raise e } }
	end

	def self.error_handler
		@error_handler
	end

	class DeferrableBody
		include EventMachine::Deferrable

		def to_proc
			@body_callback
		end

		def each(&block)
			@body_callback = block
		end
	end

	module RequestMethods
		def streaming_body
			body = DeferrableBody.new
			yield body.to_proc
			body
		end

		def block_result(result)
			case result
			when DeferrableBody
				super("")
				status, headers, = response.finish
				@env["async.callback"][[status, headers, result]]
				throw :async
			when Promise
				result.then { |r|
					super(r)
					@env["async.callback"][response.finish]
				}.catch(&RodaAsync.error_handler)
				throw :async
			when EM::Deferrable
				result.callback do |r|
					super(r)
					@env["async.callback"][response.finish]
				end
				result.errback(&RodaAsync.error_handler)
				throw :async
			else
				super(result)
			end
		end
	end
end

A public/beep.mp3 => public/beep.mp3 +0 -0
M sgx_jmp.rb => sgx_jmp.rb +6 -0
@@ 45,6 45,9 @@ module SentryOugai
end
LOG.extend SentryOugai

require "thin"
require_relative "./web"

CONFIG =
	Dhall::Coder
	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])


@@ 189,6 192,9 @@ when_ready do
		ping.from = CONFIG[:component][:jid]
		self << ping
	end

	CallHandler.blather = self
	Thin::Server.start("::1", 8080, CallHandler.freeze.app, signals: false)
end

# workqueue_count MUST be 0 or else Blather uses threads!

A web.rb => web.rb +320 -0
@@ 0,0 1,320 @@
# frozen_string_literal: true

require "em_promise"
require "em-http"
require "em-http/middleware/json_response"
require "erb" # for ERB::Util
require "nokogiri"
require "roda"

require_relative "./lib/roda_async"

if ENV["RACK_ENV"] == "development"
	require "pry-rescue"
	use PryRescue::Rack
end

module TransferTargets
	def self.from_redis_for(tel)
		REDIS.get("catapult_fwd-#{tel}").then { |fwd|
			fwd ? self.for(fwd) : []
		}
	end

	def self.for(*uris)
		uris.map do |uri|
			case uri
			when /^tel:/
				Tel.new(uri)
			when /^sip:/
				SIP.new(uri)
			when /^xmpp:/
				XMPP.new(uri)
			else
				raise "Unknown forward URI: #{uri}"
			end
		end
	end

	class Tel
		def initialize(uri)
			@tel = uri.sub(/^tel:/, "")
		end

		def add_to_xml(xml)
			xml.PhoneNumber @tel
		end
	end

	class SIP
		def initialize(uri)
			@uri = uri
		end

		def add_to_xml(xml)
			xml.SipUri @uri
		end
	end

	class XMPP
		def initialize(uri)
			@jid = uri.sub(/^xmpp:/, "")
		end

		def add_to_xml(xml)
			xml.SipUri "sip:#{ERB::Util.url_encode(@jid)}@sip.cheogram.com"
		end
	end
end

class Customer
	def self.from_redis_for(tel)
		REDIS.get("catapult_jid-#{tel}").then { |jid|
			REDIS.mget(
				"catapult_ogm_url-#{jid}",
				"catapult_fwd_timout-#{jid}"
			).then { |(ogm_url, timeout)|
				new(
					jid,
					tel: tel,
					ogm: ogm_url ? OGM::Media.new(ogm_url) : OGM::TTS.new(jid),
					fwd_timeout: (timeout.nil? || timeout < 0) ? 300 : timeout
				)
			}
		}
	end

	attr_reader :ogm

	def initialize(jid, fwd_timeout:, tel:, ogm:)
		@jid = jid
		@tel = tel
		@ogm = ogm
		@fwd_timeout = fwd_timeout
	end

	def transfer(call_id)
		TransferTargets.from_redis_for(@tel).then { |targets|
			Transfer.for(call_id, targets, @fwd_timeout)
		}
	end

	def build_message(body, from:, subject: nil)
		m = Blather::Stanza::Message.new(@jid, body)
		m.from = from
		m.subject = subject if subject
		yield m if block_given?
		m
	end

	def voicemail_recording(media_url, from:)
		jmp_media_url = media_url.sub(
			/https:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
			"https://jmp.chat"
		)
		build_message(jmp_media_url, from: from, subject: "New Voicemail") do |m|
			m.add_child(Nokogiri::XML::Builder.new { |xml|
				xml.x("xmlns" => "jabber:x:oob") {
					xml.url jmp_media_url
					xml.desc "Voicemail Recording"
				}
			}.doc.root)
		end
	end

	module OGM
		class Media
			def initialize(url)
				@url = url
			end

			def add_to_xml(xml)
				xml.PlayAudio @url
			end
		end

		class TTS
			def initialize(jid)
				@jid = jid
			end

			def add_to_xml(xml)
				xml.SpeakSentence(
					"You have reached the voicemail of a user of JMP.chat. " \
					"Please send a text message, or leave a message after the tone."
				)
			end
		end
	end
end

class Transfer
	def self.for(call_id, targets, timeout)
		if timeout == 0 || targets.empty?
			NoTransfer.new(call_id)
		else
			new(call_id, targets, timeout)
		end
	end

	def initialize(call_id, targets, timeout)
		@call_id = call_id
		@targets = targets
		@timeout = timeout
	end

	def add_to_xml(xml)
		xml.Transfer(
			callTimeout: @timeout,
			transferCompleteUrl: "/calls/#{@call_id}/transfer_complete"
		) {
			@targets.each { |t| t.add_to_xml(xml) }
		}
	end

	class NoTransfer
		def initialize(call_id)
			@call_id = call_id
		end

		def add_to_xml(xml)
			xml.Redirect(redirectUrl: "/calls/#{call_id}/transfer_complete")
		end
	end
end

class CallHandler < Roda
	plugin :common_logger, $stdout
	plugin :json_parser
	plugin :public
	plugin RodaAsync do |e|
		if ENV["RACK_ENV"] == "development"
			Pry::rescued(e)
		else
			p e
		end
	end

	def self.blather=(b)
		@blather = b
	end

	def self.blather
		@blather
	end

	def sanitize_tel_candidate(candidate)
		if candidate.length < 3
			"13;phone-context=anonymous.phone-context.soprani.ca"
		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
			candidate
		elsif candidate == "Restricted"
			# TODO: add 'NONE', 'NULL', 'null', 'ANONYMOUS' (18, 7, 6, 5)?
			{
				"Restricted" => "14",
				"anonymous" => "15",
				"Anonymous" => "16",
				"unavailable" => "17",
				"Unavailable" => "18",
			}.fetch(candidate, "19")
		end
	end

	def from_jid
		[
			sanitize_tel_candidate(request.params["from"]),
			CONFIG[:component][:jid]
		].join("@")
	end

	route do |r|
		r.on "calls" do
r.post "status" do
p request.params
""
end

			r.on :call_id do |call_id|
				r.post "transfer_complete" do
					if ["hangup", "cancel"].include?(request.params["cause"])
						Nokogiri::XML::Builder.new { |xml|
							xml.Response { xml.Hangup }
						}.to_xml
					else
						Customer.from_redis_for(request.params["to"]).then { |cust|
							Nokogiri::XML::Builder.new { |xml|
								xml.Response {
									xml.Pause(duration: 2)
									cust.ogm.add_to_xml(xml)
									xml.PlayAudio("/beep.mp3")
									xml.Record(
										recordingAvailableUrl: "/calls/#{call_id}/voicemail_audio",
										transcribe: "true", # TODO
										transcriptionAvailableUrl: "/calls/#{call_id}/voicemail_transcription",
										fileFormat: "mp3"
									)
								}
							}.to_xml
						}
					end
				end

				r.post "voicemail_audio" do
					duration = Time.parse(request.params["endTime"]) - \
						Time.parse(request.params["startTime"])
					next unless duration > 5

					Customer.from_redis_for(request.params["to"]).then { |cust|
						CallHandler.blather << cust.voicemail_recording(
							request.params["mediaUrl"],
							from: from_jid
						)
						""
					}
				end

				r.post "voicemail_transcription" do
					EM::HttpRequest.new(
						request.params["transcription"]["url"],
						tls: { verify_peer: true }
					).tap { |conn|
						conn.use EM::Middleware::JSONResponse
					}.get(
						head: {
							"authorization" => [
								CONFIG[:creds][:username],
								CONFIG[:creds][:password]
							]
						}
					).then { |http|
						EMPromise.all([
							http.response["transcripts"]&.first&.[]("text"),
							Customer.from_redis_for(request.params["to"])
						])
					}.then { |(transcript, cust)|
						CallHandler.blather << cust.build_message(
							transcript,
							from: from_jid,
							subject: "Voicemail Transcription"
						)
						""
					}
				end
			end

			r.post do
				Customer.from_redis_for(request.params["to"]).then { |cust|
					EMPromise.all([cust, cust.transfer(request.params["callId"])])
				}.then { |(cust, transfer)|
					Nokogiri::XML::Builder.new { |xml|
						xml.Response {
							transfer.add_to_xml(xml)
						}
					}.to_xml
				}
			end
		end

		r.public if ENV["RACK_ENV"] != "production"
	end
end