module Marconi.ChainIndex.Indexers where
+
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode,
+
ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo)
+
import Cardano.Api qualified as C
+
import Cardano.Api.Shelley qualified as C
+
import Cardano.Api.Shelley qualified as Shelley
+
import Cardano.BM.Setup (withTrace)
+
import Cardano.BM.Trace (logError)
+
import Cardano.BM.Tracing (defaultConfigStdout)
+
import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo
+
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound),
+
withChainSyncEventStream)
+
import Cardano.Streaming qualified as CS
import Control.Concurrent (MVar, forkIO, modifyMVar_, newMVar, readMVar)
+
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Lens.Operators ((^.))
import Control.Monad (forever, void)
import Control.Monad.Trans.Class (lift)
+
import Control.Monad.Trans.Except (runExceptT)
import Data.List (findIndex, foldl1', intersect)
import Data.Map qualified as Map
import Data.Maybe (fromMaybe, mapMaybe)
import Data.Text qualified as TS
+
import Data.Word (Word64)
import Database.SQLite.Simple qualified as SQL
-
import Streaming.Prelude qualified as S
-
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode,
-
ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo)
-
import Cardano.Api qualified as C
-
import Cardano.Api.Shelley qualified as Shelley
-
import Cardano.BM.Setup (withTrace)
-
import Cardano.BM.Trace (logError)
-
import Cardano.BM.Tracing (defaultConfigStdout)
-
import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo
-
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound),
-
withChainSyncEventStream)
-
import Cardano.Streaming qualified as CS
-
import Marconi.ChainIndex.Logging (logging)
-
import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>))
-
import Prettyprinter.Render.Text (renderStrict)
import Marconi.ChainIndex.Indexers.AddressDatum (AddressDatumDepth (AddressDatumDepth), AddressDatumHandle,
import Marconi.ChainIndex.Indexers.AddressDatum qualified as AddressDatum
import Marconi.ChainIndex.Indexers.Datum (DatumIndex)
import Marconi.ChainIndex.Indexers.Datum qualified as Datum
+
import Marconi.ChainIndex.Indexers.EpochSPD (EpochSPDHandle, EpochSPDIndex)
+
import Marconi.ChainIndex.Indexers.EpochSPD qualified as EpochSPD
import Marconi.ChainIndex.Indexers.EpochStakepoolSize qualified as EpochStakepoolSize
import Marconi.ChainIndex.Indexers.MintBurn qualified as MintBurn
import Marconi.ChainIndex.Indexers.ScriptTx qualified as ScriptTx
import Marconi.ChainIndex.Indexers.Utxo qualified as Utxo
+
import Marconi.ChainIndex.Logging (logging)
+
import Marconi.ChainIndex.Node.Client.GenesisConfig (NetworkConfigFile (NetworkConfigFile), initExtLedgerStateVar,
+
mkProtocolInfoCardano, readCardanoGenesisConfig, readNetworkConfig,
+
renderGenesisConfigError)
import Marconi.ChainIndex.Types (TargetAddresses)
import Marconi.Core.Index.VSplit qualified as Ix
import Marconi.Core.Storable qualified as Storable
+
import Ouroboros.Consensus.Config qualified as O
+
import Ouroboros.Consensus.Ledger.Abstract qualified as O
+
import Ouroboros.Consensus.Ledger.Extended qualified as O
+
import Ouroboros.Consensus.Node qualified as O
+
import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>))
+
import Prettyprinter.Render.Text (renderStrict)
+
import Streaming.Prelude qualified as S
+
import System.Directory (createDirectoryIfMissing)
+
import System.FilePath (takeDirectory, (</>))
getDatums :: BlockInMode CardanoMode -> [(SlotNo, (Hash ScriptData, ScriptData))]
scriptDataFromCardanoTxBody :: C.TxBody era -> Map (Hash ScriptData) ScriptData
-
scriptDataFromCardanoTxBody (Shelley.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) =
+
scriptDataFromCardanoTxBody (C.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) =
extractData :: Alonzo.TxDats era -> Map (Hash ScriptData) ScriptData
extractData (Alonzo.TxDats' xs) =
-
. fmap ((\x -> (C.hashScriptData x, x)) . Shelley.fromAlonzoData)
+
. fmap ((\x -> (C.hashScriptData x, x)) . C.fromAlonzoData)
scriptDataFromCardanoTxBody _ = mempty
data Coordinator = Coordinator
-
{ _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
{ _channel :: !(TChan (ChainSyncEvent (BlockInMode CardanoMode)))
+
, _indexerCount :: !Int
initialCoordinator :: Int -> IO Coordinator
initialCoordinator indexerCount =
Coordinator <$> newBroadcastTChanIO
:: (Utxo.UtxoIndexer -> IO ()) -- ^ callback function used in the queryApi thread, needs to be non-blocking
-> Maybe TargetAddresses -- ^ Target addresses to filter for
-
-> Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> FilePath -> IO (IO (), MVar Utxo.UtxoIndexer)
+
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
-> IO (IO (), MVar Utxo.UtxoIndexer)
utxoWorker_ callback depth maybeTargetAddresses Coordinator{_barrier} ch path = do
ix <- Utxo.open path depth
RollForward (BlockInMode (Block (BlockHeader slotNo bh _) txs) _) _ -> do
-- TODO Redo. Inefficient filtering
-
fmap (\targetAddrs -> \addr -> addr `elem` targetAddrs)
AddressDatum.toAddressDatumIndexEvent addressFilter txs (C.ChainPoint slotNo bh)
-- * Epoch stakepool size indexer
+
epochStakepoolSizeWorker_
+
-> (Storable.State EpochSPDHandle -> IO ())
+
-> Word64 -- Security param
+
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
+
-> IO (IO b, MVar EpochSPDIndex)
+
epochStakepoolSizeWorker_
+
let ledgerStateDir = takeDirectory dbPath </> "ledgerStates"
+
createDirectoryIfMissing False ledgerStateDir
+
indexerMVar <- newMVar =<< EpochSPD.open dbPath ledgerStateDir securityParam
+
nodeConfigE <- runExceptT $ readNetworkConfig (NetworkConfigFile nodeConfigPath)
+
nodeConfig <- either (error . show) pure nodeConfigE
+
genesisConfigE <- runExceptT $ readCardanoGenesisConfig nodeConfig
+
genesisConfig <- either (error . show . renderGenesisConfigError) pure genesisConfigE
+
let initialLedgerState = O.ledgerState $ initExtLedgerStateVar genesisConfig
+
hfLedgerConfig = O.topLevelConfigLedger $ O.pInfoConfig (mkProtocolInfoCardano genesisConfig)
+
loop currentLedgerState maybeEpochNo = do
+
chainSyncEvent <- atomically $ readTChan ch
+
newLedgerState <- case chainSyncEvent of
+
RollForward [email protected](C.BlockInMode (C.Block (C.BlockHeader slotNo bh bn) _) _) chainTip -> do
+
-- Compute new LedgerState given block and old LedgerState
+
$ O.tickThenReapplyLedgerResult
+
(C.toConsensusBlock blockInMode)
+
let newEpochNo = EpochSPD.getEpochNo newLedgerState
+
-- If the block is rollbackable, we always store the LedgerState. If the block is
+
-- immutable, we only store it right before a new epoch.
+
-- let isLastEventOfEpoch = maybeEpochNo /= newEpochNo
+
let isLastEventOfEpoch = maybeEpochNo /= newEpochNo
+
EpochSPD.toStorableEvent
+
modifyMVar_ indexerMVar $ Storable.insert storableEvent
+
readMVar indexerMVar >>= onInsert -- refresh the query STM/CPS with new storage pointers/counters state
+
RollBackward C.ChainPointAtGenesis _ct -> do