From 68ed9c34b0d4a2eaa0e49def372deabe4eb55daf Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 25 Apr 2022 10:26:02 -0500 Subject: [PATCH] Run billing 3 at a time All at once kills the box --- bin/billing_monthly_cronjob | 103 +++++++++++++++++++++--------------- lib/blather_notify.rb | 16 ++++-- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/bin/billing_monthly_cronjob b/bin/billing_monthly_cronjob index b603fb1..7fcbc41 100755 --- a/bin/billing_monthly_cronjob +++ b/bin/billing_monthly_cronjob @@ -30,62 +30,77 @@ BlatherNotify.start( CONFIG[:notify_using][:password] ) -promises = [] +def format(item) + if item.respond_to?(:note) && item.note && item.note.text != "" + item.note.text + elsif item.respond_to?(:to_xml) + item.to_xml + else + item.inspect + end +end -db.exec( - <<-SQL - SELECT customer_id - FROM customer_plans - WHERE expires_at <= LOCALTIMESTAMP + '4 days' - SQL -).each do |row| - EM.next_tick do - promises << BlatherNotify.execute( +class ExpiringCustomer + def initialize(customer_id) + @customer_id = customer_id + end + + def info + BlatherNotify.execute( "customer info", - { q: row["customer_id"] }.to_form(:submit) - ).then { |iq| - BlatherNotify.write_with_promise(BlatherNotify.command( - "customer info", - iq.sessionid - )) - }.then do |iq| - unless iq.form.field("action") - next "#{row["customer_id"]} not found" + { q: @customer_id }.to_form(:submit) + ).then do |iq| + @sessionid = iq.sessionid + unless iq.form.field("customer_id") + raise "#{@customer_id} not found" end - BlatherNotify.write_with_promise(BlatherNotify.command( - "customer info", - iq.sessionid, - action: :complete, - form: { action: "bill_plan" }.to_form(:submit) - )) + iq end end -end -one = Queue.new + def next + raise "Call info first" unless @sessionid -def format(item) - if item.respond_to?(:note) && item.note - item.note.text - elsif item.respond_to?(:to_xml) - item.to_xml - else - item.inspect + BlatherNotify.write_with_promise(BlatherNotify.command( + "customer info", + @sessionid + )) + end + + def bill_plan + raise "Call info first" unless @sessionid + + BlatherNotify.write_with_promise(BlatherNotify.command( + "customer info", + @sessionid, + action: :complete, + form: { action: "bill_plan" }.to_form(:submit) + )) end end -EM.add_timer(0) do - EMPromise.all(promises).then( - ->(all) { one << all }, - ->(err) { one << RuntimeError.new(format(err)) } - ) +one = Queue.new + +EM::Iterator.new(db.exec( + <<-SQL + SELECT customer_id + FROM customer_plans + WHERE expires_at <= LOCALTIMESTAMP + '4 days' + SQL +), 3).each(nil, -> { one << :done }) do |row, iter| + customer = ExpiringCustomer.new(row["customer_id"]) + customer.info.then { + customer.next + }.then { + customer.bill_plan + }.then { |result| + puts format(result) + iter.next + }.catch do |err| + one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err))) + end end result = one.pop - raise result if result.is_a?(Exception) - -result.each do |item| - puts format(item) -end diff --git a/lib/blather_notify.rb b/lib/blather_notify.rb index a6e0adc..81deb18 100644 --- a/lib/blather_notify.rb +++ b/lib/blather_notify.rb @@ -40,14 +40,22 @@ module BlatherNotify @thread.join end - def self.write_with_promise(stanza) - promise = EMPromise.new - EM.add_timer(15) do + def self.timeout_promise(promise, timeout: 15) + timer = EM.add_timer(timeout) { promise.reject(:timeout) + } + + promise.then do + timer.cancel end + end + + def self.write_with_promise(stanza) + promise = EMPromise.new + timeout_promise(promise) client.write_with_handler(stanza) do |s| - if s.error? + if s.error? || s.type == :error promise.reject(s) else promise.fulfill(s) -- 2.34.7