Merge #1348

1348: LocalTxSubmission real implementation (remove ‘localTxSubmissionNull’ placeholder) r=KtorZ a=KtorZ

Issue Number

#1346

Overview

  • 9b89a053bb4f3e40a0f6909d89e3bc594c50579d Allow mapping on ‘NextBlocksResult’s cursor This makes the type slightly more flexible and avoid pushing down too much unrelated context regarding cursors. See Byron.Network adjustments in the next commit

  • c9fba67e393d9f52bc113f4be554dca5534fe3f5 define compatibility type translation between sealedtx and byron gentx

  • ba8c30d48e9d982b1991d75e76f8529e287e3ad5 rename ‘NetworkClientCmd’ to ‘ChainSyncCmd’

    • And move ‘Cursor’ definition out of the chain sync client
  • Also renamed ‘queue’ to ‘chainSyncQ’ to make the distinction with the upcoming local tx submission queue clearer

  • 93865ae0b4f7465e943ade1429c7d00e29be11f0 wire up ‘LocalTxSubmission’ protocol in Byron Network

Comments

Testing will come soon, I first need to enable the transaction submission through the API (see #1347) and start a local blockchain.

Co-authored-by: KtorZ [email protected]

View on GitHub
File Changes
    bp = blockchainParameters @n

                      
    serveApp socket = do
-
        let nl = newNetworkLayer nullTracer bp addrInfo (versionData @n)
+
        nl <- newNetworkLayer nullTracer bp addrInfo (versionData @n)
        byronApi   <- apiLayer (newTransactionLayer @n) nl
        icarusApi  <- apiLayer (newTransactionLayer @n) nl
        startServer socket byronApi icarusApi $> ExitSuccess
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
+
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TupleSections #-}
    , toEpochSlots
    , toPoint
    , toSlotNo
+
    , toGenTx

                      
    , fromByronBlock
    , fromTxAux
    ( AbstractHash (..), hash )
import Cardano.Wallet.Primitive.AddressDerivation
    ( NetworkDiscriminant (..) )
+
import Cardano.Wallet.Unsafe
+
    ( unsafeDeserialiseCbor, unsafeFromHex )
import Codec.SerialiseTerm
    ( CodecCBORTerm )
import Data.Coerce
    ( posixSecondsToUTCTime )
import Data.Word
    ( Word16, Word32 )
+
import GHC.Stack
+
    ( HasCallStack )
import Ouroboros.Consensus.Ledger.Byron
-
    ( ByronBlock (..), ByronHash (..) )
+
    ( ByronBlock (..), ByronHash (..), GenTx (..), decodeByronGenTx )
import Ouroboros.Network.Block
    ( BlockNo (..)
    , ChainHash (..)
import qualified Crypto.Hash as Crypto
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
+
import qualified Data.ByteString.Lazy as BL
import qualified Data.List.NonEmpty as NE
import qualified Ouroboros.Network.Block as O
import qualified Ouroboros.Network.Point as Point

                      
-
import Cardano.Wallet.Unsafe
-
    ( unsafeFromHex )
-

                      
import qualified Cardano.Wallet.Primitive.Types as W

                      
data Byron
toSlotNo =
    SlotNo . W.flatSlot byronEpochLength

                      
+
-- | SealedTx are the result of rightfully constructed byron transactions so, it
+
-- is relatively safe to unserialize them from CBOR.
+
toGenTx :: HasCallStack => W.SealedTx -> GenTx ByronBlock
+
toGenTx =
+
    unsafeDeserialiseCbor decodeByronGenTx . BL.fromStrict . W.getSealedTx
+

                      
fromByronBlock :: W.Hash "Genesis" -> ByronBlock -> W.Block
fromByronBlock genesisHash byronBlk = case byronBlockRaw byronBlk of
  ABOBBlock blk  ->
    , genesisTip
    , toByronHash
    , toEpochSlots
+
    , toGenTx
    , toPoint
    )
import Cardano.Wallet.Logging
    ( Cursor
    , ErrGetBlock (..)
    , ErrNetworkUnavailable (..)
+
    , ErrPostTx (..)
    , NetworkLayer (..)
    , NextBlocksResult (..)
+
    , mapCursor
    )
import Codec.SerialiseTerm
    ( CodecCBORTerm )
import Control.Monad.IO.Class
    ( MonadIO )
import Control.Monad.Trans.Except
-
    ( ExceptT (..), withExceptT )
+
    ( ExceptT (..), throwE, withExceptT )
import Control.Tracer
    ( Tracer, contramap )
import Data.ByteString.Lazy
    , NodeToClientVersion (..)
    , NodeToClientVersionData (..)
    , connectTo
-
    , localTxSubmissionClientNull
    )
