module Marconi.ChainIndex.Indexers where
+
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode,
+
ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo)
+
import Cardano.Api qualified as C
+
import Cardano.Api.Shelley qualified as C
+
import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo
+
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), withChainSyncEventStream)
+
import Cardano.Streaming qualified as CS
import Control.Concurrent (MVar, forkIO, modifyMVar_, newMVar, readMVar)
+
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Lens.Operators ((^.))
import Control.Monad (forever, void)
import Control.Monad.Trans.Class (lift)
+
import Control.Monad.Trans.Except (runExceptT)
import Data.List (findIndex, foldl1', intersect)
import Data.Map qualified as Map
import Data.Maybe (fromMaybe, mapMaybe)
import Data.Text qualified as TS
import Database.SQLite.Simple qualified as SQL
import Streaming.Prelude qualified as S
-
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode,
-
ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo)
-
import Cardano.Api qualified as C
import Cardano.Api.Shelley qualified as Shelley
import Cardano.BM.Setup (withTrace)
import Cardano.BM.Trace (logError)
import Cardano.BM.Tracing (defaultConfigStdout)
-
import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo
-
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound),
-
withChainSyncEventStream)
-
import Cardano.Streaming qualified as CS
import Marconi.ChainIndex.Logging (logging)
import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>))
import Prettyprinter.Render.Text (renderStrict)
+
import Data.Word (Word64)
import Marconi.ChainIndex.Indexers.AddressDatum (AddressDatumDepth (AddressDatumDepth), AddressDatumHandle,
import Marconi.ChainIndex.Indexers.AddressDatum qualified as AddressDatum
import Marconi.ChainIndex.Indexers.Datum (DatumIndex)
import Marconi.ChainIndex.Indexers.Datum qualified as Datum
+
import Marconi.ChainIndex.Indexers.EpochSPD (EpochSPDHandle, EpochSPDIndex)
+
import Marconi.ChainIndex.Indexers.EpochSPD qualified as EpochSPD
import Marconi.ChainIndex.Indexers.EpochStakepoolSize qualified as EpochStakepoolSize
import Marconi.ChainIndex.Indexers.MintBurn qualified as MintBurn
import Marconi.ChainIndex.Indexers.ScriptTx qualified as ScriptTx
import Marconi.ChainIndex.Indexers.Utxo qualified as Utxo
+
import Marconi.ChainIndex.Node.Client.GenesisConfig (NetworkConfigFile (NetworkConfigFile), initExtLedgerStateVar,
+
mkProtocolInfoCardano, readCardanoGenesisConfig, readNetworkConfig,
+
renderGenesisConfigError)
import Marconi.ChainIndex.Types (TargetAddresses)
import Marconi.Core.Index.VSplit qualified as Ix
import Marconi.Core.Storable qualified as Storable
+
import Ouroboros.Consensus.Config qualified as O
+
import Ouroboros.Consensus.Ledger.Abstract qualified as O
+
import Ouroboros.Consensus.Ledger.Extended qualified as O
+
import Ouroboros.Consensus.Node qualified as O
+
import System.Directory (createDirectoryIfMissing)
+
import System.FilePath (takeDirectory, (</>))
getDatums :: BlockInMode CardanoMode -> [(SlotNo, (Hash ScriptData, ScriptData))]
scriptDataFromCardanoTxBody :: C.TxBody era -> Map (Hash ScriptData) ScriptData
-
scriptDataFromCardanoTxBody (Shelley.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) =
+
scriptDataFromCardanoTxBody (C.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) =
extractData :: Alonzo.TxDats era -> Map (Hash ScriptData) ScriptData
extractData (Alonzo.TxDats' xs) =
-
. fmap ((\x -> (C.hashScriptData x, x)) . Shelley.fromAlonzoData)
+
. fmap ((\x -> (C.hashScriptData x, x)) . C.fromAlonzoData)
scriptDataFromCardanoTxBody _ = mempty
data Coordinator = Coordinator
-
{ _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
{ _channel :: !(TChan (ChainSyncEvent (BlockInMode CardanoMode)))
+
, _indexerCount :: !Int
initialCoordinator :: Int -> IO Coordinator
initialCoordinator indexerCount =
Coordinator <$> newBroadcastTChanIO
:: (Utxo.UtxoIndexer -> IO ()) -- ^ callback function used in the queryApi thread, needs to be non-blocking
-> Maybe TargetAddresses -- ^ Target addresses to filter for
-
-> Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> FilePath -> IO (IO (), MVar Utxo.UtxoIndexer)
+
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
-> IO (IO (), MVar Utxo.UtxoIndexer)
utxoWorker_ callback depth maybeTargetAddresses Coordinator{_barrier} ch path = do
ix <- Utxo.open path depth
RollForward (BlockInMode (Block (BlockHeader slotNo bh _) txs) _) _ -> do
-- TODO Redo. Inefficient filtering
-
fmap (\targetAddrs -> \addr -> addr `elem` targetAddrs)
AddressDatum.toAddressDatumIndexEvent addressFilter txs (C.ChainPoint slotNo bh)
-- * Epoch stakepool size indexer
+
epochStakepoolSizeWorker_
+
-> (Storable.State EpochSPDHandle -> IO ())
+
-> Word64 -- Security param
+
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
-> IO (IO b, MVar EpochSPDIndex)
+
epochStakepoolSizeWorker_
+
let ledgerStateDir = takeDirectory dbPath </> "ledgerStates"
+
createDirectoryIfMissing False ledgerStateDir
+
indexerMVar <- newMVar =<< EpochSPD.open dbPath ledgerStateDir securityParam
+
nodeConfigE <- runExceptT $ readNetworkConfig (NetworkConfigFile nodeConfigPath)
+
nodeConfig <- either (error . show) pure nodeConfigE
+
genesisConfigE <- runExceptT $ readCardanoGenesisConfig nodeConfig
+
genesisConfig <- either (error . show . renderGenesisConfigError) pure genesisConfigE
+
let initialLedgerState = O.ledgerState $ initExtLedgerStateVar genesisConfig
+
hfLedgerConfig = O.topLevelConfigLedger $ O.pInfoConfig (mkProtocolInfoCardano genesisConfig)
+
loop currentLedgerState maybeEpochNo = do
+
chainSyncEvent <- atomically $ readTChan ch
+
newLedgerState <- case chainSyncEvent of
+
RollForward blockInMode@(C.BlockInMode (C.Block (C.BlockHeader slotNo bh bn) _) _) chainTip -> do
+
-- Compute new LedgerState given block and old LedgerState
+
$ O.tickThenReapplyLedgerResult
+
(C.toConsensusBlock blockInMode)
+
let newEpochNo = EpochSPD.getEpochNo newLedgerState
+
-- If the block is rollbackable, we always store the LedgerState. If the block is
+
-- immutable, we only store it right before a new epoch.
+
-- let isLastEventOfEpoch = maybeEpochNo /= newEpochNo
+
let isLastEventOfEpoch = maybeEpochNo /= newEpochNo
+
EpochSPD.toStorableEvent
+
modifyMVar_ indexerMVar $ Storable.insert storableEvent
+
readMVar indexerMVar >>= onInsert -- refresh the query STM/CPS with new storage pointers/counters state
+
RollBackward C.ChainPointAtGenesis _ct -> do
+
modifyMVar_ indexerMVar $ \ix -> fromMaybe ix <$> Storable.rewind C.ChainPointAtGenesis ix
+
pure initialLedgerState
+
modifyMVar indexerMVar $ \ix -> do
+
newIndex <- fromMaybe ix <$> Storable.rewind cp ix
+
-- The possible points from which we can possibly rollback should be available
+
-- in the buffer events and from the resumable points.
+
-- For that assumption to be correct, we absolutely need
+
-- * to make sure that 'EpochSPD.open' was called with the correct