~singpolyma/sgx-jmp

ref: f571b5e726750cf8c4523c22d77e110194c3e16a sgx-jmp/lib/command.rb -rw-r--r-- 3.7 KiB
f571b5e7Stephen Paul Weber Postgres#query_one 1 year, 7 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# frozen_string_literal: true

require "sentry-ruby"
require "statsd-instrument"

require_relative "customer_repo"
require_relative "session_manager"

class Command
	def self.execution
		Thread.current[:execution]
	end

	def self.execution=(exe)
		Thread.current[:execution] = exe
	end

	def self.reply(stanza=nil, &blk)
		execution.reply(stanza, &blk)
	end

	def self.finish(*args, **kwargs, &blk)
		execution.finish(*args, **kwargs, &blk)
	end

	def self.customer
		execution.customer
	end

	def self.log
		execution.log
	end

	class Execution
		class Timeout < SessionManager::Timeout; end

		class FinalStanza
			attr_reader :stanza

			def initialize(stanza)
				@stanza = stanza
			end
		end

		attr_reader :customer_repo, :log, :iq

		def initialize(customer_repo, blather, format_error, iq)
			@customer_repo = customer_repo
			@blather = blather
			@format_error = format_error
			@iq = iq
			@log = LOG.child(node: iq.node)
		end

		def execute
			StatsD.increment("command", tags: ["node:#{iq.node}"])
			EMPromise.resolve(nil).then {
				Command.execution = self
				sentry_hub
				catch_after(EMPromise.resolve(yield self))
			}.catch(&method(:panic))
		end

		def reply(stanza=nil)
			stanza ||= iq.reply.tap { |reply| reply.status = :executing }
			yield stanza if block_given?
			COMMAND_MANAGER.write(stanza).then { |new_iq|
				@iq = new_iq
			}.catch_only(Blather::Stanza::Iq) { |new_iq|
				@iq = new_iq if new_iq.set?
				EMPromise.reject(new_iq)
			}.catch_only(SessionManager::Timeout) do
				EMPromise.reject(Timeout.new)
			end
		end

		def finish(text=nil, type: :info, status: :completed)
			reply = @iq.reply
			reply.status = status
			yield reply if block_given?
			if text
				reply.note_type = type
				reply.note_text = text
			end
			EMPromise.reject(FinalStanza.new(reply))
		end

		def sentry_hub
			return @sentry_hub if @sentry_hub

			# Stored on Fiber-local in 4.3.1 and earlier
			# https://github.com/getsentry/sentry-ruby/issues/1495
			@sentry_hub = Sentry.get_current_hub
			raise "Sentry.init has not been called" unless @sentry_hub

			@sentry_hub.push_scope
			@sentry_hub.current_scope.clear_breadcrumbs
			@sentry_hub.current_scope.set_transaction_name(@iq.node)
			@sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
			@sentry_hub
		end

		def customer
			@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
				sentry_hub.current_scope.set_user(
					id: c.customer_id,
					jid: @iq.from.stripped
				)
				c
			}
		end

	protected

		def catch_after(promise)
			promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
				next EMPromise.reject(iq) unless iq.cancel?

				finish(status: :canceled)
			}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
				@blather << e.stanza
			}.catch do |e|
				log_error(e)
				send_final_error(e)
			end
		end

		def send_final_error(e)
			finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
				@blather << s.stanza
			end
		end

		def log_error(e)
			@log.error("Error raised during #{iq.node}: #{e.class}", e)
			if e.is_a?(::Exception)
				sentry_hub.capture_exception(e)
			else
				sentry_hub.capture_message(e.to_s)
			end
		end
	end

	attr_reader :node, :name

	def initialize(
		node,
		name,
		customer_repo: CustomerRepo.new,
		list_for: ->(tel:, **) { !!tel },
		format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
		&blk
	)
		@node = node
		@name = name
		@customer_repo = customer_repo
		@list_for = list_for
		@format_error = format_error
		@blk = blk
	end

	def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
		blather.command(*guards) do |iq|
			Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
		end
		self
	end

	def list_for?(**kwargs)
		@list_for.call(**kwargs)
	end
end