import Ouroboros.Network.Point
    ( fromWithOrigin )
import Ouroboros.Network.Protocol.Handshake.Version
    ( DictVersion (..), simpleSingletonVersions )
import Ouroboros.Network.Protocol.LocalTxSubmission.Client
-
    ( LocalTxSubmissionClient (..), localTxSubmissionClientPeer )
+
    ( LocalTxClientStIdle (..)
+
    , LocalTxSubmissionClient (..)
+
    , localTxSubmissionClientPeer
+
    )
import Ouroboros.Network.Protocol.LocalTxSubmission.Codec
    ( codecLocalTxSubmission )
import Ouroboros.Network.Protocol.LocalTxSubmission.Type
-- stateful and the node's keep track of the associated connection's cursor.
data instance Cursor (m Byron) = Cursor
    (Point ByronBlock)
-
    (TQueue m (NetworkClientCmd m))
+
    (TQueue m (ChainSyncCmd m))

                      
-- | Create an instance of the network layer
newNetworkLayer
        -- ^ Socket for communicating with the node
    -> (NodeToClientVersionData, CodecCBORTerm Text NodeToClientVersionData)
        -- ^ Codecs for the node's client
-
    -> NetworkLayer IO (IO Byron) ByronBlock
-
newNetworkLayer tr bp addrInfo versionData = NetworkLayer
-
    { currentNodeTip = _currentNodeTip
-
    , nextBlocks = _nextBlocks
-
    , initCursor = _initCursor
-
    , cursorSlotId = _cursorSlotId
-
    , postTx = _postTx
-
    , staticBlockchainParameters = _staticBlockchainParameters
-
    , stakeDistribution = _stakeDistribution
-
    , getAccountBalance = _getAccountBalance
-
    }
+
    -> IO (NetworkLayer IO (IO Byron) ByronBlock)
+
newNetworkLayer tr bp addrInfo versionData = do
+
    localTxSubmissionQ <- atomically newTQueue
+
    pure NetworkLayer
+
        { currentNodeTip = _currentNodeTip
+
        , nextBlocks = _nextBlocks
+
        , initCursor = _initCursor localTxSubmissionQ
+
        , cursorSlotId = _cursorSlotId
+
        , postTx = _postTx localTxSubmissionQ
+
        , staticBlockchainParameters = _staticBlockchainParameters
+
        , stakeDistribution = _stakeDistribution
+
        , getAccountBalance = _getAccountBalance
+
        }
  where
-
    _initCursor headers = do
-
        queue <- atomically newTQueue
-
        link =<< async
-
            (connectClient (mkNetworkClient tr bp queue) versionData addrInfo)
+
    _initCursor localTxSubmissionQ headers = do
+
        chainSyncQ <- atomically newTQueue
+
        let client = mkNetworkClient tr bp chainSyncQ localTxSubmissionQ
+
        link =<< async (connectClient client versionData addrInfo)

                      
        let points = genesisPoint : (toPoint <$> headers)
-
        queue `send` CmdFindIntersection points >>= \case
+
        chainSyncQ `send` CmdFindIntersection points >>= \case
            Right(Just intersection) ->
-
                pure $ Cursor intersection queue
+
                pure $ Cursor intersection chainSyncQ
            _ -> fail
                "initCursor: intersection not found? This can't happen \
                \because we always give at least the genesis point..."

                      
-
    _nextBlocks (Cursor _ queue) = withExceptT ErrGetBlockNetworkUnreachable $ do
-
        ExceptT (queue `send` CmdNextBlocks)
+
    _nextBlocks (Cursor _ chainSyncQ) = do
+
        let toCursor point = Cursor point chainSyncQ
+
        fmap (mapCursor toCursor) $ withExceptT ErrGetBlockNetworkUnreachable $
+
            ExceptT (chainSyncQ `send` CmdNextBlocks)

                      
    _cursorSlotId (Cursor point _) = do
        fromSlotNo $ fromWithOrigin (SlotNo 0) $ pointSlot point
    _currentNodeTip =
        notImplemented "currentNodeTip"

                      
-
    _postTx =
-
        notImplemented "postTx"
