Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Add the following to your `Cargo.toml`:

```toml
[dependencies]
chainlink-data-streams-report = "1.2.1"
chainlink-data-streams-sdk = { version = "1.2.1", features = ["full"] }
chainlink-data-streams-report = "1.2.2"
chainlink-data-streams-sdk = { version = "1.2.2", features = ["full"] }
```

#### Features
Expand Down Expand Up @@ -110,8 +110,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let api_key = "YOUR_API_KEY_GOES_HERE";
let user_secret = "YOUR_USER_SECRET_GOES_HERE";
let rest_url = "https://api.testnet-dataengine.chain.link";
let ws_url = "wss://ws.testnet-dataengine.chain.link";
let rest_url = "https://api.dataengine.chain.link";
let ws_url = "wss://ws.dataengine.chain.link";

let eth_usd_feed_id =
ID::from_hex_str("0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782")
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/report/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chainlink-data-streams-report"
version = "1.2.1"
version = "1.2.2"
edition = "2021"
description = "Chainlink Data Streams Report"
license = "MIT"
Expand Down
4 changes: 2 additions & 2 deletions rust/crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chainlink-data-streams-sdk"
version = "1.2.1"
version = "1.2.2"
edition = "2021"
rust-version = "1.70"
description = "Chainlink Data Streams client SDK"
Expand All @@ -11,7 +11,7 @@ exclude = ["/target/*", "examples/*", "tests/*", "docs/*", "book/*"]
keywords = ["chainlink"]

