~singpolyma/biboumi

09b10cc80146c1ac2a0d5c53c6c8469b934189f2 — louiz’ 4 years ago ba97c44
Throttle all commands sent to IRC servers

fix #3354
M CHANGELOG.rst => CHANGELOG.rst +3 -0
@@ 5,6 5,9 @@ Version 9.0
- Messages from unjoined resources are now rejected instead of being accepted.
  This helps clients understand that they are not in the room (because of
  some connection issue for example).
- All commands sent to IRC servers are now throttled to avoid being
  disconnected for excess flood. The limit value can be customized using the
  ad-hoc configuration form on a server JID.

Version 8.3 - 2018-06-01
========================

M doc/biboumi.1.rst => doc/biboumi.1.rst +6 -0
@@ 682,6 682,12 @@ On a server JID (e.g on the JID chat.freenode.org@biboumi.example.com)
      this is NOT a password that will be sent to NickServ (or some author
      authentication service), some server (notably Freenode) use it as if it
      was sent to NickServ to identify your nickname.
    * Throttle limit: specifies a number of messages that can be sent
      without a limit, before the throttling takes place. When messages
      are throttled, only one command per second is sent to the server.
      The default is 10. You can lower this value if you are ever kicked
      for excess flood. If the value is 0, all messages are throttled. To
      disable this feature, just set a high value, like 999.

- get-irc-connection-info: Returns some information about the IRC server,
  for the executing user. It lets the user know if they are connected to

M src/database/database.hpp => src/database/database.hpp +4 -1
@@ 86,13 86,16 @@ class Database

  struct Address: Column<std::string> { static constexpr auto name = "address_"; };

  struct ThrottleLimit: Column<std::size_t> { static constexpr auto name = "throttlelimit_";
      ThrottleLimit(): Column<std::size_t>(10) {} };

  using MucLogLineTable = Table<Id, Uuid, Owner, IrcChanName, IrcServerName, Date, Body, Nick>;
  using MucLogLine = MucLogLineTable::RowType;

  using GlobalOptionsTable = Table<Id, Owner, MaxHistoryLength, RecordHistory, GlobalPersistent>;
  using GlobalOptions = GlobalOptionsTable::RowType;

  using IrcServerOptionsTable = Table<Id, Owner, Server, Pass, TlsPorts, Ports, Username, Realname, VerifyCert, TrustedFingerprint, EncodingOut, EncodingIn, MaxHistoryLength, Address, Nick>;
  using IrcServerOptionsTable = Table<Id, Owner, Server, Pass, TlsPorts, Ports, Username, Realname, VerifyCert, TrustedFingerprint, EncodingOut, EncodingIn, MaxHistoryLength, Address, Nick, ThrottleLimit>;
  using IrcServerOptions = IrcServerOptionsTable::RowType;

  using IrcChannelOptionsTable = Table<Id, Owner, Server, Channel, EncodingOut, EncodingIn, MaxHistoryLength, Persistent, RecordHistoryOptional>;

M src/irc/irc_client.cpp => src/irc/irc_client.cpp +42 -21
@@ 135,7 135,7 @@ IrcClient::IrcClient(std::shared_ptr<Poller>& poller, std::string hostname,
                     std::string realname, std::string user_hostname,
                     Bridge& bridge):
  TCPClientSocketHandler(poller),
  hostname(std::move(hostname)),
  hostname(hostname),
  user_hostname(std::move(user_hostname)),
  username(std::move(username)),
  realname(std::move(realname)),


@@ 143,7 143,14 @@ IrcClient::IrcClient(std::shared_ptr<Poller>& poller, std::string hostname,
  bridge(bridge),
  welcomed(false),
  chanmodes({"", "", "", ""}),
  chantypes({'#', '&'})
  chantypes({'#', '&'}),
  tokens_bucket(Database::get_irc_server_options(bridge.get_bare_jid(), hostname).col<Database::ThrottleLimit>(), 1s, [this]() {
    if (message_queue.empty())
      return true;
    this->actual_send(std::move(this->message_queue.front()));
    this->message_queue.pop_front();
    return false;
  }, "TokensBucket" + this->hostname + this->bridge.get_jid())
{
#ifdef USE_DATABASE
  auto options = Database::get_irc_server_options(this->bridge.get_bare_jid(),


@@ 171,6 178,7 @@ IrcClient::~IrcClient()
  // This event may or may not exist (if we never got connected, it
  // doesn't), but it's ok
  TimedEventsManager::instance().cancel("PING" + this->hostname + this->bridge.get_jid());
  TimedEventsManager::instance().cancel("TokensBucket" + this->hostname + this->bridge.get_jid());
}

void IrcClient::start()


@@ 390,25 398,33 @@ void IrcClient::parse_in_buffer(const size_t)
    }
}

