~singpolyma/cheogram

248f58010212fbb48b508c8d5683e7fd147e1c6e — Stephen Paul Weber 12 days ago 7a900b7 + 5c74171
Merge branch 'stats'

* stats:
  Add stats counter for cmd list fetch
  Add stats counter for adhoc-bot cmd run
  Add stats counter for adhoc-bot msg received
  Add stats counter for service start up
  Add stats counter for malformed cacheobb url
1 files changed, 41 insertions(+), 8 deletions(-)

M Main.hs
M Main.hs => Main.hs +41 -8
@@ 735,6 735,7 @@ handleRegister _ _ iq _ = do

data ComponentContext = ComponentContext {
	db :: DB.DB,
	pushStatsd :: [StatsD.Stat] -> IO (),
	smsJid :: Maybe JID,
	registrationJids :: [JID],
	adhocBotMessage :: Message -> STM (),


@@ 1016,7 1017,7 @@ componentStanza (ComponentContext { db, componentJid }) (ReceivedIQ iq@(IQ { iqF
	| iqType iq `elem` [IQGet, IQSet],
	  [query] <- isNamed (fromString "{jabber:iq:register}query") p = do
		handleRegister db componentJid iq query
componentStanza (ComponentContext { db, componentJid, maybeAvatar, sendIQ }) (ReceivedIQ (IQ { iqType = IQGet, iqFrom = Just from, iqTo = Just to, iqID = id, iqPayload = Just p }))
componentStanza (ComponentContext { db, pushStatsd, componentJid, maybeAvatar, sendIQ }) (ReceivedIQ (IQ { iqType = IQGet, iqFrom = Just from, iqTo = Just to, iqID = id, iqPayload = Just p }))
	| Nothing <- jidNode to,
	  [q] <- isNamed (fromString "{http://jabber.org/protocol/disco#info}query") p = do
		payload <- cheogramDiscoInfo db componentJid sendIQ from (Just q)


@@ 1029,6 1030,9 @@ componentStanza (ComponentContext { db, componentJid, maybeAvatar, sendIQ }) (Re
	| Nothing <- jidNode to,
	  [s"http://jabber.org/protocol/commands"] ==
	    mapMaybe (attributeText (s"node")) (isNamed (fromString "{http://jabber.org/protocol/disco#items}query") p) = do

		pushStatsd [StatsD.stat ["cmd-list", "fetch"] 1 "c" Nothing]

		routeQueryOrReply db componentJid from componentJid ("CHEOGRAM%query-then-send-command-list%" ++ extra) queryCommandList (commandList componentJid id to from [])
	| Nothing <- jidNode to,
	  [_] <- isNamed (s"{vcard-temp}vCard") p =


@@ 1287,6 1291,7 @@ cacheOneOOB magic pushStatsd jingleStore jingleStoreURL oob
						}
					)
	| otherwise = do
		pushStatsd [StatsD.stat ["cache", "oob", "malformed"] 1 "c" Nothing]
		log "cacheOneOOB MALFORMED" oob
		return (Nothing, oob)
	where


@@ 1306,11 1311,32 @@ cacheOOB magic pushStatsd jingleStore jingleStoreURL m@(XMPP.Message { XMPP.mess
	(body, noOobsNoBody) = partition (\el -> XML.elementName el == bodyName) noOobs
	(oobs, noOobs) = partition (\el -> XML.elementName el == oobName) payloads

component :: DB.DB
                     -> Redis.Connection
                     -> ([StatsD.Stat] -> UIO ())
                     -> Text
                     -> Text
                     -> Maybe Avatar
                     -> (Message -> UIO Message)
                     -> (IQ -> UIO (STM (Maybe IQ)))
                     -> (IQ -> XMPP ())
                     -> (Message -> STM ())
                     -> TChan RoomPresences
                     -> TChan RejoinManagerCommand
                     -> TChan JoinPartDebounce
                     -> TChan StanzaRec
                     -> TChan ReceivedStanza
                     -> (IQ -> IO (Maybe IQ))
                     -> (IQ -> XMPP ())
                     -> JID
                     -> [JID]
                     -> [Text]
                     -> XMPP ()
component db redis pushStatsd backendHost did maybeAvatar cacheOOB sendIQ iqReceiver adhocBotMessage toRoomPresences toRejoinManager toJoinPartDebouncer toComponent toStanzaProcessor processDirectMessageRouteConfig jingleHandler componentJid registrationJids conferenceServers = do
	sendThread <- forkXMPP $ forever $ flip catchError (log "component EXCEPTION") $ do
		stanza <- liftIO $ hasLocked "read toComponent" $ atomically $ readTChan toComponent

		pushStatsd [StatsD.stat ["stanzas", "out"] 1 "c" Nothing]
		UIO.lift $ pushStatsd [StatsD.stat ["stanzas", "out"] 1 "c" Nothing]
		putStanza =<< (liftIO . ensureId) stanza

	recvThread <- forkXMPP $ forever $ flip catchError (\e -> do


@@ 1324,7 1350,7 @@ component db redis pushStatsd backendHost did maybeAvatar cacheOOB sendIQ iqRece

	flip catchError (\e -> liftIO (log "component part 2 EXCEPTION" e >> killThread sendThread >> killThread recvThread)) $ forever $ do
		stanza <- liftIO $ hasLocked "read toStanzaProcessor" $ atomicUIO $ readTChan toStanzaProcessor
		pushStatsd [StatsD.stat ["stanzas", "in"] 1 "c" Nothing]
		UIO.lift $ pushStatsd [StatsD.stat ["stanzas", "in"] 1 "c" Nothing]
		liftIO $ forkIO $ case stanza of
			(ReceivedPresence p@(Presence { presenceType = PresenceAvailable, presenceFrom = Just from, presenceTo = Just to }))
				| Just returnFrom <- parseJID (bareTxt to ++ s"/capsQuery") ->


@@ 1499,7 1525,7 @@ component db redis pushStatsd backendHost did maybeAvatar cacheOOB sendIQ iqRece
				  (nameNamespace $ elementName p) `elem` [Just (s"urn:xmpp:jingle:1"), Just (s"http://jabber.org/protocol/ibb")] -> do
					jingleHandler iq
				| otherwise -> liftIO $
					mapM_ sendToComponent =<< componentStanza (ComponentContext db backendTo registrationJids adhocBotMessage cacheOOB toRoomPresences toRejoinManager toJoinPartDebouncer processDirectMessageRouteConfig componentJid sendIQ maybeAvatar) stanza
					mapM_ sendToComponent =<< componentStanza (ComponentContext db (UIO.lift . pushStatsd) backendTo registrationJids adhocBotMessage cacheOOB toRoomPresences toRejoinManager toJoinPartDebouncer processDirectMessageRouteConfig componentJid sendIQ maybeAvatar) stanza
	where
	mapToComponent = mapToBackend (formatJID componentJid)
	sendToComponent = hasLocked "sendToComponent" . atomically . writeTChan toComponent


@@ 2075,8 2101,8 @@ joinPartDebouncer db backendHost sendToComponent componentJid toRoomPresences to
			(_, state') -> return state'


adhocBotManager :: (UIO.Unexceptional m) => DB.DB -> JID -> (XMPP.Message -> UIO.UIO ()) -> (XMPP.IQ -> UIO.UIO (STM (Maybe XMPP.IQ))) -> (STM XMPP.Message) -> m ()
adhocBotManager db componentJid sendMessage sendIQ messages = do
adhocBotManager :: (UIO.Unexceptional m) => DB.DB -> ([StatsD.Stat] -> UIO ()) -> JID -> (XMPP.Message -> UIO.UIO ()) -> (XMPP.IQ -> UIO.UIO (STM (Maybe XMPP.IQ))) -> (STM XMPP.Message) -> m ()
adhocBotManager db pushStatsd componentJid sendMessage sendIQ messages = do
	cleanupChan <- atomicUIO newTChan
	statefulManager cleanupChan Map.empty
	where


@@ 2087,12 2113,16 @@ adhocBotManager db componentJid sendMessage sendIQ messages = do

	processMessage cleanupChan sessions message = do
		-- XXX: At some point this should not include resource, but it makes it easy to test for now
		UIO.lift $ pushStatsd [StatsD.stat ["adhoc-bot", "msg-recv"] 1 "c" Nothing]

		let key = bareTxt <$> (XMPP.stanzaFrom message)
		sessions' <- case Map.lookup key sessions of
			Just input -> input message >> return sessions
			Nothing -> do
				newChan <- atomicUIO newTChan

				UIO.forkFinally (adhocBotSession db componentJid sendMessage sendIQ (readTChan newChan) message) (\result -> do
						pushStatsd [StatsD.stat ["adhoc-bot", "cmd-run"] 1 "c" Nothing]
						fromIO_ $ either (log "adhocBotManager") (const $ return ()) result
						atomicUIO $ writeTChan cleanupChan key
					)


@@ 2158,10 2188,11 @@ main = do
			toRejoinManager <- atomically newTChan

			statsd <- openStatsD statsdHost (show statsdPort) ["cheogram"]
			let pushStatsd = void . UIO.fromIO . StatsD.push statsd

			(sendIQ, iqReceiver) <- iqManager $ atomicUIO . writeTChan sendToComponent . mkStanzaRec
			adhocBotMessages <- atomically newTChan
			void $ forkFinally (adhocBotManager db componentJid (atomicUIO . writeTChan sendToComponent . mkStanzaRec) sendIQ (readTChan adhocBotMessages)) (log "adhocBotManagerTOP")
			void $ forkFinally (adhocBotManager db pushStatsd componentJid (atomicUIO . writeTChan sendToComponent . mkStanzaRec) sendIQ (readTChan adhocBotMessages)) (log "adhocBotManagerTOP")

			void $ forkFinally (void $ joinPartDebouncer db backendHost (atomically . writeTChan sendToComponent) componentJid toRoomPresences toJoinPartDebouncer) (log "joinPartDebouncerTOP")
			void $ forkFinally (void $ roomPresences db toRoomPresences) (log "roomPresencesTOP")


@@ 2268,11 2299,13 @@ main = do
						Nothing -> iqNotImplemented iq
				)

			let pushStatsd = void . UIO.fromIO . StatsD.push statsd

			maybeAvatar <- mapM mkAvatar maybeAvatarPath

			log "" "runComponent STARTING"

			UIO.lift $ pushStatsd [StatsD.stat ["service", "start"] 1 "c" Nothing]

			log "runComponent ENDED" =<< runComponent (Server componentJid host port) secret
				(component db presenceRedis (UIO.lift . pushStatsd) backendHost did maybeAvatar (cacheOOB magic (UIO.lift . pushStatsd) jingleStore jingleStoreURL) sendIQ iqReceiver (writeTChan adhocBotMessages) toRoomPresences toRejoinManager toJoinPartDebouncer sendToComponent toStanzaProcessor processDirectMessageRouteConfig jingleHandler componentJid [registrationJid] conferences)
		_ -> log "ERROR" "Bad arguments"