From 55617580eeaf02d04d42083a29896f1935509faf Mon Sep 17 00:00:00 2001 From: Christian Date: Mon, 9 Feb 2026 13:29:59 -0600 Subject: [PATCH] fix: forward browser user-agent in OpenRTB `device.ua` field --- crates/common/src/auction/formats.rs | 10 +-- crates/common/src/integrations/prebid.rs | 21 +++-- crates/common/src/openrtb.rs | 2 + crates/common/src/proxy.rs | 56 +++++------- crates/common/src/streaming_processor.rs | 103 ++++++----------------- 5 files changed, 62 insertions(+), 130 deletions(-) diff --git a/crates/common/src/auction/formats.rs b/crates/common/src/auction/formats.rs index 6b446f0..71804e2 100644 --- a/crates/common/src/auction/formats.rs +++ b/crates/common/src/auction/formats.rs @@ -108,7 +108,7 @@ pub fn convert_tsjs_to_auction_request( } // Extract bidder params from the bids array - let mut bidders = std::collections::HashMap::new(); + let mut bidders = HashMap::new(); if let Some(bids) = &unit.bids { for bid in bids { bidders.insert(bid.bidder.clone(), bid.params.clone()); @@ -119,20 +119,20 @@ pub fn convert_tsjs_to_auction_request( id: unit.code.clone(), formats, floor_price: None, - targeting: std::collections::HashMap::new(), + targeting: HashMap::new(), bidders, }); } } } - // Get geo info if available - let device = GeoInfo::from_request(req).map(|geo| DeviceInfo { + // Build device info with user-agent (always) and geo (if available) + let device = Some(DeviceInfo { user_agent: req .get_header_str("user-agent") .map(std::string::ToString::to_string), ip: req.get_client_ip_addr().map(|ip| ip.to_string()), - geo: Some(geo), + geo: GeoInfo::from_request(req), }); Ok(AuctionRequest { diff --git a/crates/common/src/integrations/prebid.rs b/crates/common/src/integrations/prebid.rs index 2a5ddba..f28d467 100644 --- a/crates/common/src/integrations/prebid.rs +++ b/crates/common/src/integrations/prebid.rs @@ -302,7 +302,7 @@ fn transform_prebid_response( fn rewrite_ad_markup(markup: &str, request_host: &str, request_scheme: &str) -> String { let mut content = markup.to_string(); - let cdn_patterns = vec![ + let cdn_patterns = [ ("https://cdn.adsrvr.org", "adsrvr"), ("https://ib.adnxs.com", "adnxs"), ("https://rtb.openx.net", "openx"), @@ -451,16 +451,15 @@ impl PrebidAuctionProvider { }), }); - // Build device object with geo if available - let device = request.device.as_ref().and_then(|d| { - d.geo.as_ref().map(|geo| Device { - geo: Some(Geo { - geo_type: 2, // IP address per OpenRTB spec - country: Some(geo.country.clone()), - city: Some(geo.city.clone()), - region: geo.region.clone(), - }), - }) + // Build device object with user-agent and geo if available + let device = request.device.as_ref().map(|d| Device { + ua: d.user_agent.clone(), + geo: d.geo.as_ref().map(|geo| Geo { + geo_type: 2, // IP address per OpenRTB spec + country: Some(geo.country.clone()), + city: Some(geo.city.clone()), + region: geo.region.clone(), + }), }); // Build regs object if Sec-GPC header is present diff --git a/crates/common/src/openrtb.rs b/crates/common/src/openrtb.rs index b87afc2..3b40520 100644 --- a/crates/common/src/openrtb.rs +++ b/crates/common/src/openrtb.rs @@ -66,6 +66,8 @@ pub struct UserExt { #[derive(Debug, Serialize, Default)] pub struct Device { + #[serde(skip_serializing_if = "Option::is_none")] + pub ua: Option, #[serde(skip_serializing_if = "Option::is_none")] pub geo: Option, } diff --git a/crates/common/src/proxy.rs b/crates/common/src/proxy.rs index baa9169..fdebd68 100644 --- a/crates/common/src/proxy.rs +++ b/crates/common/src/proxy.rs @@ -361,6 +361,22 @@ fn finalize_proxied_response_streaming( beresp } +/// Finalize a proxied response, choosing between streaming passthrough and full +/// content processing based on the `stream_passthrough` flag. +fn finalize_response( + settings: &Settings, + req: &Request, + url: &str, + beresp: Response, + stream_passthrough: bool, +) -> Result> { + if stream_passthrough { + Ok(finalize_proxied_response_streaming(req, url, beresp)) + } else { + finalize_proxied_response(settings, req, url, beresp) + } +} + /// Proxy a request to a clear target URL while reusing creative rewrite logic. /// /// This forwards a curated header set, follows redirects when enabled, and can append @@ -498,15 +514,7 @@ async fn proxy_with_redirects( })?; if !follow_redirects { - return if stream_passthrough { - Ok(finalize_proxied_response_streaming( - req, - ¤t_url, - beresp, - )) - } else { - finalize_proxied_response(settings, req, ¤t_url, beresp) - }; + return finalize_response(settings, req, ¤t_url, beresp, stream_passthrough); } let status = beresp.get_status(); @@ -520,15 +528,7 @@ async fn proxy_with_redirects( ); if !is_redirect { - return if stream_passthrough { - Ok(finalize_proxied_response_streaming( - req, - ¤t_url, - beresp, - )) - } else { - finalize_proxied_response(settings, req, ¤t_url, beresp) - }; + return finalize_response(settings, req, ¤t_url, beresp, stream_passthrough); } let Some(location) = beresp @@ -536,15 +536,7 @@ async fn proxy_with_redirects( .and_then(|h| h.to_str().ok()) .filter(|value| !value.is_empty()) else { - return if stream_passthrough { - Ok(finalize_proxied_response_streaming( - req, - ¤t_url, - beresp, - )) - } else { - finalize_proxied_response(settings, req, ¤t_url, beresp) - }; + return finalize_response(settings, req, ¤t_url, beresp, stream_passthrough); }; if redirect_attempt == MAX_REDIRECTS { @@ -565,15 +557,7 @@ async fn proxy_with_redirects( let next_scheme = next_url.scheme().to_ascii_lowercase(); if next_scheme != "http" && next_scheme != "https" { - return if stream_passthrough { - Ok(finalize_proxied_response_streaming( - req, - ¤t_url, - beresp, - )) - } else { - finalize_proxied_response(settings, req, ¤t_url, beresp) - }; + return finalize_response(settings, req, ¤t_url, beresp, stream_passthrough); } log::info!( diff --git a/crates/common/src/streaming_processor.rs b/crates/common/src/streaming_processor.rs index 8f9a809..cda62e6 100644 --- a/crates/common/src/streaming_processor.rs +++ b/crates/common/src/streaming_processor.rs @@ -224,26 +224,25 @@ impl StreamingPipeline