void IrcClient::send_message(IrcMessage&& message)
void IrcClient::actual_send(const IrcMessage& message)
{
  log_debug("IRC SENDING: (", this->get_hostname(), ") ", message);
  std::string res;
  if (!message.prefix.empty())
    res += ":" + std::move(message.prefix) + " ";
  res += message.command;
  for (const std::string& arg: message.arguments)
    {
      if (arg.find(' ') != std::string::npos ||
          (!arg.empty() && arg[0] == ':'))
        {
          res += " :" + arg;
          break;
        }
      res += " " + arg;
    }
  res += "\r\n";
  this->send_data(std::move(res));
   log_debug("IRC SENDING: (", this->get_hostname(), ") ", message);
    std::string res;
    if (!message.prefix.empty())
      res += ":" + message.prefix + " ";
    res += message.command;
    for (const std::string& arg: message.arguments)
      {
        if (arg.find(' ') != std::string::npos
            || (!arg.empty() && arg[0] == ':'))
          {
            res += " :" + arg;
            break;
          }
        res += " " + arg;
      }
    res += "\r\n";
    this->send_data(std::move(res));
 }

void IrcClient::send_message(IrcMessage message, bool throttle)
{
  if (this->tokens_bucket.use_token() || !throttle)
    this->actual_send(message);
  else
    message_queue.push_back(std::move(message));
}

void IrcClient::send_raw(const std::string& txt)


@@ 459,7 475,7 @@ void IrcClient::send_topic_command(const std::string& chan_name, const std::stri

void IrcClient::send_quit_command(const std::string& reason)
{
  this->send_message(IrcMessage("QUIT", {reason}));
  this->send_message(IrcMessage("QUIT", {reason}), false);
}

void IrcClient::send_join_command(const std::string& chan_name, const std::string& password)


@@ 1225,6 1241,11 @@ void IrcClient::on_channel_mode(const IrcMessage& message)
    }
}

void IrcClient::set_throttle_limit(std::size_t limit)
{
  this->tokens_bucket.set_limit(limit);
}