+
    _postTx localTxSubmissionQ tx = do
+
        result <- withExceptT ErrPostTxNetworkUnreachable $
+
            ExceptT (localTxSubmissionQ `send` CmdSubmitTx (toGenTx tx))
+
        case result of
+
            Nothing  -> pure ()
+
            Just err -> throwE $ ErrPostTxBadRequest $ T.pack err

                      
    _stakeDistribution =
        notImplemented "stakeDistribution"
-- callback.
--
-- See also 'send' for invoking commands.
-
data NetworkClientCmd (m :: * -> *)
+
data ChainSyncCmd (m :: * -> *)
    = CmdFindIntersection
        [Point ByronBlock]
        (Maybe (Point ByronBlock) -> m ())
    | CmdNextBlocks
-
        (NextBlocksResult (m Byron) ByronBlock -> m ())
+
        (NextBlocksResult (Point ByronBlock) ByronBlock -> m ())
    | CmdCurrentNodeTip
        (Tip ByronBlock -> m ())

                      
+
-- | Sending command to the localTxSubmission client. See also 'ChainSyncCmd'.
+
data LocalTxSubmissionCmd (m :: * -> *)
+
    = CmdSubmitTx
+
        (GenTx ByronBlock)
+
        (Maybe String -> m ())
+

                      
-- | Helper function to easily send commands to the node's client and read
-- responses back.
--
-- AwaitReply
send
    :: (MonadSTM m, MonadAsync m, MonadTimer m)
-
    => TQueue m (NetworkClientCmd m)
-
    -> ((a -> m ()) -> NetworkClientCmd m)
+
    => TQueue m (cmd m)
+
    -> ((a -> m ()) -> cmd m)
    -> m (Either ErrNetworkUnavailable a)
send queue cmd = do
    tvar <- newEmptyTMVarM
        -- ^ Base trace for underlying protocols
    -> W.BlockchainParameters
        -- ^ Static blockchain parameters
-
    -> TQueue m (NetworkClientCmd m)
-
        -- ^ Communication channel with the node
+
    -> TQueue m (ChainSyncCmd m)
+
        -- ^ Communication channel with the ChainSync client
+
    -> TQueue m (LocalTxSubmissionCmd m)
+
        -- ^ Communication channel with the LocalTxSubmission client
    -> NetworkClient m
-
mkNetworkClient tr bp queue =
+
mkNetworkClient tr bp chainSyncQ localTxSubmissionQ =
    OuroborosInitiatorApplication $ \pid -> \case
        ChainSyncWithBlocksPtcl ->
            let tr' = contramap (T.pack . show) $ trMessage tr in
-
            chainSyncWithBlocks tr' pid (W.getGenesisBlockHash bp) queue
+
            chainSyncWithBlocks tr' pid (W.getGenesisBlockHash bp) chainSyncQ
        LocalTxSubmissionPtcl ->
-
            localTxSubmission nullTracer pid
+
            let tr' = contramap (T.pack . show) $ trMessage tr in
+
            localTxSubmission tr' pid localTxSubmissionQ

                      
-- Connect a client to a network, see `mkNetworkClient` to construct a network
-- client interface.
        -- ^ An abstract peer identifier for 'runPeer'
    -> W.Hash "Genesis"
        -- ^ Hash of the genesis block
-
    -> TQueue m (NetworkClientCmd m)
+
    -> TQueue m (ChainSyncCmd m)
        -- ^ We use a 'TQueue' as a communication channel to drive queries from
        -- outside of the network client to the client itself.
        -- Requests are pushed to the queue which are then transformed into

                      
    -- * Interface
      NetworkLayer (..)
    , NextBlocksResult (..)
+
    , mapCursor
    , Cursor
    , follow
    , FollowAction (..)
