1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# frozen_string_literal: true
require "sentry-ruby"
require "statsd-instrument"
require_relative "customer_repo"
require_relative "session_manager"
class Command
def self.execution
Thread.current[:execution]
end
def self.execution=(exe)
Thread.current[:execution] = exe
end
def self.reply(stanza=nil, &blk)
execution.reply(stanza, &blk)
end
def self.finish(*args, **kwargs, &blk)
execution.finish(*args, **kwargs, &blk)
end
def self.customer
execution.customer
end
def self.log
execution.log
end
class Execution
class Timeout < SessionManager::Timeout; end
class FinalStanza
attr_reader :stanza
def initialize(stanza)
@stanza = stanza
end
end
attr_reader :customer_repo, :log, :iq
def initialize(customer_repo, blather, format_error, iq)
@customer_repo = customer_repo
@blather = blather
@format_error = format_error
@iq = iq
@log = LOG.child(node: iq.node)
end
def execute
StatsD.increment("command", tags: ["node:#{iq.node}"])
EMPromise.resolve(nil).then {
Command.execution = self
sentry_hub
catch_after(EMPromise.resolve(yield self))
}.catch(&method(:panic))
end
def reply(stanza=nil)
stanza ||= iq.reply.tap { |reply| reply.status = :executing }
yield stanza if block_given?
COMMAND_MANAGER.write(stanza).then { |new_iq|
@iq = new_iq
}.catch_only(Blather::Stanza::Iq) { |new_iq|
@iq = new_iq if new_iq.set?
EMPromise.reject(new_iq)
}.catch_only(SessionManager::Timeout) do
EMPromise.reject(Timeout.new)
end
end
def finish(text=nil, type: :info, status: :completed)
reply = @iq.reply
reply.status = status
yield reply if block_given?
if text
reply.note_type = type
reply.note_text = text
end
EMPromise.reject(FinalStanza.new(reply))
end
def sentry_hub
return @sentry_hub if @sentry_hub
# Stored on Fiber-local in 4.3.1 and earlier
# https://github.com/getsentry/sentry-ruby/issues/1495
@sentry_hub = Sentry.get_current_hub
raise "Sentry.init has not been called" unless @sentry_hub
@sentry_hub.push_scope
@sentry_hub.current_scope.clear_breadcrumbs
@sentry_hub.current_scope.set_transaction_name(@iq.node)
@sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
@sentry_hub
end
def customer
@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
sentry_hub.current_scope.set_user(
id: c.customer_id,
jid: @iq.from.stripped
)
c
}
end
protected
def catch_after(promise)
promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
next EMPromise.reject(iq) unless iq.cancel?
finish(status: :canceled)
}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
@blather << e.stanza
}.catch do |e|
log_error(e)
send_final_error(e)
end
end
def send_final_error(e)
finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
@blather << s.stanza
end
end
def log_error(e)
@log.error("Error raised during #{iq.node}: #{e.class}", e)
if e.is_a?(::Exception)
sentry_hub.capture_exception(e)
else
sentry_hub.capture_message(e.to_s)
end
end
end
attr_reader :node, :name
def initialize(
node,
name,
customer_repo: CustomerRepo.new,
list_for: ->(tel:, **) { !!tel },
format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
&blk
)
@node = node
@name = name
@customer_repo = customer_repo
@list_for = list_for
@format_error = format_error
@blk = blk
end
def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
blather.command(*guards) do |iq|
Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
end
self
end
def list_for?(**kwargs)
@list_for.call(**kwargs)
end
end