diff --git a/.github/actions/build-docs/action.yml b/.github/actions/build-docs/action.yml deleted file mode 100644 index 01685b5..0000000 --- a/.github/actions/build-docs/action.yml +++ /dev/null @@ -1,9 +0,0 @@ -name: Build Documentation -description: 'Build Documentation.' - -runs: - using: composite - steps: - - name: Build Documentation - shell: bash - run: cargo doc --no-deps -p launchdarkly-server-sdk diff --git a/.github/actions/ci/action.yml b/.github/actions/ci/action.yml index 45648a7..b8db4ef 100644 --- a/.github/actions/ci/action.yml +++ b/.github/actions/ci/action.yml @@ -1,5 +1,15 @@ name: CI Workflow -description: 'Shared CI workflow.' +description: "Shared CI workflow." + +inputs: + cargo-flags: + description: "Flags to pass to cargo commands." + required: false + default: "" + cargo-test-flags: + description: "Flags to pass to cargo test commands." + required: false + default: "" runs: using: composite @@ -10,8 +20,8 @@ runs: - name: Run tests shell: bash - run: cargo test -p launchdarkly-server-sdk + run: cargo test ${{ inputs.cargo-flags }} ${{ inputs.cargo-test-flags }} -p launchdarkly-server-sdk - name: Run clippy checks shell: bash - run: cargo clippy -p launchdarkly-server-sdk -- -D warnings + run: cargo clippy ${{ inputs.cargo-flags }} -p launchdarkly-server-sdk -- -D warnings diff --git a/.github/actions/contract-tests/action.yml b/.github/actions/contract-tests/action.yml index a4f9438..4222eb4 100644 --- a/.github/actions/contract-tests/action.yml +++ b/.github/actions/contract-tests/action.yml @@ -1,11 +1,11 @@ name: Contract test runner -description: 'Reusable contract runner action' +description: "Reusable contract runner action" inputs: - tls_feature: - description: 'Which TLS feature do you want to enable?' + cargo-flags: + description: "Flags to pass to cargo commands." required: true token: - description: 'GH Token used for retrieving SDK test harness.' + description: "GH Token used for retrieving SDK test harness." required: true runs: @@ -13,11 +13,11 @@ runs: steps: - name: Build contract tests shell: bash - run: TLS_FEATURE="${{ inputs.tls_feature }}" make build-contract-tests + run: CARGO_FLAGS="${{ inputs.cargo-flags }}" make build-contract-tests - name: Start contract test service shell: bash - run: make start-contract-test-service-bg + run: CARGO_FLAGS="${{ inputs.cargo-flags }}" make start-contract-test-service-bg - uses: launchdarkly/gh-actions/actions/contract-tests@contract-tests-v1.0.2 with: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ccac587..b7bb94e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,34 @@ on: jobs: ci-build: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + features: + - name: "default (aws-lc-rs for crypto)" + + - name: "no-features" + cargo-flags: "--no-default-features" + cargo-test-flags: "--lib" + skip_contract_tests: "true" + + - name: "hyper" + cargo-flags: "--no-default-features --features hyper" + cargo-test-flags: "--lib" + + - name: "hyper-rustls-native-roots" + cargo-flags: "--no-default-features --features hyper-rustls-native-roots" + + - name: "hyper-rustls-webpki-roots" + cargo-flags: "--no-default-features --features hyper-rustls-webpki-roots" + + - name: "native-tls" + cargo-flags: "--no-default-features --features native-tls" + + - name: "openssl crypto" + cargo-flags: "--no-default-features --features hyper-rustls-native-roots,crypto-openssl" + + name: CI (${{ matrix.features.name }}) steps: - uses: actions/checkout@v4 @@ -26,20 +54,33 @@ jobs: rustup component add rustfmt clippy - uses: ./.github/actions/ci + with: + cargo-flags: ${{ matrix.features.cargo-flags }} + cargo-test-flags: ${{ matrix.features.cargo-test-flags }} - - name: "Run contract tests with hyper_rustls" - uses: ./.github/actions/contract-tests + - uses: ./.github/actions/contract-tests + if: ${{ matrix.features.skip_contract_tests != 'true' }} with: - tls_feature: "rustls" + cargo-flags: ${{ matrix.features.cargo-flags }} token: ${{ secrets.GITHUB_TOKEN }} - - name: "Run contract tests with hyper_tls" - uses: ./.github/actions/contract-tests + build-docs: + runs-on: ubuntu-latest + name: Build Documentation (all features) + + steps: + - uses: actions/checkout@v4 with: - tls_feature: "tls" - token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Setup rust tooling + run: rustup override set nightly + + - name: Install cargo-docs-rs + run: cargo install cargo-docs-rs - - uses: ./.github/actions/build-docs + - name: Build documentation + run: cargo docs-rs -p launchdarkly-server-sdk musl-build: runs-on: ubuntu-latest diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index b9d1e0b..76c5f30 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -27,7 +27,6 @@ jobs: rustup component add rustfmt clippy - uses: ./.github/actions/ci - - uses: ./.github/actions/build-docs - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 name: "Get crates.io token" diff --git a/Makefile b/Makefile index 3da13d4..4f069d0 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,15 @@ TEMP_TEST_OUTPUT=/tmp/contract-test-service.log -TLS_FEATURE ?= rustls +CARGO_FLAGS ?= hyper-rustls-native-roots build-contract-tests: - cargo build -p contract-tests --release --no-default-features --features "$(TLS_FEATURE)" + cargo build -p contract-tests --release $(CARGO_FLAGS) start-contract-test-service: build-contract-tests @./target/release/contract-tests start-contract-test-service-bg: @echo "Test service output will be captured in $(TEMP_TEST_OUTPUT)" - @make start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & + @$(MAKE) start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & run-contract-tests: @curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/main/downloader/run.sh \ diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index 803be90..06ebcde 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -9,18 +9,44 @@ license = "Apache-2.0" actix = "0.13.0" actix-web = "4.2.1" env_logger = "0.10.0" +eventsource-client = { version = "0.17.0" } +launchdarkly-sdk-transport = { version = "0.1.0" } log = "0.4.14" launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]} serde = { version = "1.0.132", features = ["derive"] } serde_json = "1.0.73" futures = "0.3.12" -hyper = { version = "0.14.19", features = ["client"] } -hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]} -hyper-tls = { version = "0.5.0", optional = true } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"] } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "ring"], optional = true } +hyper-tls = { version = "0.6.0", optional = true } reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] } async-mutex = "1.4.0" [features] -default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "launchdarkly-server-sdk/rustls"] -tls = ["hyper-tls"] +default = ["hyper", "crypto-aws-lc-rs"] + +crypto-aws-lc-rs = ["launchdarkly-server-sdk/crypto-aws-lc-rs"] +crypto-openssl = ["launchdarkly-server-sdk/crypto-openssl"] + +hyper = [ + "launchdarkly-sdk-transport/hyper", + "eventsource-client/hyper" +] +hyper-rustls-native-roots = [ + "hyper", + "dep:hyper-rustls", + "launchdarkly-sdk-transport/hyper-rustls-native-roots", + "eventsource-client/hyper-rustls-native-roots" +] +hyper-rustls-webpki-roots = [ + "hyper", + "dep:hyper-rustls", + "launchdarkly-sdk-transport/hyper-rustls-webpki-roots", + "eventsource-client/hyper-rustls-webpki-roots" +] +native-tls = [ + "hyper", + "dep:hyper-tls", + "launchdarkly-sdk-transport/native-tls", + "eventsource-client/native-tls" +] diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 3054b47..fdf5908 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -11,13 +11,15 @@ const DEFAULT_EVENTS_BASE_URL: &str = "https://events.launchdarkly.com"; use launchdarkly_server_sdk::{ ApplicationInfo, BuildError, Client, ConfigBuilder, Detail, EventProcessorBuilder, - FlagDetailConfig, FlagValue, NullEventProcessorBuilder, PollingDataSourceBuilder, + FlagDetailConfig, FlagFilter, FlagValue, NullEventProcessorBuilder, PollingDataSourceBuilder, ServiceEndpointsBuilder, StreamingDataSourceBuilder, }; +#[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] +use crate::command_params::SecureModeHashResponse; use crate::command_params::{ ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, - MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse, + MigrationOperationResponse, MigrationVariationResponse, }; use crate::HttpsConnector; use crate::{ @@ -37,6 +39,21 @@ impl ClientEntity { create_instance_params: CreateInstanceParams, connector: HttpsConnector, ) -> Result { + let proxy = create_instance_params + .configuration + .proxy + .unwrap_or_default() + .http_proxy + .unwrap_or_default(); + let mut transport_builder = launchdarkly_sdk_transport::HyperTransport::builder(); + if !proxy.is_empty() { + transport_builder = transport_builder.proxy_url(proxy.clone()); + } + + // Create fresh transports for this client to avoid shared connection pool issues + let transport = transport_builder + .build_with_connector(connector.clone()) + .map_err(|e| BuildError::InvalidConfig(e.to_string()))?; let mut config_builder = ConfigBuilder::new(&create_instance_params.configuration.credential); @@ -79,7 +96,7 @@ impl ClientEntity { if let Some(delay) = streaming.initial_retry_delay_ms { streaming_builder.initial_reconnect_delay(Duration::from_millis(delay)); } - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport.clone()); config_builder = config_builder.data_source(&streaming_builder); } else if let Some(polling) = create_instance_params.configuration.polling { @@ -91,15 +108,15 @@ impl ClientEntity { if let Some(delay) = polling.poll_interval_ms { polling_builder.poll_interval(Duration::from_millis(delay)); } - polling_builder.https_connector(connector.clone()); + polling_builder.transport(transport.clone()); config_builder = config_builder.data_source(&polling_builder); } else { // If we didn't specify streaming or polling, we fall back to basic streaming. The only - // customization we provide is the https connector to support testing multiple - // connectors. + // customization we provide is the transport to support testing multiple + // transport implementations. let mut streaming_builder = StreamingDataSourceBuilder::new(); - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport.clone()); config_builder = config_builder.data_source(&streaming_builder); } @@ -113,6 +130,7 @@ impl ClientEntity { processor_builder.capacity(capacity); } processor_builder.all_attributes_private(events.all_attributes_private); + processor_builder.compress_events(false); if let Some(e) = events.enable_gzip { processor_builder.compress_events(e); } @@ -124,7 +142,7 @@ impl ClientEntity { if let Some(attributes) = events.global_private_attributes { processor_builder.private_attributes(attributes); } - processor_builder.https_connector(connector.clone()); + processor_builder.transport(transport); processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts); config_builder.event_processor(&processor_builder) @@ -214,14 +232,17 @@ impl ClientEntity { ContextResponse::from(Self::context_convert(params)), ))) } + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] "secureModeHash" => { let params = command .secure_mode_hash .ok_or("secureModeHash params should be set")?; + let hash = self + .client + .secure_mode_hash(¶ms.context) + .map_err(|e| e.to_string())?; Ok(Some(CommandResponse::SecureModeHash( - SecureModeHashResponse { - result: self.client.secure_mode_hash(¶ms.context), - }, + SecureModeHashResponse { result: hash }, ))) } "migrationVariation" => { @@ -551,7 +572,7 @@ impl ClientEntity { } if params.client_side_only { - config.client_side_only(); + config.flag_filter(FlagFilter::CLIENT); } if params.details_only_for_tracked_flags { diff --git a/contract-tests/src/command_params.rs b/contract-tests/src/command_params.rs index 5bbf495..2d19593 100644 --- a/contract-tests/src/command_params.rs +++ b/contract-tests/src/command_params.rs @@ -10,6 +10,7 @@ pub enum CommandResponse { EvaluateFlag(EvaluateFlagResponse), EvaluateAll(EvaluateAllFlagsResponse), ContextBuildOrConvert(ContextResponse), + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] SecureModeHash(SecureModeHashResponse), MigrationVariation(MigrationVariationResponse), MigrationOperation(MigrationOperationResponse), @@ -25,6 +26,7 @@ pub struct CommandParams { pub identify_event: Option, pub context_build: Option, pub context_convert: Option, + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] pub secure_mode_hash: Option, pub migration_variation: Option, pub migration_operation: Option, @@ -126,12 +128,14 @@ pub struct ContextConvertParams { pub input: String, } +#[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct SecureModeHashParams { pub context: Context, } +#[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct SecureModeHashResponse { diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index 8c33b43..676b361 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -7,7 +7,6 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Resu use async_mutex::Mutex; use client_entity::ClientEntity; use futures::executor; -use hyper::client::HttpConnector; use launchdarkly_server_sdk::Reference; use serde::{self, Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -19,6 +18,12 @@ struct Status { capabilities: Vec, } +#[derive(Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct ProxyParameters { + pub http_proxy: Option, +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct StreamingParameters { @@ -73,6 +78,8 @@ pub struct Configuration { #[serde(default = "bool::default")] pub init_can_fail: bool, + pub proxy: Option, + pub streaming: Option, pub polling: Option, @@ -104,6 +111,8 @@ async fn status() -> impl Responder { "tags".to_string(), "service-endpoints".to_string(), "context-type".to_string(), + "http-proxy".to_string(), + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] "secure-mode-hash".to_string(), "inline-context-all".to_string(), "anonymous-redaction".to_string(), @@ -208,41 +217,61 @@ struct AppState { https_connector: HttpsConnector, } -#[cfg(feature = "rustls")] -type HttpsConnector = hyper_rustls::HttpsConnector; -#[cfg(feature = "tls")] -type HttpsConnector = hyper_tls::HttpsConnector; +#[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots" +))] +type HttpsConnector = + hyper_rustls::HttpsConnector; + +#[cfg(feature = "native-tls")] +type HttpsConnector = hyper_tls::HttpsConnector; + +#[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" +)))] +type HttpsConnector = hyper_util::client::legacy::connect::HttpConnector; #[actix_web::main] + async fn main() -> std::io::Result<()> { env_logger::init(); - #[cfg(not(any(feature = "tls", feature = "rustls")))] - { - compile_error!("one of the { \"tls\", \"rustls\" } features must be enabled"); - } - #[cfg(all(feature = "tls", feature = "rustls"))] - { - compile_error!("only one of the { \"tls\", \"rustls\" } features can be enabled at a time"); - } - let (tx, rx) = mpsc::channel::<()>(); - #[cfg(feature = "rustls")] - let connector = hyper_rustls::HttpsConnectorBuilder::new() + #[cfg(feature = "hyper-rustls-native-roots")] + let https_connector = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() + .expect("Failed to load native root certificates") .https_or_http() .enable_http1() .enable_http2() .build(); - #[cfg(feature = "tls")] - let connector = hyper_tls::HttpsConnector::new(); + #[cfg(feature = "hyper-rustls-webpki-roots")] + let https_connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + #[cfg(feature = "native-tls")] + let https_connector = hyper_tls::HttpsConnector::new(); + + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] + let https_connector = hyper_util::client::legacy::connect::HttpConnector::new(); let state = web::Data::new(AppState { counter: Mutex::new(0), client_entities: Mutex::new(HashMap::new()), - https_connector: connector, + https_connector, }); let server = HttpServer::new(move || { diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index 1b5d6f3..143b606 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -14,13 +14,14 @@ exclude = [ ] [package.metadata.docs.rs] -features = ["event-compression"] +features = [] [dependencies] chrono = "0.4.19" crossbeam-channel = "0.5.1" data-encoding = "2.3.2" -eventsource-client = { version = "0.16.0", default-features = false } +eventsource-client = { version = "0.17.0" } +launchdarkly-sdk-transport = { version = "0.1.0" } futures = "0.3.12" log = "0.4.14" lru = { version = "0.16.3", default-features = false } @@ -33,11 +34,13 @@ parking_lot = "0.12.0" tokio-stream = { version = "0.1.8", features = ["sync"] } moka = { version = "0.12.1", features = ["sync"] } uuid = {version = "1.2.2", features = ["v4"] } -hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] } -hyper-rustls = { version = "0.24.1" , optional = true} +http = "1.0" +bytes = "1.11" +bitflags = "2.4" rand = "0.9" flate2 = { version = "1.0.35", optional = true } -aws-lc-rs = "1.14.1" +aws-lc-rs = { version = "1.14.1", optional = true } +openssl = { version = "0.10.75", optional = true } [dev-dependencies] maplit = "1.0.1" @@ -52,14 +55,41 @@ reqwest = { version = "0.12.4", features = ["json"] } testing_logger = "0.1.1" [features] -default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"] +default = [ + "hyper-rustls-native-roots", + "crypto-aws-lc-rs", + "event-compression" +] + +crypto-aws-lc-rs = ["dep:aws-lc-rs"] +crypto-openssl = ["dep:openssl"] + +hyper = [ + "launchdarkly-sdk-transport/hyper", + "eventsource-client/hyper" +] +hyper-rustls-native-roots = [ + "hyper", + "launchdarkly-sdk-transport/hyper-rustls-native-roots", + "eventsource-client/hyper-rustls-native-roots" +] +hyper-rustls-webpki-roots = [ + "hyper", + "launchdarkly-sdk-transport/hyper-rustls-webpki-roots", + "eventsource-client/hyper-rustls-webpki-roots" +] +native-tls = [ + "hyper", + "launchdarkly-sdk-transport/native-tls", + "eventsource-client/native-tls" +] + event-compression = ["flate2"] [[example]] name = "print_flags" -required-features = ["rustls"] +required-features = ["hyper-rustls-native-roots"] [[example]] name = "progress" -required-features = ["rustls"] +required-features = ["hyper-rustls-native-roots"] diff --git a/launchdarkly-server-sdk/examples/custom_transport.rs b/launchdarkly-server-sdk/examples/custom_transport.rs new file mode 100644 index 0000000..9867abf --- /dev/null +++ b/launchdarkly-server-sdk/examples/custom_transport.rs @@ -0,0 +1,120 @@ +use bytes::Bytes; +use http::Request; +use launchdarkly_sdk_transport::{HttpTransport, ResponseFuture}; +use launchdarkly_server_sdk::{ConfigBuilder, EventProcessorBuilder}; +use std::time::Instant; + +/// Example of a custom transport that wraps another transport and adds logging. +/// +/// This demonstrates how to implement the HttpTransport trait to add middleware +/// functionality like logging, metrics, retries, circuit breakers, etc. +#[derive(Clone)] +struct LoggingTransport { + inner: T, +} + +impl LoggingTransport { + fn new(inner: T) -> Self { + Self { inner } + } +} + +impl HttpTransport for LoggingTransport { + fn request(&self, request: Request>) -> ResponseFuture { + let method = request.method().clone(); + let uri = request.uri().clone(); + let start = Instant::now(); + + println!("[REQUEST] {method} {uri}"); + + let inner = self.inner.clone(); + Box::pin(async move { + let result = inner.request(request).await; + let elapsed = start.elapsed(); + + match &result { + Ok(response) => { + println!( + "[RESPONSE] {} {} - Status: {} - Duration: {:?}", + method, + uri, + response.status(), + elapsed + ); + } + Err(e) => { + println!("[ERROR] {method} {uri} - Error: {e} - Duration: {elapsed:?}"); + } + } + + result + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Get SDK key from environment + let sdk_key = + std::env::var("LAUNCHDARKLY_SDK_KEY").unwrap_or_else(|_| "your-sdk-key".to_string()); + + if sdk_key == "your-sdk-key" { + eprintln!("Please set LAUNCHDARKLY_SDK_KEY environment variable"); + std::process::exit(1); + } + + // Create the base HTTPS transport + let base_transport = launchdarkly_sdk_transport::HyperTransport::new_https()?; + + // Wrap it with logging middleware + let logging_transport = LoggingTransport::new(base_transport); + + // Configure the SDK to use the custom transport + let config = ConfigBuilder::new(&sdk_key) + .event_processor( + EventProcessorBuilder::new() + .transport(logging_transport.clone()) + .flush_interval(std::time::Duration::from_secs(5)), + ) + .build()?; + + // Create the client - you'll see all HTTP requests logged + println!("Initializing LaunchDarkly client with logging transport..."); + let client = launchdarkly_server_sdk::Client::build(config)?; + client.start_with_default_executor(); + + // Wait for initialization + println!("Waiting for client initialization..."); + match client + .wait_for_initialization(std::time::Duration::from_secs(10)) + .await + { + Some(true) => { + println!("Client initialized successfully!"); + + // Evaluate a flag (will trigger HTTP events) + let context = launchdarkly_server_sdk::ContextBuilder::new("example-user-key") + .build() + .expect("Failed to create context"); + + let flag_value = client.bool_variation(&context, "example-flag", false); + println!("Flag 'example-flag' evaluated to: {flag_value}"); + + // Wait a bit to see event flushing + println!("Waiting to observe event flushing..."); + tokio::time::sleep(std::time::Duration::from_secs(6)).await; + } + Some(false) => { + eprintln!("Client failed to initialize"); + } + None => { + eprintln!("Client initialization timed out"); + } + } + + // Shutdown the client + println!("Shutting down client..."); + client.close(); + + Ok(()) +} diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 4a1a4fb..c9b5a3d 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -156,6 +156,10 @@ pub struct Client { started: AtomicBool, offline: bool, daemon_mode: bool, + #[cfg_attr( + not(any(feature = "crypto-openssl", feature = "crypto-aws-lc-rs")), + allow(dead_code) + )] sdk_key: String, shutdown_broadcast: broadcast::Sender<()>, runtime: RwLock>, @@ -273,16 +277,6 @@ impl Client { Ok(true) } - /// This is an async method that will resolve once initialization is complete. - /// Initialization being complete does not mean that initialization was a success. - /// The return value from the method indicates if the client successfully initialized. - #[deprecated( - note = "blocking without a timeout is discouraged, use wait_for_initialization instead" - )] - pub async fn initialized_async(&self) -> bool { - self.initialized_async_internal().await - } - /// This is an async method that will resolve once initialization is complete or the specified /// timeout has occurred. /// @@ -352,6 +346,60 @@ impl Client { self.event_processor.flush(); } + /// Flush tells the client that all pending analytics events should be delivered as + /// soon as possible, and blocks until delivery is complete or the timeout expires. + /// + /// This method is particularly useful in short-lived execution environments like AWS Lambda + /// where you need to ensure events are sent before the function terminates. + /// + /// This method triggers a flush of events currently buffered and waits for that specific + /// flush to complete. Note that if periodic flushes or other flush operations are in-flight + /// when this is called, those may still be completing after this method returns. + /// + /// # Arguments + /// + /// * `timeout` - Maximum time to wait for flush to complete. Use `Duration::ZERO` to wait indefinitely. + /// + /// # Returns + /// + /// Returns `true` if flush completed successfully, `false` if timeout occurred. + /// + /// # Examples + /// + /// ```no_run + /// # use launchdarkly_server_sdk::{Client, ConfigBuilder}; + /// # use std::time::Duration; + /// # async fn example() { + /// # let client = Client::build(ConfigBuilder::new("sdk-key").build().unwrap()).unwrap(); + /// // Wait up to 5 seconds for flush to complete + /// let success = client.flush_blocking(Duration::from_secs(5)).await; + /// if !success { + /// eprintln!("Warning: flush timed out"); + /// } + /// # } + /// ``` + /// + /// For more information, see the Reference Guide: + /// . + pub async fn flush_blocking(&self, timeout: Duration) -> bool { + let event_processor = self.event_processor.clone(); + + let flush_future = + tokio::task::spawn_blocking(move || event_processor.flush_blocking(timeout)); + + if timeout == Duration::ZERO { + // Wait indefinitely + flush_future.await.unwrap_or(false) + } else { + // Apply timeout at async level too + match tokio::time::timeout(timeout, flush_future).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => false, // spawn_blocking panicked + Err(_) => false, // Timeout + } + } + } + /// Identify reports details about a context. /// /// For more information, see the Reference Guide: @@ -541,15 +589,39 @@ impl Client { .try_map(|val| val.as_json(), default, eval::Error::WrongType) } + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] /// Generates the secure mode hash value for a context. /// /// For more information, see the Reference Guide: /// . - pub fn secure_mode_hash(&self, context: &Context) -> String { - let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes()); - let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes()); - - data_encoding::HEXLOWER.encode(tag.as_ref()) + pub fn secure_mode_hash(&self, context: &Context) -> Result { + #[cfg(feature = "crypto-aws-lc-rs")] + { + let key = + aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes()); + let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes()); + + Ok(data_encoding::HEXLOWER.encode(tag.as_ref())) + } + #[cfg(feature = "crypto-openssl")] + { + use openssl::hash::MessageDigest; + use openssl::pkey::PKey; + use openssl::sign::Signer; + + let key = PKey::hmac(self.sdk_key.as_bytes()) + .map_err(|e| format!("Failed to create HMAC key: {e}"))?; + let mut signer = Signer::new(MessageDigest::sha256(), &key) + .map_err(|e| format!("Failed to create signer: {e}"))?; + signer + .update(context.canonical_key().as_bytes()) + .map_err(|e| format!("Failed to update signer: {e}"))?; + let hmac = signer + .sign_to_vec() + .map_err(|e| format!("Failed to sign: {e}"))?; + + Ok(data_encoding::HEXLOWER.encode(&hmac)) + } } /// Returns an object that encapsulates the state of all feature flags for a given context. This @@ -828,7 +900,6 @@ mod tests { use crossbeam_channel::Receiver; use eval::{ContextBuilder, MultiContextBuilder}; use futures::FutureExt; - use hyper::client::HttpConnector; use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment}; use maplit::hashmap; use std::collections::HashMap; @@ -836,6 +907,7 @@ mod tests { use crate::data_source::MockDataSource; use crate::data_source_builders::MockDataSourceBuilder; + use crate::evaluation::FlagFilter; use crate::events::create_event_sender; use crate::events::event::{OutputEvent, VariationKey}; use crate::events::processor_builders::EventProcessorBuilder; @@ -861,19 +933,6 @@ mod tests { is_send_and_sync::() } - #[tokio::test] - async fn client_asynchronously_initializes() { - let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false); - client.start_with_default_executor(); - - let now = Instant::now(); - let initialized = client.initialized_async().await; - let elapsed_time = now.elapsed(); - assert!(initialized); - // Give ourself a good margin for thread scheduling. - assert!(elapsed_time.as_millis() > 500) - } - #[tokio::test] async fn client_asynchronously_initializes_within_timeout() { let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false); @@ -1075,6 +1134,7 @@ mod tests { "toplevel", &["prereq1", "prereq2"], false, + false, ))), ) .expect("patch should apply"); @@ -1123,7 +1183,7 @@ mod tests { .upsert( "prereq1", PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( - "prereq1", false, + "prereq1", false, false, ))), ) .expect("patch should apply"); @@ -1133,7 +1193,7 @@ mod tests { .upsert( "prereq2", PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( - "prereq2", false, + "prereq2", false, false, ))), ) .expect("patch should apply"); @@ -1147,6 +1207,7 @@ mod tests { "toplevel", &["prereq1", "prereq2"], true, + false, ))), ) .expect("patch should apply"); @@ -1156,7 +1217,7 @@ mod tests { .expect("Failed to create context"); let mut config = FlagDetailConfig::new(); - config.client_side_only(); + config.flag_filter(FlagFilter::CLIENT); let all_flags = client.all_flags_detail(&context, config); @@ -1408,7 +1469,7 @@ mod tests { impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory { fn create_persistent_data_store( &self, - ) -> Result, std::io::Error> { + ) -> Result, std::io::Error> { let serialized_data = AllData::::try_from(self.data.clone())?; Ok(Box::new(InMemoryPersistentDataStore { @@ -1797,6 +1858,7 @@ mod tests { } #[test] + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] fn secure_mode_hash() { let config = ConfigBuilder::new("secret") .offline(true) @@ -1808,12 +1870,15 @@ mod tests { .expect("Failed to create context"); assert_eq!( - client.secure_mode_hash(&context), + client + .secure_mode_hash(&context) + .expect("Hash should be computed"), "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597" ); } #[test] + #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))] fn secure_mode_hash_with_multi_kind() { let config = ConfigBuilder::new("secret") .offline(true) @@ -1836,7 +1901,9 @@ mod tests { .expect("failed to build multi-context"); assert_eq!( - client.secure_mode_hash(&context), + client + .secure_mode_hash(&context) + .expect("Hash should be computed"), "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163" ); } @@ -2587,6 +2654,95 @@ mod tests { } } + #[tokio::test] + async fn client_flush_blocking_completes_successfully() { + let (client, event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client.wait_for_initialization(Duration::from_secs(1)).await; + + let context = ContextBuilder::new("user-key") + .build() + .expect("Failed to create context"); + + client.identify(context); + + let result = client.flush_blocking(Duration::from_secs(5)).await; + assert!(result, "flush_blocking should complete successfully"); + + client.close(); + + let events = event_rx.iter().collect::>(); + assert!(!events.is_empty(), "Should have received identify event"); + } + + #[tokio::test] + async fn client_flush_blocking_with_zero_timeout() { + let (client, event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client.wait_for_initialization(Duration::from_secs(1)).await; + + let context = ContextBuilder::new("user-key") + .build() + .expect("Failed to create context"); + + client.identify(context); + + let result = client.flush_blocking(Duration::ZERO).await; + assert!( + result, + "flush_blocking with zero timeout should complete successfully" + ); + + client.close(); + + let events = event_rx.iter().collect::>(); + assert!(!events.is_empty(), "Should have received identify event"); + } + + #[tokio::test] + async fn client_flush_blocking_with_no_events() { + let (client, _event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client.wait_for_initialization(Duration::from_secs(1)).await; + + let result = client.flush_blocking(Duration::from_secs(1)).await; + assert!( + result, + "flush_blocking with no events should complete immediately" + ); + + client.close(); + } + + #[tokio::test] + async fn client_flush_blocking_multiple_concurrent_calls() { + let (client, event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client.wait_for_initialization(Duration::from_secs(1)).await; + + let context = ContextBuilder::new("user-key") + .build() + .expect("Failed to create context"); + + client.identify(context); + + // Make multiple concurrent flush_blocking calls + let (result1, result2, result3) = tokio::join!( + client.flush_blocking(Duration::from_secs(5)), + client.flush_blocking(Duration::from_secs(5)), + client.flush_blocking(Duration::from_secs(5)), + ); + + assert!(result1, "First flush_blocking should succeed"); + assert!(result2, "Second flush_blocking should succeed"); + assert!(result3, "Third flush_blocking should succeed"); + + client.close(); + + let events = event_rx.iter().collect::>(); + assert!(!events.is_empty(), "Should have received identify event"); + } + fn make_mocked_client_with_delay( delay: u64, offline: bool, @@ -2600,7 +2756,8 @@ mod tests { .daemon_mode(daemon_mode) .data_source(MockDataSourceBuilder::new().data_source(updates)) .event_processor( - EventProcessorBuilder::::new().event_sender(Arc::new(event_sender)), + EventProcessorBuilder::::new() + .event_sender(Arc::new(event_sender)), ) .build() .expect("config should build"); @@ -2617,4 +2774,21 @@ mod tests { fn make_mocked_client() -> (Client, Receiver) { make_mocked_client_with_delay(0, false, false) } + + #[test] + fn client_builds_successfully() { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config should build"); + + let client = Client::build(config).expect("client should build successfully"); + + assert!( + !client.started.load(Ordering::SeqCst), + "client should not be started yet" + ); + assert!(client.offline, "client should be in offline mode"); + assert_eq!(client.sdk_key, "sdk-key", "sdk_key should match"); + } } diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index a43e2a1..5f5b6f3 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -1,11 +1,23 @@ use thiserror::Error; use crate::data_source_builders::{DataSourceFactory, NullDataSourceBuilder}; -use crate::events::processor_builders::{ - EventProcessorBuilder, EventProcessorFactory, NullEventProcessorBuilder, -}; + +#[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" +))] +use crate::events::processor_builders::EventProcessorBuilder; +use crate::events::processor_builders::{EventProcessorFactory, NullEventProcessorBuilder}; + use crate::stores::store_builders::{DataStoreFactory, InMemoryDataStoreBuilder}; -use crate::{ServiceEndpointsBuilder, StreamingDataSourceBuilder}; +use crate::ServiceEndpointsBuilder; +#[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" +))] +use crate::StreamingDataSourceBuilder; use std::borrow::Borrow; @@ -300,13 +312,30 @@ impl ConfigBuilder { Ok(Box::new(NullDataSourceBuilder::new())) } Some(builder) => Ok(builder), - #[cfg(feature = "rustls")] - None => Ok(Box::new(StreamingDataSourceBuilder::< - hyper_rustls::HttpsConnector, - >::new())), - #[cfg(not(feature = "rustls"))] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] + None => { + let transport = launchdarkly_sdk_transport::HyperTransport::new_https() + .map_err(|e| { + BuildError::InvalidConfig(format!( + "failed to create default transport: {}", + e + )) + })?; + let mut builder = StreamingDataSourceBuilder::new(); + builder.transport(transport); + Ok(Box::new(builder)) + } + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] None => Err(BuildError::InvalidConfig( - "data source builder required when rustls is disabled".into(), + "data source builder required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(), )), }; let data_source_builder = data_source_builder_result?; @@ -319,13 +348,30 @@ impl ConfigBuilder { Ok(Box::new(NullEventProcessorBuilder::new())) } Some(builder) => Ok(builder), - #[cfg(feature = "rustls")] - None => Ok(Box::new(EventProcessorBuilder::< - hyper_rustls::HttpsConnector, - >::new())), - #[cfg(not(feature = "rustls"))] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] + None => { + let transport = launchdarkly_sdk_transport::HyperTransport::new_https() + .map_err(|e| { + BuildError::InvalidConfig(format!( + "failed to create default transport: {}", + e + )) + })?; + let mut builder = EventProcessorBuilder::new(); + builder.transport(transport); + Ok(Box::new(builder)) + } + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] None => Err(BuildError::InvalidConfig( - "event processor factory required when rustls is disabled".into(), + "event processor factory required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(), )), }; let event_processor_builder = event_processor_builder_result?; @@ -373,6 +419,11 @@ mod tests { } #[test] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] fn unconfigured_config_builder_handles_application_tags_correctly() { let builder = ConfigBuilder::new("sdk-key"); let config = builder.build().expect("config should build"); @@ -384,6 +435,11 @@ mod tests { #[test_case("Invalid id", "version", Some("application-version/version".to_string()))] #[test_case("id", "Invalid version", Some("application-id/id".to_string()))] #[test_case("Invalid id", "Invalid version", None)] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] fn config_builder_handles_application_tags_appropriately( id: impl Into, version: impl Into, diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 665f86e..47b5c88 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -7,15 +7,11 @@ use crate::LAUNCHDARKLY_TAGS_HEADER; use es::{Client, ClientBuilder, ReconnectOptionsBuilder}; use eventsource_client as es; use futures::StreamExt; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; use launchdarkly_server_sdk_evaluation::{Flag, Segment}; use parking_lot::RwLock; use serde::Deserialize; use std::sync::{Arc, Mutex, Once}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::broadcast; use tokio::time; use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; @@ -74,19 +70,13 @@ pub struct StreamingDataSource { impl StreamingDataSource { #[allow(clippy::result_large_err)] - pub fn new( + pub fn new( base_url: &str, sdk_key: &str, initial_reconnect_delay: Duration, tags: &Option, - connector: C, - ) -> std::result::Result - where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, - { + transport: T, + ) -> std::result::Result { let stream_url = format!("{base_url}/all"); let client_builder = ClientBuilder::for_url(&stream_url)?; @@ -106,7 +96,7 @@ impl StreamingDataSource { } Ok(Self { - es_client: Box::new(client_builder.build_with_conn(connector)), + es_client: Box::new(client_builder.build_with_transport(transport)), }) } } @@ -377,14 +367,13 @@ mod tests { time::Duration, }; - use hyper::client::HttpConnector; use mockito::Matcher; use parking_lot::RwLock; use test_case::test_case; use tokio::sync::broadcast; use super::{DataSource, PollingDataSource, StreamingDataSource}; - use crate::feature_requester_builders::HyperFeatureRequesterBuilder; + use crate::feature_requester_builders::HttpFeatureRequesterBuilder; use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER}; #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] @@ -412,7 +401,8 @@ mod tests { "sdk-key", Duration::from_secs(0), &tag, - HttpConnector::new(), + launchdarkly_sdk_transport::HyperTransport::new() + .expect("Failed to create streaming data source"), ) .unwrap(); @@ -463,8 +453,9 @@ mod tests { let (shutdown_tx, _) = broadcast::channel::<()>(1); let initialized = Arc::new(AtomicBool::new(false)); - let hyper_builder = - HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new()); + let transport = launchdarkly_sdk_transport::HyperTransport::new() + .expect("Failed to create transport for polling data source"); + let hyper_builder = HttpFeatureRequesterBuilder::new(&server.url(), "sdk-key", transport); let polling = PollingDataSource::new( Arc::new(Mutex::new(Box::new(hyper_builder))), diff --git a/launchdarkly-server-sdk/src/data_source_builders.rs b/launchdarkly-server-sdk/src/data_source_builders.rs index 4789078..d0a56e3 100644 --- a/launchdarkly-server-sdk/src/data_source_builders.rs +++ b/launchdarkly-server-sdk/src/data_source_builders.rs @@ -1,13 +1,10 @@ use super::service_endpoints; use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource}; -use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder}; -use hyper::{client::connect::Connection, service::Service, Uri}; -#[cfg(feature = "rustls")] -use hyper_rustls::HttpsConnectorBuilder; +use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder}; +use launchdarkly_sdk_transport::HttpTransport; use std::sync::{Arc, Mutex}; use std::time::Duration; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; #[cfg(test)] use super::data_source; @@ -47,26 +44,25 @@ pub trait DataSourceFactory { /// Adjust the initial reconnect delay. /// ``` /// # use launchdarkly_server_sdk::{StreamingDataSourceBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use launchdarkly_sdk_transport::HyperTransport; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::::new() /// .initial_reconnect_delay(Duration::from_secs(10))); /// # } /// ``` #[derive(Clone)] -pub struct StreamingDataSourceBuilder { +pub struct StreamingDataSourceBuilder { initial_reconnect_delay: Duration, - connector: Option, + transport: Option, } -impl StreamingDataSourceBuilder { +impl StreamingDataSourceBuilder { /// Create a new instance of the [StreamingDataSourceBuilder] with default values. pub fn new() -> Self { Self { initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY, - connector: None, + transport: None, } } @@ -76,22 +72,18 @@ impl StreamingDataSourceBuilder { self } - /// Sets the connector for the event source client to use. This allows for re-use of a - /// connector between multiple client instances. This is especially useful for the + /// Sets the transport for the event source client to use. This allows for re-use of a + /// transport between multiple client instances. This is especially useful for the /// `sdk-test-harness` where many client instances are created throughout the test and reading /// the native certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } } -impl DataSourceFactory for StreamingDataSourceBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, +impl DataSourceFactory + for StreamingDataSourceBuilder { fn build( &self, @@ -99,33 +91,41 @@ where sdk_key: &str, tags: Option, ) -> Result, BuildError> { - let data_source_result = match &self.connector { - #[cfg(feature = "rustls")] + let data_source_result = match &self.transport { + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] None => { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); + let transport = + launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| { + BuildError::InvalidConfig(format!( + "failed to create default https transport: {e:?}" + )) + })?; Ok(StreamingDataSource::new( endpoints.streaming_base_url(), sdk_key, self.initial_reconnect_delay, &tags, - connector, + transport, )) } - #[cfg(not(feature = "rustls"))] + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] None => Err(BuildError::InvalidConfig( - "https connector required when rustls is disabled".into(), + "https connector required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(), )), - Some(connector) => Ok(StreamingDataSource::new( + Some(transport) => Ok(StreamingDataSource::new( endpoints.streaming_base_url(), sdk_key, self.initial_reconnect_delay, &tags, - connector.clone(), + transport.clone(), )), }; let data_source = data_source_result? @@ -138,7 +138,7 @@ where } } -impl Default for StreamingDataSourceBuilder { +impl Default for StreamingDataSourceBuilder { fn default() -> Self { StreamingDataSourceBuilder::new() } @@ -189,18 +189,17 @@ impl Default for NullDataSourceBuilder { /// Adjust the initial reconnect delay. /// ``` /// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use launchdarkly_sdk_transport::HyperTransport; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::::new() /// .poll_interval(Duration::from_secs(60))); /// # } /// ``` #[derive(Clone)] -pub struct PollingDataSourceBuilder { +pub struct PollingDataSourceBuilder { poll_interval: Duration, - connector: Option, + transport: Option, } /// Contains methods for configuring the polling data source. @@ -219,20 +218,19 @@ pub struct PollingDataSourceBuilder { /// Adjust the poll interval. /// ``` /// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; +/// # use launchdarkly_sdk_transport::HyperTransport; /// # use std::time::Duration; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::::new() /// .poll_interval(Duration::from_secs(60))); /// # } /// ``` -impl PollingDataSourceBuilder { +impl PollingDataSourceBuilder { /// Create a new instance of the [PollingDataSourceBuilder] with default values. pub fn new() -> Self { Self { poll_interval: MINIMUM_POLL_INTERVAL, - connector: None, + transport: None, } } @@ -245,23 +243,17 @@ impl PollingDataSourceBuilder { self } - /// Sets the connector for the polling client to use. This allows for re-use of a connector + /// Sets the transport for the polling client to use. This allows for re-use of a transport /// between multiple client instances. This is especially useful for the `sdk-test-harness` /// where many client instances are created throughout the test and reading the native /// certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } } -impl DataSourceFactory for PollingDataSourceBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl DataSourceFactory for PollingDataSourceBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, @@ -269,30 +261,38 @@ where tags: Option, ) -> Result, BuildError> { let feature_requester_builder: Result, BuildError> = - match &self.connector { - #[cfg(feature = "rustls")] + match &self.transport { + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] None => { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - Ok(Box::new(HyperFeatureRequesterBuilder::new( + let transport = launchdarkly_sdk_transport::HyperTransport::new_https() + .map_err(|e| { + BuildError::InvalidConfig(format!( + "failed to create default https transport: {e:?}" + )) + })?; + + Ok(Box::new(HttpFeatureRequesterBuilder::new( endpoints.polling_base_url(), sdk_key, - connector, + transport, ))) } - #[cfg(not(feature = "rustls"))] + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] None => Err(BuildError::InvalidConfig( - "https connector required when rustls is disabled".into(), + "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(), )), - Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new( + Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new( endpoints.polling_base_url(), sdk_key, - connector.clone(), + transport.clone(), ))), }; @@ -309,7 +309,7 @@ where } } -impl Default for PollingDataSourceBuilder { +impl Default for PollingDataSourceBuilder { fn default() -> Self { PollingDataSourceBuilder::new() } @@ -355,13 +355,15 @@ impl DataSourceFactory for MockDataSourceBuilder { #[cfg(test)] mod tests { - use hyper::client::HttpConnector; + use bytes::Bytes; + use launchdarkly_sdk_transport::{HyperTransport, Request, ResponseFuture}; use super::*; #[test] fn default_stream_builder_has_correct_defaults() { - let builder: StreamingDataSourceBuilder = StreamingDataSourceBuilder::new(); + let builder: StreamingDataSourceBuilder = + StreamingDataSourceBuilder::new(); assert_eq!( builder.initial_reconnect_delay, @@ -370,29 +372,19 @@ mod tests { } #[test] - fn stream_builder_can_use_custom_connector() { + fn stream_builder_can_use_custom_transport() { #[derive(Debug, Clone)] - struct TestConnector; - impl hyper::service::Service for TestConnector { - type Response = tokio::net::TcpStream; - type Error = std::io::Error; - type Future = futures::future::BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } + struct TestTransport; - fn call(&mut self, _req: hyper::Uri) -> Self::Future { + impl launchdarkly_sdk_transport::HttpTransport for TestTransport { + fn request(&self, _request: Request>) -> ResponseFuture { // this won't be called during the test unreachable!(); } } let mut builder = StreamingDataSourceBuilder::new(); - builder.https_connector(TestConnector); + builder.transport(TestTransport); assert!(builder .build( &crate::ServiceEndpointsBuilder::new().build().unwrap(), @@ -404,13 +396,13 @@ mod tests { #[test] fn default_polling_builder_has_correct_defaults() { - let builder = PollingDataSourceBuilder::::new(); + let builder = PollingDataSourceBuilder::::new(); assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,); } #[test] fn initial_reconnect_delay_for_streaming_can_be_adjusted() { - let mut builder = StreamingDataSourceBuilder::<()>::new(); + let mut builder = StreamingDataSourceBuilder::::new(); builder.initial_reconnect_delay(Duration::from_secs(1234)); assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234)); } diff --git a/launchdarkly-server-sdk/src/evaluation.rs b/launchdarkly-server-sdk/src/evaluation.rs index 75af925..ca0873f 100644 --- a/launchdarkly-server-sdk/src/evaluation.rs +++ b/launchdarkly-server-sdk/src/evaluation.rs @@ -1,4 +1,5 @@ use super::stores::store::DataStore; +use bitflags::bitflags; use serde::Serialize; use std::cell::RefCell; @@ -8,22 +9,61 @@ use launchdarkly_server_sdk_evaluation::{ use std::collections::HashMap; use std::time::SystemTime; +bitflags! { + /// Controls which flags are included based on their client-side availability settings. + /// + /// Use this with [FlagDetailConfig] to filter flags returned by [crate::Client::all_flags_detail]. + /// + /// # Examples + /// + /// ``` + /// # use launchdarkly_server_sdk::{FlagDetailConfig, FlagFilter}; + /// // Include only web/JavaScript client flags + /// let mut config = FlagDetailConfig::new(); + /// config.flag_filter(FlagFilter::CLIENT); + /// + /// // Include both web and mobile client flags + /// let mut config = FlagDetailConfig::new(); + /// config.flag_filter(FlagFilter::CLIENT | FlagFilter::MOBILE); + /// + /// // Include all flags (default) + /// let config = FlagDetailConfig::new(); // empty filter = no filtering + /// ``` + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub struct FlagFilter: u8 { + /// Include flags available to JavaScript/web client-side SDKs. + /// Filters to flags where `using_environment_id()` returns true. + const CLIENT = 0b01; + + /// Include flags available to mobile/desktop client-side SDKs. + /// Filters to flags where `using_mobile_key()` returns true. + const MOBILE = 0b10; + } +} + +impl Default for FlagFilter { + fn default() -> Self { + // Empty filter = include all flags (no filtering) + Self::empty() + } +} + /// Configuration struct to control the type of data returned from the [crate::Client::all_flags_detail] /// method. By default, each of the options default to false. However, you can selectively enable /// them by calling the appropriate functions. /// /// ``` -/// # use launchdarkly_server_sdk::FlagDetailConfig; +/// # use launchdarkly_server_sdk::{FlagDetailConfig, FlagFilter}; /// # fn main() { /// let mut config = FlagDetailConfig::new(); -/// config.client_side_only() +/// config.flag_filter(FlagFilter::CLIENT) /// .with_reasons() /// .details_only_for_tracked_flags(); /// # } /// ``` #[derive(Clone, Copy, Default)] pub struct FlagDetailConfig { - client_side_only: bool, + flag_filter: FlagFilter, with_reasons: bool, details_only_for_tracked_flags: bool, } @@ -34,16 +74,18 @@ impl FlagDetailConfig { /// By default, this config will include al flags and will not include reasons. pub fn new() -> Self { Self { - client_side_only: false, + flag_filter: FlagFilter::default(), with_reasons: false, details_only_for_tracked_flags: false, } } - /// Limit to only flags that are marked for use with the client-side SDK (by - /// default, all flags are included) - pub fn client_side_only(&mut self) -> &mut Self { - self.client_side_only = true; + /// Set the flag filter to control which flags are included. + /// + /// Pass an empty filter (default) to include all flags. + /// Use `FlagFilter::CLIENT`, `FlagFilter::MOBILE`, or combine them. + pub fn flag_filter(&mut self, filter: FlagFilter) -> &mut Self { + self.flag_filter = filter; self } @@ -148,8 +190,14 @@ impl FlagDetail { let mut flag_state = HashMap::new(); for (key, flag) in store.all_flags() { - if config.client_side_only && !flag.using_environment_id() { - continue; + if !config.flag_filter.is_empty() { + let matches_filter = (config.flag_filter.contains(FlagFilter::CLIENT) + && flag.using_environment_id()) + || (config.flag_filter.contains(FlagFilter::MOBILE) && flag.using_mobile_key()); + + if !matches_filter { + continue; + } } let event_recorder = DirectPrerequisiteRecorder::new(key.clone()); @@ -219,7 +267,7 @@ impl FlagDetail { #[cfg(test)] mod tests { - use crate::evaluation::FlagDetail; + use crate::evaluation::{FlagDetail, FlagFilter}; use crate::stores::store::DataStore; use crate::stores::store::InMemoryDataStore; use crate::stores::store_types::{PatchTarget, StorageItem}; @@ -230,6 +278,7 @@ mod tests { use crate::FlagDetailConfig; use assert_json_diff::assert_json_eq; use launchdarkly_server_sdk_evaluation::ContextBuilder; + use test_case::test_case; #[test] fn flag_detail_handles_default_configuration() { @@ -461,8 +510,12 @@ mod tests { let prereq1 = basic_flag("prereq1"); let prereq2 = basic_flag("prereq2"); - let toplevel = - basic_flag_with_prereqs_and_visibility("toplevel", &["prereq1", "prereq2"], false); + let toplevel = basic_flag_with_prereqs_and_visibility( + "toplevel", + &["prereq1", "prereq2"], + false, + false, + ); store .upsert("prereq1", PatchTarget::Flag(StorageItem::Item(prereq1))) @@ -512,12 +565,16 @@ mod tests { let mut store = InMemoryDataStore::new(); // These two prerequisites won't be visible to clients (environment ID) SDKs. - let prereq1 = basic_flag_with_visibility("prereq1", false); - let prereq2 = basic_flag_with_visibility("prereq2", false); + let prereq1 = basic_flag_with_visibility("prereq1", false, false); + let prereq2 = basic_flag_with_visibility("prereq2", false, false); // But, the top-level flag will. - let toplevel = - basic_flag_with_prereqs_and_visibility("toplevel", &["prereq1", "prereq2"], true); + let toplevel = basic_flag_with_prereqs_and_visibility( + "toplevel", + &["prereq1", "prereq2"], + true, + false, + ); store .upsert("prereq1", PatchTarget::Flag(StorageItem::Item(prereq1))) @@ -534,7 +591,7 @@ mod tests { let mut flag_detail = FlagDetail::new(true); let mut config = FlagDetailConfig::new(); - config.client_side_only(); + config.flag_filter(FlagFilter::CLIENT); flag_detail.populate(&store, &context, config); @@ -567,8 +624,12 @@ mod tests { let prereq1 = basic_off_flag("prereq1"); let prereq2 = basic_flag("prereq2"); - let toplevel = - basic_flag_with_prereqs_and_visibility("toplevel", &["prereq1", "prereq2"], true); + let toplevel = basic_flag_with_prereqs_and_visibility( + "toplevel", + &["prereq1", "prereq2"], + true, + false, + ); store .upsert("prereq1", PatchTarget::Flag(StorageItem::Item(prereq1))) @@ -610,4 +671,84 @@ mod tests { assert_json_eq!(expected, flag_detail); } + + #[test_case(FlagFilter::empty(), &["server-flag", "client-flag", "mobile-flag", "both-flag"] ; "empty filter includes all flags")] + #[test_case(FlagFilter::CLIENT, &["client-flag", "both-flag"] ; "client filter includes only client flags")] + #[test_case(FlagFilter::MOBILE, &["mobile-flag", "both-flag"] ; "mobile filter includes only mobile flags")] + #[test_case(FlagFilter::CLIENT | FlagFilter::MOBILE, &["client-flag", "mobile-flag", "both-flag"] ; "combined filter includes client or mobile flags")] + fn flag_filter_includes_correct_flags(filter: FlagFilter, expected_flags: &[&str]) { + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + let mut store = InMemoryDataStore::new(); + + // Add different types of flags + store + .upsert( + "server-flag", + PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( + "server-flag", + false, + false, + ))), + ) + .expect("patch should apply"); + + store + .upsert( + "client-flag", + PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( + "client-flag", + true, + false, + ))), + ) + .expect("patch should apply"); + + store + .upsert( + "mobile-flag", + PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( + "mobile-flag", + false, + true, + ))), + ) + .expect("patch should apply"); + + store + .upsert( + "both-flag", + PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility( + "both-flag", + true, + true, + ))), + ) + .expect("patch should apply"); + + let mut flag_detail = FlagDetail::new(true); + let mut config = FlagDetailConfig::new(); + if !filter.is_empty() { + config.flag_filter(filter); + } + flag_detail.populate(&store, &context, config); + + // Assert expected flags are present + for expected_flag in expected_flags { + assert!( + flag_detail.evaluations.contains_key(*expected_flag), + "Expected flag '{expected_flag}' to be present" + ); + } + + // Assert count matches + assert_eq!( + flag_detail.evaluations.len(), + expected_flags.len(), + "Expected {} flags, got {}", + expected_flags.len(), + flag_detail.evaluations.len() + ); + } } diff --git a/launchdarkly-server-sdk/src/events/dispatcher.rs b/launchdarkly-server-sdk/src/events/dispatcher.rs index 5921fb5..73b86ac 100644 --- a/launchdarkly-server-sdk/src/events/dispatcher.rs +++ b/launchdarkly-server-sdk/src/events/dispatcher.rs @@ -114,6 +114,7 @@ impl EventDispatcher { }; let (send, recv) = bounded::<()>(1); + let mut flush_signal: Option> = None; loop { debug!("waiting for a batch to send"); @@ -121,12 +122,15 @@ impl EventDispatcher { loop { select! { recv(event_result_rx) -> result => match result { - Ok(result) if result.success => self.last_known_time = std::cmp::max(result.time_from_server, self.last_known_time), - Ok(result) if result.must_shutdown => { - self.disabled = true; - self.outbox.reset(); - }, - Ok(_) => continue, + Ok(result) => { + if result.success { + self.last_known_time = std::cmp::max(result.time_from_server, self.last_known_time); + } else if result.must_shutdown { + self.disabled = true; + self.outbox.reset(); + } + result.flush_signal.map(|s| s.send(())); + } Err(e) => { error!("event_result_rx is disconnected. Shutting down dispatcher: {e}"); return; @@ -136,6 +140,10 @@ impl EventDispatcher { recv(flush_ticker) -> _ => break, recv(inbox_rx) -> result => match result { Ok(EventDispatcherMessage::Flush) => break, + Ok(EventDispatcherMessage::FlushWithReply(reply_sender)) => { + flush_signal = Some(reply_sender); + break; + } Ok(EventDispatcherMessage::EventMessage(event)) => { if !self.disabled { self.process_event(event); @@ -166,6 +174,7 @@ impl EventDispatcher { } if self.disabled { + flush_signal.take().map(|s| s.send(())); continue; } @@ -177,10 +186,14 @@ impl EventDispatcher { let sender = self.events_configuration.event_sender.clone(); let results = event_result_tx.clone(); let send = send.clone(); + let fs = flush_signal.take(); rt.spawn(async move { - sender.send_event_data(payload, results).await; + sender.send_event_data(payload, results, fs).await; drop(send); }); + } else { + // No events to send, reply immediately to any waiting flush_blocking calls + flush_signal.take().map(|s| s.send(())); } } } @@ -317,6 +330,7 @@ pub(super) enum EventDispatcherMessage { EventMessage(InputEvent), Flush, Close(Sender<()>), + FlushWithReply(Sender<()>), } #[cfg(test)] @@ -828,6 +842,146 @@ mod tests { assert_eq!(event_rx.try_iter().count(), 1); } + #[test] + fn flush_blocking_returns_immediately_when_outbox_empty() { + let (event_sender, event_rx) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); + + let dispatcher_handle = thread::Builder::new() + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.start(inbox_rx) + }) + .unwrap(); + + // Send FlushWithReply without any events + let (tx, rx) = bounded(1); + inbox_tx + .send(EventDispatcherMessage::FlushWithReply(tx)) + .expect("flush with reply failed"); + + // Should receive signal immediately since outbox is empty + rx.recv_timeout(Duration::from_millis(500)) + .expect("should receive flush signal immediately when outbox is empty"); + + let (close_tx, close_rx) = bounded(1); + inbox_tx + .send(EventDispatcherMessage::Close(close_tx)) + .expect("failed to close"); + close_rx.recv().expect("failed to notify on close"); + dispatcher_handle.join().unwrap(); + + assert_eq!(event_rx.try_iter().count(), 0); + } + + #[test] + fn flush_blocking_signals_after_send_completes() { + let (event_sender, event_rx) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); + + let dispatcher_handle = thread::Builder::new() + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.start(inbox_rx) + }) + .unwrap(); + + let context = ContextBuilder::new("context") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + + // Send an event + inbox_tx + .send(EventDispatcherMessage::EventMessage( + event_factory.new_identify(context), + )) + .expect("event send failed"); + + // Send FlushWithReply + let (tx, rx) = bounded(1); + inbox_tx + .send(EventDispatcherMessage::FlushWithReply(tx)) + .expect("flush with reply failed"); + + // Should receive signal after send completes + rx.recv_timeout(Duration::from_secs(5)) + .expect("should receive flush signal after send completes"); + + let (close_tx, close_rx) = bounded(1); + inbox_tx + .send(EventDispatcherMessage::Close(close_tx)) + .expect("failed to close"); + close_rx.recv().expect("failed to notify on close"); + dispatcher_handle.join().unwrap(); + + assert_eq!(event_rx.iter().count(), 1); + } + + #[test] + fn flush_blocking_with_multiple_concurrent_requests() { + let (event_sender, event_rx) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); + + let dispatcher_handle = thread::Builder::new() + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.start(inbox_rx) + }) + .unwrap(); + + let context = ContextBuilder::new("context") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + + // Send an event + inbox_tx + .send(EventDispatcherMessage::EventMessage( + event_factory.new_identify(context), + )) + .expect("event send failed"); + + // Send multiple FlushWithReply requests + let (tx1, rx1) = bounded(1); + let (tx2, rx2) = bounded(1); + let (tx3, rx3) = bounded(1); + + inbox_tx + .send(EventDispatcherMessage::FlushWithReply(tx1)) + .expect("flush1 failed"); + inbox_tx + .send(EventDispatcherMessage::FlushWithReply(tx2)) + .expect("flush2 failed"); + inbox_tx + .send(EventDispatcherMessage::FlushWithReply(tx3)) + .expect("flush3 failed"); + + // All should complete (though only first triggers actual send) + rx1.recv_timeout(Duration::from_secs(5)) + .expect("rx1 should complete"); + rx2.recv_timeout(Duration::from_secs(5)) + .expect("rx2 should complete"); + rx3.recv_timeout(Duration::from_secs(5)) + .expect("rx3 should complete"); + + let (close_tx, close_rx) = bounded(1); + inbox_tx + .send(EventDispatcherMessage::Close(close_tx)) + .expect("failed to close"); + close_rx.recv().expect("failed to notify on close"); + dispatcher_handle.join().unwrap(); + + // Should have one event from first flush (others had empty outbox) + assert_eq!(event_rx.iter().count(), 1); + } + fn create_dispatcher(events_configuration: EventsConfiguration) -> EventDispatcher { EventDispatcher::new(events_configuration) } diff --git a/launchdarkly-server-sdk/src/events/processor.rs b/launchdarkly-server-sdk/src/events/processor.rs index 9296602..a6f72f4 100644 --- a/launchdarkly-server-sdk/src/events/processor.rs +++ b/launchdarkly-server-sdk/src/events/processor.rs @@ -1,6 +1,7 @@ -use crossbeam_channel::{bounded, Sender}; +use crossbeam_channel::{bounded, RecvTimeoutError, Sender}; use std::sync::Once; use std::thread; +use std::time::Duration; use thiserror::Error; use super::dispatcher::{EventDispatcher, EventDispatcherMessage}; @@ -31,6 +32,23 @@ pub trait EventProcessor: Send + Sync { /// delivered. Subsequent calls to [EventProcessor::send] or [EventProcessor::flush] will be /// ignored. fn close(&self); + + /// Tells the event processor that all pending analytics events should be delivered as soon as + /// possible, and blocks until delivery is complete or the timeout expires. + /// + /// This method triggers a flush of events currently in the outbox and waits for that specific + /// flush to complete. Note that if periodic flushes or other flush operations are in-flight + /// when this is called, those may still be completing after this method returns. + /// + /// # Arguments + /// + /// * `timeout` - Maximum time to wait for flush to complete. Use `Duration::ZERO` to wait indefinitely. + /// + /// # Returns + /// + /// Returns `true` if flush completed successfully, `false` if timeout occurred or the event + /// processor has been shut down. + fn flush_blocking(&self, timeout: std::time::Duration) -> bool; } pub struct NullEventProcessor {} @@ -45,6 +63,9 @@ impl EventProcessor for NullEventProcessor { fn send(&self, _: InputEvent) {} fn flush(&self) {} fn close(&self) {} + fn flush_blocking(&self, _timeout: std::time::Duration) -> bool { + true + } } pub struct EventProcessorImpl { @@ -106,6 +127,43 @@ impl EventProcessor for EventProcessorImpl { let _ = receiver.recv(); } + + fn flush_blocking(&self, timeout: Duration) -> bool { + let (sender, receiver) = bounded::<()>(1); + + if self + .inbox_tx + .send(EventDispatcherMessage::FlushWithReply(sender)) + .is_err() + { + error!("Failed to send flush_blocking message"); + return false; + } + + if timeout == Duration::ZERO { + // Wait indefinitely + match receiver.recv() { + Ok(()) => true, + Err(_) => { + error!("flush_blocking failed: event processor shut down"); + false + } + } + } else { + // Wait with timeout + match receiver.recv_timeout(timeout) { + Ok(()) => true, + Err(RecvTimeoutError::Timeout) => { + warn!("flush_blocking timed out after {timeout:?}"); + false + } + Err(RecvTimeoutError::Disconnected) => { + error!("flush_blocking failed: event processor shut down"); + false + } + } + } + } } #[cfg(test)] @@ -125,6 +183,37 @@ mod tests { use super::*; + // Helper to create a failing event sender for testing + struct FailingEventSender { + should_shutdown: bool, + } + + impl FailingEventSender { + fn new(should_shutdown: bool) -> Self { + Self { should_shutdown } + } + } + + impl crate::events::sender::EventSender for FailingEventSender { + fn send_event_data( + &self, + _events: Vec, + result_tx: crossbeam_channel::Sender, + flush_signal: Option>, + ) -> futures::future::BoxFuture<'static, ()> { + let should_shutdown = self.should_shutdown; + Box::pin(async move { + // Simulate a failed HTTP send + let _ = result_tx.send(crate::events::sender::EventSenderResult { + time_from_server: 0, + success: false, + must_shutdown: should_shutdown, + flush_signal, + }); + }) + } + } + #[test] fn calling_close_on_processor_twice_returns() { let (event_sender, _) = create_event_sender(); @@ -372,4 +461,228 @@ mod tests { 1 ); } + + #[test] + fn flush_blocking_completes_successfully() { + let (event_sender, event_rx) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let context = ContextBuilder::new("foo") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + event_processor.send(event_factory.new_identify(context)); + + let result = event_processor.flush_blocking(Duration::from_secs(5)); + assert!(result, "flush_blocking should complete successfully"); + + event_processor.close(); + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 1); + } + + #[test] + fn flush_blocking_with_very_short_timeout() { + let (event_sender, _) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let event_factory = EventFactory::new(true); + + // Send many events to increase the chance of timeout + for i in 0..100 { + let ctx = ContextBuilder::new(format!("user-{i}")) + .build() + .expect("Failed to create context"); + event_processor.send(event_factory.new_identify(ctx)); + } + + // Very short timeout may or may not complete - just verify it doesn't panic + let _result = event_processor.flush_blocking(Duration::from_nanos(1)); + + event_processor.close(); + } + + #[test] + fn flush_blocking_with_zero_timeout_waits() { + let (event_sender, event_rx) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let context = ContextBuilder::new("foo") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + event_processor.send(event_factory.new_identify(context)); + + let result = event_processor.flush_blocking(Duration::ZERO); + assert!( + result, + "flush_blocking with zero timeout should complete successfully" + ); + + event_processor.close(); + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 1); + } + + #[test] + fn flush_blocking_with_no_events_completes_immediately() { + let (event_sender, _) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let result = event_processor.flush_blocking(Duration::from_secs(1)); + assert!( + result, + "flush_blocking with no events should complete immediately" + ); + + event_processor.close(); + } + + #[test] + fn null_processor_flush_blocking_returns_true() { + let processor = NullEventProcessor::new(); + assert!(processor.flush_blocking(Duration::from_secs(1))); + assert!(processor.flush_blocking(Duration::ZERO)); + } + + #[test] + fn flush_blocking_fails_when_processor_closed() { + let (event_sender, _) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + event_processor.close(); + + let result = event_processor.flush_blocking(Duration::from_secs(1)); + assert!( + !result, + "flush_blocking should fail when processor is closed" + ); + } + + #[test] + fn flush_blocking_completes_on_recoverable_http_failure() { + use std::collections::HashSet; + use std::num::NonZeroUsize; + use std::sync::Arc; + + let event_sender = FailingEventSender::new(false); + let events_configuration = crate::events::EventsConfiguration { + capacity: 5, + event_sender: Arc::new(event_sender), + flush_interval: Duration::from_secs(100), + context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"), + context_keys_flush_interval: Duration::from_secs(100), + all_attributes_private: false, + private_attributes: HashSet::new(), + omit_anonymous_contexts: false, + }; + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let context = ContextBuilder::new("foo") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + event_processor.send(event_factory.new_identify(context)); + + // Even though HTTP fails, flush_blocking should complete (not hang) + let result = event_processor.flush_blocking(Duration::from_secs(5)); + assert!( + result, + "flush_blocking should complete even when HTTP send fails (recoverable)" + ); + + event_processor.close(); + } + + #[test] + fn flush_blocking_completes_on_unrecoverable_http_failure() { + use std::collections::HashSet; + use std::num::NonZeroUsize; + use std::sync::Arc; + + let event_sender = FailingEventSender::new(true); + let events_configuration = crate::events::EventsConfiguration { + capacity: 5, + event_sender: Arc::new(event_sender), + flush_interval: Duration::from_secs(100), + context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"), + context_keys_flush_interval: Duration::from_secs(100), + all_attributes_private: false, + private_attributes: HashSet::new(), + omit_anonymous_contexts: false, + }; + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let context = ContextBuilder::new("foo") + .build() + .expect("Failed to create context"); + let event_factory = EventFactory::new(true); + event_processor.send(event_factory.new_identify(context)); + + // Even with must_shutdown=true, flush_blocking should complete (not hang) + let result = event_processor.flush_blocking(Duration::from_secs(5)); + assert!( + result, + "flush_blocking should complete even when HTTP send fails (unrecoverable)" + ); + + event_processor.close(); + } + + #[test] + fn flush_blocking_with_multiple_events_and_http_failures() { + use std::collections::HashSet; + use std::num::NonZeroUsize; + use std::sync::Arc; + + let event_sender = FailingEventSender::new(false); + let events_configuration = crate::events::EventsConfiguration { + capacity: 5, + event_sender: Arc::new(event_sender), + flush_interval: Duration::from_secs(100), + context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"), + context_keys_flush_interval: Duration::from_secs(100), + all_attributes_private: false, + private_attributes: HashSet::new(), + omit_anonymous_contexts: false, + }; + let event_processor = + EventProcessorImpl::new(events_configuration).expect("failed to start ep"); + + let event_factory = EventFactory::new(true); + + // Send multiple events + for i in 0..10 { + let ctx = ContextBuilder::new(format!("user-{i}")) + .build() + .expect("Failed to create context"); + event_processor.send(event_factory.new_identify(ctx)); + } + + // flush_blocking should complete even with multiple events and HTTP failures + let result = event_processor.flush_blocking(Duration::from_secs(5)); + assert!( + result, + "flush_blocking should complete with multiple events despite HTTP failures" + ); + + event_processor.close(); + } } diff --git a/launchdarkly-server-sdk/src/events/processor_builders.rs b/launchdarkly-server-sdk/src/events/processor_builders.rs index 75b7e8e..5a2440a 100644 --- a/launchdarkly-server-sdk/src/events/processor_builders.rs +++ b/launchdarkly-server-sdk/src/events/processor_builders.rs @@ -4,17 +4,13 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; -#[cfg(feature = "rustls")] -use hyper_rustls::HttpsConnectorBuilder; +use http::Uri; use launchdarkly_server_sdk_evaluation::Reference; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; -use crate::events::sender::HyperEventSender; +use crate::events::sender::HttpEventSender; use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER}; +use launchdarkly_sdk_transport::HttpTransport; use super::processor::{ EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor, @@ -65,16 +61,15 @@ pub trait EventProcessorFactory { /// Adjust the flush interval /// ``` /// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use launchdarkly_sdk_transport::HyperTransport; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::>::new() +/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::::new() /// .flush_interval(Duration::from_secs(10))); /// # } /// ``` #[derive(Clone)] -pub struct EventProcessorBuilder { +pub struct EventProcessorBuilder { capacity: usize, flush_interval: Duration, context_keys_capacity: NonZeroUsize, @@ -82,19 +77,13 @@ pub struct EventProcessorBuilder { event_sender: Option>, all_attributes_private: bool, private_attributes: HashSet, - connector: Option, + transport: Option, omit_anonymous_contexts: bool, compress_events: bool, // diagnostic_recording_interval: Duration } -impl EventProcessorFactory for EventProcessorBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl EventProcessorFactory for EventProcessorBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, @@ -113,35 +102,42 @@ where // NOTE: This would only be possible under unit testing conditions. if let Some(event_sender) = &self.event_sender { Ok(event_sender.clone()) - } else if let Some(connector) = &self.connector { - Ok(Arc::new(HyperEventSender::new( - connector.clone(), - hyper::Uri::from_str(url_string.as_str()).unwrap(), + } else if let Some(transport) = &self.transport { + Ok(Arc::new(HttpEventSender::new( + transport.clone(), + Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, self.compress_events, ))) } else { - #[cfg(feature = "rustls")] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - Ok(Arc::new(HyperEventSender::new( - connector, - hyper::Uri::from_str(url_string.as_str()).unwrap(), + let transport = launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| { + BuildError::InvalidConfig(format!( + "failed to create default https transport: {}", + e + )) + })?; + Ok(Arc::new(HttpEventSender::new( + transport, + Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, self.compress_events, ))) } - #[cfg(not(feature = "rustls"))] + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] Err(BuildError::InvalidConfig( - "https connector is required when rustls is disabled".into(), + "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(), )) }; let event_sender = event_sender_result?; @@ -168,7 +164,7 @@ where } } -impl EventProcessorBuilder { +impl EventProcessorBuilder { /// Create a new [EventProcessorBuilder] with all default values. pub fn new() -> Self { Self { @@ -181,7 +177,10 @@ impl EventProcessorBuilder { all_attributes_private: false, private_attributes: HashSet::new(), omit_anonymous_contexts: false, - connector: None, + transport: None, + #[cfg(feature = "event-compression")] + compress_events: true, + #[cfg(not(feature = "event-compression"))] compress_events: false, } } @@ -245,12 +244,12 @@ impl EventProcessorBuilder { self } - /// Sets the connector for the event sender to use. This allows for re-use of a connector + /// Sets the transport for the event sender to use. This allows for re-use of a transport /// between multiple client instances. This is especially useful for the `sdk-test-harness` /// where many client instances are created throughout the test and reading the native /// certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } @@ -265,11 +264,10 @@ impl EventProcessorBuilder { #[cfg(feature = "event-compression")] /// Should the event payload sent to LaunchDarkly use gzip compression. By - /// default this is false to prevent backward breaking compatibility issues with - /// older versions of the relay proxy. + /// default this is true. // - /// Customers not using the relay proxy are strongly encouraged to enable this - /// feature to reduce egress bandwidth cost. + /// Customers using the relay proxy are encouraged to disable this feature to avoid unnecessary + /// CPU overhead, as the relay proxy will decompress & recompress the payloads. pub fn compress_events(&mut self, enabled: bool) -> &mut Self { self.compress_events = enabled; self @@ -283,7 +281,7 @@ impl EventProcessorBuilder { } } -impl Default for EventProcessorBuilder { +impl Default for EventProcessorBuilder { fn default() -> Self { Self::new() } @@ -324,7 +322,6 @@ impl Default for NullEventProcessorBuilder { #[cfg(test)] mod tests { - use hyper::client::HttpConnector; use launchdarkly_server_sdk_evaluation::ContextBuilder; use maplit::hashset; use mockito::Matcher; @@ -336,28 +333,31 @@ mod tests { #[test] fn default_builder_has_correct_defaults() { - let builder = EventProcessorBuilder::::new(); + let builder = EventProcessorBuilder::::new(); assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY); assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL); } #[test] fn capacity_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); builder.capacity(1234); assert_eq!(builder.capacity, 1234); } #[test] fn flush_interval_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); builder.flush_interval(Duration::from_secs(1234)); assert_eq!(builder.flush_interval, Duration::from_secs(1234)); } #[test] fn context_keys_capacity_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); let cap = NonZeroUsize::new(1234).expect("1234 > 0"); builder.context_keys_capacity(cap); assert_eq!(builder.context_keys_capacity, cap); @@ -365,7 +365,8 @@ mod tests { #[test] fn context_keys_flush_interval_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); builder.context_keys_flush_interval(Duration::from_secs(1000)); assert_eq!( builder.context_keys_flush_interval, @@ -375,7 +376,8 @@ mod tests { #[test] fn all_attribute_private_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); assert!(!builder.all_attributes_private); builder.all_attributes_private(true); @@ -384,7 +386,8 @@ mod tests { #[test] fn attribte_names_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = + EventProcessorBuilder::::new(); assert!(builder.private_attributes.is_empty()); builder.private_attributes(hashset!["name"]); @@ -393,6 +396,11 @@ mod tests { #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] fn processor_sends_correct_headers(tag: Option, matcher: impl Into) { let mut server = mockito::Server::new(); let mock = server @@ -409,7 +417,7 @@ mod tests { .build() .expect("Service endpoints failed to be created"); - let builder = EventProcessorBuilder::::new(); + let builder = EventProcessorBuilder::::new(); let processor = builder .build(&service_endpoints, "sdk-key", tag) .expect("Processor failed to build"); diff --git a/launchdarkly-server-sdk/src/events/sender.rs b/launchdarkly-server-sdk/src/events/sender.rs index 93ebefb..162e84c 100644 --- a/launchdarkly-server-sdk/src/events/sender.rs +++ b/launchdarkly-server-sdk/src/events/sender.rs @@ -4,6 +4,7 @@ use crate::{ }; use chrono::DateTime; use crossbeam_channel::Sender; +use launchdarkly_sdk_transport::HttpTransport; use std::collections::HashMap; #[cfg(feature = "event-compression")] @@ -13,12 +14,9 @@ use flate2::Compression; #[cfg(feature = "event-compression")] use std::io::Write; +use bytes::Bytes; use futures::future::BoxFuture; -use hyper::{client::connect::Connection, service::Service, Uri}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - time::{sleep, Duration}, -}; +use tokio::time::{sleep, Duration}; use uuid::Uuid; use super::event::OutputEvent; @@ -27,6 +25,7 @@ pub struct EventSenderResult { pub(super) time_from_server: u128, pub(super) success: bool, pub(super) must_shutdown: bool, + pub(super) flush_signal: Option>, } pub trait EventSender: Send + Sync { @@ -34,14 +33,15 @@ pub trait EventSender: Send + Sync { &self, events: Vec, result_tx: Sender, + flush_signal: Option>, ) -> BoxFuture<'_, ()>; } #[derive(Clone)] -pub struct HyperEventSender { - url: hyper::Uri, +pub struct HttpEventSender { + url: http::Uri, sdk_key: String, - http: hyper::Client, + transport: T, default_headers: HashMap<&'static str, String>, // used with event-compression feature @@ -49,16 +49,10 @@ pub struct HyperEventSender { compress_events: bool, } -impl HyperEventSender -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl HttpEventSender { pub fn new( - connector: C, - url: hyper::Uri, + transport: T, + url: http::Uri, sdk_key: &str, default_headers: HashMap<&'static str, String>, compress_events: bool, @@ -66,13 +60,13 @@ where Self { url, sdk_key: sdk_key.to_owned(), - http: hyper::Client::builder().build(connector), + transport, default_headers, compress_events, } } - fn get_server_time_from_response(&self, response: &hyper::Response) -> u128 { + fn get_server_time_from_response(&self, response: &http::Response) -> u128 { let date_value = response .headers() .get("date") @@ -88,17 +82,12 @@ where } } -impl EventSender for HyperEventSender -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl EventSender for HttpEventSender { fn send_event_data( &self, events: Vec, result_tx: Sender, + flush_signal: Option>, ) -> BoxFuture<'_, ()> { Box::pin(async move { let uuid = Uuid::new_v4(); @@ -139,7 +128,7 @@ where sleep(Duration::from_secs(1)).await; } - let mut request_builder = hyper::Request::builder() + let mut request_builder = http::Request::builder() .uri(self.url.clone()) .method("POST") .header("Content-Type", "application/json") @@ -155,9 +144,12 @@ where request_builder = request_builder.header(*default_header.0, default_header.1.as_str()); } - let request = request_builder.body(hyper::Body::from(payload.clone())); - let result = self.http.request(request.unwrap()).await; + // Create request with Bytes body for transport + let body_bytes = Bytes::from(payload.clone()); + let request = request_builder.body(Some(body_bytes)).unwrap(); + + let result = self.transport.request(request).await; let response = match result { Ok(response) => response, @@ -171,6 +163,7 @@ where success: false, time_from_server: 0, must_shutdown: false, + flush_signal, }) .unwrap(); return; @@ -182,6 +175,7 @@ where success: true, time_from_server: self.get_server_time_from_response(&response), must_shutdown: false, + flush_signal, }); return; } @@ -192,6 +186,7 @@ where success: false, time_from_server: 0, must_shutdown: true, + flush_signal, }) .unwrap(); return; @@ -203,6 +198,7 @@ where success: false, time_from_server: 0, must_shutdown: false, + flush_signal, }) .unwrap(); }) @@ -227,7 +223,8 @@ impl EventSender for InMemoryEventSender { &self, events: Vec, sender: Sender, - ) -> BoxFuture<()> { + flush_signal: Option>, + ) -> BoxFuture<'_, ()> { Box::pin(async move { for event in events { self.event_tx.send(event).unwrap(); @@ -238,6 +235,7 @@ impl EventSender for InMemoryEventSender { time_from_server: 0, success: true, must_shutdown: true, + flush_signal, }) .unwrap(); }) @@ -251,18 +249,18 @@ mod tests { use std::str::FromStr; use test_case::test_case; - #[test_case(hyper::StatusCode::CONTINUE, true)] - #[test_case(hyper::StatusCode::OK, true)] - #[test_case(hyper::StatusCode::MULTIPLE_CHOICES, true)] - #[test_case(hyper::StatusCode::BAD_REQUEST, true)] - #[test_case(hyper::StatusCode::UNAUTHORIZED, false)] - #[test_case(hyper::StatusCode::REQUEST_TIMEOUT, true)] - #[test_case(hyper::StatusCode::CONFLICT, false)] - #[test_case(hyper::StatusCode::TOO_MANY_REQUESTS, true)] - #[test_case(hyper::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, false)] - #[test_case(hyper::StatusCode::INTERNAL_SERVER_ERROR, true)] - fn can_determine_recoverable_errors(status: hyper::StatusCode, is_recoverable: bool) { - assert_eq!(is_recoverable, is_http_error_recoverable(status.as_u16())); + #[test_case(100, true; "100 CONTINUE is recoverable")] + #[test_case(200, true; "200 OK is recoverable")] + #[test_case(300, true; "300 MULTIPLE_CHOICES is recoverable")] + #[test_case(400, true; "400 BAD_REQUEST is recoverable")] + #[test_case(401, false; "401 UNAUTHORIZED is not recoverable")] + #[test_case(408, true; "408 REQUEST_TIMEOUT is recoverable")] + #[test_case(409, false; "409 CONFLICT is not recoverable")] + #[test_case(429, true; "429 TOO_MANY_REQUESTS is recoverable")] + #[test_case(431, false; "431 REQUEST_HEADER_FIELDS_TOO_LARGE is not recoverable")] + #[test_case(500, true; "500 INTERNAL_SERVER_ERROR is recoverable")] + fn can_determine_recoverable_errors(status: u16, is_recoverable: bool) { + assert_eq!(is_recoverable, is_http_error_recoverable(status)); } #[tokio::test] @@ -278,7 +276,7 @@ mod tests { let (tx, rx) = bounded::(5); let event_sender = build_event_sender(server.url()); - event_sender.send_event_data(vec![], tx).await; + event_sender.send_event_data(vec![], tx, None).await; let sender_result = rx.recv().unwrap(); assert!(sender_result.success); @@ -298,7 +296,7 @@ mod tests { let (tx, rx) = bounded::(5); let event_sender = build_event_sender(server.url()); - event_sender.send_event_data(vec![], tx).await; + event_sender.send_event_data(vec![], tx, None).await; let sender_result = rx.recv().expect("Failed to receive sender_result"); assert!(!sender_result.success); @@ -318,7 +316,7 @@ mod tests { let (tx, rx) = bounded::(5); let event_sender = build_event_sender(server.url()); - event_sender.send_event_data(vec![], tx).await; + event_sender.send_event_data(vec![], tx, None).await; let sender_result = rx.recv().expect("Failed to receive sender_result"); assert!(!sender_result.success); @@ -344,7 +342,7 @@ mod tests { let (tx, rx) = bounded::(5); let event_sender = build_event_sender(server.url()); - event_sender.send_event_data(vec![], tx).await; + event_sender.send_event_data(vec![], tx, None).await; let sender_result = rx.recv().expect("Failed to receive sender_result"); assert!(sender_result.success); @@ -352,12 +350,16 @@ mod tests { assert_eq!(sender_result.time_from_server, 1234567890000); } - fn build_event_sender(url: String) -> HyperEventSender { + fn build_event_sender( + url: String, + ) -> HttpEventSender { let url = format!("{}/bulk", &url); - let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url"); + let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); - HyperEventSender::new( - hyper::client::HttpConnector::new(), + let transport = launchdarkly_sdk_transport::HyperTransport::new() + .expect("Failed to create HyperTransport"); + HttpEventSender::new( + transport, url, "sdk-key", HashMap::<&str, String>::new(), diff --git a/launchdarkly-server-sdk/src/feature_requester.rs b/launchdarkly-server-sdk/src/feature_requester.rs index 5cea2b4..2c65f5e 100644 --- a/launchdarkly-server-sdk/src/feature_requester.rs +++ b/launchdarkly-server-sdk/src/feature_requester.rs @@ -1,8 +1,9 @@ use crate::reqwest::is_http_error_recoverable; +use bytes::Bytes; use futures::future::BoxFuture; -use hyper::Body; +use futures::stream::StreamExt; +use launchdarkly_sdk_transport::HttpTransport; use std::collections::HashMap; -use std::sync::Arc; use super::stores::store_types::AllData; use launchdarkly_server_sdk_evaluation::{Flag, Segment}; @@ -20,23 +21,23 @@ pub trait FeatureRequester: Send { fn get_all(&mut self) -> BoxFuture<'_, Result, FeatureRequesterError>>; } -pub struct HyperFeatureRequester { - http: Arc>, - url: hyper::Uri, +pub struct HttpFeatureRequester { + transport: T, + url: http::Uri, sdk_key: String, cache: Option, default_headers: HashMap<&'static str, String>, } -impl HyperFeatureRequester { +impl HttpFeatureRequester { pub fn new( - http: hyper::Client, - url: hyper::Uri, + transport: T, + url: http::Uri, sdk_key: String, default_headers: HashMap<&'static str, String>, ) -> Self { Self { - http: Arc::new(http), + transport, url, sdk_key, cache: None, @@ -45,19 +46,16 @@ impl HyperFeatureRequester { } } -impl FeatureRequester for HyperFeatureRequester -where - C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, -{ +impl FeatureRequester for HttpFeatureRequester { fn get_all(&mut self) -> BoxFuture<'_, Result, FeatureRequesterError>> { Box::pin(async { let uri = self.url.clone(); let key = self.sdk_key.clone(); - let http = self.http.clone(); + let transport = self.transport.clone(); let cache = self.cache.clone(); - let mut request_builder = hyper::http::Request::builder() + let mut request_builder = http::Request::builder() .uri(uri) .method("GET") .header("Content-Type", "application/json") @@ -73,9 +71,10 @@ where request_builder = request_builder.header("If-None-Match", cache.1.clone()); } - let result = http - .request(request_builder.body(Body::empty()).unwrap()) - .await; + // Create empty body for GET request + let request = request_builder.body(Some(Bytes::new())).unwrap(); + + let result = transport.request(request).await; let response = match result { Ok(response) => response, @@ -87,7 +86,8 @@ where } }; - if response.status() == hyper::StatusCode::NOT_MODIFIED && cache.is_some() { + // 304 NOT MODIFIED + if response.status() == 304 && cache.is_some() { if let Some(entry) = cache { return Ok(entry.0); } @@ -101,13 +101,17 @@ where .map_or_else(|_| "".into(), |s| s.into()); if response.status().is_success() { - let bytes = hyper::body::to_bytes(response.into_body()) - .await - .map_err(|e| { + // Collect streaming body + let mut body_bytes = Vec::new(); + let mut stream = response.into_body(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| { error!("An error occurred while reading the polling response body: {e}"); FeatureRequesterError::Temporary })?; - let json = serde_json::from_slice::>(bytes.as_ref()); + body_bytes.extend_from_slice(&chunk); + } + let json = serde_json::from_slice::>(&body_bytes); return match json { Ok(all_data) => { @@ -243,12 +247,15 @@ mod tests { } } - fn build_feature_requester(url: String) -> HyperFeatureRequester { - let http = hyper::Client::builder().build(hyper::client::HttpConnector::new()); - let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url"); + fn build_feature_requester( + url: String, + ) -> HttpFeatureRequester { + let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); + let transport = launchdarkly_sdk_transport::HyperTransport::new() + .expect("Failed to create HyperTransport"); - HyperFeatureRequester::new( - http, + HttpFeatureRequester::new( + transport, url, "sdk-key".to_string(), HashMap::<&str, String>::new(), diff --git a/launchdarkly-server-sdk/src/feature_requester_builders.rs b/launchdarkly-server-sdk/src/feature_requester_builders.rs index 1d7f1ee..3c29f40 100644 --- a/launchdarkly-server-sdk/src/feature_requester_builders.rs +++ b/launchdarkly-server-sdk/src/feature_requester_builders.rs @@ -1,12 +1,10 @@ -use crate::feature_requester::{FeatureRequester, HyperFeatureRequester}; +use crate::feature_requester::{FeatureRequester, HttpFeatureRequester}; use crate::LAUNCHDARKLY_TAGS_HEADER; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; +use http::Uri; +use launchdarkly_sdk_transport::HttpTransport; use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; /// Error type used to represent failures when building a [FeatureRequesterFactory] instance. #[non_exhaustive] @@ -26,35 +24,23 @@ pub trait FeatureRequesterFactory: Send { fn build(&self, tags: Option) -> Result, BuildError>; } -pub struct HyperFeatureRequesterBuilder { +pub struct HttpFeatureRequesterBuilder { url: String, sdk_key: String, - http: hyper::Client, + transport: T, } -impl HyperFeatureRequesterBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ - pub fn new(url: &str, sdk_key: &str, connector: C) -> Self { +impl HttpFeatureRequesterBuilder { + pub fn new(url: &str, sdk_key: &str, transport: T) -> Self { Self { - http: hyper::Client::builder().build(connector), + transport, url: url.into(), sdk_key: sdk_key.into(), } } } -impl FeatureRequesterFactory for HyperFeatureRequesterBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl FeatureRequesterFactory for HttpFeatureRequesterBuilder { fn build(&self, tags: Option) -> Result, BuildError> { let url = format!("{}/sdk/latest-all", self.url); @@ -64,11 +50,11 @@ where default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags); } - let url = hyper::Uri::from_str(url.as_str()) + let url = Uri::from_str(url.as_str()) .map_err(|_| BuildError::InvalidConfig("Invalid base url provided".into()))?; - Ok(Box::new(HyperFeatureRequester::new( - self.http.clone(), + Ok(Box::new(HttpFeatureRequester::new( + self.transport.clone(), url, self.sdk_key.clone(), default_headers, @@ -78,16 +64,16 @@ where #[cfg(test)] mod tests { - use hyper::client::HttpConnector; - use super::*; #[test] fn factory_handles_url_parsing_failure() { - let builder = HyperFeatureRequesterBuilder::new( + let transport = + launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"); + let builder = HttpFeatureRequesterBuilder::new( "This is clearly not a valid URL", "sdk-key", - HttpConnector::new(), + transport, ); let result = builder.build(None); diff --git a/launchdarkly-server-sdk/src/lib.rs b/launchdarkly-server-sdk/src/lib.rs index cf47a93..7dd7b1d 100644 --- a/launchdarkly-server-sdk/src/lib.rs +++ b/launchdarkly-server-sdk/src/lib.rs @@ -16,6 +16,7 @@ extern crate log; #[macro_use] extern crate serde_json; +use http::HeaderValue; pub use launchdarkly_server_sdk_evaluation::Error as EvalError; pub use launchdarkly_server_sdk_evaluation::{ AttributeValue, Context, ContextBuilder, Detail, FlagValue, Kind, MultiContextBuilder, Reason, @@ -31,7 +32,7 @@ pub use config::{ApplicationInfo, BuildError as ConfigBuildError, Config, Config pub use data_source_builders::{ BuildError as DataSourceBuildError, PollingDataSourceBuilder, StreamingDataSourceBuilder, }; -pub use evaluation::{FlagDetail, FlagDetailConfig}; +pub use evaluation::{FlagDetail, FlagDetailConfig, FlagFilter}; pub use events::event::MigrationOpEvent; pub use events::processor::EventProcessor; pub use events::processor_builders::{ @@ -76,9 +77,7 @@ static CURRENT_EVENT_SCHEMA: &str = "4"; static USER_AGENT: LazyLock = LazyLock::new(|| format!("RustServerClient/{}", version_string())); -// For cases where a statically empty header value are needed. -static EMPTY_HEADER: LazyLock = - LazyLock::new(|| hyper::header::HeaderValue::from_static("")); +static EMPTY_HEADER: LazyLock = LazyLock::new(|| HeaderValue::from_static("")); #[cfg(test)] mod tests { diff --git a/launchdarkly-server-sdk/src/reqwest.rs b/launchdarkly-server-sdk/src/reqwest.rs index 9fcc2b9..8c0d69e 100644 --- a/launchdarkly-server-sdk/src/reqwest.rs +++ b/launchdarkly-server-sdk/src/reqwest.rs @@ -1,25 +1,17 @@ -use hyper::StatusCode; - pub fn is_http_error_recoverable(status: u16) -> bool { - if let Ok(status) = StatusCode::from_u16(status) { - if !status.is_client_error() { - return true; - } - - return matches!( - status, - StatusCode::BAD_REQUEST | StatusCode::REQUEST_TIMEOUT | StatusCode::TOO_MANY_REQUESTS - ); + if !(400..500).contains(&status) { + return true; } - warn!("Unable to determine if status code is recoverable"); - false + matches!( + status, + 400 | 408 | 429 // BAD_REQUEST | REQUEST_TIMEOUT | TOO_MANY_REQUESTS + ) } #[cfg(test)] mod tests { use super::*; - use hyper::StatusCode; use test_case::test_case; #[test_case("130.65331632653061", 130.65331632653061)] @@ -30,17 +22,17 @@ mod tests { assert_eq!(expected, parsed); } - #[test_case(StatusCode::CONTINUE, true)] - #[test_case(StatusCode::OK, true)] - #[test_case(StatusCode::MULTIPLE_CHOICES, true)] - #[test_case(StatusCode::BAD_REQUEST, true)] - #[test_case(StatusCode::UNAUTHORIZED, false)] - #[test_case(StatusCode::REQUEST_TIMEOUT, true)] - #[test_case(StatusCode::CONFLICT, false)] - #[test_case(StatusCode::TOO_MANY_REQUESTS, true)] - #[test_case(StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, false)] - #[test_case(StatusCode::INTERNAL_SERVER_ERROR, true)] - fn can_determine_recoverable_errors(status: StatusCode, is_recoverable: bool) { - assert_eq!(is_recoverable, is_http_error_recoverable(status.as_u16())); + #[test_case(100, true; "CONTINUE_STATUS")] + #[test_case(200, true; "OK")] + #[test_case(300, true; "MULTIPLE_CHOICES")] + #[test_case(400, true; "BAD_REQUEST")] + #[test_case(401, false; "UNAUTHORIZED")] + #[test_case(408, true; "REQUEST_TIMEOUT")] + #[test_case(409, false; "CONFLICT")] + #[test_case(429, true; "TOO_MANY_REQUESTS")] + #[test_case(431, false; "REQUEST_HEADER_FIELDS_TOO_LARGE")] + #[test_case(500, true; "INTERNAL_SERVER_ERROR")] + fn can_determine_recoverable_errors(status: u16, is_recoverable: bool) { + assert_eq!(is_recoverable, is_http_error_recoverable(status)); } } diff --git a/launchdarkly-server-sdk/src/stores/persistent_store_builders.rs b/launchdarkly-server-sdk/src/stores/persistent_store_builders.rs index 4158288..1afcfd3 100644 --- a/launchdarkly-server-sdk/src/stores/persistent_store_builders.rs +++ b/launchdarkly-server-sdk/src/stores/persistent_store_builders.rs @@ -102,7 +102,7 @@ mod tests { impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory { fn create_persistent_data_store( &self, - ) -> Result, std::io::Error> { + ) -> Result, std::io::Error> { Ok(Box::new(InMemoryPersistentDataStore { data: AllData { flags: HashMap::new(), diff --git a/launchdarkly-server-sdk/src/test_common.rs b/launchdarkly-server-sdk/src/test_common.rs index c5f167c..b2bbe52 100644 --- a/launchdarkly-server-sdk/src/test_common.rs +++ b/launchdarkly-server-sdk/src/test_common.rs @@ -7,10 +7,14 @@ use crate::Stage; pub const FLOAT_TO_INT_MAX: i64 = 9007199254740991; pub fn basic_flag(key: &str) -> Flag { - basic_flag_with_visibility(key, false) + basic_flag_with_visibility(key, false, false) } -pub fn basic_flag_with_visibility(key: &str, visible_to_environment_id: bool) -> Flag { +pub fn basic_flag_with_visibility( + key: &str, + visible_to_environment_id: bool, + visible_to_mobile_key: bool, +) -> Flag { serde_json::from_str(&format!( r#"{{ "key": {}, @@ -23,12 +27,13 @@ pub fn basic_flag_with_visibility(key: &str, visible_to_environment_id: bool) -> "offVariation": 0, "variations": [false, true], "clientSideAvailability": {{ - "usingMobileKey": false, + "usingMobileKey": {}, "usingEnvironmentId": {} }}, "salt": "kosher" }}"#, serde_json::Value::String(key.to_string()), + visible_to_mobile_key, visible_to_environment_id )) .unwrap() @@ -57,13 +62,14 @@ pub fn basic_off_flag(key: &str) -> Flag { } pub fn basic_flag_with_prereq(key: &str, prereq_key: &str) -> Flag { - basic_flag_with_prereqs_and_visibility(key, &[prereq_key], false) + basic_flag_with_prereqs_and_visibility(key, &[prereq_key], false, false) } pub fn basic_flag_with_prereqs_and_visibility( key: &str, prereq_keys: &[&str], visible_to_environment_id: bool, + visible_to_mobile_key: bool, ) -> Flag { let prereqs_json: String = prereq_keys .iter() @@ -88,13 +94,14 @@ pub fn basic_flag_with_prereqs_and_visibility( "offVariation": 0, "variations": [false, true], "clientSideAvailability": {{ - "usingMobileKey": false, + "usingMobileKey": {}, "usingEnvironmentId": {} }}, "salt": "kosher" }}"#, serde_json::Value::String(key.to_string()), prereqs_json, + visible_to_mobile_key, visible_to_environment_id )) .unwrap()