Remove the "WithLedgerState" stream.
It can be implemented separately, as done in Plutus.Streaming.LedgerState.
It can be implemented separately, as done in Plutus.Streaming.LedgerState.
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Aeson qualified as Aeson
import Data.Maybe qualified as Maybe
import Options.Applicative (Alternative ((<|>)), Parser, auto, command, execParser, flag', help, helper, info, long,
metavar, option, progDesc, str, strOption, subparser, value, (<**>))
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, value, (<**>))
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), SimpleChainSyncEvent,
withChainSyncEventStreamWithLedgerState, withSimpleChainSyncEventStream)
withSimpleChainSyncEventStream)
import Plutus.Streaming.ChainIndex (utxoState)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
| ChainIndex
deriving (Show, Read)
data Options
= Simple
{ optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint,
optionsExample :: Example
}
| WithLedgerState
{ optionsNetworkConfigPath :: String,
optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint
}
data Options = Options
{ optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint,
optionsExample :: Example
}
deriving (Show)
optionsParser :: Parser Options
optionsParser =
subparser
( command "simple" (info simple (progDesc "simple"))
<> command "with-ledger-state" (info withLedgerState (progDesc "withLedgerSate"))
)
where
simple =
Simple
<$> strOption (long "socket-path" <> help "Node socket path")
<*> networkIdParser
<*> chainPointParser
<*> option auto (long "example" <> value Print)
withLedgerState =
WithLedgerState
<$> strOption (long "network-config-path" <> help "Node config path")
<*> strOption (long "socket-path" <> help "Node socket path")
<*> networkIdParser
<*> chainPointParser
Options
<$> strOption (long "socket-path" <> help "Node socket path")
<*> networkIdParser
<*> chainPointParser
<*> option auto (long "example" <> value Print)
networkIdParser :: Parser NetworkId
networkIdParser =
main :: IO ()
main = do
options <- execParser $ info (optionsParser <**> helper) mempty
case options of
Simple {optionsSocketPath, optionsNetworkId, optionsChainPoint, optionsExample} ->
withSimpleChainSyncEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
(doSimple optionsExample)
WithLedgerState {optionsNetworkConfigPath, optionsNetworkId, optionsSocketPath, optionsChainPoint} ->
withChainSyncEventStreamWithLedgerState
optionsNetworkConfigPath
optionsSocketPath
optionsNetworkId
optionsChainPoint
S.print
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint, optionsExample} <-
execParser $ info (optionsParser <**> helper) mempty
withSimpleChainSyncEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
(doSimple optionsExample)
doSimple ::
Example ->
module Plutus.Streaming
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEventWithLedgerState,
withChainSyncEventStreamWithLedgerState,
ChainSyncEvent (..),
EventStreamResult (..),
)
where
import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots), LedgerEvent, LedgerState,
LedgerStateError, LocalChainSyncClient (LocalChainSyncClient),
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, ValidationMode (QuickValidation), chainSyncClientWithLedgerState, connectToLocalNode,
envSecurityParam, initialLedgerState)
NetworkId, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent (Chan, MVar, newChan, newEmptyMVar, putMVar, readChan, takeMVar, writeChan)
import Control.Concurrent.Async (withAsync)
import Control.Exception (SomeException, catch)
import Control.Monad.Trans.Except (runExceptT)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)
type ChainSyncEventWithLedgerState = ChainSyncEvent (BlockInMode CardanoMode, Either LedgerStateError (LedgerState, [LedgerEvent]))
data EventStreamResult
= NoIntersectionFound
deriving (Show)
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
IO b
withSimpleChainSyncEventStream filePath networkId point =
withClientStream (runChainSyncStreamingClient filePath networkId point)
withChainSyncEventStreamWithLedgerState ::
FilePath ->
FilePath ->
NetworkId ->
ChainPoint ->
(Stream (Of ChainSyncEventWithLedgerState) IO EventStreamResult -> IO b) ->
IO b
withChainSyncEventStreamWithLedgerState networkConfigPath filePath networkId point =
withClientStream (runChainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point)
-- This adapts a streaming client to a stream
withClientStream ::
(MVar (Maybe (Chan e)) -> IO r) ->
(Stream (Of e) IO EventStreamResult -> IO b) ->
IO b
withClientStream client consumer = do
-- We use a MVar as a synchronisation point to learn if the client as
-- successfully found an intersection. We rely on the fact that
-- client will write into m, telling us whether or not it has found an
-- intersection. If this doesn't happen we will be stuck waiting forever.
-- FIXME I haven't even thought about exception safety here.
m <- newEmptyMVar
withAsync (client m) $ \_ -> do
mc <- takeMVar m
case mc of
Nothing ->
consumer $ return NoIntersectionFound
Just c -> do
-- FIXME Client gets killed when the consumer finishes, we
-- should allow for a better clean up here
consumer $ S.repeatM $ readChan c
--
-- this can be replaced by the almost identical function in
-- Cardano.Protocol.Socket.Client.
--
-- TODO move to pipelined version
runChainSyncStreamingClient ::
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan SimpleChainSyncEvent)) ->
IO ()
runChainSyncStreamingClient socketPath networkId point mChan = do
withSimpleChainSyncEventStream socketPath networkId point consumer = do
mChan <- newEmptyMVar
let client = chainSyncStreamingClient point mChan
localNodeClientProtocols =
-- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig!
epochSlots = EpochSlots 40
connectToLocalNode
connectInfo
localNodeClientProtocols
`catch` \(_ :: SomeException) -> putMVar mChan Nothing
runChainSyncStreamingClientWithLedgerState ::
FilePath ->
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan ChainSyncEventWithLedgerState)) ->
IO ()
runChainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do
ils <- runExceptT (initialLedgerState networkConfigFile)
case ils of
(Left _) ->
-- FIXME here we swallow the error but we could do better
putMVar mChan Nothing
(Right (env, ledgerState)) -> do
let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient point mChan)
clientThread =
connectToLocalNode connectInfo localNodeClientProtocols
`catch` \(_ :: SomeException) -> putMVar mChan Nothing
cardanoModeParams = CardanoModeParams . EpochSlots $ 10 * envSecurityParam env
connectInfo =
LocalNodeConnectInfo
{ localConsensusModeParams = cardanoModeParams,
localNodeNetworkId = networkId,
localNodeSocketPath = socketPath
}
localNodeClientProtocols =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient client,
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing
}
connectToLocalNode
connectInfo
localNodeClientProtocols
withAsync clientThread $ \_ -> do
mc <- takeMVar mChan
case mc of
Nothing ->
consumer $ return NoIntersectionFound
Just c -> do
consumer $ S.repeatM $ readChan c
-- | This is the "core" client that connects to a local node and
-- runs the chain-sync mini-protocol. The only job of this client is to
Co-authored-by: Ubuntu <[email protected]>
Adding Coin
XFAIL on node issue #3859
And, provide a useful / informative error message down to clients.