Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/common/src/auction/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn convert_tsjs_to_auction_request(
}

// Extract bidder params from the bids array
let mut bidders = std::collections::HashMap::new();
let mut bidders = HashMap::new();
if let Some(bids) = &unit.bids {
for bid in bids {
bidders.insert(bid.bidder.clone(), bid.params.clone());
Expand All @@ -119,20 +119,20 @@ pub fn convert_tsjs_to_auction_request(
id: unit.code.clone(),
formats,
floor_price: None,
targeting: std::collections::HashMap::new(),
targeting: HashMap::new(),
bidders,
});
}
}
}

// Get geo info if available
let device = GeoInfo::from_request(req).map(|geo| DeviceInfo {
// Build device info with user-agent (always) and geo (if available)
let device = Some(DeviceInfo {
user_agent: req
.get_header_str("user-agent")
.map(std::string::ToString::to_string),
ip: req.get_client_ip_addr().map(|ip| ip.to_string()),
geo: Some(geo),
geo: GeoInfo::from_request(req),
});

Ok(AuctionRequest {
Expand Down
21 changes: 10 additions & 11 deletions crates/common/src/integrations/prebid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ fn transform_prebid_response(

fn rewrite_ad_markup(markup: &str, request_host: &str, request_scheme: &str) -> String {
let mut content = markup.to_string();
let cdn_patterns = vec![
let cdn_patterns = [
("https://cdn.adsrvr.org", "adsrvr"),
("https://ib.adnxs.com", "adnxs"),
("https://rtb.openx.net", "openx"),
Expand Down Expand Up @@ -451,16 +451,15 @@ impl PrebidAuctionProvider {
}),
});

// Build device object with geo if available
let device = request.device.as_ref().and_then(|d| {
d.geo.as_ref().map(|geo| Device {
geo: Some(Geo {
geo_type: 2, // IP address per OpenRTB spec
country: Some(geo.country.clone()),
city: Some(geo.city.clone()),
region: geo.region.clone(),
}),
})
// Build device object with user-agent and geo if available
let device = request.device.as_ref().map(|d| Device {
ua: d.user_agent.clone(),
geo: d.geo.as_ref().map(|geo| Geo {
geo_type: 2, // IP address per OpenRTB spec
country: Some(geo.country.clone()),
city: Some(geo.city.clone()),
region: geo.region.clone(),
}),
});

// Build regs object if Sec-GPC header is present
Expand Down
2 changes: 2 additions & 0 deletions crates/common/src/openrtb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct UserExt {

#[derive(Debug, Serialize, Default)]
pub struct Device {
#[serde(skip_serializing_if = "Option::is_none")]
pub ua: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub geo: Option<Geo>,
}
Expand Down
56 changes: 20 additions & 36 deletions crates/common/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,22 @@ fn finalize_proxied_response_streaming(
beresp
}

/// Finalize a proxied response, choosing between streaming passthrough and full
/// content processing based on the `stream_passthrough` flag.
fn finalize_response(
settings: &Settings,
req: &Request,
url: &str,
beresp: Response,
stream_passthrough: bool,
) -> Result<Response, Report<TrustedServerError>> {
if stream_passthrough {
Ok(finalize_proxied_response_streaming(req, url, beresp))
} else {
finalize_proxied_response(settings, req, url, beresp)
}
}

/// Proxy a request to a clear target URL while reusing creative rewrite logic.
///
/// This forwards a curated header set, follows redirects when enabled, and can append
Expand Down Expand Up @@ -498,15 +514,7 @@ async fn proxy_with_redirects(
})?;

if !follow_redirects {
return if stream_passthrough {
Ok(finalize_proxied_response_streaming(
req,
&current_url,
beresp,
))
} else {
finalize_proxied_response(settings, req, &current_url, beresp)
};
return finalize_response(settings, req, &current_url, beresp, stream_passthrough);
}

let status = beresp.get_status();
Expand All @@ -520,31 +528,15 @@ async fn proxy_with_redirects(
);

if !is_redirect {
return if stream_passthrough {
Ok(finalize_proxied_response_streaming(
req,
&current_url,
beresp,
))
} else {
finalize_proxied_response(settings, req, &current_url, beresp)
};
return finalize_response(settings, req, &current_url, beresp, stream_passthrough);
}

let Some(location) = beresp
.get_header(header::LOCATION)
.and_then(|h| h.to_str().ok())
.filter(|value| !value.is_empty())
else {
return if stream_passthrough {
Ok(finalize_proxied_response_streaming(
req,
&current_url,
beresp,
))
} else {
finalize_proxied_response(settings, req, &current_url, beresp)
};
return finalize_response(settings, req, &current_url, beresp, stream_passthrough);
};

if redirect_attempt == MAX_REDIRECTS {
Expand All @@ -565,15 +557,7 @@ async fn proxy_with_redirects(

let next_scheme = next_url.scheme().to_ascii_lowercase();
if next_scheme != "http" && next_scheme != "https" {
return if stream_passthrough {
Ok(finalize_proxied_response_streaming(
req,
&current_url,
beresp,
))
} else {
finalize_proxied_response(settings, req, &current_url, beresp)
};
return finalize_response(settings, req, &current_url, beresp, stream_passthrough);
}

log::info!(
Expand Down
103 changes: 25 additions & 78 deletions crates/common/src/streaming_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,36 +224,34 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
Ok(())
}

/// Process gzip compressed input to uncompressed output (decompression only)
fn process_gzip_to_none<R: Read, W: Write>(
/// Decompress input, process content, and write uncompressed output.
fn decompress_and_process<R: Read, W: Write>(
&mut self,
input: R,
mut decoder: R,
mut output: W,
codec_name: &str,
) -> Result<(), Report<TrustedServerError>> {
use flate2::read::GzDecoder;

// Decompress input
let mut decoder = GzDecoder::new(input);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.change_context(TrustedServerError::Proxy {
message: "Failed to decompress gzip".to_string(),
message: format!("Failed to decompress {codec_name}"),
})?;

log::info!("Decompressed size: {} bytes", decompressed.len());
log::info!(
"{codec_name} decompressed size: {} bytes",
decompressed.len()
);

// Process the decompressed content
let processed = self
.processor
.process_chunk(&decompressed, true)
.change_context(TrustedServerError::Proxy {
message: "Failed to process content".to_string(),
})?;

log::info!("Processed size: {} bytes", processed.len());
log::info!("{codec_name} processed size: {} bytes", processed.len());

// Write uncompressed output
output
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
Expand All @@ -263,6 +261,17 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
Ok(())
}

/// Process gzip compressed input to uncompressed output (decompression only)
fn process_gzip_to_none<R: Read, W: Write>(
&mut self,
input: R,
output: W,
) -> Result<(), Report<TrustedServerError>> {
use flate2::read::GzDecoder;

self.decompress_and_process(GzDecoder::new(input), output, "gzip")
}

/// Process deflate compressed stream
fn process_deflate_to_deflate<R: Read, W: Write>(
&mut self,
Expand All @@ -283,42 +292,11 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
fn process_deflate_to_none<R: Read, W: Write>(
&mut self,
input: R,
mut output: W,
output: W,
) -> Result<(), Report<TrustedServerError>> {
use flate2::read::ZlibDecoder;

// Decompress input
let mut decoder = ZlibDecoder::new(input);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.change_context(TrustedServerError::Proxy {
message: "Failed to decompress deflate".to_string(),
})?;

log::info!(
"Deflate->None decompressed size: {} bytes",
decompressed.len()
);

// Process the decompressed content
let processed = self
.processor
.process_chunk(&decompressed, true)
.change_context(TrustedServerError::Proxy {
message: "Failed to process content".to_string(),
})?;

log::info!("Deflate->None processed size: {} bytes", processed.len());

// Write uncompressed output
output
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
message: "Failed to write output".to_string(),
})?;

Ok(())
self.decompress_and_process(ZlibDecoder::new(input), output, "deflate")
}

/// Process brotli compressed stream
Expand Down Expand Up @@ -346,42 +324,11 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
fn process_brotli_to_none<R: Read, W: Write>(
&mut self,
input: R,
mut output: W,
output: W,
) -> Result<(), Report<TrustedServerError>> {
use brotli::Decompressor;

// Decompress input
let mut decoder = Decompressor::new(input, 4096);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.change_context(TrustedServerError::Proxy {
message: "Failed to decompress brotli".to_string(),
})?;

log::info!(
"Brotli->None decompressed size: {} bytes",
decompressed.len()
);

// Process the decompressed content
let processed = self
.processor
.process_chunk(&decompressed, true)
.change_context(TrustedServerError::Proxy {
message: "Failed to process content".to_string(),
})?;

log::info!("Brotli->None processed size: {} bytes", processed.len());

// Write uncompressed output
output
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
message: "Failed to write output".to_string(),
})?;

Ok(())
self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli")
}

/// Generic processing through compression layers
Expand Down