Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions hyperspace/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ frame-system = { git = "https://github.com/paritytech/substrate", branch = "polk
frame-support = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.39", default-features = false }
prost = { version = "0.11", default-features = false }
serde_json = "1.0.74"
tokio-stream = "0.1.14"

[dev-dependencies]
derive_more = "0.99.17"
Expand All @@ -79,10 +80,6 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkad
build-metadata-from-ws = []
#near = ["dep:near"]
cosmos = ["dep:cosmos"]
testing = [
"primitives/testing",
"parachain/testing",
"cosmos/testing",
]
testing = ["primitives/testing", "parachain/testing", "cosmos/testing"]
default = ["cosmos"]
composable-beefy = []
composable-beefy = []
40 changes: 35 additions & 5 deletions hyperspace/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ibc::{events::IbcEvent, Height};
use ibc_proto::google::protobuf::Any;
use metrics::handler::MetricsHandler;
use primitives::{Chain, IbcProvider, UndeliveredType, UpdateType};
use std::collections::HashSet;
use std::{collections::HashSet, time::Duration};

#[derive(Copy, Debug, Clone)]
pub enum Mode {
Expand Down Expand Up @@ -63,16 +63,32 @@ where

// loop forever
loop {
let future_chain_a = tokio::time::timeout(Duration::from_secs(60), chain_a_finality.next());
let future_chain_b = tokio::time::timeout(Duration::from_secs(60), chain_b_finality.next());
tokio::select! {
// new finality event from chain A
result = chain_a_finality.next(), if !first_executed => {
result = future_chain_a, if !first_executed => {
first_executed = true;
process_finality_event(&mut chain_a, &mut chain_b, &mut chain_a_metrics, mode, result, &mut chain_a_finality, &mut chain_b_finality).await?;
match result {
Ok(result) => {
process_finality_event(&mut chain_a, &mut chain_b, &mut chain_a_metrics, mode, result, &mut chain_a_finality, &mut chain_b_finality).await?;
},
Err(_) => {
chain_a_finality = reconnect_stream(&mut chain_a).await;
}
}
}
// new finality event from chain B
result = chain_b_finality.next() => {
result = future_chain_b => {
first_executed = false;
process_finality_event(&mut chain_b, &mut chain_a, &mut chain_b_metrics, mode, result, &mut chain_b_finality, &mut chain_a_finality).await?;
match result {
Ok(result) => {
process_finality_event(&mut chain_b, &mut chain_a, &mut chain_b_metrics, mode, result, &mut chain_b_finality, &mut chain_a_finality).await?;
}
Err(_) => {
chain_b_finality = reconnect_stream(&mut chain_b).await;
}
}
}
else => {
first_executed = false;
Expand Down Expand Up @@ -141,6 +157,20 @@ where
Ok(())
}

async fn reconnect_stream<C: Chain>(source: &mut C) -> RecentStream<C::FinalityEvent> {
log::warn!("Stream closed for {}", source.name());
loop {
match source.finality_notifications().await {
Ok(stream) => return RecentStream::new(stream),
Err(e) => {
log::error!("Failed to get finality notifications for {} {:?}. Trying again in 30 seconds...", source.name(), e);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let _ = source.reconnect().await;
},
};
}
}

async fn process_finality_event<A: Chain, B: Chain>(
source: &mut A,
sink: &mut B,
Expand Down
2 changes: 1 addition & 1 deletion hyperspace/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ macro_rules! chains {
Wasm(WasmChain),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum AnyFinalityEvent {
$(
$(#[$($meta)*])*
Expand Down
74 changes: 48 additions & 26 deletions hyperspace/core/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,69 @@
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
};
use std::{fmt::Debug, pin::Pin, task::Poll};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;

#[derive(Debug)]
/// Keeps the most recent value of a stream and acts as stream itself.
pub struct RecentStream<T: Send + 'static> {
value: Arc<Mutex<Option<Option<T>>>>,
pub struct RecentStream<T: Send + Sync + 'static> {
pub value: watch::Receiver<Option<T>>,
// Arc<Mutex<Option<Option<T>>>>,
}

impl<T: Send + 'static> RecentStream<T> {
impl<T: Send + Sync + 'static + Clone + Debug> RecentStream<T> {
pub fn new(mut stream: impl Stream<Item = T> + Send + Unpin + 'static) -> Self {
let value = Arc::new(Mutex::new(Some(None)));
let value_cloned = value.clone();
let (tx, rx) = watch::channel(None);
tokio::spawn(async move {
while let Some(v) = stream.next().await {
*value_cloned.lock().unwrap() = Some(Some(v));
while let Some(v) = dbg!(stream.next().await) {
tx.send(Some(v)).unwrap();
}
*value_cloned.lock().unwrap() = None;
});
Self { value }
Self { value: rx }
}
}

impl<T: Send> Stream for RecentStream<T> {
impl<T: Clone + 'static + Send + Sync + Debug> Stream for RecentStream<T> {
type Item = T;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut value = this.value.lock().unwrap();
match value.as_mut() {
Some(v) => match v.take() {
Some(v) => Poll::Ready(Some(v)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
},
},
None => Poll::Ready(None),
if self.value.has_changed().is_err() {
return Poll::Ready(None)
}
let v = WatchStream::new(self.value.clone());
tokio::pin!(v);
match v.poll_next(cx) {
Poll::Ready(Some(None)) => Poll::Pending,
Poll::Pending => Poll::Pending,
Poll::Ready(Some(data)) => Poll::Ready(data),
Poll::Ready(None) => Poll::Ready(None),
}
}
}

mod tests {

use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn recent_stream_test() {
// ensure that we:
// - only read latest value, and reads it only once
// - upon stream ends, gets a None
let (tx, rx) = tokio::sync::mpsc::channel(4);
let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut recent_stream = RecentStream::new(receiver_stream);

tx.send("booba".to_string()).await.unwrap();
tx.send("hello".to_string()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(recent_stream.next().await, Some("hello".to_string()));
tx.send("goodbye".to_string()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(recent_stream.next().await, Some("goodbye".to_string()));
drop(tx);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(recent_stream.next().await, None);
}
}
2 changes: 1 addition & 1 deletion hyperspace/parachain/src/finality_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum FinalityProtocol {
}

/// Finality event for parachains
#[derive(Decode, Encode, Debug)]
#[derive(Decode, Encode, Debug, Clone)]
pub enum FinalityEvent {
Grandpa(
grandpa_light_client_primitives::justification::GrandpaJustification<
Expand Down
2 changes: 1 addition & 1 deletion hyperspace/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub enum UndeliveredType {
#[async_trait::async_trait]
pub trait IbcProvider {
/// Finality event type, passed on to [`Chain::query_latest_ibc_events`]
type FinalityEvent: Debug + Send + 'static;
type FinalityEvent: Debug + Send + Sync + 'static + Clone;
/// A representation of the transaction id for the chain
type TransactionId: Debug;
/// Asset Id
Expand Down
24 changes: 22 additions & 2 deletions utils/subxt/codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use anyhow::anyhow;
use codec::{Decode, Input};
use frame_metadata::RuntimeMetadataPrefixed;
use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed};
use jsonrpsee::{
async_client::ClientBuilder,
client_transport::ws::{Uri, WsTransportClientBuilder},
Expand All @@ -39,7 +39,27 @@ pub async fn fetch_metadata_ws(url: &str) -> anyhow::Result<Vec<u8>> {
}

pub fn codegen<I: Input>(encoded: &mut I) -> anyhow::Result<String> {
let metadata = <RuntimeMetadataPrefixed as Decode>::decode(encoded)?;
let mut metadata = <RuntimeMetadataPrefixed as Decode>::decode(encoded)?;
match &mut metadata.1 {
RuntimeMetadata::V14(inner_metadata) =>
inner_metadata.pallets = inner_metadata
.pallets
.drain(..)
.filter(|pallet| {
pallet.name == "Ibc".to_string() ||
pallet.name == "AssetsRegistry".to_string() ||
pallet.name == "Sudo".to_string() ||
pallet.name == "System".to_string() ||
pallet.name == "Timestamp".to_string() ||
pallet.name == "Paras".to_string() ||
pallet.name == "Grandpa".to_string() ||
pallet.name == "Beefy".to_string() ||
pallet.name == "MmrLeaf".to_string() ||
pallet.name == "Babe".to_string()
})
.collect(),
_ => panic!("what"),
};
let generator = subxt_codegen::RuntimeGenerator::new(metadata);
let item_mod = syn::parse_quote!(
pub mod api {}
Expand Down
Loading