[dependencies]
chainlink-data-streams-report = { path = "../report", version = "1.2.1" }
chainlink-data-streams-report = { path = "../report", version = "1.2.2" }
reqwest = { version = "0.11.20", features = ["json", "rustls-tls"] }
tokio = { version = "1.29.1", features = ["full"] }
tokio-tungstenite = { version = "0.20.1", features = [
Expand Down
8 changes: 4 additions & 4 deletions rust/crates/sdk/examples/wss_multiple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let api_key = "YOUR_API_KEY_GOES_HERE";
let user_secret = "YOUR_USER_SECRET_GOES_HERE";
let rest_url = "https://api.testnet-dataengine.chain.link";
let ws_url = "wss://ws.testnet-dataengine.chain.link,wss://ws.testnet-dataengine.chain.link";
let rest_url = "https://api.dataengine.chain.link";
let ws_url = "wss://ws.dataengine.chain.link";

let eth_usd_feed_id =
ID::from_hex_str("0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782")
ID::from_hex_str("0x000362205e10b3a147d02792eccee483dca6c7b44ecce7012cb8c6e0b68b3ae9")
.unwrap();
let btc_usd_feed_id: ID =
ID::from_hex_str("0x00037da06d56d083fe599397a4769a042d63aa73dc4ef57709d31e9971a5b439")
ID::from_hex_str("0x00039d9e45394f473ab1f050a1b963e6b05351e52d71e507509ada0c95ed75b8")
.unwrap();

let feed_ids = vec![eth_usd_feed_id, btc_usd_feed_id];
Expand Down
6 changes: 3 additions & 3 deletions rust/crates/sdk/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ impl Config {
/// .build()?;
///
/// // If you want to customize the configuration further, use the builder pattern
/// let ws_urls_multiple = "wss://api.testnet-dataengine.chain.link/ws,wss://api.testnet-dataengine.chain.link/ws";
///
/// // In HA mode, provide a single WebSocket URL — origins are discovered automatically
/// // via a HEAD request to the server (X-Cll-Available-Origins header).
/// let config_custom = Config::new(
/// api_key.to_string(),
/// user_secret.to_string(),
/// rest_url.to_string(),
/// ws_urls_multiple.to_string(),
/// ws_url.to_string(),
/// )
/// .with_ws_ha(WebSocketHighAvailability::Enabled) // Enable WebSocket High Availability Mode
/// .with_ws_max_reconnect(10) // Set maximum reconnection attempts to 10, instead of the default 5.
Expand Down
25 changes: 19 additions & 6 deletions rust/crates/sdk/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ impl CtxKey {
}

/// HTTP Header constants using `HeaderName` with `OnceLock` for lazy initialization
#[allow(dead_code)] // Currently unused
static CLL_AVAIL_ORIGINS_HEADER: OnceLock<HeaderName> = OnceLock::new();
#[allow(dead_code)] // Currently unused
static CLL_ORIGIN_HEADER: OnceLock<HeaderName> = OnceLock::new();
#[allow(dead_code)] // Currently unused
#[allow(dead_code)]
static CLL_INT_HEADER: OnceLock<HeaderName> = OnceLock::new();
static AUTHZ_HEADER: OnceLock<HeaderName> = OnceLock::new();
static AUTHZ_TS_HEADER: OnceLock<HeaderName> = OnceLock::new();
Expand All @@ -33,7 +31,6 @@ static HOST_HEADER: OnceLock<HeaderName> = OnceLock::new();

/// Functions to retrieve header constants, initializing them on first access

#[allow(dead_code)] // Currently unused
/// "X-Cll-Available-Origins"
pub fn get_cll_avail_origins_header() -> &'static HeaderName {
CLL_AVAIL_ORIGINS_HEADER.get_or_init(|| {
Expand All @@ -42,16 +39,15 @@ pub fn get_cll_avail_origins_header() -> &'static HeaderName {
})
}

#[allow(dead_code)] // Currently unused
/// "X-Cll-Origin"
pub fn get_cll_origin_header() -> &'static HeaderName {
CLL_ORIGIN_HEADER.get_or_init(|| {
HeaderName::from_str("X-Cll-Origin").expect("Invalid header name: X-Cll-Origin")
})
}

#[allow(dead_code)] // Currently unused
/// "X-Cll-Eng-Int"
#[allow(dead_code)]
pub fn get_cll_int_header() -> &'static HeaderName {
CLL_INT_HEADER.get_or_init(|| {
HeaderName::from_str("X-Cll-Eng-Int").expect("Invalid header name: X-Cll-Eng-Int")
Expand Down Expand Up @@ -86,3 +82,20 @@ pub fn get_authz_sig_header() -> &'static HeaderName {
pub fn get_host_header() -> &'static HeaderName {
HOST_HEADER.get_or_init(|| HeaderName::from_str("Host").expect("Invalid header name: Host"))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cll_origin_header_name() {
let h = get_cll_origin_header();
assert_eq!(h.as_str(), "x-cll-origin");
}

#[test]
fn test_cll_avail_origins_header_name() {
let h = get_cll_avail_origins_header();
assert_eq!(h.as_str(), "x-cll-available-origins");
}
}
171 changes: 166 additions & 5 deletions rust/crates/sdk/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@ mod monitor_connection;
use establish_connection::connect;
use monitor_connection::run_stream;

use crate::config::Config;
use crate::auth::generate_auth_headers;
use crate::config::{Config, WebSocketHighAvailability};
use crate::endpoints::get_cll_avail_origins_header;

use chainlink_data_streams_report::feed_id::ID;
use chainlink_data_streams_report::report::Report;

use reqwest::Client as HttpClient;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{SystemTime, UNIX_EPOCH},
};
use tokio::{
net::TcpStream,
sync::{broadcast, mpsc, Mutex},
time::{sleep, Duration},
};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream as TungsteniteWebSocketStream};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

pub const DEFAULT_WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
pub const MIN_WS_RECONNECT_INTERVAL: Duration = Duration::from_millis(1000);
Expand Down Expand Up @@ -70,7 +74,7 @@ struct Stats {
#[derive(Debug)]
pub enum WebSocketConnection {
Single(TungsteniteWebSocketStream<MaybeTlsStream<TcpStream>>),
Multiple(Vec<TungsteniteWebSocketStream<MaybeTlsStream<TcpStream>>>),
Multiple(Vec<(TungsteniteWebSocketStream<MaybeTlsStream<TcpStream>>, String)>),
}

/// Stream represents a realtime report stream.
Expand Down Expand Up @@ -141,7 +145,26 @@ impl Stream {
active_connections: AtomicUsize::new(0),
});

let conn = connect(config, &feed_ids, stats.clone()).await?;
let origins: Vec<String> = if config.ws_ha == WebSocketHighAvailability::Enabled {
match fetch_ha_origins(config).await {
Ok(o) if !o.is_empty() => {
info!("HA mode: discovered {} origins", o.len());
o
}
Ok(_) => {
warn!("HA mode: no origins returned from HEAD request, degrading to single connection");
vec![]
}
Err(e) => {
warn!("HA mode: origin discovery failed ({}), degrading to single connection", e);
vec![]
}
}
} else {
vec![]
};

let conn = connect(config, &origins, &feed_ids, stats.clone()).await?;

