diff --git a/TESTING.md b/TESTING.md new file mode 100644 index 0000000..27f3f0b --- /dev/null +++ b/TESTING.md @@ -0,0 +1,201 @@ +# Testing the Auction Orchestration System + +## Quick Test Summary + +The auction orchestration system has been integrated into the existing Prebid endpoints. You can test it right away using the Fastly local server! + +## How to Test + +### 1. Start the Local Server + +```bash +fastly compute serve +``` + +### 2. Test with Existing Endpoint + +The `/third-party/ad` endpoint now uses the orchestrator when `auction.enabled = true` in config. + +**Test Request:** +```bash +curl -X POST http://localhost:7676/third-party/ad \ + -H "Content-Type: application/json" \ + -d '{ + "adUnits": [ + { + "code": "header-banner", + "mediaTypes": { + "banner": { + "sizes": [[728, 90], [970, 250]] + } + } + }, + { + "code": "sidebar", + "mediaTypes": { + "banner": { + "sizes": [[300, 250], [300, 600]] + } + } + } + ] + }' +``` + +### 3. What You'll See + +**With Orchestrator Enabled** (`auction.enabled = true`): +- Logs showing: `"Using auction orchestrator"` +- Parallel execution of APS (mocked) and Prebid (real) +- GAM mediation (mocked) selecting winning bids +- Final response with winning creatives + +**With Orchestrator Disabled** (`auction.enabled = false`): +- Logs showing: `"Using legacy Prebid flow"` +- Direct Prebid Server call (backward compatible) + +##Configuration + +Edit `trusted-server.toml` to customize the auction: + +```toml +# Enable/disable orchestrator +[auction] +enabled = true +strategy = "parallel_mediation" # or "parallel_only" or "waterfall" +bidders = ["prebid", "aps"] +mediator = "gam" +timeout_ms = 2000 + +# Mock provider configs +[integrations.aps] +enabled = true +mock = true +mock_price = 2.50 + +[integrations.gam] +enabled = true +mock = true +inject_house_bids = true +gam_win_rate = 30 # GAM wins 30% of the time +``` + +## Test Scenarios + +### Scenario 1: Parallel + Mediation (Default) +**Config:** +```toml +[auction] +enabled = true +strategy = "parallel_mediation" +bidders = ["prebid", "aps"] +mediator = "gam" +``` + +**Expected Flow:** +1. Prebid queries real SSPs +2. APS returns mock bids ($2.50 CPM) +3. GAM mediates between all bids +4. Winning creative returned + +### Scenario 2: Parallel Only (No Mediation) +**Config:** +```toml +[auction] +enabled = true +strategy = "parallel_only" +bidders = ["prebid", "aps"] +# No mediator +``` + +**Expected Flow:** +1. Prebid and APS run in parallel +2. Highest bid wins automatically +3. No GAM mediation + +### Scenario 3: Waterfall +**Config:** +```toml +[auction] +enabled = true +strategy = "waterfall" +bidders = ["prebid", "aps"] +``` + +**Expected Flow:** +1. Try Prebid first +2. If Prebid returns no bids, try APS +3. Return first successful bid + +### Scenario 4: Legacy Mode (Backward Compatible) +**Config:** +```toml +[auction] +enabled = false +``` + +**Expected Flow:** +- Original Prebid-only behavior +- No orchestration overhead + +## Debugging + +### Check Logs +The orchestrator logs extensively: +``` +INFO: Using auction orchestrator +INFO: Running auction with strategy: parallel_mediation +INFO: Running 2 bidders in parallel +INFO: Requesting bids from: prebid +INFO: Prebid returned 2 bids (time: 120ms) +INFO: Requesting bids from: aps +INFO: APS (MOCK): returning 2 bids in 80ms +INFO: GAM mediation: slot 'header-banner' won by 'amazon-aps' at $2.50 CPM +``` + +### Verify Provider Registration +Look for these log messages on startup: +``` +INFO: Registering auction provider: prebid +INFO: Registering auction provider: aps +INFO: Registering auction provider: gam +``` + +### Common Issues + +**Issue:** `"Provider 'aps' not registered"` +**Fix:** Make sure `[integrations.aps]` is configured in `trusted-server.toml` + +**Issue:** `"No bidders configured"` +**Fix:** Make sure `bidders = ["prebid", "aps"]` is set in `[auction]` + +**Issue:** Tests fail with WASM errors +**Explanation:** Async tests don't work in WASM test environment. Integration tests via HTTP work fine! + +## Next Steps + +1. **Test with real Prebid Server** - Verify Prebid bids work correctly +2. **Implement real APS** - Replace mock with actual Amazon TAM API calls +3. **Implement real GAM** - Add Google Ad Manager API integration +4. **Add metrics** - Track bid rates, win rates, latency per provider + +## Mock Provider Behavior + +### APS (Amazon) +- Returns bids for all slots +- Default mock price: $2.50 CPM +- Always returns 2 bids +- Response time: ~80ms (simulated) + +### GAM (Google Ad Manager) +- Acts as mediator +- Can inject house ads at $1.75 CPM +- Wins 30% of auctions (configurable) +- Response time: ~40ms (simulated) +- Uses hash-based "randomness" for consistent testing + +### Prebid +- **Real implementation** - makes actual HTTP calls +- Queries configured SSPs +- Returns real bids from real bidders +- Response time: varies (network dependent) diff --git a/crates/common/build.rs b/crates/common/build.rs index d7b020b..a97734b 100644 --- a/crates/common/build.rs +++ b/crates/common/build.rs @@ -1,6 +1,9 @@ #[path = "src/error.rs"] mod error; +#[path = "src/auction_config_types.rs"] +mod auction_config_types; + #[path = "src/settings.rs"] mod settings; diff --git a/crates/common/src/auction/README.md b/crates/common/src/auction/README.md new file mode 100644 index 0000000..d5bfc0b --- /dev/null +++ b/crates/common/src/auction/README.md @@ -0,0 +1,233 @@ +# Auction Orchestration System + +A flexible, extensible framework for managing multi-provider header bidding auctions with support for parallel execution and mediation. + +## Overview + +The auction orchestration system allows you to: +- Run multiple auction providers (Prebid, Amazon APS, Google GAM, etc.) in parallel or sequentially +- Implement mediation strategies where a primary ad server (like GAM) makes the final decision +- Configure different auction flows for different scenarios +- Easily add new auction providers + +## Architecture + +``` +┌─────────────────────────────────────────────────────────┐ +│ Auction Orchestrator │ +│ - Manages auction workflow & sequencing │ +│ - Combines bids from multiple sources │ +│ - Applies business logic │ +└─────────────────────────────────────────────────────────┘ + │ + │ uses + ▼ +┌─────────────────────────────────────────────────────────┐ +│ AuctionProvider Trait │ +│ - request_bids() │ +│ - provider_name() │ +│ - timeout_ms() │ +│ - is_enabled() │ +└─────────────────────────────────────────────────────────┘ + │ + ┌─────────────────┼─────────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │ Prebid │ │ Amazon │ │ Google │ + │ Provider │ │ APS │ │ GAM │ + └──────────┘ └──────────┘ └──────────┘ +``` + +## Key Concepts + +### Auction Provider +Implements the `AuctionProvider` trait to integrate with a specific SSP/ad exchange. + +### Auction Flow +A named configuration that defines: +- Which providers participate +- Execution strategy (parallel, waterfall, etc.) +- Timeout settings +- Optional mediator + +### Orchestrator +Manages the execution of an auction flow, coordinates providers, and collects results. + +## Auction Strategies + +### 1. Parallel + Mediation (Recommended) +**Use case:** Header bidding with GAM as primary ad server + +```toml +[auction] +enabled = true +strategy = "parallel_mediation" +bidders = ["prebid", "aps"] +mediator = "gam" +timeout_ms = 2000 +``` + +**Flow:** +1. Prebid and APS run in parallel +2. Both return their bids simultaneously +3. Bids are sent to GAM for final mediation +4. GAM competes its own inventory and returns winning creative + +### 2. Parallel Only +**Use case:** Client-side auction, no mediation + +```toml +[auction] +enabled = true +strategy = "parallel_only" +bidders = ["prebid", "aps"] +timeout_ms = 2000 +``` + +**Flow:** +1. All bidders run in parallel +2. Highest bid wins +3. No mediation server involved + +### 3. Waterfall +**Use case:** Sequential fallback when parallel isn't needed + +```toml +[auction] +enabled = true +strategy = "waterfall" +bidders = ["prebid", "aps"] +timeout_ms = 2000 +``` + +**Flow:** +1. Try Prebid first +2. If Prebid returns no bids, try APS +3. Return first successful bid + +## Configuration + +### Configuration + +All auction settings are configured directly under `[auction]`: + +```toml +[auction] +enabled = true # Enable/disable auction orchestration +strategy = "parallel_mediation" # Auction strategy +bidders = ["prebid", "aps"] # List of bidder providers +mediator = "gam" # Optional mediator (only for parallel_mediation) +timeout_ms = 2000 # Overall auction timeout +``` + +### Provider Configuration + +Each provider has its own configuration section: + +```toml +[integrations.prebid] +enabled = true +server_url = "https://prebid-server.example.com" +timeout_ms = 1000 + +[integrations.aps] +enabled = true +mock = true # Set to false for real integration +timeout_ms = 800 + +[integrations.gam] +enabled = true +mock = true +timeout_ms = 500 +``` + +## Adding a New Provider + +1. Create a new file in `src/auction/providers/your_provider.rs` + +```rust +use async_trait::async_trait; +use crate::auction::provider::AuctionProvider; +use crate::auction::types::{AuctionContext, AuctionRequest, AuctionResponse}; + +pub struct YourAuctionProvider { + config: YourConfig, +} + +#[async_trait(?Send)] +impl AuctionProvider for YourAuctionProvider { + fn provider_name(&self) -> &'static str { + "your_provider" + } + + async fn request_bids( + &self, + request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + // 1. Transform AuctionRequest to your provider's format + // 2. Make HTTP request to your provider + // 3. Parse response + // 4. Return AuctionResponse with bids + todo!() + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} +``` + +2. Register the provider in `src/auction/providers/mod.rs` + +3. Configure it in `trusted-server.toml` + +## Testing + +### Mock Providers + +APS and GAM providers currently run in mock mode for testing the orchestration pattern: + +- **APS Mock**: Returns synthetic bids with Amazon branding +- **GAM Mock**: Acts as mediator, optionally injects house ads, simulates mediation logic + +Set `mock = false` when real implementations are ready. + +### Example Test Flow + +```rust +let orchestrator = AuctionOrchestrator::new(config); +orchestrator.register_provider(Arc::new(PrebidAuctionProvider::new(prebid_config))); +orchestrator.register_provider(Arc::new(ApsAuctionProvider::new(aps_config))); +orchestrator.register_provider(Arc::new(GamAuctionProvider::new(gam_config))); + +let result = orchestrator.run_auction(&request, &context).await?; + +// Check results +assert_eq!(result.winning_bids.len(), 2); +assert!(result.total_time_ms < 2000); +``` + +## Performance Considerations + +- **Parallel Execution**: Currently runs sequentially in Fastly Compute (no tokio runtime), but structured for easy parallelization +- **Timeouts**: Each provider has independent timeout; global timeout enforced at flow level +- **Error Handling**: Provider failures don't fail entire auction; partial results returned + +## Related Files + +- `src/auction/mod.rs` - Module exports +- `src/auction/types.rs` - Core auction types +- `src/auction/provider.rs` - Provider trait definition +- `src/auction/orchestrator.rs` - Orchestration logic +- `src/auction/config.rs` - Configuration types +- `src/auction/providers/` - Provider implementations + +## Questions? + +See the main project [README](../../../../README.md) or [integration guide](../../../../docs/integration_guide.md). diff --git a/crates/common/src/auction/config.rs b/crates/common/src/auction/config.rs new file mode 100644 index 0000000..8ffe5a5 --- /dev/null +++ b/crates/common/src/auction/config.rs @@ -0,0 +1,6 @@ +//! Configuration structures for auction orchestration. +//! +//! The base types are defined in auction_config_types.rs to avoid circular dependencies +//! with build.rs. This module re-exports them. + +pub use crate::auction_config_types::AuctionConfig; diff --git a/crates/common/src/auction/mod.rs b/crates/common/src/auction/mod.rs new file mode 100644 index 0000000..0ef3eed --- /dev/null +++ b/crates/common/src/auction/mod.rs @@ -0,0 +1,322 @@ +//! Auction orchestration module for managing multi-provider bidding. +//! +//! This module provides an extensible framework for running auctions across +//! multiple providers (Prebid, Amazon APS, Google GAM, etc.) with support for +//! parallel execution and mediation strategies. +//! +//! Note: Individual auction providers are located in the `integrations` module +//! (e.g., `crate::integrations::aps`, `crate::integrations::gam`, `crate::integrations::prebid`). + +use crate::settings::Settings; +use std::sync::{Arc, OnceLock}; + +pub mod config; +pub mod orchestrator; +pub mod provider; +pub mod types; + +pub use config::AuctionConfig; +pub use orchestrator::AuctionOrchestrator; +pub use provider::AuctionProvider; +pub use types::{ + AdFormat, AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus, MediaType, +}; + +/// Global auction orchestrator singleton. +/// +/// Initialized once on first access with the provided settings. +/// All providers are registered during initialization. +static GLOBAL_ORCHESTRATOR: OnceLock = OnceLock::new(); + +/// Type alias for provider builder functions. +type ProviderBuilder = fn(&Settings) -> Vec>; + +/// Returns the list of all available provider builder functions. +/// +/// This list is used to auto-discover and register auction providers from settings. +/// Each builder function checks the settings for its specific provider configuration +/// and returns any enabled providers. +fn provider_builders() -> &'static [ProviderBuilder] { + &[ + crate::integrations::prebid::register_auction_provider, + crate::integrations::aps::register_providers, + crate::integrations::gam::register_providers, + ] +} + +/// Initialize the global auction orchestrator. +/// +/// This function should be called once at application startup to initialize the orchestrator +/// with the application settings. All auction providers are automatically discovered and +/// registered during initialization. +/// +/// # Arguments +/// * `settings` - Application settings used to configure the orchestrator and providers +/// +/// # Returns +/// Reference to the global orchestrator instance +/// +/// # Panics +/// Panics if called more than once (orchestrator already initialized) +pub fn init_orchestrator(settings: &Settings) -> &'static AuctionOrchestrator { + GLOBAL_ORCHESTRATOR.get_or_init(|| { + log::info!("Initializing global auction orchestrator"); + + let mut orchestrator = AuctionOrchestrator::new(settings.auction.clone()); + + // Auto-discover and register all auction providers from settings + for builder in provider_builders() { + for provider in builder(settings) { + orchestrator.register_provider(provider); + } + } + + log::info!( + "Auction orchestrator initialized with {} providers", + orchestrator.provider_count() + ); + + orchestrator + }) +} + +/// Get the global auction orchestrator. +/// +/// Returns a reference to the orchestrator if it has been initialized via `init_orchestrator()`. +/// +/// # Returns +/// * `Some(&'static AuctionOrchestrator)` if the orchestrator has been initialized +/// * `None` if `init_orchestrator()` has not been called yet +pub fn get_orchestrator() -> Option<&'static AuctionOrchestrator> { + GLOBAL_ORCHESTRATOR.get() +} + +// ============================================================================ +// Top-Level Auction Handler +// ============================================================================ + +use error_stack::{Report, ResultExt}; +use fastly::http::{header, StatusCode}; +use fastly::{Request, Response}; +use serde::Deserialize; +use serde_json::{json, Value as JsonValue}; +use std::collections::HashMap; +use uuid::Uuid; + +use crate::creative; +use crate::error::TrustedServerError; +use crate::geo::GeoInfo; +use crate::synthetic::{generate_synthetic_id, get_or_generate_synthetic_id}; + +/// Request body format for auction endpoints (tsjs/Prebid.js format). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct AdRequest { + ad_units: Vec, + #[allow(dead_code)] + config: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct AdUnit { + code: String, + media_types: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MediaTypes { + banner: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BannerUnit { + sizes: Vec>, +} + +/// Handle auction request from /third-party/ad or /auction/run endpoints. +/// +/// This is the main entry point for running header bidding auctions. +/// It orchestrates bids from multiple providers (Prebid, APS, GAM, etc.) and returns +/// the winning bids in OpenRTB format. +pub async fn handle_auction( + settings: &Settings, + mut req: Request, +) -> Result> { + // Parse request body + let body: AdRequest = serde_json::from_slice(&req.take_body_bytes()).change_context( + TrustedServerError::Auction { + message: "Failed to parse auction request body".to_string(), + }, + )?; + + log::info!( + "Auction request received for {} ad units", + body.ad_units.len() + ); + + // Get the global orchestrator (should be initialized at startup) + let orchestrator = get_orchestrator().ok_or_else(|| { + Report::new(TrustedServerError::Auction { + message: "Auction orchestrator not initialized. Call init_orchestrator() at startup." + .to_string(), + }) + })?; + + // Convert tsjs request format to auction request + let auction_request = convert_tsjs_to_auction_request(&body, settings, &req)?; + + // Create auction context + let context = AuctionContext { + settings, + request: &req, + timeout_ms: settings.auction.timeout_ms, + }; + + // Run the auction + let result = orchestrator + .run_auction(&auction_request, &context) + .await + .change_context(TrustedServerError::Auction { + message: "Auction orchestration failed".to_string(), + })?; + + log::info!( + "Auction completed: {} bidders, {} winning bids, {}ms total", + result.bidder_responses.len(), + result.winning_bids.len(), + result.total_time_ms + ); + + // Convert to OpenRTB response format + convert_to_openrtb_response(&result, settings) +} + +/// Convert tsjs/Prebid.js request format to internal AuctionRequest. +fn convert_tsjs_to_auction_request( + body: &AdRequest, + settings: &Settings, + req: &Request, +) -> Result> { + use types::{AdSlot, DeviceInfo, PublisherInfo, SiteInfo, UserInfo}; + + // Generate synthetic ID + let synthetic_id = get_or_generate_synthetic_id(settings, req).change_context( + TrustedServerError::Auction { + message: "Failed to generate synthetic ID".to_string(), + }, + )?; + let fresh_id = + generate_synthetic_id(settings, req).change_context(TrustedServerError::Auction { + message: "Failed to generate fresh ID".to_string(), + })?; + + // Convert ad units to slots + let mut slots = Vec::new(); + for unit in &body.ad_units { + if let Some(media_types) = &unit.media_types { + if let Some(banner) = &media_types.banner { + let formats: Vec = banner + .sizes + .iter() + .map(|size| AdFormat { + width: size[0], + height: size[1], + media_type: MediaType::Banner, + }) + .collect(); + + slots.push(AdSlot { + id: unit.code.clone(), + formats, + floor_price: None, + targeting: std::collections::HashMap::new(), + }); + } + } + } + + // Get geo info if available + let device = GeoInfo::from_request(req).map(|geo| DeviceInfo { + user_agent: req.get_header_str("user-agent").map(|s| s.to_string()), + ip: req.get_client_ip_addr().map(|ip| ip.to_string()), + geo: Some(types::GeoInfo { + country: Some(geo.country), + region: geo.region, + city: Some(geo.city), + }), + }); + + Ok(AuctionRequest { + id: Uuid::new_v4().to_string(), + slots, + publisher: PublisherInfo { + domain: settings.publisher.domain.clone(), + page_url: Some(format!("https://{}", settings.publisher.domain)), + }, + user: UserInfo { + id: synthetic_id, + fresh_id, + consent: None, + }, + device, + site: Some(SiteInfo { + domain: settings.publisher.domain.clone(), + page: format!("https://{}", settings.publisher.domain), + }), + context: HashMap::new(), + }) +} + +/// Convert OrchestrationResult to OpenRTB response format. +fn convert_to_openrtb_response( + result: &orchestrator::OrchestrationResult, + settings: &Settings, +) -> Result> { + // Build OpenRTB-style seatbid array + let mut seatbids = Vec::new(); + + for (slot_id, bid) in &result.winning_bids { + let rewritten_creative = creative::rewrite_creative_html(&bid.creative, settings); + + let bid_obj = json!({ + "id": format!("{}-{}", bid.bidder, slot_id), + "impid": slot_id, + "price": bid.price, + "adm": rewritten_creative, + "crid": format!("{}-creative", bid.bidder), + "w": bid.width, + "h": bid.height, + "adomain": bid.adomain.clone().unwrap_or_default(), + }); + + seatbids.push(json!({ + "seat": bid.bidder, + "bid": [bid_obj] + })); + } + + let response_body = json!({ + "id": "auction-response", + "seatbid": seatbids, + "ext": { + "orchestrator": { + "strategy": settings.auction.strategy, + "bidders": result.bidder_responses.len(), + "total_bids": result.total_bids(), + "time_ms": result.total_time_ms + } + } + }); + + let body_bytes = + serde_json::to_vec(&response_body).change_context(TrustedServerError::Auction { + message: "Failed to serialize auction response".to_string(), + })?; + + Ok(Response::from_status(StatusCode::OK) + .with_header(header::CONTENT_TYPE, "application/json") + .with_body(body_bytes)) +} diff --git a/crates/common/src/auction/orchestrator.rs b/crates/common/src/auction/orchestrator.rs new file mode 100644 index 0000000..425538b --- /dev/null +++ b/crates/common/src/auction/orchestrator.rs @@ -0,0 +1,805 @@ +//! Auction orchestrator for managing multi-provider auctions. + +use error_stack::{Report, ResultExt}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use crate::error::TrustedServerError; + +use super::config::AuctionConfig; +use super::provider::AuctionProvider; +use super::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus}; + +/// Manages auction execution across multiple providers. +pub struct AuctionOrchestrator { + config: AuctionConfig, + providers: HashMap>, +} + +impl AuctionOrchestrator { + /// Create a new orchestrator with the given configuration. + pub fn new(config: AuctionConfig) -> Self { + Self { + config, + providers: HashMap::new(), + } + } + + /// Register an auction provider. + pub fn register_provider(&mut self, provider: Arc) { + let name = provider.provider_name().to_string(); + log::info!("Registering auction provider: {}", name); + self.providers.insert(name, provider); + } + + /// Get the number of registered providers. + pub fn provider_count(&self) -> usize { + self.providers.len() + } + + /// Execute an auction using the configured strategy. + pub async fn run_auction( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result> { + let start_time = Instant::now(); + + log::info!("Running auction with strategy: {}", self.config.strategy); + + let result = match self.config.strategy.as_str() { + "parallel_mediation" => self.run_parallel_mediation(request, context).await, + "parallel_only" => self.run_parallel_only(request, context).await, + "waterfall" => self.run_waterfall(request, context).await, + strategy => Err(Report::new(TrustedServerError::Auction { + message: format!( + "Unknown auction strategy '{}'. Valid strategies: parallel_mediation, parallel_only, waterfall", + strategy + ), + })), + }?; + + Ok(OrchestrationResult { + total_time_ms: start_time.elapsed().as_millis() as u64, + ..result + }) + } + + /// Run auction with parallel bidding + mediation. + /// + /// Flow: + /// 1. Run all bidders in parallel + /// 2. Collect bids from all bidders + /// 3. Send combined bids to mediator for final decision + async fn run_parallel_mediation( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result> { + // Phase 1: Run bidders in parallel + let bidder_responses = self.run_bidders_parallel(request, context).await?; + + // Phase 2: Send to mediator if configured + let (mediator_response, winning_bids) = if self.config.has_mediator() { + let mediator_name = self.config.mediator.as_ref().unwrap(); + let mediator = self.get_provider(mediator_name)?; + + log::info!( + "Sending {} bidder responses to mediator: {}", + bidder_responses.len(), + mediator.provider_name() + ); + + // Create a modified request with all bids attached + let mut mediation_request = request.clone(); + mediation_request.context.insert( + "bidder_responses".to_string(), + serde_json::json!(&bidder_responses), + ); + + let start_time = Instant::now(); + let pending = mediator + .request_bids(&mediation_request, context) + .change_context(TrustedServerError::Auction { + message: format!("Mediator {} failed to launch", mediator.provider_name()), + })?; + + let backend_response = pending.wait().change_context(TrustedServerError::Auction { + message: format!("Mediator {} request failed", mediator.provider_name()), + })?; + + let response_time_ms = start_time.elapsed().as_millis() as u64; + let mediator_resp = mediator + .parse_response(backend_response, response_time_ms) + .change_context(TrustedServerError::Auction { + message: format!("Mediator {} parse failed", mediator.provider_name()), + })?; + + // Extract winning bids from mediator response + let winning = mediator_resp + .bids + .iter() + .map(|bid| (bid.slot_id.clone(), bid.clone())) + .collect(); + + (Some(mediator_resp), winning) + } else { + // No mediator - select best bid per slot from bidder responses + let winning = self.select_winning_bids(&bidder_responses); + (None, winning) + }; + + Ok(OrchestrationResult { + bidder_responses, + mediator_response, + winning_bids, + total_time_ms: 0, // Will be set by caller + metadata: HashMap::new(), + }) + } + + /// Run auction with only parallel bidding (no mediation). + async fn run_parallel_only( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result> { + let bidder_responses = self.run_bidders_parallel(request, context).await?; + let winning_bids = self.select_winning_bids(&bidder_responses); + + Ok(OrchestrationResult { + bidder_responses, + mediator_response: None, + winning_bids, + total_time_ms: 0, + metadata: HashMap::new(), + }) + } + + /// Run auction with waterfall strategy (sequential). + async fn run_waterfall( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result> { + let mut bidder_responses = Vec::new(); + let mut winning_bids = HashMap::new(); + + // Try each bidder sequentially until we get bids + for bidder_name in self.config.bidder_names() { + let provider = match self.providers.get(bidder_name) { + Some(p) => p, + None => { + log::warn!("Provider '{}' not registered, skipping", bidder_name); + continue; + } + }; + + if !provider.is_enabled() { + log::debug!( + "Provider '{}' is disabled, skipping", + provider.provider_name() + ); + continue; + } + + log::info!("Waterfall: trying provider {}", provider.provider_name()); + + let start_time = Instant::now(); + match provider.request_bids(request, context) { + Ok(pending) => { + match pending.wait() { + Ok(backend_response) => { + let response_time_ms = start_time.elapsed().as_millis() as u64; + + match provider.parse_response(backend_response, response_time_ms) { + Ok(response) => { + let has_bids = !response.bids.is_empty() + && response.status == BidStatus::Success; + bidder_responses.push(response.clone()); + + if has_bids { + // Got bids, stop waterfall + winning_bids = response + .bids + .into_iter() + .map(|bid| (bid.slot_id.clone(), bid)) + .collect(); + break; + } + } + Err(e) => { + log::warn!( + "Provider '{}' failed to parse response in waterfall: {:?}", + provider.provider_name(), + e + ); + } + } + } + Err(e) => { + log::warn!( + "Provider '{}' request failed in waterfall: {:?}", + provider.provider_name(), + e + ); + } + } + } + Err(e) => { + log::warn!( + "Provider '{}' failed to launch request in waterfall: {:?}", + provider.provider_name(), + e + ); + // Continue to next provider + } + } + } + + Ok(OrchestrationResult { + bidder_responses, + mediator_response: None, + winning_bids, + total_time_ms: 0, + metadata: HashMap::new(), + }) + } + + /// Run all bidders in parallel and collect responses. + async fn run_bidders_parallel( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result, Report> { + use std::time::Instant; + + let bidder_names = self.config.bidder_names(); + + if bidder_names.is_empty() { + return Err(Report::new(TrustedServerError::Auction { + message: "No bidders configured".to_string(), + })); + } + + log::info!( + "Running {} bidders in parallel using send_async", + bidder_names.len() + ); + + // Phase 1: Launch all requests concurrently + let mut pending_requests = Vec::new(); + + for bidder_name in bidder_names { + let provider = match self.providers.get(bidder_name) { + Some(p) => p, + None => { + log::warn!("Provider '{}' not registered, skipping", bidder_name); + continue; + } + }; + + if !provider.is_enabled() { + log::debug!( + "Provider '{}' is disabled, skipping", + provider.provider_name() + ); + continue; + } + + log::info!("Launching bid request to: {}", provider.provider_name()); + + let start_time = Instant::now(); + match provider.request_bids(request, context) { + Ok(pending) => { + pending_requests.push(( + provider.provider_name(), + pending, + start_time, + provider.as_ref(), + )); + log::debug!( + "Request to '{}' launched successfully", + provider.provider_name() + ); + } + Err(e) => { + log::warn!( + "Provider '{}' failed to launch request: {:?}", + provider.provider_name(), + e + ); + } + } + } + + log::info!( + "Launched {} concurrent requests, waiting for responses...", + pending_requests.len() + ); + + // Phase 2: Wait for all responses + let mut responses = Vec::new(); + + for (provider_name, pending, start_time, provider) in pending_requests { + match pending.wait() { + Ok(response) => { + let response_time_ms = start_time.elapsed().as_millis() as u64; + + match provider.parse_response(response, response_time_ms) { + Ok(auction_response) => { + log::info!( + "Provider '{}' returned {} bids (status: {:?}, time: {}ms)", + auction_response.provider, + auction_response.bids.len(), + auction_response.status, + auction_response.response_time_ms + ); + responses.push(auction_response); + } + Err(e) => { + log::warn!( + "Provider '{}' failed to parse response: {:?}", + provider_name, + e + ); + responses.push(AuctionResponse::error(provider_name, response_time_ms)); + } + } + } + Err(e) => { + let response_time_ms = start_time.elapsed().as_millis() as u64; + log::warn!("Provider '{}' request failed: {:?}", provider_name, e); + responses.push(AuctionResponse::error(provider_name, response_time_ms)); + } + } + } + + Ok(responses) + } + + /// Select the best bid for each slot from all responses. + fn select_winning_bids(&self, responses: &[AuctionResponse]) -> HashMap { + let mut winning_bids: HashMap = HashMap::new(); + + for response in responses { + if response.status != BidStatus::Success { + continue; + } + + for bid in &response.bids { + let should_replace = match winning_bids.get(&bid.slot_id) { + Some(current_winner) => bid.price > current_winner.price, + None => true, + }; + + if should_replace { + winning_bids.insert(bid.slot_id.clone(), bid.clone()); + } + } + } + + log::info!("Selected {} winning bids", winning_bids.len()); + winning_bids + } + + /// Get a provider by name. + fn get_provider( + &self, + name: &str, + ) -> Result<&Arc, Report> { + self.providers.get(name).ok_or_else(|| { + log::warn!( + "Provider '{}' configured but not registered. Available providers: {:?}", + name, + self.providers.keys().collect::>() + ); + Report::new(TrustedServerError::Auction { + message: format!("Provider '{}' not registered", name), + }) + }) + } + + /// Check if orchestrator is enabled. + pub fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +/// Result of an orchestrated auction. +#[derive(Debug, Clone)] +pub struct OrchestrationResult { + /// All responses from bidders + pub bidder_responses: Vec, + /// Final response from mediator (if used) + pub mediator_response: Option, + /// Winning bids per slot + pub winning_bids: HashMap, + /// Total orchestration time in milliseconds + pub total_time_ms: u64, + /// Metadata about the auction + pub metadata: HashMap, +} + +impl OrchestrationResult { + /// Get the winning bid for a specific slot. + pub fn get_winning_bid(&self, slot_id: &str) -> Option<&Bid> { + self.winning_bids.get(slot_id) + } + + /// Get all bids from all providers for a specific slot. + pub fn get_all_bids_for_slot(&self, slot_id: &str) -> Vec<&Bid> { + self.bidder_responses + .iter() + .flat_map(|response| &response.bids) + .filter(|bid| bid.slot_id == slot_id) + .collect() + } + + /// Get the total number of bids received. + pub fn total_bids(&self) -> usize { + self.bidder_responses.iter().map(|r| r.bids.len()).sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::auction::types::*; + use crate::integrations::aps::{MockApsConfig, MockApsProvider}; + use crate::integrations::gam::{MockGamConfig, MockGamProvider}; + use crate::test_support::tests::crate_test_settings_str; + use fastly::Request; + use std::collections::HashMap; + + fn create_test_auction_request() -> AuctionRequest { + AuctionRequest { + id: "test-auction-123".to_string(), + slots: vec![ + AdSlot { + id: "header-banner".to_string(), + formats: vec![AdFormat { + media_type: MediaType::Banner, + width: 728, + height: 90, + }], + floor_price: Some(1.50), + targeting: HashMap::new(), + }, + AdSlot { + id: "sidebar".to_string(), + formats: vec![AdFormat { + media_type: MediaType::Banner, + width: 300, + height: 250, + }], + floor_price: Some(1.00), + targeting: HashMap::new(), + }, + ], + publisher: PublisherInfo { + domain: "test.com".to_string(), + page_url: Some("https://test.com/article".to_string()), + }, + user: UserInfo { + id: "user-123".to_string(), + fresh_id: "fresh-456".to_string(), + consent: None, + }, + device: None, + site: None, + context: HashMap::new(), + } + } + + fn create_test_settings() -> crate::settings::Settings { + let settings_str = crate_test_settings_str(); + crate::settings::Settings::from_toml(&settings_str).expect("should parse test settings") + } + + fn create_test_context<'a>( + settings: &'a crate::settings::Settings, + req: &'a Request, + ) -> AuctionContext<'a> { + AuctionContext { + settings, + request: req, + timeout_ms: 2000, + } + } + + // TODO: Re-enable these tests after implementing mock provider support for send_async() + // Mock providers currently don't work with concurrent requests because they can't + // create PendingRequest without real backends configured in Fastly. + // + // Options to fix: + // 1. Configure dummy backends in fastly.toml for testing + // 2. Refactor mock providers to use a different pattern + // 3. Create a test-only mock backend server + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_parallel_mediation_strategy() { + let config = AuctionConfig { + enabled: true, + strategy: "parallel_mediation".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: Some("gam_mock".to_string()), + timeout_ms: 2000, + }; + + let mut orchestrator = AuctionOrchestrator::new(config); + + // Register mock providers + let aps_config = MockApsConfig { + enabled: true, + bid_price: 2.50, + ..Default::default() + }; + let gam_config = MockGamConfig { + enabled: true, + inject_house_bids: true, + house_bid_price: 1.75, + win_rate: 50, + ..Default::default() + }; + + orchestrator.register_provider(Arc::new(MockApsProvider::new(aps_config))); + orchestrator.register_provider(Arc::new(MockGamProvider::new(gam_config))); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator + .run_auction(&request, &context) + .await + .expect("auction should succeed"); + + // Verify bidder ran + assert_eq!(result.bidder_responses.len(), 1); + assert_eq!(result.bidder_responses[0].provider, "aps_mock"); + + // Verify mediator ran + assert!(result.mediator_response.is_some()); + let mediator_resp = result.mediator_response.unwrap(); + assert_eq!(mediator_resp.provider, "gam_mock"); + + // Verify we got winning bids (GAM mediated) + assert!(!result.winning_bids.is_empty()); + + // Timing is available (may be 0 in WASM test env, but field exists) + let _ = result.total_time_ms; + } + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_parallel_only_strategy() { + let config = AuctionConfig { + enabled: true, + strategy: "parallel_only".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: None, + timeout_ms: 2000, + }; + + let mut orchestrator = AuctionOrchestrator::new(config); + + let aps_config = MockApsConfig { + enabled: true, + bid_price: 2.50, + ..Default::default() + }; + + orchestrator.register_provider(Arc::new(MockApsProvider::new(aps_config))); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator + .run_auction(&request, &context) + .await + .expect("auction should succeed"); + + // No mediator in parallel_only + assert!(result.mediator_response.is_none()); + + // Should have bids from APS + assert_eq!(result.bidder_responses.len(), 1); + assert!(result.bidder_responses[0].bids.len() > 0); + + // Winning bids selected directly from bidders + assert!(!result.winning_bids.is_empty()); + } + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_waterfall_strategy() { + let config = AuctionConfig { + enabled: true, + strategy: "waterfall".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: None, + timeout_ms: 2000, + }; + + let mut orchestrator = AuctionOrchestrator::new(config); + + let aps_config = MockApsConfig { + enabled: true, + bid_price: 2.50, + ..Default::default() + }; + + orchestrator.register_provider(Arc::new(MockApsProvider::new(aps_config))); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator + .run_auction(&request, &context) + .await + .expect("auction should succeed"); + + // Should have tried APS (first in waterfall) + assert_eq!(result.bidder_responses.len(), 1); + assert_eq!(result.bidder_responses[0].provider, "aps_mock"); + + // No mediator + assert!(result.mediator_response.is_none()); + } + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_multiple_bidders() { + let config = AuctionConfig { + enabled: true, + strategy: "parallel_only".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: None, + timeout_ms: 2000, + }; + + let mut orchestrator = AuctionOrchestrator::new(config); + + // Register provider with different mock prices + let aps_config = MockApsConfig { + enabled: true, + bid_price: 2.50, + ..Default::default() + }; + + orchestrator.register_provider(Arc::new(MockApsProvider::new(aps_config))); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator + .run_auction(&request, &context) + .await + .expect("auction should succeed"); + + // Should have bids for both slots + assert_eq!(result.winning_bids.len(), 2); + assert!(result.winning_bids.contains_key("header-banner")); + assert!(result.winning_bids.contains_key("sidebar")); + } + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_orchestration_result_helpers() { + let config = AuctionConfig { + enabled: true, + strategy: "parallel_only".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: None, + timeout_ms: 2000, + }; + + let mut orchestrator = AuctionOrchestrator::new(config); + + let aps_config = MockApsConfig { + enabled: true, + ..Default::default() + }; + + orchestrator.register_provider(Arc::new(MockApsProvider::new(aps_config))); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator + .run_auction(&request, &context) + .await + .expect("auction should succeed"); + + // Test helper methods + let header_bid = result.get_winning_bid("header-banner"); + assert!(header_bid.is_some()); + + let all_header_bids = result.get_all_bids_for_slot("header-banner"); + assert!(!all_header_bids.is_empty()); + + let total_bids = result.total_bids(); + assert!(total_bids > 0); + } + + #[tokio::test] + #[ignore = "Mock providers not yet supported with send_async"] + async fn test_unknown_strategy_error() { + let config = AuctionConfig { + enabled: true, + strategy: "invalid_strategy".to_string(), + bidders: vec!["aps_mock".to_string()], + mediator: None, + timeout_ms: 2000, + }; + + let orchestrator = AuctionOrchestrator::new(config); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator.run_auction(&request, &context).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + let err_msg = format!("{}", err); + assert!(err_msg.contains("Unknown auction strategy")); + assert!(err_msg.contains("parallel_mediation, parallel_only, waterfall")); + } + + #[tokio::test] + async fn test_no_bidders_configured() { + let config = AuctionConfig { + enabled: true, + strategy: "parallel_only".to_string(), + bidders: vec![], + mediator: None, + timeout_ms: 2000, + }; + + let orchestrator = AuctionOrchestrator::new(config); + + let request = create_test_auction_request(); + let settings = create_test_settings(); + let req = Request::get("https://test.com/test"); + let context = create_test_context(&settings, &req); + + let result = orchestrator.run_auction(&request, &context).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(format!("{}", err).contains("No bidders configured")); + } + + #[test] + fn test_orchestrator_is_enabled() { + let config = AuctionConfig { + enabled: true, + ..Default::default() + }; + let orchestrator = AuctionOrchestrator::new(config); + assert!(orchestrator.is_enabled()); + + let config = AuctionConfig { + enabled: false, + ..Default::default() + }; + let orchestrator = AuctionOrchestrator::new(config); + assert!(!orchestrator.is_enabled()); + } +} + diff --git a/crates/common/src/auction/provider.rs b/crates/common/src/auction/provider.rs new file mode 100644 index 0000000..c0324d0 --- /dev/null +++ b/crates/common/src/auction/provider.rs @@ -0,0 +1,51 @@ +//! Trait definition for auction providers. + +use error_stack::Report; +use fastly::http::request::PendingRequest; + +use crate::error::TrustedServerError; + +use super::types::{AuctionContext, AuctionRequest, AuctionResponse}; + +/// Trait implemented by all auction providers (Prebid, APS, GAM, etc.). +pub trait AuctionProvider: Send + Sync { + /// Unique identifier for this provider (e.g., "prebid", "aps", "gam"). + fn provider_name(&self) -> &'static str; + + /// Submit a bid request to this provider and return a pending request. + /// + /// Implementations should: + /// - Transform AuctionRequest to provider-specific format + /// - Make HTTP call to provider endpoint using send_async() + /// - Return PendingRequest for orchestrator to await + /// + /// The orchestrator will handle waiting for responses and parsing them. + fn request_bids( + &self, + request: &AuctionRequest, + context: &AuctionContext<'_>, + ) -> Result>; + + /// Parse the response from the provider into an AuctionResponse. + /// + /// Called by the orchestrator after the PendingRequest completes. + fn parse_response( + &self, + response: fastly::Response, + response_time_ms: u64, + ) -> Result>; + + /// Check if this provider supports a specific media type. + fn supports_media_type(&self, media_type: &super::types::MediaType) -> bool { + // By default, support banner ads + matches!(media_type, super::types::MediaType::Banner) + } + + /// Get the configured timeout for this provider in milliseconds. + fn timeout_ms(&self) -> u32; + + /// Check if this provider is enabled. + fn is_enabled(&self) -> bool { + true + } +} diff --git a/crates/common/src/auction/types.rs b/crates/common/src/auction/types.rs new file mode 100644 index 0000000..b5c34d7 --- /dev/null +++ b/crates/common/src/auction/types.rs @@ -0,0 +1,201 @@ +//! Core types for auction requests and responses. + +use fastly::Request; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +use crate::settings::Settings; + +/// Represents a unified auction request across all providers. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuctionRequest { + /// Unique auction ID + pub id: String, + /// Ad slots/impressions being auctioned + pub slots: Vec, + /// Publisher information + pub publisher: PublisherInfo, + /// User information (privacy-preserving) + pub user: UserInfo, + /// Device information + pub device: Option, + /// Site information + pub site: Option, + /// Additional context + pub context: HashMap, +} + +/// Represents a single ad slot/impression. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AdSlot { + /// Slot identifier (e.g., "header-banner") + pub id: String, + /// Media types and formats supported + pub formats: Vec, + /// Floor price if any + pub floor_price: Option, + /// Slot-specific targeting + pub targeting: HashMap, +} + +/// Ad format specification. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AdFormat { + pub media_type: MediaType, + pub width: u32, + pub height: u32, +} + +/// Media type enumeration. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum MediaType { + Banner, + Video, + Native, +} + +/// Publisher information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PublisherInfo { + pub domain: String, + pub page_url: Option, +} + +/// Privacy-preserving user information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserInfo { + /// Synthetic/hashed user ID + pub id: String, + /// Fresh ID for this session + pub fresh_id: String, + /// GDPR consent string if applicable + pub consent: Option, +} + +/// Device information from request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceInfo { + pub user_agent: Option, + pub ip: Option, + pub geo: Option, +} + +/// Geographic information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeoInfo { + pub country: Option, + pub region: Option, + pub city: Option, +} + +/// Site information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SiteInfo { + pub domain: String, + pub page: String, +} + +/// Context passed to auction providers. +pub struct AuctionContext<'a> { + pub settings: &'a Settings, + pub request: &'a Request, + pub timeout_ms: u32, +} + +/// Response from a single auction provider. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuctionResponse { + /// Provider that generated this response + pub provider: String, + /// Bids returned + pub bids: Vec, + /// Status of the auction + pub status: BidStatus, + /// Response time in milliseconds + pub response_time_ms: u64, + /// Provider-specific metadata + pub metadata: HashMap, +} + +/// Individual bid from a provider. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Bid { + /// Slot this bid is for + pub slot_id: String, + /// Bid price in CPM + pub price: f64, + /// Currency code (e.g., "USD") + pub currency: String, + /// Creative markup (HTML/VAST) + pub creative: String, + /// Advertiser domain + pub adomain: Option>, + /// Bidder/seat identifier + pub bidder: String, + /// Width of creative + pub width: u32, + /// Height of creative + pub height: u32, + /// Win notification URL + pub nurl: Option, + /// Billing notification URL + pub burl: Option, + /// Provider-specific bid metadata + pub metadata: HashMap, +} + +/// Status of bid response. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum BidStatus { + /// Auction completed successfully + Success, + /// No bids returned + NoBid, + /// Auction failed/timed out + Error, + /// Auction still in progress + Pending, +} + +impl AuctionResponse { + /// Create a new successful auction response. + pub fn success(provider: impl Into, bids: Vec, response_time_ms: u64) -> Self { + Self { + provider: provider.into(), + bids, + status: BidStatus::Success, + response_time_ms, + metadata: HashMap::new(), + } + } + + /// Create a no-bid response. + pub fn no_bid(provider: impl Into, response_time_ms: u64) -> Self { + Self { + provider: provider.into(), + bids: Vec::new(), + status: BidStatus::NoBid, + response_time_ms, + metadata: HashMap::new(), + } + } + + /// Create an error response. + pub fn error(provider: impl Into, response_time_ms: u64) -> Self { + Self { + provider: provider.into(), + bids: Vec::new(), + status: BidStatus::Error, + response_time_ms, + metadata: HashMap::new(), + } + } + + /// Add metadata to the response. + pub fn with_metadata(mut self, key: impl Into, value: serde_json::Value) -> Self { + self.metadata.insert(key.into(), value); + self + } +} diff --git a/crates/common/src/auction_config_types.rs b/crates/common/src/auction_config_types.rs new file mode 100644 index 0000000..89db859 --- /dev/null +++ b/crates/common/src/auction_config_types.rs @@ -0,0 +1,48 @@ +//! Auction configuration types (separated to avoid circular deps in build.rs). + +use serde::{Deserialize, Serialize}; + +/// Auction orchestration configuration. +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct AuctionConfig { + /// Enable the auction orchestrator + #[serde(default)] + pub enabled: bool, + + /// Auction strategy: "parallel_mediation", "waterfall", "parallel_only" + #[serde(default = "default_strategy")] + pub strategy: String, + + /// Provider names that participate in bidding + /// Simply list the provider names (e.g., ["prebid", "aps"]) + #[serde(default)] + pub bidders: Vec, + + /// Optional mediator provider name (e.g., "gam") + pub mediator: Option, + + /// Timeout in milliseconds + #[serde(default = "default_timeout")] + pub timeout_ms: u32, +} + +fn default_strategy() -> String { + "parallel_mediation".to_string() +} + +fn default_timeout() -> u32 { + 2000 +} + +#[allow(dead_code)] // Methods used in runtime but not in build script +impl AuctionConfig { + /// Get all bidder names. + pub fn bidder_names(&self) -> &[String] { + &self.bidders + } + + /// Check if this config has a mediator configured. + pub fn has_mediator(&self) -> bool { + self.mediator.is_some() + } +} diff --git a/crates/common/src/error.rs b/crates/common/src/error.rs index b57bc5d..c5964cf 100644 --- a/crates/common/src/error.rs +++ b/crates/common/src/error.rs @@ -22,6 +22,10 @@ pub enum TrustedServerError { #[display("Configuration error: {message}")] Configuration { message: String }, + /// Auction orchestration error. + #[display("Auction error: {message}")] + Auction { message: String }, + /// GAM (Google Ad Manager) integration error. #[display("GAM error: {message}")] Gam { message: String }, @@ -89,6 +93,7 @@ pub trait IntoHttpResponse { impl IntoHttpResponse for TrustedServerError { fn status_code(&self) -> StatusCode { match self { + Self::Auction { .. } => StatusCode::BAD_GATEWAY, Self::BadRequest { .. } => StatusCode::BAD_REQUEST, Self::Configuration { .. } | Self::Settings { .. } => StatusCode::INTERNAL_SERVER_ERROR, Self::Gam { .. } => StatusCode::BAD_GATEWAY, diff --git a/crates/common/src/integrations/aps.rs b/crates/common/src/integrations/aps.rs new file mode 100644 index 0000000..354e012 --- /dev/null +++ b/crates/common/src/integrations/aps.rs @@ -0,0 +1,347 @@ +//! Amazon Publisher Services (APS/TAM) integration. +//! +//! This module provides both real and mock implementations of the APS auction provider. + +use error_stack::{Report, ResultExt}; +use fastly::{http::StatusCode, Request, Response}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::time::Instant; +use validator::Validate; + +use crate::auction::provider::AuctionProvider; +use crate::auction::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, MediaType}; +use crate::error::TrustedServerError; +use crate::settings::IntegrationConfig as IntegrationConfigTrait; + +// ============================================================================ +// Real APS Provider +// ============================================================================ + +/// Configuration for APS integration. +#[derive(Debug, Clone, Deserialize, Serialize, Validate)] +pub struct ApsConfig { + /// Whether APS integration is enabled + #[serde(default = "default_enabled")] + pub enabled: bool, + + /// APS publisher ID + pub pub_id: String, + + /// APS API endpoint + #[serde(default = "default_endpoint")] + pub endpoint: String, + + /// Timeout in milliseconds + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u32, +} + +fn default_enabled() -> bool { + false +} + +fn default_endpoint() -> String { + "https://aax.amazon-adsystem.com/e/dtb/bid".to_string() +} + +fn default_timeout_ms() -> u32 { + 800 +} + +impl Default for ApsConfig { + fn default() -> Self { + Self { + enabled: default_enabled(), + pub_id: String::new(), + endpoint: default_endpoint(), + timeout_ms: default_timeout_ms(), + } + } +} + +impl IntegrationConfigTrait for ApsConfig { + fn is_enabled(&self) -> bool { + self.enabled + } +} + +/// Amazon APS auction provider. +pub struct ApsAuctionProvider { + config: ApsConfig, +} + +impl ApsAuctionProvider { + /// Create a new APS auction provider. + pub fn new(config: ApsConfig) -> Self { + Self { config } + } +} + +impl AuctionProvider for ApsAuctionProvider { + fn provider_name(&self) -> &'static str { + "aps" + } + + fn request_bids( + &self, + request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + log::info!( + "APS: requesting bids for {} slots (pub_id: {})", + request.slots.len(), + self.config.pub_id + ); + + // TODO: Implement real APS TAM API integration + // + // Implementation steps: + // 1. Transform AuctionRequest to APS TAM bid request format + // 2. Make HTTP POST to self.config.endpoint with send_async(): + // - Publisher ID (pub_id) + // - Slot information (sizes, ad unit codes) + // - User agent, page URL from context + // 3. Return PendingRequest + // + // Reference: https://aps.amazon.com/aps/transparent-ad-marketplace-api/ + + log::warn!("APS: Real implementation not yet available"); + + Err(Report::new(TrustedServerError::Auction { + message: "APS integration not yet implemented. Use 'aps_mock' provider for testing." + .to_string(), + })) + } + + fn parse_response( + &self, + _response: fastly::Response, + response_time_ms: u64, + ) -> Result> { + // TODO: Parse APS TAM response format + Ok(AuctionResponse::error("aps", response_time_ms)) + } + + fn supports_media_type(&self, media_type: &MediaType) -> bool { + // APS supports banner and video formats + matches!(media_type, MediaType::Banner | MediaType::Video) + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +// ============================================================================ +// Mock APS Provider +// ============================================================================ + +/// Configuration for mock APS integration. +#[derive(Debug, Clone, Deserialize, Serialize, Validate)] +pub struct MockApsConfig { + /// Whether this mock provider is enabled + #[serde(default = "mock_default_enabled")] + pub enabled: bool, + + /// Timeout in milliseconds + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u32, + + /// Mock bid price (CPM) - default bid amount + #[serde(default = "mock_default_bid_price")] + pub bid_price: f64, + + /// Simulated network latency in milliseconds + #[serde(default = "mock_default_latency_ms")] + pub latency_ms: u64, + + /// Fill rate (0.0 to 1.0) - probability of returning a bid + #[serde(default = "mock_default_fill_rate")] + #[validate(range(min = 0.0, max = 1.0))] + pub fill_rate: f64, +} + +fn mock_default_enabled() -> bool { + false +} + +fn mock_default_bid_price() -> f64 { + 2.50 +} + +fn mock_default_latency_ms() -> u64 { + 80 +} + +fn mock_default_fill_rate() -> f64 { + 1.0 // Always return bids by default +} + +impl Default for MockApsConfig { + fn default() -> Self { + Self { + enabled: mock_default_enabled(), + timeout_ms: default_timeout_ms(), + bid_price: mock_default_bid_price(), + latency_ms: mock_default_latency_ms(), + fill_rate: mock_default_fill_rate(), + } + } +} + +impl IntegrationConfigTrait for MockApsConfig { + fn is_enabled(&self) -> bool { + self.enabled + } +} + +/// Mock Amazon APS auction provider. +pub struct MockApsProvider { + config: MockApsConfig, +} + +impl MockApsProvider { + /// Create a new mock APS auction provider. + pub fn new(config: MockApsConfig) -> Self { + Self { config } + } + + /// Generate mock bids for testing. + fn generate_mock_bids(&self, request: &AuctionRequest) -> Vec { + request + .slots + .iter() + .filter_map(|slot| { + // Check fill rate using hash-based pseudo-randomness + if self.config.fill_rate < 1.0 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + slot.id.hash(&mut hasher); + let hash_val = hasher.finish(); + let rand_val = (hash_val % 100) as f64 / 100.0; + + if rand_val >= self.config.fill_rate { + log::debug!("APS Mock: No fill for slot '{}' (fill_rate={})", slot.id, self.config.fill_rate); + return None; + } + } + + // Only bid on banner ads + let banner_format = slot + .formats + .iter() + .find(|f| f.media_type == MediaType::Banner)?; + + // Mock APS typically bids slightly higher than floor + let price = slot + .floor_price + .map(|floor| floor * 1.15) + .unwrap_or(self.config.bid_price); + + Some(Bid { + slot_id: slot.id.clone(), + price, + currency: "USD".to_string(), + creative: format!( + r#"
+
+
Amazon APS
+
Mock Bid: ${:.2} CPM
+
+
"#, + banner_format.width, banner_format.height, price + ), + adomain: Some(vec!["amazon.com".to_string()]), + bidder: "amazon-aps-mock".to_string(), + width: banner_format.width, + height: banner_format.height, + nurl: Some(format!( + "https://mock-aps.amazon.com/win?slot={}&price={}", + slot.id, price + )), + burl: Some(format!( + "https://mock-aps.amazon.com/bill?slot={}&price={}", + slot.id, price + )), + metadata: std::collections::HashMap::new(), + }) + }) + .collect() + } +} + +impl AuctionProvider for MockApsProvider { + fn provider_name(&self) -> &'static str { + "aps_mock" + } + + fn request_bids( + &self, + _request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + // TODO: Implement mock provider support for send_async + // For now, mock providers are disabled when using concurrent requests + log::warn!("APS Mock: Mock providers not yet supported with concurrent requests"); + + Err(Report::new(TrustedServerError::Auction { + message: "Mock providers not yet supported with send_async. Disable auction.enabled or remove mock providers.".to_string(), + })) + } + + fn parse_response( + &self, + _response: fastly::Response, + response_time_ms: u64, + ) -> Result> { + Ok(AuctionResponse::error("aps_mock", response_time_ms)) + } + + fn supports_media_type(&self, media_type: &MediaType) -> bool { + matches!(media_type, MediaType::Banner | MediaType::Video) + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +// ============================================================================ +// Provider Auto-Registration +// ============================================================================ + +use crate::settings::Settings; +use std::sync::Arc; + +/// Auto-register APS providers based on settings configuration. +/// +/// This function checks the settings for both real and mock APS configurations +/// and returns any enabled providers ready for registration with the orchestrator. +pub fn register_providers(settings: &Settings) -> Vec> { + let mut providers: Vec> = Vec::new(); + + // Check for real APS provider configuration + if let Ok(Some(config)) = settings.integration_config::("aps") { + log::info!("Registering real APS provider"); + providers.push(Arc::new(ApsAuctionProvider::new(config))); + } + + // Check for mock APS provider configuration + if let Ok(Some(config)) = settings.integration_config::("aps_mock") { + log::info!("Registering mock APS provider"); + providers.push(Arc::new(MockApsProvider::new(config))); + } + + providers +} diff --git a/crates/common/src/integrations/gam.rs b/crates/common/src/integrations/gam.rs new file mode 100644 index 0000000..25d16ac --- /dev/null +++ b/crates/common/src/integrations/gam.rs @@ -0,0 +1,434 @@ +//! Google Ad Manager (GAM) integration. +//! +//! This module provides both real and mock implementations of the GAM auction provider. +//! GAM acts as a mediation server that receives bids from other providers and makes +//! the final ad selection decision. + +use error_stack::Report; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::Instant; +use validator::Validate; + +use crate::auction::provider::AuctionProvider; +use crate::auction::types::{ + AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus, MediaType, +}; +use crate::error::TrustedServerError; +use crate::settings::IntegrationConfig as IntegrationConfigTrait; + +// ============================================================================ +// Real GAM Provider +// ============================================================================ + +/// Configuration for GAM integration. +#[derive(Debug, Clone, Deserialize, Serialize, Validate)] +pub struct GamConfig { + /// Whether GAM integration is enabled + #[serde(default = "default_enabled")] + pub enabled: bool, + + /// GAM network ID + pub network_id: String, + + /// GAM API endpoint + #[serde(default = "default_endpoint")] + pub endpoint: String, + + /// Timeout in milliseconds + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u32, +} + +impl IntegrationConfigTrait for GamConfig { + fn is_enabled(&self) -> bool { + self.enabled + } +} + +fn default_enabled() -> bool { + false +} + +fn default_endpoint() -> String { + "https://securepubads.g.doubleclick.net/gampad/ads".to_string() +} + +fn default_timeout_ms() -> u32 { + 500 +} + +impl Default for GamConfig { + fn default() -> Self { + Self { + enabled: default_enabled(), + network_id: String::new(), + endpoint: default_endpoint(), + timeout_ms: default_timeout_ms(), + } + } +} + +/// Google Ad Manager provider (acts as mediator). +pub struct GamAuctionProvider { + config: GamConfig, +} + +impl GamAuctionProvider { + /// Create a new GAM auction provider. + pub fn new(config: GamConfig) -> Self { + Self { config } + } +} + +impl AuctionProvider for GamAuctionProvider { + fn provider_name(&self) -> &'static str { + "gam" + } + + fn request_bids( + &self, + request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + log::info!( + "GAM: mediating auction for {} slots (network_id: {})", + request.slots.len(), + self.config.network_id + ); + + // TODO: Implement real GAM API integration with send_async + // + // Implementation steps: + // 1. Extract bidder responses from request context (see gam_mock implementation for example) + // 2. Transform bids to GAM key-value targeting format + // 3. Make HTTP request to GAM ad server using send_async() + // 4. Return PendingRequest + // + // Reference: https://developers.google.com/ad-manager/api/start + + log::warn!("GAM: Real implementation not yet available"); + + Err(Report::new(TrustedServerError::Auction { + message: "GAM integration not yet implemented. Use 'gam_mock' provider for testing." + .to_string(), + })) + } + + fn parse_response( + &self, + _response: fastly::Response, + response_time_ms: u64, + ) -> Result> { + // TODO: Parse GAM response + Ok(AuctionResponse::error("gam", response_time_ms)) + } + + fn supports_media_type(&self, media_type: &MediaType) -> bool { + // GAM supports all media types + matches!( + media_type, + MediaType::Banner | MediaType::Video | MediaType::Native + ) + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +// ============================================================================ +// Mock GAM Provider +// ============================================================================ + +/// Configuration for mock GAM integration. +#[derive(Debug, Clone, Deserialize, Serialize, Validate)] +pub struct MockGamConfig { + /// Whether this mock provider is enabled + #[serde(default = "mock_default_enabled")] + pub enabled: bool, + + /// Timeout in milliseconds + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u32, + + /// Whether GAM should inject its own house ad bids + #[serde(default = "mock_default_inject_house_bids")] + pub inject_house_bids: bool, + + /// House ad bid price (CPM) + #[serde(default = "mock_default_house_bid_price")] + pub house_bid_price: f64, + + /// Percentage chance GAM house ads win (0-100) + /// Used when inject_house_bids is true + #[serde(default = "mock_default_win_rate")] + #[validate(range(min = 0, max = 100))] + pub win_rate: u8, + + /// Simulated network latency in milliseconds + #[serde(default = "mock_default_latency_ms")] + pub latency_ms: u64, +} + +fn mock_default_enabled() -> bool { + false +} + +fn mock_default_inject_house_bids() -> bool { + true +} + +fn mock_default_house_bid_price() -> f64 { + 1.75 +} + +fn mock_default_win_rate() -> u8 { + 30 // GAM wins 30% of the time by default +} + +fn mock_default_latency_ms() -> u64 { + 40 +} + +impl Default for MockGamConfig { + fn default() -> Self { + Self { + enabled: mock_default_enabled(), + timeout_ms: default_timeout_ms(), + inject_house_bids: mock_default_inject_house_bids(), + house_bid_price: mock_default_house_bid_price(), + win_rate: mock_default_win_rate(), + latency_ms: mock_default_latency_ms(), + } + } +} + +impl IntegrationConfigTrait for MockGamConfig { + fn is_enabled(&self) -> bool { + self.enabled + } +} + +/// Mock Google Ad Manager provider (acts as mediator). +pub struct MockGamProvider { + config: MockGamConfig, +} + +impl MockGamProvider { + /// Create a new mock GAM auction provider. + pub fn new(config: MockGamConfig) -> Self { + Self { config } + } + + /// Extract bidder responses from the auction request context. + fn extract_bidder_responses(&self, request: &AuctionRequest) -> Vec { + request + .context + .get("bidder_responses") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default() + } + + /// Simulate GAM's mediation logic. + /// + /// In mock mode: + /// 1. Optionally inject GAM's own house bids + /// 2. Select winning bid per slot based on price + win rate + /// 3. Return selected bids as GAM's response + fn mediate_bids( + &self, + request: &AuctionRequest, + bidder_responses: Vec, + ) -> Vec { + let mut all_bids: HashMap> = HashMap::new(); + + // Collect all bids by slot + for response in bidder_responses { + if response.status != BidStatus::Success { + continue; + } + + for bid in response.bids { + all_bids + .entry(bid.slot_id.clone()) + .or_insert_with(Vec::new) + .push(bid); + } + } + + // Optionally inject GAM house ads + if self.config.inject_house_bids { + for slot in &request.slots { + let banner_format = slot + .formats + .iter() + .find(|f| f.media_type == MediaType::Banner); + + if let Some(format) = banner_format { + let house_bid = Bid { + slot_id: slot.id.clone(), + price: self.config.house_bid_price, + currency: "USD".to_string(), + creative: format!( + r#"
+
+
Google Ad Manager
+
House Ad: ${:.2} CPM
+
+
"#, + format.width, format.height, self.config.house_bid_price + ), + adomain: Some(vec!["google.com".to_string()]), + bidder: "gam-house-mock".to_string(), + width: format.width, + height: format.height, + nurl: Some(format!( + "https://mock-gam.google.com/win?slot={}&price={}", + slot.id, self.config.house_bid_price + )), + burl: None, + metadata: { + let mut meta = HashMap::new(); + meta.insert("house_ad".to_string(), serde_json::json!(true)); + meta + }, + }; + + all_bids + .entry(slot.id.clone()) + .or_insert_with(Vec::new) + .push(house_bid); + } + } + } + + // Select winner for each slot + let mut winning_bids = Vec::new(); + + for (slot_id, mut bids) in all_bids { + if bids.is_empty() { + continue; + } + + // Sort by price descending + bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap()); + + // In mock mode, sometimes prefer GAM house ads even if not highest bid + let winner = if self.config.inject_house_bids { + let has_gam_bid = bids.iter().any(|b| b.bidder == "gam-house-mock"); + + // Use hash-based pseudo-randomness for consistent but realistic win rate simulation + let should_gam_win = { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + slot_id.hash(&mut hasher); + let hash_val = hasher.finish(); + (hash_val % 100) < self.config.win_rate as u64 + }; + + if has_gam_bid && should_gam_win { + bids.iter() + .find(|b| b.bidder == "gam-house-mock") + .cloned() + .unwrap() + } else { + bids[0].clone() + } + } else { + bids[0].clone() + }; + + log::info!( + "GAM Mock mediation: slot '{}' won by '{}' at ${:.2} CPM (from {} bids)", + slot_id, + winner.bidder, + winner.price, + bids.len() + ); + + winning_bids.push(winner); + } + + winning_bids + } +} + +impl AuctionProvider for MockGamProvider { + fn provider_name(&self) -> &'static str { + "gam_mock" + } + + fn request_bids( + &self, + _request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + // TODO: Implement mock provider support for send_async + // For now, mock providers are disabled when using concurrent requests + log::warn!("GAM Mock: Mock providers not yet supported with concurrent requests"); + + Err(Report::new(TrustedServerError::Auction { + message: "Mock providers not yet supported with send_async. Disable auction.enabled or remove mock providers.".to_string(), + })) + } + + fn parse_response( + &self, + _response: fastly::Response, + response_time_ms: u64, + ) -> Result> { + Ok(AuctionResponse::error("gam_mock", response_time_ms)) + } + + fn supports_media_type(&self, media_type: &MediaType) -> bool { + // GAM supports all media types + matches!( + media_type, + MediaType::Banner | MediaType::Video | MediaType::Native + ) + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +// ============================================================================ +// Provider Auto-Registration +// ============================================================================ + +use crate::settings::Settings; +use std::sync::Arc; + +/// Auto-register GAM providers based on settings configuration. +/// +/// This function checks the settings for both real and mock GAM configurations +/// and returns any enabled providers ready for registration with the orchestrator. +pub fn register_providers(settings: &Settings) -> Vec> { + let mut providers: Vec> = Vec::new(); + + // Check for real GAM provider configuration + if let Ok(Some(config)) = settings.integration_config::("gam") { + log::info!("Registering real GAM provider"); + providers.push(Arc::new(GamAuctionProvider::new(config))); + } + + // Check for mock GAM provider configuration + if let Ok(Some(config)) = settings.integration_config::("gam_mock") { + log::info!("Registering mock GAM provider"); + providers.push(Arc::new(MockGamProvider::new(config))); + } + + providers +} diff --git a/crates/common/src/integrations/mod.rs b/crates/common/src/integrations/mod.rs index 888fa5a..4a16478 100644 --- a/crates/common/src/integrations/mod.rs +++ b/crates/common/src/integrations/mod.rs @@ -2,6 +2,8 @@ use crate::settings::Settings; +pub mod aps; +pub mod gam; pub mod nextjs; pub mod permutive; pub mod prebid; diff --git a/crates/common/src/integrations/prebid.rs b/crates/common/src/integrations/prebid.rs index 51b1b9f..3715280 100644 --- a/crates/common/src/integrations/prebid.rs +++ b/crates/common/src/integrations/prebid.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; @@ -11,6 +12,10 @@ use serde_json::{json, Value as Json, Value as JsonValue}; use url::Url; use validator::Validate; +use crate::auction::provider::AuctionProvider; +use crate::auction::types::{ + AuctionContext, AuctionRequest, AuctionResponse, Bid as AuctionBid, MediaType, +}; use crate::backend::ensure_backend_from_url; use crate::constants::{HEADER_SYNTHETIC_FRESH, HEADER_SYNTHETIC_TRUSTED_SERVER}; use crate::creative; @@ -27,7 +32,6 @@ use crate::synthetic::{generate_synthetic_id, get_or_generate_synthetic_id}; const PREBID_INTEGRATION_ID: &str = "prebid"; const ROUTE_FIRST_PARTY_AD: &str = "/first-party/ad"; -const ROUTE_THIRD_PARTY_AD: &str = "/third-party/ad"; #[derive(Debug, Clone, Deserialize, Serialize, Validate)] pub struct PrebidIntegrationConfig { @@ -125,42 +129,6 @@ impl PrebidIntegration { } } - async fn handle_third_party_ad( - &self, - settings: &Settings, - mut req: Request, - ) -> Result> { - let body: AdRequest = serde_json::from_slice(&req.take_body_bytes()).change_context( - TrustedServerError::Prebid { - message: "Failed to parse tsjs auction request".to_string(), - }, - )?; - - log::info!("/third-party/ad: received {} adUnits", body.ad_units.len()); - for unit in &body.ad_units { - if let Some(mt) = &unit.media_types { - if let Some(banner) = &mt.banner { - log::debug!("unit={} sizes={:?}", unit.code, banner.sizes); - } - } - } - - let openrtb = build_openrtb_from_ts(&body, settings, &self.config); - if let Ok(preview) = serde_json::to_string(&openrtb) { - log::debug!( - "OpenRTB payload (truncated): {}", - &preview.chars().take(512).collect::() - ); - } - - req.set_body_json(&openrtb) - .change_context(TrustedServerError::Prebid { - message: "Failed to set OpenRTB body".to_string(), - })?; - - handle_prebid_auction(settings, req, &self.config).await - } - fn handle_script_handler(&self) -> Result> { let body = "// Script overridden by Trusted Server\n"; @@ -269,16 +237,10 @@ impl IntegrationProxy for PrebidIntegration { } fn routes(&self) -> Vec { - let mut routes = vec![ - IntegrationEndpoint::get(ROUTE_FIRST_PARTY_AD), - IntegrationEndpoint::post(ROUTE_THIRD_PARTY_AD), - ]; + let mut routes = vec![IntegrationEndpoint::get(ROUTE_FIRST_PARTY_AD)]; if let Some(script_path) = &self.config.script_handler { - // We need to leak the string to get a 'static str for IntegrationEndpoint - // This is safe because the config lives for the lifetime of the application - let static_path: &'static str = Box::leak(script_path.clone().into_boxed_str()); - routes.push(IntegrationEndpoint::get(static_path)); + routes.push(IntegrationEndpoint::get(script_path.clone())); } routes @@ -299,9 +261,6 @@ impl IntegrationProxy for PrebidIntegration { Method::GET if path == ROUTE_FIRST_PARTY_AD => { self.handle_first_party_ad(settings, req).await } - Method::POST if path == ROUTE_THIRD_PARTY_AD => { - self.handle_third_party_ad(settings, req).await - } _ => Err(Report::new(Self::error(format!( "Unsupported Prebid route: {path}" )))), @@ -1099,8 +1058,8 @@ server_url = "https://prebid.example" let routes = integration.routes(); - // Should have 3 routes: first-party ad, third-party ad, and script handler - assert_eq!(routes.len(), 3); + // Should have 2 routes: first-party ad and script handler + assert_eq!(routes.len(), 2); let has_script_route = routes .iter() @@ -1115,7 +1074,278 @@ server_url = "https://prebid.example" let routes = integration.routes(); - // Should only have 2 routes: first-party ad and third-party ad - assert_eq!(routes.len(), 2); + // Should only have 1 route: first-party ad + assert_eq!(routes.len(), 1); } } + +// ============================================================================ +// Prebid Auction Provider +// ============================================================================ + +/// Prebid Server auction provider. +pub struct PrebidAuctionProvider { + config: PrebidIntegrationConfig, +} + +impl PrebidAuctionProvider { + /// Create a new Prebid auction provider. + pub fn new(config: PrebidIntegrationConfig) -> Self { + Self { config } + } + + /// Convert auction request to OpenRTB format. + fn to_openrtb(&self, request: &AuctionRequest) -> OpenRtbRequest { + let imps: Vec = request + .slots + .iter() + .map(|slot| { + let formats: Vec = slot + .formats + .iter() + .filter(|f| f.media_type == MediaType::Banner) + .map(|f| Format { + w: f.width, + h: f.height, + }) + .collect(); + + let mut bidder = std::collections::HashMap::new(); + for bidder_name in &self.config.bidders { + bidder.insert(bidder_name.clone(), Json::Object(serde_json::Map::new())); + } + + Imp { + id: slot.id.clone(), + banner: Some(Banner { format: formats }), + ext: Some(ImpExt { + prebid: PrebidImpExt { bidder }, + }), + } + }) + .collect(); + + OpenRtbRequest { + id: request.id.clone(), + imp: imps, + site: Some(Site { + domain: Some(request.publisher.domain.clone()), + page: request.publisher.page_url.clone(), + }), + } + } + + /// Parse OpenRTB response into auction response. + fn parse_openrtb_response(&self, json: &Json, response_time_ms: u64) -> AuctionResponse { + let mut bids = Vec::new(); + + if let Some(seatbids) = json.get("seatbid").and_then(|v| v.as_array()) { + for seatbid in seatbids { + let seat = seatbid + .get("seat") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + if let Some(bid_array) = seatbid.get("bid").and_then(|v| v.as_array()) { + for bid_obj in bid_array { + if let Ok(bid) = self.parse_bid(bid_obj, seat) { + bids.push(bid); + } + } + } + } + } + + if bids.is_empty() { + AuctionResponse::no_bid("prebid", response_time_ms) + } else { + AuctionResponse::success("prebid", bids, response_time_ms) + } + } + + /// Parse a single bid from OpenRTB response. + fn parse_bid(&self, bid_obj: &Json, seat: &str) -> Result { + let slot_id = bid_obj + .get("impid") + .and_then(|v| v.as_str()) + .ok_or(())? + .to_string(); + + let price = bid_obj.get("price").and_then(|v| v.as_f64()).ok_or(())?; + + let creative = bid_obj + .get("adm") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let width = bid_obj.get("w").and_then(|v| v.as_u64()).unwrap_or(300) as u32; + let height = bid_obj.get("h").and_then(|v| v.as_u64()).unwrap_or(250) as u32; + + let nurl = bid_obj + .get("nurl") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let burl = bid_obj + .get("burl") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let adomain = bid_obj + .get("adomain") + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + }); + + Ok(AuctionBid { + slot_id, + price, + currency: "USD".to_string(), + creative, + adomain, + bidder: seat.to_string(), + width, + height, + nurl, + burl, + metadata: std::collections::HashMap::new(), + }) + } +} + +impl AuctionProvider for PrebidAuctionProvider { + fn provider_name(&self) -> &'static str { + "prebid" + } + + fn request_bids( + &self, + request: &AuctionRequest, + _context: &AuctionContext<'_>, + ) -> Result> { + log::info!("Prebid: requesting bids for {} slots", request.slots.len()); + + // Convert to OpenRTB + let openrtb = self.to_openrtb(request); + let mut openrtb_json = + serde_json::to_value(&openrtb).change_context(TrustedServerError::Prebid { + message: "Failed to serialize OpenRTB request".to_string(), + })?; + + // Enhance with user info + if !openrtb_json["user"].is_object() { + openrtb_json["user"] = json!({}); + } + openrtb_json["user"]["id"] = json!(&request.user.id); + if !openrtb_json["user"]["ext"].is_object() { + openrtb_json["user"]["ext"] = json!({}); + } + openrtb_json["user"]["ext"]["synthetic_fresh"] = json!(&request.user.fresh_id); + + // Add device info if available + if let Some(device) = &request.device { + if let Some(geo) = &device.geo { + let geo_obj = json!({ + "type": 2, + "country": geo.country, + "city": geo.city, + "region": geo.region, + }); + if !openrtb_json["device"].is_object() { + openrtb_json["device"] = json!({}); + } + openrtb_json["device"]["geo"] = geo_obj; + } + } + + // Create HTTP request + let mut pbs_req = Request::new( + Method::POST, + format!("{}/openrtb2/auction", self.config.server_url), + ); + + pbs_req + .set_body_json(&openrtb_json) + .change_context(TrustedServerError::Prebid { + message: "Failed to set request body".to_string(), + })?; + + // Send request asynchronously + let backend_name = ensure_backend_from_url(&self.config.server_url)?; + let pending = + pbs_req + .send_async(backend_name) + .change_context(TrustedServerError::Prebid { + message: "Failed to send async request to Prebid Server".to_string(), + })?; + + Ok(pending) + } + + fn parse_response( + &self, + mut response: fastly::Response, + response_time_ms: u64, + ) -> Result> { + // Parse response + if !response.get_status().is_success() { + log::warn!( + "Prebid returned non-success status: {}", + response.get_status() + ); + return Ok(AuctionResponse::error("prebid", response_time_ms)); + } + + let body_bytes = response.take_body_bytes(); + let response_json: Json = + serde_json::from_slice(&body_bytes).change_context(TrustedServerError::Prebid { + message: "Failed to parse Prebid response".to_string(), + })?; + + let auction_response = self.parse_openrtb_response(&response_json, response_time_ms); + + log::info!( + "Prebid returned {} bids in {}ms", + auction_response.bids.len(), + response_time_ms + ); + + Ok(auction_response) + } + + fn supports_media_type(&self, media_type: &MediaType) -> bool { + matches!(media_type, MediaType::Banner) + } + + fn timeout_ms(&self) -> u32 { + self.config.timeout_ms + } + + fn is_enabled(&self) -> bool { + self.config.enabled + } +} + +// ============================================================================ +// Provider Auto-Registration +// ============================================================================ + +/// Auto-register Prebid provider based on settings configuration. +/// +/// This function checks the settings for Prebid configuration and returns +/// the provider if enabled. +pub fn register_auction_provider(settings: &Settings) -> Vec> { + let mut providers: Vec> = Vec::new(); + + // Prebid provider is always registered if integration is enabled + if let Ok(Some(config)) = settings.integration_config::("prebid") { + log::info!("Registering Prebid auction provider"); + providers.push(Arc::new(PrebidAuctionProvider::new(config))); + } + + providers +} diff --git a/crates/common/src/integrations/registry.rs b/crates/common/src/integrations/registry.rs index 9db8033..573c874 100644 --- a/crates/common/src/integrations/registry.rs +++ b/crates/common/src/integrations/registry.rs @@ -93,52 +93,55 @@ pub struct IntegrationScriptContext<'a> { #[derive(Clone, Debug)] pub struct IntegrationEndpoint { pub method: Method, - pub path: &'static str, + pub path: String, } impl IntegrationEndpoint { #[must_use] - pub fn new(method: Method, path: &'static str) -> Self { - Self { method, path } + pub fn new(method: Method, path: impl Into) -> Self { + Self { + method, + path: path.into(), + } } #[must_use] - pub fn get(path: &'static str) -> Self { + pub fn get(path: impl Into) -> Self { Self { method: Method::GET, - path, + path: path.into(), } } #[must_use] - pub fn post(path: &'static str) -> Self { + pub fn post(path: impl Into) -> Self { Self { method: Method::POST, - path, + path: path.into(), } } #[must_use] - pub fn put(path: &'static str) -> Self { + pub fn put(path: impl Into) -> Self { Self { method: Method::PUT, - path, + path: path.into(), } } #[must_use] - pub fn delete(path: &'static str) -> Self { + pub fn delete(path: impl Into) -> Self { Self { method: Method::DELETE, - path, + path: path.into(), } } #[must_use] - pub fn patch(path: &'static str) -> Self { + pub fn patch(path: impl Into) -> Self { Self { method: Method::PATCH, - path, + path: path.into(), } } } @@ -171,7 +174,7 @@ pub trait IntegrationProxy: Send + Sync { /// ``` fn get(&self, path: &str) -> IntegrationEndpoint { let full_path = format!("/integrations/{}{}", self.integration_name(), path); - IntegrationEndpoint::get(Box::leak(full_path.into_boxed_str())) + IntegrationEndpoint::get(full_path) } /// Helper to create a namespaced POST endpoint. @@ -183,7 +186,7 @@ pub trait IntegrationProxy: Send + Sync { /// ``` fn post(&self, path: &str) -> IntegrationEndpoint { let full_path = format!("/integrations/{}{}", self.integration_name(), path); - IntegrationEndpoint::post(Box::leak(full_path.into_boxed_str())) + IntegrationEndpoint::post(full_path) } /// Helper to create a namespaced PUT endpoint. @@ -195,7 +198,7 @@ pub trait IntegrationProxy: Send + Sync { /// ``` fn put(&self, path: &str) -> IntegrationEndpoint { let full_path = format!("/integrations/{}{}", self.integration_name(), path); - IntegrationEndpoint::put(Box::leak(full_path.into_boxed_str())) + IntegrationEndpoint::put(full_path) } /// Helper to create a namespaced DELETE endpoint. @@ -203,11 +206,11 @@ pub trait IntegrationProxy: Send + Sync { /// /// # Example /// ```ignore - /// self.delete("/users") // becomes /integrations/my_integration/users + /// self.delete("/users/123") // becomes /integrations/my_integration/users/123 /// ``` fn delete(&self, path: &str) -> IntegrationEndpoint { let full_path = format!("/integrations/{}{}", self.integration_name(), path); - IntegrationEndpoint::delete(Box::leak(full_path.into_boxed_str())) + IntegrationEndpoint::delete(full_path) } /// Helper to create a namespaced PATCH endpoint. @@ -215,11 +218,11 @@ pub trait IntegrationProxy: Send + Sync { /// /// # Example /// ```ignore - /// self.patch("/users") // becomes /integrations/my_integration/users + /// self.patch("/settings") // becomes /integrations/my_integration/settings /// ``` fn patch(&self, path: &str) -> IntegrationEndpoint { let full_path = format!("/integrations/{}{}", self.integration_name(), path); - IntegrationEndpoint::patch(Box::leak(full_path.into_boxed_str())) + IntegrationEndpoint::patch(full_path) } } @@ -379,7 +382,7 @@ impl IntegrationRegistry { let matchit_path = if route.path.ends_with("/*") { format!("{}/{{*rest}}", route.path.strip_suffix("/*").unwrap()) } else { - route.path.to_string() + route.path.clone() }; // Select appropriate router and insert @@ -501,9 +504,10 @@ impl IntegrationRegistry { let entry = map .entry(*integration_id) .or_insert_with(|| IntegrationMetadata::new(integration_id)); - entry - .routes - .push(IntegrationEndpoint::new(route.method.clone(), route.path)); + entry.routes.push(IntegrationEndpoint::new( + route.method.clone(), + route.path.clone(), + )); } for rewriter in &self.inner.html_rewriters { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index fc0b888..9ed1362 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -21,6 +21,8 @@ //! - [`test_support`]: Testing utilities and mocks //! - [`why`]: Debugging and introspection utilities +pub mod auction; +pub mod auction_config_types; pub mod auth; pub mod backend; pub mod constants; diff --git a/crates/common/src/settings.rs b/crates/common/src/settings.rs index 25149d9..7f13f80 100644 --- a/crates/common/src/settings.rs +++ b/crates/common/src/settings.rs @@ -11,12 +11,13 @@ use std::sync::OnceLock; use url::Url; use validator::{Validate, ValidationError}; +use crate::auction_config_types::AuctionConfig; use crate::error::TrustedServerError; pub const ENVIRONMENT_VARIABLE_PREFIX: &str = "TRUSTED_SERVER"; pub const ENVIRONMENT_VARIABLE_SEPARATOR: &str = "__"; -#[derive(Debug, Default, Deserialize, Serialize, Validate)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, Validate)] pub struct Publisher { pub domain: String, pub cookie_domain: String, @@ -55,7 +56,7 @@ impl Publisher { } } -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct IntegrationSettings { #[serde(flatten)] entries: HashMap, @@ -157,7 +158,7 @@ impl DerefMut for IntegrationSettings { } #[allow(unused)] -#[derive(Debug, Default, Deserialize, Serialize, Validate)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, Validate)] pub struct Synthetic { pub counter_store: String, pub opid_store: String, @@ -176,7 +177,7 @@ impl Synthetic { } } -#[derive(Debug, Default, Deserialize, Serialize, Validate)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, Validate)] pub struct Rewrite { /// List of domains to exclude from rewriting. Supports wildcards (e.g., "*.example.com"). /// URLs from these domains will not be proxied through first-party endpoints. @@ -211,7 +212,7 @@ impl Rewrite { } } -#[derive(Debug, Default, Deserialize, Serialize, Validate)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, Validate)] pub struct Handler { #[validate(length(min = 1), custom(function = validate_path))] pub path: String, @@ -236,7 +237,7 @@ impl Handler { } } -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct RequestSigning { #[serde(default = "default_request_signing_enabled")] pub enabled: bool, @@ -248,7 +249,7 @@ fn default_request_signing_enabled() -> bool { false } -#[derive(Debug, Default, Deserialize, Serialize, Validate)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, Validate)] pub struct Settings { #[validate(nested)] pub publisher: Publisher, @@ -266,6 +267,8 @@ pub struct Settings { #[serde(default)] #[validate(nested)] pub rewrite: Rewrite, + #[serde(default)] + pub auction: AuctionConfig, } #[allow(unused)] diff --git a/crates/common/src/settings_data.rs b/crates/common/src/settings_data.rs index 7061ca2..f02eaca 100644 --- a/crates/common/src/settings_data.rs +++ b/crates/common/src/settings_data.rs @@ -5,6 +5,8 @@ use validator::Validate; use crate::error::TrustedServerError; use crate::settings::Settings; +pub use crate::auction_config_types::AuctionConfig; + const SETTINGS_DATA: &[u8] = include_bytes!("../../../target/trusted-server-out.toml"); /// Creates a new [`Settings`] instance from the embedded configuration file. diff --git a/crates/fastly/src/main.rs b/crates/fastly/src/main.rs index 69c0b2b..105d45b 100644 --- a/crates/fastly/src/main.rs +++ b/crates/fastly/src/main.rs @@ -3,6 +3,7 @@ use fastly::http::Method; use fastly::{Error, Request, Response}; use log_fastly::Logger; +use trusted_server_common::auction::{handle_auction, init_orchestrator}; use trusted_server_common::auth::enforce_basic_auth; use trusted_server_common::error::TrustedServerError; use trusted_server_common::integrations::IntegrationRegistry; @@ -32,6 +33,10 @@ fn main(req: Request) -> Result { } }; log::info!("Settings {settings:?}"); + + // Initialize the auction orchestrator once at startup + init_orchestrator(&settings); + let integration_registry = IntegrationRegistry::new(&settings); futures::executor::block_on(route_request(settings, integration_registry, req)) @@ -72,6 +77,9 @@ async fn route_request( (Method::POST, "/admin/keys/rotate") => handle_rotate_key(&settings, req), (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(&settings, req), + // Auction endpoints (top-level, not an integration) + (Method::POST, "/third-party/ad") => handle_auction(&settings, req).await, + // tsjs endpoints (Method::GET, "/first-party/proxy") => handle_first_party_proxy(&settings, req).await, (Method::GET, "/first-party/click") => handle_first_party_click(&settings, req).await, diff --git a/trusted-server.toml b/trusted-server.toml index 6da5e63..d1442c0 100644 --- a/trusted-server.toml +++ b/trusted-server.toml @@ -68,3 +68,42 @@ rewrite_sdk = true # exclude_domains = [ # "*.edgecompute.app", # ] + +# Auction orchestration configuration +[auction] +enabled = true +strategy = "parallel_mediation" # Options: "parallel_mediation", "parallel_only", "waterfall" +bidders = ["prebid", "aps_mock"] # Use mock providers for testing +mediator = "gam_mock" +timeout_ms = 2000 + +# Mock APS Configuration (for testing) +[integrations.aps_mock] +enabled = true +timeout_ms = 800 +bid_price = 2.50 +latency_ms = 80 +fill_rate = 1.0 # 1.0 = always bid, 0.5 = bid 50% of the time + +# Real APS Configuration (disabled by default - enable when real integration is ready) +[integrations.aps] +enabled = false +pub_id = "your-aps-publisher-id" +endpoint = "https://aax.amazon-adsystem.com/e/dtb/bid" +timeout_ms = 800 + +# Mock GAM Configuration (for testing) +[integrations.gam_mock] +enabled = true +timeout_ms = 500 +inject_house_bids = true +house_bid_price = 1.75 +win_rate = 30 # Percentage (0-100) - GAM house ads win this % of time +latency_ms = 40 + +# Real GAM Configuration (disabled by default - enable when real integration is ready) +[integrations.gam] +enabled = false +network_id = "your-gam-network-id" +endpoint = "https://securepubads.g.doubleclick.net/gampad/ads" +timeout_ms = 500