void IrcClient::on_user_mode(const IrcMessage& message)
{
  this->bridge.send_xmpp_message(this->hostname, "",

M src/irc/irc_client.hpp => src/irc/irc_client.hpp +10 -2
@@ 16,8 16,10 @@
#include <vector>
#include <string>
#include <stack>
#include <deque>
#include <map>
#include <set>
#include <utils/tokens_bucket.hpp>

class Bridge;



@@ 84,8 86,9 @@ public:
   * (actually, into our out_buf and signal the poller that we want to wach
   * for send events to be ready)
   */
  void send_message(IrcMessage&& message);
  void send_message(IrcMessage message, bool throttle=true);
  void send_raw(const std::string& txt);
  void actual_send(const IrcMessage& message);
  /**
   * Send the PONG irc command
   */


@@ 293,7 296,7 @@ public:
  const std::vector<char>& get_sorted_user_modes() const { return this->sorted_user_modes; }

  std::set<char> get_chantypes() const { return this->chantypes; }

  void set_throttle_limit(std::size_t limit);
  /**
   * Store the history limit that the client asked when joining this room.
   */


@@ 331,6 334,10 @@ private:
   */
  Bridge& bridge;
  /**
   * Where messaged are stored when they are throttled.
   */
  std::deque<IrcMessage> message_queue{};
  /**
   * The list of joined channels, indexed by name
   */
  std::unordered_map<std::string, std::unique_ptr<IrcChannel>> channels;


@@ 389,6 396,7 @@ private:
   * the WebIRC protocole.
   */
  Resolver dns_resolver;
  TokensBucket tokens_bucket;
};



M src/irc/irc_message.hpp => src/irc/irc_message.hpp +2 -2
@@ 14,9 14,9 @@ public:
  ~IrcMessage() = default;

  IrcMessage(const IrcMessage&) = delete;
  IrcMessage(IrcMessage&&) = delete;
  IrcMessage(IrcMessage&&) = default;
  IrcMessage& operator=(const IrcMessage&) = delete;
  IrcMessage& operator=(IrcMessage&&) = delete;
  IrcMessage& operator=(IrcMessage&&) = default;

  std::string prefix;
  std::string command;

A src/utils/tokens_bucket.hpp => src/utils/tokens_bucket.hpp +58 -0
@@ 0,0 1,58 @@
/**
 * Implementation of the token bucket algorithm.
 *
 * It uses a repetitive TimedEvent, started at construction, to fill the
 * bucket.
 *
 * Every n seconds, it executes the given callback. If the callback
 * returns true, we add a token (if the limit is not yet reached).
 *
 */

#pragma once

#include <utils/timed_events.hpp>
#include <logger/logger.hpp>

class TokensBucket
{
public:
  TokensBucket(std::size_t max_size, std::chrono::milliseconds fill_duration, std::function<bool()> callback, std::string name):
      limit(max_size),
      tokens(limit),
      fill_duration(fill_duration),
      callback(std::move(callback))
  {
    log_debug("creating TokensBucket with max size: ", max_size);
    TimedEvent event(std::move(fill_duration), [this]() { this->add_token(); }, std::move(name));
    TimedEventsManager::instance().add_event(std::move(event));
  }

  bool use_token()
  {
    if (this->tokens > 0)
      {
        this->tokens--;
        return true;
      }
    else
      return false;
  }

  void set_limit(std::size_t limit)
  {
    this->limit = limit;
  }

private:
  std::size_t limit;
  std::size_t tokens;
  std::chrono::milliseconds fill_duration;
  std::function<bool()> callback;

  void add_token()
  {
    if (this->callback() && this->tokens != limit)
      this->tokens++;
  }
};

M src/xmpp/biboumi_adhoc_commands.cpp => src/xmpp/biboumi_adhoc_commands.cpp +25 -1
@@ 366,6 366,15 @@ void ConfigureIrcServerStep1(XmppComponent&, AdhocSession& session, XmlNode& com
    }

  {
    XmlSubNode throttle_limit(x, "field");
    throttle_limit["var"] = "throttle_limit";
    throttle_limit["type"] = "text-single";
    throttle_limit["label"] = "Throttle limit";
    XmlSubNode value(throttle_limit, "value");
    value.set_inner(std::to_string(options.col<Database::ThrottleLimit>()));
  }

  {
  XmlSubNode encoding_out(x, "field");
  encoding_out["var"] = "encoding_out";
  encoding_out["type"] = "text-single";


@@ 392,8 401,10 @@ void ConfigureIrcServerStep1(XmppComponent&, AdhocSession& session, XmlNode& com
  }
}

void ConfigureIrcServerStep2(XmppComponent&, AdhocSession& session, XmlNode& command_node)
void ConfigureIrcServerStep2(XmppComponent& xmpp_component, AdhocSession& session, XmlNode& command_node)
{
  auto& biboumi_component = dynamic_cast<BiboumiComponent&>(xmpp_component);

  const XmlNode* x = command_node.get_child("x", "jabber:x:data");
  if (x)
    {


@@ 474,6 485,19 @@ void ConfigureIrcServerStep2(XmppComponent&, AdhocSession& session, XmlNode& com
          else if (field->get_tag("var") == "realname" && value)
            options.col<Database::Realname>() = value->get_inner();

          else if (field->get_tag("var") == "throttle_limit" && value)
            {
              options.col<Database::ThrottleLimit>() = std::stoull(value->get_inner());
              Bridge* bridge = biboumi_component.find_user_bridge(session.get_owner_jid());
              if (bridge)
                {
                  IrcClient* client = bridge->find_irc_client(server_domain);
                  if (client)
                    client->set_throttle_limit(options.col<Database::ThrottleLimit>());
                }

            }

          else if (field->get_tag("var") == "encoding_out" && value)
            options.col<Database::EncodingOut>() = value->get_inner();


M tests/end_to_end/__main__.py => tests/end_to_end/__main__.py +22 -7
@@ 1310,16 1310,16 @@ if __name__ == '__main__':
                     ]),

                     # Send a multi-line channel message
                     partial(send_stanza, "<message id='the-message-id' from='{jid_one}/{resource_one}' to='#foo%{irc_server_one}' type='groupchat'><body>un\ndeux\ntrois</body></message>"),
                     partial(send_stanza, "<message id='the-message-id' from='{jid_one}/{resource_one}' to='#foo%{irc_server_one}' type='groupchat'><body>a\nb\nc</body></message>"),
                     # Receive multiple messages, for each user
                     partial(expect_unordered, [
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id='the-message-id'][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='un']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='deux']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='trois']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id='the-message-id'][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='a']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='b']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='c']",),

                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='un']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='deux']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='trois']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='a']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='b']",),
                         ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='c']",),
                     ])
                 ]),
        Scenario("channel_messages",


@@ 2073,6 2073,21 @@ if __name__ == '__main__':
        Scenario("join_history_limits",
                 [
                     handshake_sequence(),

                     # Disable the throttling because the test is based on timings
                     partial(send_stanza, "<iq type='set' id='id1' from='{jid_one}/{resource_one}' to='{irc_server_one}'><command xmlns='http://jabber.org/protocol/commands' node='configure' action='execute' /></iq>"),
                     partial(expect_stanza, "/iq[@type='result']",
                             after = partial(save_value, "sessionid", partial(extract_attribute, "/iq[@type='result']/commands:command[@node='configure']", "sessionid"))),
                     partial(send_stanza, "<iq type='set' id='id2' from='{jid_one}/{resource_one}' to='{irc_server_one}'>"
                                           "<command xmlns='http://jabber.org/protocol/commands' node='configure' sessionid='{sessionid}' action='next'>"
                                           "<x xmlns='jabber:x:data' type='submit'>"
                                           "<field var='ports'><value>6667</value></field>"
                                           "<field var='tls_ports'><value>6697</value><value>6670</value></field>"
                                           "<field var='throttle_limit'><value>9999</value></field>"
                                           "</x></command></iq>"),
                      partial(expect_stanza, "/iq[@type='result']/commands:command[@node='configure'][@status='completed']/commands:note[@type='info'][text()='Configuration successfully applied.']"),


                     partial(send_stanza,
                             "<presence from='{jid_one}/{resource_one}' to='#foo%{irc_server_one}/{nick_one}' />"),
                     connection_sequence("irc.localhost", '{jid_one}/{resource_one}'),