Merge pull request #3027 from Zeegomo/refactor-grpc-client
Refactor grpc methods
Refactor grpc methods
use tracing::{span, Level};
use tracing_futures::Instrument;
use std::convert::identity;
use std::time::Duration;
const PROCESS_TIMEOUT_GET_BLOCK_TIP: u64 = 5;
handle_get_headers(storage, ids, handle),
);
}
ClientMsg::GetHeadersRange(checkpoints, to, handle) => {
ClientMsg::PullHeaders(from, to, handle) => {
let storage = task_data.storage.clone();
info.timeout_spawn_fallible(
"GetHeadersRange",
"PullHeaders",
Duration::from_secs(PROCESS_TIMEOUT_GET_HEADERS_RANGE),
handle_get_headers_range(storage, checkpoints, to, handle),
handle_get_headers_range(storage, from, to, handle),
);
}
ClientMsg::GetBlocks(ids, handle) => {
tip.header().clone()
}
async fn handle_get_headers_range(
fn get_block_from_storage(storage: &Storage, id: HeaderHash) -> Result<Block, Error> {
match storage.get(id) {
Ok(Some(block)) => Ok(block),
Ok(None) => Err(Error::not_found(format!(
"block {} is not known to this node",
id
))),
Err(e) => Err(e.into()),
}
}
// Stop after sending the first Err() variant
//
// Common base for GetBlocks and GetHeaders
async fn fuse_send_items<T, V>(
items: T,
reply_handle: ReplyStreamHandle<V>,
) -> Result<(), ReplySendError>
where
T: IntoIterator<Item = Result<V, Error>>,
{
let mut sink = reply_handle.start_sending();
for item in items.into_iter() {
let err = item.is_err();
sink.feed(item).await?;
if err {
break;
}
}
sink.close().await
}
// Send a range of blocks info directly from the storage to the stream.
// The starting point is determined by the closest ancestor of 'to'
// among the blocks specified in 'from'.
// The transformation function is applied to the block contents before
// sending it.
//
// Commong behavior for PullHeaders, PullBlocks, PullBlocksToTip
async fn send_range_from_storage<T, F>(
storage: Storage,
checkpoints: Vec<HeaderHash>,
from: Vec<HeaderHash>,
to: HeaderHash,
handle: ReplyStreamHandle<Header>,
) -> Result<(), ReplySendError> {
let res = storage.find_closest_ancestor(checkpoints, to);
match res {
Ok(maybe_ancestor) => {
let depth = maybe_ancestor.map(|ancestor| ancestor.distance);
storage
.send_branch_with(to, depth, handle, |block| block.header())
.await
}
f: F,
handle: ReplyStreamHandle<T>,
) -> Result<(), ReplySendError>
where
F: FnMut(Block) -> T,
F: Send + 'static,
T: Send + 'static,
{
let closest_ancestor = storage
.find_closest_ancestor(from, to)
.map_err(Into::into)
.and_then(move |maybe_ancestor| {
maybe_ancestor
.map(|ancestor| (to, ancestor.distance))
.ok_or_else(|| Error::not_found("Could not find a known block in `from`"))
});
match closest_ancestor {
Ok((to, depth)) => storage.send_branch_with(to, Some(depth), handle, f).await,
Err(e) => {
handle.reply_error(e.into());
handle.reply_error(e);
Ok(())
}
}
ids: Vec<HeaderHash>,
handle: ReplyStreamHandle<Block>,
) -> Result<(), ReplySendError> {
let mut sink = handle.start_sending();
for id in ids {
let res = match storage.get(id) {
Ok(Some(block)) => Ok(block),
Ok(None) => Err(Error::not_found(format!(
"block {} is not known to this node",
id
))),
Err(e) => Err(e.into()),
};
let is_err = res.is_err();
sink.send(res).await?;
if is_err {
break;
}
}
sink.close().await
fuse_send_items(
ids.into_iter()
.map(|id| get_block_from_storage(&storage, id)),
handle,
)
.await
}
async fn handle_get_headers(
storage: Storage,
ids: Vec<HeaderHash>,
handle: ReplyStreamHandle<Header>,
) -> Result<(), ReplySendError> {
let mut sink = handle.start_sending();
for id in ids {
let res = match storage.get(id) {
Ok(Some(block)) => Ok(block.header()),
Ok(None) => Err(Error::not_found(format!(
"block {} is not known to this node",
id
))),
Err(e) => Err(e.into()),
};
let is_err = res.is_err();
sink.send(res).await?;
if is_err {
break;
}
}
sink.close().await
fuse_send_items(
ids.into_iter()
.map(|id| get_block_from_storage(&storage, id).map(|block| block.header())),
handle,
)
.await
}
async fn handle_get_headers_range(
storage: Storage,
from: Vec<HeaderHash>,
to: HeaderHash,
handle: ReplyStreamHandle<Header>,
) -> Result<(), ReplySendError> {
send_range_from_storage(storage, from, to, |block| block.header(), handle).await
}
async fn handle_pull_blocks(
to: HeaderHash,
handle: ReplyStreamHandle<Block>,
) -> Result<(), ReplySendError> {
use crate::intercom::Error as IntercomError;
let res = storage
.find_closest_ancestor(from, to)
.map_err(Into::into)
.and_then(move |maybe_ancestor| {
maybe_ancestor
.map(|ancestor| (to, ancestor.distance))
.ok_or_else(|| IntercomError::not_found("`from` not found"))
});
match res {
Ok((to, depth)) => storage.send_branch(to, Some(depth), handle).await,
Err(e) => {
handle.reply_error(e);
Ok(())
}
}
send_range_from_storage(storage, from, to, identity, handle).await
}
pub enum ClientMsg {
GetBlockTip(ReplyHandle<Header>),
GetHeaders(Vec<HeaderHash>, ReplyStreamHandle<Header>),
GetHeadersRange(Vec<HeaderHash>, HeaderHash, ReplyStreamHandle<Header>),
PullHeaders(Vec<HeaderHash>, HeaderHash, ReplyStreamHandle<Header>),
GetBlocks(Vec<HeaderHash>, ReplyStreamHandle<Block>),
PullBlocks(Vec<HeaderHash>, HeaderHash, ReplyStreamHandle<Block>),
PullBlocksToTip(Vec<HeaderHash>, ReplyStreamHandle<Block>),
.field(ids)
.field(&format_args!("_"))
.finish(),
ClientMsg::GetHeadersRange(from, to, _) => f
.debug_tuple("GetHeadersRange")
ClientMsg::PullHeaders(from, to, _) => f
.debug_tuple("PullHeaders")
.field(from)
.field(to)
.field(&format_args!("_"))
let (reply_handle, future) = intercom::stream_reply(buffer_sizes::outbound::HEADERS);
let future = future.instrument(span.clone());
debug_assert!(self.incoming_solicitation.is_none());
self.incoming_solicitation = Some(ClientMsg::GetHeadersRange(from, to, reply_handle));
self.incoming_solicitation = Some(ClientMsg::PullHeaders(from, to, reply_handle));
let mut client = self.inner.clone();
self.global_state.spawn(
async move {
let (handle, future) = intercom::stream_reply(buffer_sizes::outbound::HEADERS);
let future = future.instrument(span.clone());
let client_box = self.channels.client_box.clone();
send_message(client_box, ClientMsg::GetHeadersRange(from, to, handle))
send_message(client_box, ClientMsg::PullHeaders(from, to, handle))
.instrument(span)
.await?;
let stream = future.await?;
let (_server, config) = fixture.bootstrap_node();
let client = Config::attach_to_local_node(config.get_p2p_listen_port()).client();
assert_eq!(
MockClientError::InvalidRequest(format!("not found (block not found)")),
MockClientError::InvalidRequest(format!(
"not found (Could not find a known block in `from`)"
)),
client
.pull_headers(&[], TestGen::hash().into())
.await
add triggers