{ Ok(()) } - /// Process gzip compressed input to uncompressed output (decompression only) - fn process_gzip_to_none( + /// Decompress input, process content, and write uncompressed output. + fn decompress_and_process( &mut self, - input: R, + mut decoder: R, mut output: W, + codec_name: &str, ) -> Result<(), Report> { - use flate2::read::GzDecoder; - - // Decompress input - let mut decoder = GzDecoder::new(input); let mut decompressed = Vec::new(); decoder .read_to_end(&mut decompressed) .change_context(TrustedServerError::Proxy { - message: "Failed to decompress gzip".to_string(), + message: format!("Failed to decompress {codec_name}"), })?; - log::info!("Decompressed size: {} bytes", decompressed.len()); + log::info!( + "{codec_name} decompressed size: {} bytes", + decompressed.len() + ); - // Process the decompressed content let processed = self .processor .process_chunk(&decompressed, true) @@ -251,9 +250,8 @@ impl StreamingPipeline

{ message: "Failed to process content".to_string(), })?; - log::info!("Processed size: {} bytes", processed.len()); + log::info!("{codec_name} processed size: {} bytes", processed.len()); - // Write uncompressed output output .write_all(&processed) .change_context(TrustedServerError::Proxy { @@ -263,6 +261,17 @@ impl StreamingPipeline

{ Ok(()) } + /// Process gzip compressed input to uncompressed output (decompression only) + fn process_gzip_to_none( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::GzDecoder; + + self.decompress_and_process(GzDecoder::new(input), output, "gzip") + } + /// Process deflate compressed stream fn process_deflate_to_deflate( &mut self, @@ -283,42 +292,11 @@ impl StreamingPipeline

{ fn process_deflate_to_none( &mut self, input: R, - mut output: W, + output: W, ) -> Result<(), Report> { use flate2::read::ZlibDecoder; - // Decompress input - let mut decoder = ZlibDecoder::new(input); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress deflate".to_string(), - })?; - - log::info!( - "Deflate->None decompressed size: {} bytes", - decompressed.len() - ); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Deflate->None processed size: {} bytes", processed.len()); - - // Write uncompressed output - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write output".to_string(), - })?; - - Ok(()) + self.decompress_and_process(ZlibDecoder::new(input), output, "deflate") } /// Process brotli compressed stream @@ -346,42 +324,11 @@ impl StreamingPipeline

{ fn process_brotli_to_none( &mut self, input: R, - mut output: W, + output: W, ) -> Result<(), Report> { use brotli::Decompressor; - // Decompress input - let mut decoder = Decompressor::new(input, 4096); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress brotli".to_string(), - })?; - - log::info!( - "Brotli->None decompressed size: {} bytes", - decompressed.len() - ); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Brotli->None processed size: {} bytes", processed.len()); - - // Write uncompressed output - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write output".to_string(), - })?; - - Ok(()) + self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli") } /// Generic processing through compression layers