Streams of no return
The Stream of blocks never ends, move out-of-band communication to be exception based.
The Stream of blocks never ends, move out-of-band communication to be exception based.
optionsNetworkId
optionsChainPoint
(doSimple optionsExample)
doSimple ::
Example ->
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEvent (..),
EventStreamResult (..),
ChainSyncEventException (..),
)
where
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent (Chan, MVar, newChan, newEmptyMVar, putMVar, readChan, takeMVar, writeChan)
import Control.Concurrent (Chan, newChan, readChan, writeChan)
import Control.Concurrent.Async (withAsync)
import Control.Exception (SomeException, catch)
import Control.Exception (Exception, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)
data EventStreamResult
data ChainSyncEventException
= NoIntersectionFound
deriving (Show)
instance Exception ChainSyncEventException
withSimpleChainSyncEventStream ::
FilePath ->
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
(Stream (Of SimpleChainSyncEvent) IO r -> IO b) ->
IO b
withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread. It needs to send us
-- two kind of information 1) if it has managed to establish a connection
-- and found an intersection 2) the blocks it gets from the protocol.
--
-- I encapsulated both this information in a single MVar (Maybe Chan _)
--
-- The MVar needs to be written to by the client.
--
-- If the MVar has Nothing written to it, the client has run into issues
-- preventing it from finding an intersection.
--
-- If the MVar has (Just c) written to it, the client has succesfully
-- found an intersection and blocks are going to be available from the
-- channel c.
--
-- TODO the client needs to be able to reinitialise and keep going if the
-- connection fails.
mChan <- newEmptyMVar
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
chan <- newChan
let client = chainSyncStreamingClient point mChan
let client = chainSyncStreamingClient point chan
localNodeClientProtocols =
LocalNodeClientProtocols
-- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig!
epochSlots = EpochSlots 40
clientThread =
clientThread = do
connectToLocalNode connectInfo localNodeClientProtocols
-- FIXME this is still not good enough, if an exception arises
-- after the client has started streaming, the consumer code
-- below will ignore the value of the MVar and will be stuck
-- waiting on the chan.
`catch` \(_ :: SomeException) -> putMVar mChan Nothing
-- the only reason the clien can terminate successfully is if it
-- doesn't find an intersection, we report that case to the
-- consumer as an exception
throw NoIntersectionFound
-- All exceptions in the client thread are passed to the consumer thread
-- TODO the client should be able to reinitialise and keep going if the
-- connection fails.
-- FIXME we still have a problem here, if the client dies while we are
-- waiting on the channel we get a BlockedIndefinitelyOnMVar right away
-- before the exception that killed the client
withAsync clientThread $ \_ -> do
mc <- takeMVar mChan
case mc of
Nothing ->
consumer $ return NoIntersectionFound
Just c -> do
consumer $ S.repeatM $ readChan c
consumer $ S.repeatM $ readChan chan
-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol.
-- note in `withSimpleChainSyncEventStream`
chainSyncStreamingClient ::
ChainPoint ->
MVar (Maybe (Chan (ChainSyncEvent e))) ->
Chan (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point mChan =
chainSyncStreamingClient point chan =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient $ do
c <- newChan
putMVar mChan (Just c)
sendRequestNext c,
ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $ do
putMVar mChan Nothing
pure $ SendMsgDone ()
}
sendRequestNext c =
sendRequestNext =
pure $ SendMsgRequestNext onNext (pure onNext)
where
onNext =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
writeChan c (RollForward bim ct)
sendRequestNext c,
writeChan chan (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
writeChan c (RollBackward cp ct)
sendRequestNext c
writeChan chan (RollBackward cp ct)
sendRequestNext
}
Docs for running SMASH against docker.
Fixup prose in configuration docs.
Co-authored-by: Ubuntu <[email protected]>
Adding Coin
XFAIL on node issue #3859