~singpolyma/jingle-xmpp

8e697719004d3b69b6af24a2f0645892cec539a7 — Christopher Vollick 2 years ago 5df1fb6
Complete Uploads Once They Reach Size

The spec for Jingle claims either side can terminate the session, but suggests
that the receiver should, since it knows when it's received things.

The current code expects the sender to do it, and Conversations expects the
receiver to do it, leading to both sides waiting for the other to do something.

This change on its own is insufficient to actually fix this, but it's a big
step. We pull the size information we're given out of the session
initialization and store it in a cache, similar to how other session
information is currently stored.

Then, when the socks server detects a connection, it checks to see if there's a
size recorded for this connection. If there isn't, that's fine, it just does
what it currently does, which is hope the other side will close the socket when
it's done so the EOF will unstick things.
But if there is a size listed, then when it gets to full size it now returns.

In order for this to be fixed something will need to terminate the session
after the chunks are done being read. Eventually this should live in this repo,
but for now the code lives in Cheogram as part of the handler for the file
upload finishing.

Rather than try to fix that now, we'll do that later and just make sure we
return, so that code is able to run.

Implementation notes:
- hGet was changed to hGetSome because hGet will buffer until it gets the right
  amount of bytes, or EOF
	Since we're expecting the upload to hang when it's done, we need the code to
  unstick so we can count the new total and realize we're done. hGetSome is
  like a non-blocking hGet, but it it blocks on 0. So if there's no data, it
  will wait, but as soon as there's any data it will return right away.
- If there are multiple files in the same session that would be bad, but it
	seems like none of the supported clients do that. Other things already may
  not work in that case
3 files changed, 78 insertions(+), 23 deletions(-)

M Jingle.hs
M Jingle/Socks5Server.hs
M StoreChunks.hs
M Jingle.hs => Jingle.hs +62 -14
@@ 116,15 116,17 @@ s5bPseudoTsid tsid jid1 jid2 =
sessionInitiate ::
	   (Text, Socket.PortNumber)
	-> (JingleTSID -> UIO ())
	-> (JingleTSID -> Int -> UIO ())
	-> XMPP.IQ
	-> Text
	-> [XML.Element]
	-> Text
	-> Maybe Int
	-> XMPP.XMPP ()
sessionInitiate hostPort newTransport iq@XMPP.IQ {
sessionInitiate hostPort newTransport setSize iq@XMPP.IQ {
	XMPP.iqTo = Just to,
	XMPP.iqFrom = Just from
} sid content contentName
} sid content contentName size
	| [tsid] <- elementAttributeText
	            (s"sid")
	            (s"{urn:xmpp:jingle:transports:s5b:1}transport")


