Merge pull request #1240 from input-output-hk/damien/798/implement-anyhow-context-aggregator-runtime
Implement anyhow context in aggregator runtime
Implement anyhow context in aggregator runtime
[[package]]
name = "mithril-aggregator"
version = "0.3.95"
version = "0.3.96"
dependencies = [
"anyhow",
"async-trait",
[package]
name = "mithril-aggregator"
version = "0.3.95"
version = "0.3.96"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
use anyhow::Context;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use slog_scope::{debug, info, warn};
use std::{path::Path, path::PathBuf, sync::Arc};
.dependencies
.certifier_service
.get_open_message(signed_entity_type)
.await?
.await
.with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?
{
Some(existing_open_message) if !existing_open_message.is_certified => {
info!(
// which will avoid this fix
let chain_beacon: Beacon = self.get_beacon_from_chain().await?;
self.update_beacon(&chain_beacon).await?;
let protocol_message = self.compute_protocol_message(signed_entity_type).await?;
let protocol_message = self.compute_protocol_message(signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
Some(
self.create_open_message(signed_entity_type, &protocol_message)
.await?,
.await
.with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?,
)
}
};
debug!("RUNNER: get_current_non_certified_open_message");
let signed_entity_types = vec![
SignedEntityType::MithrilStakeDistribution(
self.dependencies.ticker_service.get_current_epoch().await?,
self.dependencies
.ticker_service
.get_current_epoch()
.await
.with_context(|| "DependencyContainer can not get current epoch")?,
),
SignedEntityType::CardanoImmutableFilesFull(
self.dependencies
.ticker_service
.get_current_immutable_beacon()
.await?,
.await
.with_context(|| "DependencyContainer can not get current immutable beacon")?,
),
];
for signed_entity_type in signed_entity_types {
if let Some(open_message) = self
.get_current_non_certified_open_message_for_signed_entity_type(&signed_entity_type)
.await?
.await
.with_context(|| format!("AggregatorRunner can not get current non certified open message for signed entity type: '{}'", &signed_entity_type))?
{
return Ok(Some(open_message));
}
debug!("RUNNER: update protocol parameters"; "beacon" => #?new_beacon);
let protocol_parameters = self.dependencies.config.protocol_parameters.clone();
self.update_beacon(new_beacon).await?;
self.update_beacon(new_beacon)
.await
.with_context(|| format!("AggregatorRunner can not update beacon: '{new_beacon}'"))?;
self.dependencies
.multi_signer
.write()
.await
.update_protocol_parameters(&protocol_parameters.into())
.update_protocol_parameters(&protocol_parameters.clone().into())
.await
.map_err(|e| e.into())
.map_err(|e| anyhow!(e))
.with_context(|| {
format!(
"Multi Signer can not update protocol parameters: '{:?}'",
&protocol_parameters
)
})
}
async fn compute_protocol_message(
) -> StdResult<()> {
debug!("RUNNER: saving pending certificate");
let signed_entity_type = pending_certificate.signed_entity_type.clone();
self.dependencies
.certificate_pending_store
.save(pending_certificate)
.await
.map_err(|e| e.into())
.map_err(|e| anyhow!(e))
.with_context(|| format!("CertificatePendingStore can not save pending certificate with signed_entity_type: '{signed_entity_type}'"))
}
async fn drop_pending_certificate(&self) -> StdResult<Option<CertificatePending>> {
.certifier_service
.create_certificate(signed_entity_type)
.await
.with_context(|| {
format!(
"CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
)
})
}
async fn create_artifact(
self.dependencies
.signed_entity_service
.create_artifact(signed_entity_type.to_owned(), certificate)
.await?;
.await
.with_context(|| {
format!(
"SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
certificate.hash
)
})?;
Ok(())
}
.dependencies
.era_reader
.read_era_epoch_token(beacon.epoch)
.await?;
.await
.with_context(|| {
format!(
"EraReader can not get era epoch token for current epoch: '{}'",
beacon.epoch
)
})?;
let current_era = token.get_current_supported_era()?;
let current_era = token
.get_current_supported_era()
.with_context(|| "EraEpochToken can not get current supported era")?;
self.dependencies
.era_checker
.change_era(current_era, token.get_current_epoch());
runtime::{AggregatorRunnerTrait, RuntimeError},
};
use anyhow::Context;
use mithril_common::entities::Beacon;
use slog_scope::{crit, info, trace, warn};
use std::fmt::Display;
match self.state.clone() {
AggregatorState::Idle(state) => {
let chain_beacon = self.runner.get_beacon_from_chain().await?;
let chain_beacon =
self.runner.get_beacon_from_chain().await.with_context(|| {
"AggregatorRuntime in the state IDLE can not get current beacon from chain"
})?;
info!(
"→ new Beacon settings found, trying to transition to READY";
});
}
AggregatorState::Ready(state) => {
let chain_beacon: Beacon = self.runner.get_beacon_from_chain().await?;
let chain_beacon: Beacon =
self.runner.get_beacon_from_chain().await.with_context(|| {
"AggregatorRuntime in the state READY can not get current beacon from chain"
})?;
if chain_beacon
.compare_to_older(&state.current_beacon)
} else if let Some(open_message) = self
.runner
.get_current_non_certified_open_message()
.await?
.await.with_context(|| "AggregatorRuntime can not get the current non certified open message")?
{
// transition READY > SIGNING
info!("→ transitioning to SIGNING");
let new_state = self
.transition_from_ready_to_signing(chain_beacon, open_message)
.await?;
.transition_from_ready_to_signing(chain_beacon.clone(), open_message.clone())
.await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
self.state = AggregatorState::Signing(new_state);
} else {
// READY > READY
}
}
AggregatorState::Signing(state) => {
let chain_beacon: Beacon = self.runner.get_beacon_from_chain().await?;
let chain_beacon: Beacon =
self.runner.get_beacon_from_chain().await.with_context(|| {
"AggregatorRuntime in the state SIGNING can not get current beacon from chain"
})?;
let has_newer_open_message = if let Some(open_message_new) = self
.runner
.get_current_non_certified_open_message_for_signed_entity_type(
&state.open_message.signed_entity_type,
)
.await?
.await
.with_context(|| format!("AggregatorRuntime can not get the current non certified open message for signed entity type: '{}'", &state.open_message.signed_entity_type))?
{
open_message_new.signed_entity_type != state.open_message.signed_entity_type
} else {
Alonzo builds