data NetworkLayer m target block = NetworkLayer
    { nextBlocks
        :: Cursor target
-
        -> ExceptT ErrGetBlock m (NextBlocksResult target block)
+
        -> ExceptT ErrGetBlock m (NextBlocksResult (Cursor target) block)
        -- ^ Starting from the given 'Cursor', fetches a contiguous sequence of
        -- blocks from the node, if they are available. An updated cursor will
        -- be returned with a 'RollFoward' result.

                      
-- | The result of 'nextBlocks', which is instructions for what the chain
-- consumer should do next.
-
data NextBlocksResult target block
+
data NextBlocksResult cursor block
    = AwaitReply
        -- ^ There are no blocks available from the node, so wait.
-
    | RollForward (Cursor target) BlockHeader [block]
+
    | RollForward cursor BlockHeader [block]
        -- ^ Apply the given contiguous non-empty sequence of blocks. Use the
        -- updated cursor to get the next batch. The given block header is the
        -- current tip of the node.
-
    | RollBackward (Cursor target)
+
    | RollBackward cursor
        -- ^ The chain consumer must roll back its state, then use the cursor to
        -- get the next batch of blocks.

                      
-
instance Functor (NextBlocksResult target) where
+
instance Functor (NextBlocksResult cursor) where
    fmap f = \case
        AwaitReply -> AwaitReply
        RollForward cur bh bs -> RollForward cur bh (fmap f bs)
        RollBackward cur -> RollBackward cur

                      
+
mapCursor :: (a -> b) -> NextBlocksResult a block -> NextBlocksResult b block
+
mapCursor fn = \case
+
    AwaitReply -> AwaitReply
+
    RollForward cur bh bs -> RollForward (fn cur) bh bs
+
    RollBackward cur -> RollBackward (fn cur)
+

                      
-- | @[email protected] enables the callback of @[email protected] to signal if the
-- chain-following should @[email protected], @[email protected], or if the current callback
-- should be forgotten and retried (@[email protected]).

                      
    _nextBlocks
        :: Cursor t
-
        -> ExceptT ErrGetBlock m (NextBlocksResult t block)
+
        -> ExceptT ErrGetBlock m (NextBlocksResult (Cursor t) block)
    _nextBlocks [email protected](Cursor localChain) = do
        lift (runExceptT _currentNodeTip) >>= \case
            Right _ -> do
        tryRollForward
            :: BlockHeader
            -> [block]
-
            -> NextBlocksResult t block
+
            -> NextBlocksResult (Cursor t) block
        tryRollForward tip = \case
            -- No more blocks to apply, no need to roll forward
            [] -> AwaitReply

                      
        rollBackward
            :: BlockHeader
-
            -> NextBlocksResult t block
+
            -> NextBlocksResult (Cursor t) block
        rollBackward point =
            RollBackward (cursorBackward point cursor)

                      
        recover
            :: BlockHeaders
-
            -> NextBlocksResult t block
+
            -> NextBlocksResult (Cursor t) block
        recover chain = case (blockHeadersBase chain, blockHeadersTip chain) of
            (Just baseH, Just tipH) | baseH /= tipH ->
                RollBackward (cursorBackward baseH cursor)
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
+
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
    , withNetworkLayer
    )
import Cardano.Wallet.Network
-
    ( ErrCurrentNodeTip (..)
+
    ( Cursor
+
    , ErrCurrentNodeTip (..)
    , ErrGetBlock (..)
    , NetworkLayer (..)
    , NextBlocksResult (..)
    let client = Jormungandr.mkJormungandrClient manager url
    runExceptT $ endpoint client resourceId

                      
-
instance Show (NextBlocksResult t b) where
+
instance Show (NextBlocksResult (Cursor t) b) where
    show AwaitReply = "AwaitReply"
    show (RollForward _ _ bs) = "RollForward " ++ show (length bs) ++ " blocks"
    show (RollBackward _) = "RollBackward"

                      
-
instance Eq (NextBlocksResult t b) where
+
instance Eq (NextBlocksResult (Cursor t) b) where
    a == b = show a == show b

                      
instance Arbitrary (Hash any) where
    arbitrary = Hash . BS.pack <$> vectorOf 32 arbitrary

                      
-
getRollForward :: NextBlocksResult target block -> Maybe [block]
+
getRollForward :: NextBlocksResult (Cursor t) block -> Maybe [block]
getRollForward AwaitReply = Nothing
getRollForward (RollForward _ _ bs) = Just bs
getRollForward (RollBackward _) = Nothing

                      
-
isRollForward :: NextBlocksResult target block -> Bool
+
isRollForward :: NextBlocksResult (Cursor t) block -> Bool
isRollForward = maybe False (not . null) . getRollForward

                      
isRollBackwardTo
-
    :: NetworkLayer m target block
+
    :: NetworkLayer m t block
    -> SlotId
-
    -> NextBlocksResult target block
+
    -> NextBlocksResult (Cursor t) block
    -> Bool
isRollBackwardTo nl sl = \case
    RollBackward cursor -> cursorSlotId nl cursor == sl