@@ 132,6 134,10 @@ sessionInitiate hostPort newTransport iq@XMPP.IQ {
		liftIO $ UIO.run $ newTransport (JingleTSID tsid)
		liftIO $ UIO.run $ newTransport $ s5bPseudoTsid tsid from to
		liftIO $ UIO.run $ newTransport $ s5bPseudoTsid tsid to from
		liftIO $ UIO.run $ forM_ size $ \inner -> do
			setSize (JingleTSID tsid) inner
			setSize (s5bPseudoTsid tsid from to) inner
			setSize (s5bPseudoTsid tsid to from) inner
		XMPP.putStanza $ iqReply Nothing iq

		-- In a strange case of IQ use, we don't really care about the


@@ 156,7 162,7 @@ sessionInitiate hostPort newTransport iq@XMPP.IQ {
				(s"dontcare-session-accept") $
					sessionAccept sid contentName to $
					ibbTransport tsid
sessionInitiate _ _ iq _ _ _ = XMPP.putStanza $ iqError notImplemented iq
sessionInitiate _ _ _ iq _ _ _ _ = XMPP.putStanza $ iqError notImplemented iq

jingleAction :: Text -> XMPP.IQ -> Bool
jingleAction action iq =


@@ 171,6 177,27 @@ fileTransferDescription = uncurry (<|>) . (
	XML.isNamed (s"{urn:xmpp:jingle:apps:file-transfer:3}description")
	)

fileSizeFromDescription :: XML.Element -> Maybe Int
fileSizeFromDescription description = readZ $ textToString $ mconcat $
	XML.elementText =<< fileTransferSize =<<
	XML.elementChildren =<< fileTransferFile =<<
	XML.elementChildren description

fileTransferFile :: XML.Element -> [XML.Element]
fileTransferFile = uncurry (<|>) . (
	XML.isNamed (s"{urn:xmpp:jingle:apps:file-transfer:5}file")
	&&&
	XML.isNamed (s"{urn:xmpp:jingle:apps:file-transfer:3}file")
	)

fileTransferSize :: XML.Element -> [XML.Element]
fileTransferSize = uncurry (<|>) . (
	XML.isNamed (s"{urn:xmpp:jingle:apps:file-transfer:5}size")
	&&&
	XML.isNamed (s"{urn:xmpp:jingle:apps:file-transfer:3}size")
	)


jingleTransport :: XML.Element -> [XML.Element]
jingleTransport = uncurry (<|>) . (
	XML.isNamed (s"{urn:xmpp:jingle:transports:s5b:1}transport")


@@ 182,19 209,21 @@ jingleHandler' ::
	   (Text, Socket.PortNumber)
	-> (JingleSID -> XMPP.IQ -> UIO ())
	-> (JingleSID -> JingleTSID -> UIO ())
	-> (JingleTSID -> Int -> UIO ())
	-> XMPP.IQ
	-> [XML.Element]
	-> Text
	-> XMPP.XMPP ()
jingleHandler' hostPort newSession newTransport iq@XMPP.IQ {
jingleHandler' hostPort newSession newTransport setSize iq@XMPP.IQ {
	XMPP.iqFrom = Just from
} children sid
	| jingleAction (s"session-initiate") iq,
	  (_:_) <- fileTransferDescription `overChildrenOf` content = do
	  (desc:_) <- fileTransferDescription `overChildrenOf` content = do
		liftIO $ UIO.run $ newSession (JingleSID sid) iq
		sessionInitiate hostPort
			(newTransport (JingleSID sid))
			iq sid content contentName
			setSize
			iq sid content contentName (fileSizeFromDescription desc)
	| jingleAction (s"transport-info") iq,
	  (_:_) <- jingleTransport `overChildrenOf` content =
		XMPP.putStanza $ iqReply Nothing iq


@@ 219,18 248,19 @@ jingleHandler' hostPort newSession newTransport iq@XMPP.IQ {
	content = XML.isNamed (s"{urn:xmpp:jingle:1}content") =<< children
	contentName = fromMaybe mempty $
		XML.attributeText (s"name") =<< headZ content
jingleHandler' _ _ _ iq _ _ = XMPP.putStanza $ iqError notImplemented iq
jingleHandler' _ _ _ _ iq _ _ = XMPP.putStanza $ iqError notImplemented iq

jingleHandler ::
	   (Text, Socket.PortNumber)
	-> (JingleSID -> XMPP.IQ -> UIO ())
	-> (JingleSID -> JingleTSID -> UIO ())
	-> (JingleTSID -> Int -> UIO ())
	-> XMPP.IQ
	-> XML.Element
	-> XMPP.XMPP ()
jingleHandler hostPort newSession newTransport iq jingle
jingleHandler hostPort newSession newTransport setSize iq jingle
	| Just sid <- XML.attributeText (s"sid") jingle =
		jingleHandler' hostPort newSession newTransport
		jingleHandler' hostPort newSession newTransport setSize
			iq (XML.elementChildren jingle) sid
	| otherwise = XMPP.putStanza $ iqError notImplemented iq



@@ 251,7 281,7 @@ ibbHandler storePath transferFinished iq
			fmap sort (listDirectory dir)
		let tmpName = textToString sid ++ "/FINAL"
		(path, _, _, _) <- liftIO $ UIO.runEitherIO $
			storeChunks storePath tmpName $ do
			storeChunks Nothing storePath tmpName $ do
				chunkFiles <- fromIO_ $ readIORef chunkRef
				case chunkFiles of
					[] -> return mempty


@@ 281,12 311,13 @@ iqSetHandler ::
	-> (Text, Socket.PortNumber)
	-> (JingleSID -> XMPP.IQ -> UIO ())
	-> (JingleSID -> JingleTSID -> UIO ())
	-> (JingleTSID -> Int -> UIO ())
	-> (JingleTSID -> FilePath -> UIO ())
	-> XMPP.IQ
	-> XMPP.XMPP ()
iqSetHandler storePath hostPort newSession newTransport transportDone iq
iqSetHandler storePath hostPort newSession newTransport setSize transportDone iq
	| Just jingle <- child (s"{urn:xmpp:jingle:1}jingle") iq =
		jingleHandler hostPort newSession newTransport iq jingle
		jingleHandler hostPort newSession newTransport setSize iq jingle
	| Just (s"http://jabber.org/protocol/ibb") ==
	  (XML.nameNamespace =<< XML.elementName <$> XMPP.iqPayload iq) =
		ibbHandler storePath transportDone iq


@@ 312,6 343,22 @@ tsidToSidMap notifyBySid =
				Just sid -> notifyBySid sid path
	)

tsidToSizeMap :: (Unexceptional m) =>
	m (
		JingleTSID -> Int -> UIO (),
		JingleTSID -> UIO (Maybe Int)
	)
tsidToSizeMap =
	fromIO_ (Cache.newCache (Just $ TimeSpec 900 0)) >>= \cache ->
	return (
		\(JingleTSID tsid) size -> do
			fromIO_ $ Cache.purgeExpired cache
			fromIO_ $ Cache.insert cache tsid size
		,
		\(JingleTSID tsid) -> do
			fromIO_ $ Cache.lookup' cache tsid
	)

sidToIqMap :: (Unexceptional m) =>
	   (XMPP.IQ -> FilePath -> UIO ())
	-> m (


@@ 343,8 390,9 @@ setupJingleHandlers :: (Unexceptional m) =>
setupJingleHandlers storePath ports hostPort logger transferDoneIq = do
	(newSession, transferDone) <- sidToIqMap transferDoneIq
	(newTransport, transportDone) <- tsidToSidMap transferDone
	(setSize, getSize) <- tsidToSizeMap
	(fmap.fmap) (\() ->
			iqSetHandler storePath hostPort
				newSession newTransport transportDone
				newSession newTransport setSize transportDone
		) $
		Socks5Server.start storePath ports logger transportDone
		Socks5Server.start storePath ports logger getSize transportDone

M Jingle/Socks5Server.hs => Jingle/Socks5Server.hs +6 -4
@@ 19,9 19,10 @@ start :: (Unexceptional m) =>
	   FilePath
	-> [Socket.SockAddr]
	-> (String -> UIO ())
	-> (JingleTSID -> UIO (Maybe Int))
	-> (JingleTSID -> FilePath -> UIO ())
	-> m (Either IOError ())
start storePath ports logger transferFinished = UIO.fromIO' (error.show) $
start storePath ports logger getSize transferFinished = UIO.fromIO' (error.show) $
	forM_ ports $ \port -> do
		Just family <- return $ addrToFamily port
		socket <- Socket.socket family Socket.Stream 0


@@ 52,7 53,7 @@ start storePath ports logger transferFinished = UIO.fromIO' (error.show) $
		-- Should these just get logged, or should the process die if we
		-- cannot restart?
		either exceptionLogger return =<<
			start storePath [port] logger transferFinished
			start storePath [port] logger getSize transferFinished

	afterAfterAccept (Right (Right ())) = return ()
	afterAfterAccept (Right (Left e)) = exceptionLogger e


@@ 63,9 64,10 @@ start storePath ports logger transferFinished = UIO.fromIO' (error.show) $
		let SocksAddrDomainName addr = requestDstAddr request
		let taddr = decodeUtf8 addr
		let saddr = textToString taddr
		size <- UIO.run $ getSize (JingleTSID taddr)
		sendSerialized conn $ SocksResponse
			SocksReplySuccess (SocksAddrDomainName addr) 0
		h <- Socket.socketToHandle conn ReadMode
		(path, _, _, _) <- UIO.runEitherIO $ storeChunks storePath saddr
			(fromIO_ $ Just <$> ByteString.hGet h 4096)
		(path, _, _, _) <- UIO.runEitherIO $ storeChunks size storePath saddr
			(fromIO_ $ Just <$> ByteString.hGetSome h 4096)
		UIO.run $ transferFinished (JingleTSID taddr) path

M StoreChunks.hs => StoreChunks.hs +10 -5
@@ 21,7 21,8 @@ createLinkIfNotExist target link = do
	when (not exist) $ createFileLink target link

storeChunks :: (Unexceptional m) =>
	   FilePath
	   Maybe Int
	-> FilePath
	-> String
	-> UIO (Maybe ByteString)
	-> m (


@@ 29,19 30,22 @@ storeChunks :: (Unexceptional m) =>
			IOError
			(FilePath, Digest SHA1, Digest SHA256, Digest SHA512)
		)
storeChunks storePath tmpName getChunk = loop hashInit hashInit hashInit
storeChunks size storePath tmpName getChunk = loop 0 hashInit hashInit hashInit
	where
	tmpPath = storePath ++ "/tmp/" ++ tmpName
	cidPath digest = storePath ++ "/" ++ textToString (digestCID digest)
	loop sha1 sha256 sha512 = do
	loop currentSize sha1 sha256 sha512
		| Just totalSize <- size,
		  currentSize >= totalSize = finish sha1 sha256 sha512
	loop currentSize sha1 sha256 sha512 = do
		maybeChunk <- UIO.lift getChunk
		case maybeChunk of
			Nothing -> return $ Left $ userError "Failed to getChunk"
			Just chunk
				| ByteString.null chunk -> finish sha1 sha256 sha512
				| otherwise -> step sha1 sha256 sha512 chunk
				| otherwise -> step currentSize sha1 sha256 sha512 chunk

	step sha1 sha256 sha512 chunk = do
	step currentSize sha1 sha256 sha512 chunk = do
		result <- UIO.fromIO' (error.show) $ do
			createDirectoryIfMissing True (storePath ++ "/tmp")
			ByteString.appendFile tmpPath chunk


@@ 49,6 53,7 @@ storeChunks storePath tmpName getChunk = loop hashInit hashInit hashInit
			Left e -> return (Left e)
			Right () ->
				loop
					(currentSize + ByteString.length chunk)
					(hashUpdate sha1 chunk)
					(hashUpdate sha256 chunk)
					(hashUpdate sha512 chunk)