let water_mark = Arc::new(Mutex::new(HashMap::new()));

Expand Down Expand Up @@ -176,6 +199,7 @@ impl Stream {

tokio::spawn(run_stream(
stream,
String::new(), // no X-Cll-Origin header for non-HA connections
report_sender,
shutdown_receiver,
stats,
Expand All @@ -185,7 +209,7 @@ impl Stream {
));
}
WebSocketConnection::Multiple(streams) => {
for stream in streams {
for (stream, origin) in streams {
let report_sender = self.report_sender.clone();
let shutdown_receiver = self.shutdown_sender.subscribe();
let stats = self.stats.clone();
Expand All @@ -195,6 +219,7 @@ impl Stream {

tokio::spawn(run_stream(
stream,
origin,
report_sender,
shutdown_receiver,
stats,
Expand Down Expand Up @@ -284,3 +309,139 @@ pub struct StatsSnapshot {
/// Current number of active connections
pub active_connections: usize,
}

fn parse_origins_from_header(header_value: &str) -> Vec<String> {
let inner = header_value
.strip_prefix('{')
.and_then(|s| s.strip_suffix('}'))
.unwrap_or(header_value);
if inner.is_empty() {
return vec![];
}
inner
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
}

fn convert_ws_to_http_scheme(ws_url: &str) -> String {
if let Some(rest) = ws_url.strip_prefix("wss://") {
format!("https://{}", rest)
} else if let Some(rest) = ws_url.strip_prefix("ws://") {
format!("http://{}", rest)
} else {
ws_url.to_string()
}
}

async fn fetch_ha_origins(config: &Config) -> Result<Vec<String>, StreamError> {
let http = HttpClient::builder()
.danger_accept_invalid_certs(config.insecure_skip_verify.to_bool())
.build()
.map_err(|e| StreamError::ConnectionError(e.to_string()))?;

// Parse URL, normalize path to "/", keep scheme+host+port so the HMAC-signed
// path "/" matches the actual request path even when ws_url carries a subpath.
let http_url = {
let mut u = reqwest::Url::parse(&convert_ws_to_http_scheme(&config.ws_url))
.map_err(|e| StreamError::ConnectionError(format!("Invalid ws_url: {}", e)))?;
u.set_path("/");
u.set_query(None);
u.to_string()
};

let request_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time error")
.as_millis();

let auth_headers = generate_auth_headers(
"HEAD",
"/",
b"",
&config.api_key,
&config.api_secret,
request_timestamp,
)?;

let response = http
.head(&http_url)
.headers(auth_headers)
.send()
.await
.map_err(|e| StreamError::ConnectionError(format!("HA origin discovery request failed: {}", e)))?;

if !response.status().is_success() {
return Err(StreamError::ConnectionError(format!(
"HA origin discovery HEAD request returned status {}",
response.status()
)));
}

let header_value = response
.headers()
.get(get_cll_avail_origins_header())
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();

Ok(parse_origins_from_header(&header_value))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_origins_from_header_empty() {
assert_eq!(parse_origins_from_header(""), Vec::<String>::new());
}

#[test]
fn test_parse_origins_from_header_with_braces() {
let result = parse_origins_from_header("{001,002}");
assert_eq!(result, vec!["001".to_string(), "002".to_string()]);
}

#[test]
fn test_parse_origins_from_header_without_braces() {
let result = parse_origins_from_header("001,002");
assert_eq!(result, vec!["001".to_string(), "002".to_string()]);
}

#[test]
fn test_parse_origins_from_header_single_origin() {
let result = parse_origins_from_header("{001}");
assert_eq!(result, vec!["001".to_string()]);
}

#[test]
fn test_parse_origins_from_header_empty_braces() {
assert_eq!(parse_origins_from_header("{}"), Vec::<String>::new());
}

#[test]
fn test_convert_ws_scheme_wss() {
assert_eq!(
convert_ws_to_http_scheme("wss://ws.dataengine.chain.link"),
"https://ws.dataengine.chain.link"
);
}

#[test]
fn test_convert_ws_scheme_ws() {
assert_eq!(
convert_ws_to_http_scheme("ws://127.0.0.1:8080"),
"http://127.0.0.1:8080"
);
}

#[test]
fn test_convert_ws_scheme_passthrough() {
assert_eq!(
convert_ws_to_http_scheme("https://already.https.com"),
"https://already.https.com"
);
}
}
Loading
Loading