A IQManager.hs => IQManager.hs +75 -0
@@ 0,0 1,75 @@
+module IQManager (iqManager) where
+
+import Prelude ()
+import BasicPrelude
+import Control.Concurrent.STM (
+ STM, TMVar, TVar, modifyTVar', newEmptyTMVar, newTVar, orElse,
+ readTVar, takeTMVar, tryPutTMVar, writeTVar
+ )
+import Control.Concurrent.STM.Delay (newDelay, waitDelay)
+import UnexceptionalIO.Trans (Unexceptional)
+import qualified Data.Map.Strict as Map
+import qualified Network.Protocol.XMPP as XMPP
+import qualified Data.UUID as UUID
+import qualified Data.UUID.V4 as UUID
+
+import Util
+
+type ResponseMap = Map.Map (Maybe Text) (TMVar XMPP.IQ)
+
+iqSendTimeoutMicroseconds :: Int
+iqSendTimeoutMicroseconds = 5000000
+
+iqDefaultID :: (Unexceptional m) => XMPP.IQ -> m XMPP.IQ
+iqDefaultID iq@XMPP.IQ { XMPP.iqID = Just _ } = return iq
+iqDefaultID iq = do
+ uuid <- fromIO_ UUID.nextRandom
+ return $ iq {
+ XMPP.iqID = Just $ UUID.toText uuid
+ }
+
+iqSenderUnexceptional :: (Unexceptional m) =>
+ (XMPP.IQ -> m ())
+ -> TVar ResponseMap
+ -> XMPP.IQ
+ -> m (STM (Maybe XMPP.IQ))
+iqSenderUnexceptional sender responseMapVar iq = do
+ iqToSend <- iqDefaultID iq
+ timeout <- fromIO_ $ newDelay iqSendTimeoutMicroseconds
+ iqResponseVar <- atomicUIO newEmptyTMVar
+ atomicUIO $ modifyTVar' responseMapVar $
+ Map.insert (XMPP.iqID iqToSend) iqResponseVar
+ sender iqToSend
+ return (
+ (waitDelay timeout *> pure Nothing)
+ `orElse`
+ fmap Just (takeTMVar iqResponseVar)
+ )
+
+iqReceiver :: (Unexceptional m) => TVar ResponseMap -> XMPP.IQ -> m (Maybe XMPP.IQ)
+iqReceiver responseMapVar receivedIQ
+ | XMPP.iqType receivedIQ `elem` [XMPP.IQResult, XMPP.IQError] = do
+ maybeIqResponseVar <- atomicUIO $ do
+ responseMap <- readTVar responseMapVar
+ let (maybeIqResponseVar, responseMap') =
+ Map.updateLookupWithKey
+ (const $ const Nothing)
+ (XMPP.iqID receivedIQ) responseMap
+ writeTVar responseMapVar $! responseMap'
+ return maybeIqResponseVar
+ case maybeIqResponseVar of
+ Just iqResponseVar -> do
+ atomicUIO $ tryPutTMVar iqResponseVar receivedIQ
+ return Nothing
+ Nothing -> return (Just receivedIQ)
+ | otherwise = return $ Just receivedIQ
+
+iqManager :: (Unexceptional m1, Unexceptional m2, Unexceptional m3) =>
+ (XMPP.IQ -> m2 ()) ->
+ m1 (XMPP.IQ -> m2 (STM (Maybe XMPP.IQ)), XMPP.IQ -> m3 (Maybe XMPP.IQ))
+iqManager sender = do
+ responseMapVar <- atomicUIO $ newTVar Map.empty
+ return (
+ iqSenderUnexceptional sender responseMapVar,
+ iqReceiver responseMapVar
+ )
M Util.hs => Util.hs +13 -0
@@ 5,8 5,12 @@ import BasicPrelude
import Control.Applicative (many)
import Control.Concurrent
(ThreadId, forkFinally, myThreadId, throwTo)
+import Control.Concurrent.STM (STM, atomically)
import Data.Time.Clock (UTCTime)
import Data.Time.Format (parseTimeM, defaultTimeLocale)
+import Data.Void (absurd)
+import UnexceptionalIO (Unexceptional)
+import qualified UnexceptionalIO as UIO
import qualified Control.Exception as Ex
import qualified Data.Attoparsec.Text as Atto
import qualified Data.Text as Text
@@ 15,9 19,18 @@ import qualified Network.Protocol.XMPP as XMPP
import qualified Config
+instance Unexceptional XMPP.XMPP where
+ lift = liftIO . UIO.lift
+
s :: (IsString s) => String -> s
s = fromString
+fromIO_ :: (Unexceptional m) => IO a -> m a
+fromIO_ = fmap (either absurd id) . UIO.fromIO' (error . show)
+
+atomicUIO :: (Unexceptional m) => STM a -> m a
+atomicUIO = fromIO_ . atomically
+
escapeJid :: Text -> Text
escapeJid txt = mconcat result
where
M cheogram-muc-bridge.cabal => cheogram-muc-bridge.cabal +8 -3
@@ 17,16 17,21 @@ common defs
basic-prelude >=0.7 && <0.8,
bytestring >=0.10 && <0.11,
containers >=0.5 && <0.6,
- dhall >= 1.24 && < 2.0,
+ dhall >= 1.24 && <2.0,
errors >=2.3 && <2.4,
- network >= 2.6.3 && < 2.7,
+ network >= 2.6.3 && <2.7,
network-protocol-xmpp >=0.4 && <0.5,
sqlite-simple >= 0.4 && <0.5,
+ stm >= 2.4 && <3.0,
+ stm-delay >= 0.1 && < 0.2,
text >=1.2 && <1.3,
time >=1.5 && <2.0,
+ unexceptionalio >= 0.5 && <0.6,
+ unexceptionalio-trans >= 0.5 && <0.6,
+ uuid >= 1.3 && <2.0,
xml-types >=0.3 && <0.4
executable gateway
import: defs
main-is: gateway.hs
- other-modules: Router, Util, Config, ConfigFile, Session
+ other-modules: Router, Util, Config, ConfigFile, Session, IQManager
M gateway.hs => gateway.hs +47 -3
@@ 5,6 5,9 @@ import BasicPrelude
import System.IO
(stdout, stderr, hSetBuffering, BufferMode(LineBuffering))
import Control.Error (exceptT, justZ)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.STM (STM)
+import qualified Database.SQLite.Simple as DB
import qualified Data.Text as T
import qualified Data.XML.Types as XML
import qualified Network.Protocol.XMPP as XMPP
@@ 13,6 16,7 @@ import qualified Config
import qualified Session
import Router
import Util
+import IQManager
hasMucCode :: Int -> XMPP.Presence -> Bool
hasMucCode code XMPP.Presence { XMPP.presencePayloads = p } =
@@ 126,11 130,47 @@ handleIq _ _ = return ()
joinFromBridge :: Config.Config -> XMPP.JID -> XMPP.XMPP ()
joinFromBridge config muc = do
- Session.mkSession config XMPP.PresenceAvailable Nothing muc
- XMPP.putStanza $ (mucJoin muc (Config.nick config)) {
+ Session.mkSession config XMPP.PresenceAvailable Nothing target
+ XMPP.putStanza presence
+ where
+ Just target = XMPP.presenceTo presence
+ presence = (mucJoin muc (Config.nick config)) {
XMPP.presenceFrom = Just $ Config.bridgeJid config
}
+pingSuccessError :: XML.Element -> [XML.Element]
+pingSuccessError = uncurry (<|>) . (uncurry (<|>) . (
+ XML.isNamed (s"{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavaliable")
+ &&&
+ XML.isNamed (s"{urn:ietf:params:xml:ns:xmpp-stanzas}feature-not-implemented")
+ ) &&&
+ XML.isNamed (s"{urn:ietf:params:xml:ns:xmpp-stanzas}item-not-found")
+ )
+
+selfPings :: Config.Config -> (XMPP.IQ -> XMPP.XMPP (STM (Maybe XMPP.IQ))) -> XMPP.XMPP ()
+selfPings config sendIQ = forever $ do
+ liftIO $ threadDelay 60000000
+ sessions <- liftIO $ DB.query_ (Config.db config) (s"SELECT source_muc, source_nick, target_muc, target_nick FROM sessions")
+ forM_ sessions $ \(sourceMuc, sourceNick, targetMuc, targetNick) -> void $ forkXMPP $ do
+ let Just target = XMPP.parseJID (targetMuc ++ s"/" ++ targetNick)
+ reply <- (atomicUIO =<<) $ sendIQ $ (XMPP.emptyIQ XMPP.IQGet) {
+ XMPP.iqFrom = sourceJid sourceMuc sourceNick,
+ XMPP.iqTo = Just target,
+ XMPP.iqPayload = Just $ XML.Element (s"{urn:xmpp:ping}ping") [] []
+ }
+ if (XMPP.iqType <$> reply) == Just XMPP.IQResult then return () else
+ case pingSuccessError =<< XML.elementChildren =<< justZ (XMPP.iqPayload =<< reply) of
+ (_:_) -> return ()
+ _ | sourceMuc == mempty -> joinFromBridge config target
+ _ ->
+ Session.sendPresence config ((mucJoin target targetNick) {
+ XMPP.presenceFrom = XMPP.parseJID (sourceMuc ++ s"/" ++ sourceNick)
+ }) target
+ where
+ sourceJid muc nick
+ | muc == mempty = Just $ Config.bridgeJid config
+ | otherwise = proxyJid config <$> XMPP.parseJID (muc ++ s"/" ++ nick)
+
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
@@ 145,13 185,17 @@ main = do
exceptT print return $
runRoutedComponent server (Config.secret config) $ do
+ (sendIQ, iqReceiver) <- iqManager XMPP.putStanza
forM_ (Config.mucs config) $ \bridge -> do
joinFromBridge config (Config.muc1 bridge)
joinFromBridge config (Config.muc2 bridge)
+ void $ forkXMPP $ selfPings config sendIQ
return $ defaultRoutes {
presenceRoute = handlePresence config,
presenceErrorRoute = handlePresenceError config,
messageGroupChatRoute = handleGroupChat config,
messageRoute = handleMessage config,
- iqRoute = handleIq config
+ iqRoute = \iq -> do
+ maybeIq <- iqReceiver iq
+ forM_ maybeIq $ handleIq config
}