~singpolyma/sgx-jmp

sgx-jmp/lib/command.rb -rw-r--r-- 3.1 KiB
1ef966f3Amolith Eliminate a registration race condition 19 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# frozen_string_literal: true

require "sentry-ruby"
require "statsd-instrument"

require_relative "customer_repo"
require_relative "session_manager"

class Command
	def self.execution
		Thread.current[:execution]
	end

	def self.execution=(exe)
		Thread.current[:execution] = exe
	end

	def self.reply(stanza=nil, &blk)
		execution.reply(stanza, &blk)
	end

	def self.finish(*args, **kwargs, &blk)
		execution.finish(*args, **kwargs, &blk)
	end

	def self.customer
		execution.customer
	end

	def self.log
		execution.log
	end

	class Execution
		class Timeout < SessionManager::Timeout; end

		class FinalStanza
			attr_reader :stanza

			def initialize(stanza)
				@stanza = stanza
			end
		end

		attr_reader :customer_repo, :log, :iq

		def initialize(customer_repo, blather, format_error, iq)
			@customer_repo = customer_repo
			@blather = blather
			@format_error = format_error
			@iq = iq
			@log = Thread.current[:log] || LOG
		end

		def execute
			StatsD.increment("command", tags: ["node:#{iq.node}"])
			EMPromise.resolve(nil).then {
				Command.execution = self
				catch_after(EMPromise.resolve(yield self))
			}
		end

		def reply(stanza=nil)
			stanza ||= iq.reply.tap { |reply| reply.status = :executing }
			yield stanza if block_given?
			COMMAND_MANAGER.write(stanza).then { |new_iq|
				@iq = new_iq
			}.catch_only(Blather::Stanza::Iq) { |new_iq|
				@iq = new_iq if new_iq.set?
				EMPromise.reject(new_iq)
			}.catch_only(SessionManager::Timeout) do
				EMPromise.reject(Timeout.new)
			end
		end

		def finish(text=nil, type: :info, status: :completed)
			reply = @iq.reply
			reply.status = status
			EMPromise.resolve(block_given? ? yield(reply) : nil).then {
				if text
					reply.note_type = type
					reply.note_text = text
				end
			}.then do
				EMPromise.reject(FinalStanza.new(reply))
			end
		end

		def customer
			@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
				Sentry.set_user(
					id: c.customer_id,
					jid: @iq.from.stripped
				)
				c
			}
		end

	protected

		def catch_after(promise)
			promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
				next EMPromise.reject(iq) unless iq.cancel?

				finish(status: :canceled)
			}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
				@blather << e.stanza
			}.catch do |e|
				e = RuntimeError.new(e) if e.is_a?(String)
				send_final_error(e)
				EMPromise.reject(e)
			end
		end

		def send_final_error(e)
			def e.replied?
				true
			end

			finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
				@blather << s.stanza
			end
		end
	end

	attr_reader :node, :name

	def initialize(
		node,
		name,
		customer_repo: CustomerRepo.new,
		list_for: ->(tel:, **) { !!tel },
		format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
		&blk
	)
		@node = node
		@name = name
		@customer_repo = customer_repo
		@list_for = list_for
		@format_error = format_error
		@blk = blk
	end

	def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
		blather.command(*guards) do |iq|
			Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
		end
		self
	end

	def list_for?(**kwargs)
		@list_for.call(**kwargs)
	end
end