Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/quickwit-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ release-feature-set = [
"quickwit-indexing/pulsar",
"quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-serve/lambda",
"quickwit-storage/azure",
"quickwit-storage/gcs",
"quickwit-metastore/postgres",
Expand All @@ -114,6 +115,7 @@ release-feature-vendored-set = [
"quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-indexing/vendored-kafka",
"quickwit-serve/lambda",
"quickwit-storage/azure",
"quickwit-storage/gcs",
"quickwit-metastore/postgres",
Expand All @@ -126,6 +128,7 @@ release-macos-feature-vendored-set = [
"quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-indexing/vendored-kafka-macos",
"quickwit-serve/lambda",
"quickwit-storage/azure",
"quickwit-storage/gcs",
"quickwit-metastore/postgres",
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-lambda-client/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! - Old versions are garbage collected (keep current + top 5 most recent)

use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::sync::OnceLock;

use anyhow::{Context, anyhow};
use aws_sdk_lambda::Client as LambdaClient;
Expand Down Expand Up @@ -108,7 +108,7 @@ fn version_description(deploy_config_opt: Option<&LambdaDeployConfig>) -> String
/// ensuring the deployed Lambda matches the embedded binary.
pub async fn try_get_or_deploy_invoker(
lambda_config: &LambdaConfig,
) -> anyhow::Result<Arc<dyn LambdaLeafSearchInvoker>> {
) -> anyhow::Result<impl LambdaLeafSearchInvoker> {
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = LambdaClient::new(&aws_config);
let function_name = &lambda_config.function_name;
Expand Down
58 changes: 51 additions & 7 deletions quickwit/quickwit-lambda-client/src/invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,65 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use aws_sdk_lambda::Client as LambdaClient;
use aws_sdk_lambda::error::{DisplayErrorContext, SdkError};
use aws_sdk_lambda::operation::invoke::InvokeError;
use aws_sdk_lambda::primitives::Blob;
use aws_sdk_lambda::types::InvocationType;
use base64::prelude::*;
use prost::Message;
use quickwit_common::retry::{RetryParams, retry};
use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload};
use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest};
use quickwit_search::{LambdaLeafSearchInvoker, SearchError};
use tracing::{debug, info, instrument};

use crate::metrics::LAMBDA_METRICS;

/// Maps an AWS Lambda SDK invocation error to a `SearchError`.
fn invoke_error_to_search_error(error: SdkError<InvokeError>) -> SearchError {
if let SdkError::ServiceError(ref service_error) = error
&& matches!(
service_error.err(),
InvokeError::TooManyRequestsException(_)
| InvokeError::EniLimitReachedException(_)
| InvokeError::SubnetIpAddressLimitReachedException(_)
| InvokeError::Ec2ThrottledException(_)
| InvokeError::ResourceConflictException(_)
)
{
return SearchError::TooManyRequests;
}

let is_timeout = match &error {
SdkError::TimeoutError(_) => true,
SdkError::DispatchFailure(failure) => failure.is_timeout(),
SdkError::ServiceError(service_error) => matches!(
service_error.err(),
InvokeError::EfsMountTimeoutException(_) | InvokeError::SnapStartTimeoutException(_)
),
_ => false,
};

let error_msg = format!("lambda invocation failed: {}", DisplayErrorContext(&error));

if is_timeout {
SearchError::Timeout(error_msg)
} else {
SearchError::Internal(error_msg)
}
}

/// Create a Lambda invoker for a specific version.
///
/// The version number is used as the qualifier when invoking, ensuring we call
/// the exact published version (not $LATEST).
pub(crate) async fn create_lambda_invoker_for_version(
function_name: String,
version: String,
) -> anyhow::Result<Arc<dyn LambdaLeafSearchInvoker>> {
) -> anyhow::Result<AwsLambdaInvoker> {
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = LambdaClient::new(&aws_config);
let invoker = AwsLambdaInvoker {
Expand All @@ -44,11 +79,11 @@ pub(crate) async fn create_lambda_invoker_for_version(
version,
};
invoker.validate().await?;
Ok(Arc::new(invoker))
Ok(invoker)
}

/// AWS Lambda implementation of RemoteFunctionInvoker.
struct AwsLambdaInvoker {
pub(crate) struct AwsLambdaInvoker {
client: LambdaClient,
function_name: String,
/// The version number to invoke (e.g., "7", "12").
Expand Down Expand Up @@ -79,6 +114,12 @@ impl AwsLambdaInvoker {
}
}

const LAMBDA_RETRY_PARAMS: RetryParams = RetryParams {
base_delay: std::time::Duration::from_secs(1),
max_delay: std::time::Duration::from_secs(10),
max_attempts: 3,
};

#[async_trait]
impl LambdaLeafSearchInvoker for AwsLambdaInvoker {
#[instrument(skip(self, request), fields(function_name = %self.function_name, version = %self.version))]
Expand All @@ -88,7 +129,10 @@ impl LambdaLeafSearchInvoker for AwsLambdaInvoker {
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
let start = std::time::Instant::now();

let result = self.invoke_leaf_search_inner(request).await;
let result = retry(&LAMBDA_RETRY_PARAMS, || {
self.invoke_leaf_search_inner(request.clone())
})
.await;

let elapsed = start.elapsed().as_secs_f64();
let status = if result.is_ok() { "success" } else { "error" };
Expand Down Expand Up @@ -141,7 +185,7 @@ impl AwsLambdaInvoker {
let response = invoke_builder
.send()
.await
.map_err(|e| SearchError::Internal(format!("Lambda invocation error: {}", e)))?;
.map_err(invoke_error_to_search_error)?;

// Check for function error
if let Some(error) = response.function_error() {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-lambda-server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ impl LambdaSearcherContext {
info!("initializing lambda searcher context");

let searcher_config = try_searcher_config_from_env()?;
let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None));
let searcher_context =
Arc::new(SearcherContext::new_without_invoker(searcher_config, None));
let storage_resolver = StorageResolver::configured(&Default::default());

Ok(Self {
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use itertools::Itertools;
use quickwit_common::rate_limited_error;
use quickwit_common::retry::Retryable;
use quickwit_doc_mapper::QueryParserError;
use quickwit_proto::error::grpc_error_to_grpc_status;
use quickwit_proto::metastore::{EntityKind, MetastoreError};
Expand Down Expand Up @@ -175,6 +176,12 @@ impl From<MetastoreError> for SearchError {
}
}

impl Retryable for SearchError {
fn is_retryable(&self) -> bool {
matches!(self, SearchError::TooManyRequests | SearchError::Timeout(_))
}
}

impl From<JoinError> for SearchError {
fn from(join_error: JoinError) -> SearchError {
SearchError::Internal(format!("spawned task in root join failed: {join_error}"))
Expand Down
24 changes: 24 additions & 0 deletions quickwit/quickwit-search/src/invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,27 @@ pub trait LambdaLeafSearchInvoker: Send + Sync + 'static {
request: LeafSearchRequest,
) -> Result<Vec<LambdaSingleSplitResult>, SearchError>;
}

#[async_trait]
impl<T> LambdaLeafSearchInvoker for Box<T>
where T: LambdaLeafSearchInvoker + ?Sized
{
async fn invoke_leaf_search(
&self,
request: LeafSearchRequest,
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
(**self).invoke_leaf_search(request).await
}
}

#[async_trait]
impl<T> LambdaLeafSearchInvoker for std::sync::Arc<T>
where T: LambdaLeafSearchInvoker + ?Sized
{
async fn invoke_leaf_search(
&self,
request: LeafSearchRequest,
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
(**self).invoke_leaf_search(request).await
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub async fn single_node_search(
let search_job_placer = SearchJobPlacer::new(searcher_pool.clone());
let cluster_client = ClusterClient::new(search_job_placer);
let searcher_config = SearcherConfig::default();
let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None));
let searcher_context = Arc::new(SearcherContext::new_without_invoker(searcher_config, None));
let search_service = Arc::new(SearchServiceImpl::new(
metastore.clone(),
storage_resolver,
Expand Down
19 changes: 17 additions & 2 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,26 @@ impl SearcherContext {
#[cfg(test)]
pub fn for_test() -> SearcherContext {
let searcher_config = SearcherConfig::default();
SearcherContext::new(searcher_config, None, None)
SearcherContext::new_without_invoker(searcher_config, None)
}

/// Creates a new searcher context without a lambda invoker.
pub fn new_without_invoker(
searcher_config: SearcherConfig,
split_cache_opt: Option<Arc<SplitCache>>,
) -> Self {
Self::new(
searcher_config,
split_cache_opt,
None::<Box<dyn LambdaLeafSearchInvoker>>,
)
}

/// Creates a new searcher context, given a searcher config, and an optional `SplitCache`.
pub fn new(
searcher_config: SearcherConfig,
split_cache_opt: Option<Arc<SplitCache>>,
lambda_invoker: Option<Arc<dyn LambdaLeafSearchInvoker>>,
lambda_invoker: Option<impl LambdaLeafSearchInvoker + 'static>,
) -> Self {
let global_split_footer_cache = MemorySizedCache::from_config(
&searcher_config.split_footer_cache,
Expand All @@ -463,6 +475,9 @@ impl SearcherContext {
Some(searcher_config.aggregation_bucket_limit),
);

let lambda_invoker =
lambda_invoker.map(|invoker| Arc::new(invoker) as Arc<dyn LambdaLeafSearchInvoker>);

Self {
searcher_config,
fast_fields_cache: storage_long_term_cache,
Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,10 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
max_hits: 100,
..Default::default()
});
let searcher_context: Arc<SearcherContext> =
Arc::new(SearcherContext::new(SearcherConfig::default(), None, None));
let searcher_context: Arc<SearcherContext> = Arc::new(SearcherContext::new_without_invoker(
SearcherConfig::default(),
None,
));

let search_response = single_doc_mapping_leaf_search(
searcher_context,
Expand Down Expand Up @@ -1666,7 +1668,10 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
.into_iter()
.map(|split| extract_split_and_footer_offsets(&split.split_metadata))
.collect();
let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None, None));
let searcher_context = Arc::new(SearcherContext::new_without_invoker(
SearcherConfig::default(),
None,
));

{
let request = ListTermsRequest {
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ quickwit-opentelemetry = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-query = { workspace = true }
quickwit-search = { workspace = true }
quickwit-lambda-client = { workspace = true }
quickwit-lambda-client = { workspace = true, optional = true }
quickwit-storage = { workspace = true }
quickwit-telemetry = { workspace = true }

Expand Down Expand Up @@ -115,3 +115,6 @@ sqs-for-tests = [
"quickwit-indexing/sqs",
"quickwit-indexing/sqs-test-helpers"
]
lambda = [
"quickwit-lambda-client"
]
45 changes: 30 additions & 15 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,25 +631,38 @@ pub async fn serve_quickwit(
};

// Initialize Lambda invoker if enabled and searcher service is running
let lambda_invoker_opt = if node_config.is_service_enabled(QuickwitService::Searcher) {
let searcher_context = if node_config.is_service_enabled(QuickwitService::Searcher) {
if let Some(lambda_config) = &node_config.searcher_config.lambda {
info!("initializing AWS Lambda invoker for search");
warn!("offloading to lambda is EXPERIMENTAL. Use at your own risk");
let invoker = quickwit_lambda_client::try_get_or_deploy_invoker(lambda_config).await?;
Some(invoker)
#[cfg(feature = "lambda")]
{
info!("initializing AWS Lambda invoker for search");
warn!("offloading to lambda is EXPERIMENTAL. Use at your own risk");
let invoker =
quickwit_lambda_client::try_get_or_deploy_invoker(lambda_config).await?;
Arc::new(SearcherContext::new(
node_config.searcher_config.clone(),
split_cache_opt,
Some(invoker),
))
}
#[cfg(not(feature = "lambda"))]
{
let _ = lambda_config;
bail!("lambda support is statically disabled, but enabled in configuration");
}
} else {
None
Arc::new(SearcherContext::new_without_invoker(
node_config.searcher_config.clone(),
split_cache_opt,
))
}
} else {
None
Arc::new(SearcherContext::new_without_invoker(
node_config.searcher_config.clone(),
split_cache_opt,
))
};

let searcher_context = Arc::new(SearcherContext::new(
node_config.searcher_config.clone(),
split_cache_opt,
lambda_invoker_opt,
));

let (search_job_placer, search_service) = setup_searcher(
&node_config,
cluster.change_stream(),
Expand Down Expand Up @@ -1569,8 +1582,10 @@ mod tests {
#[tokio::test]
async fn test_setup_searcher() {
let node_config = NodeConfig::for_test();
let searcher_context =
Arc::new(SearcherContext::new(SearcherConfig::default(), None, None));
let searcher_context = Arc::new(SearcherContext::new_without_invoker(
SearcherConfig::default(),
None,
));
let metastore = metastore_for_test();
let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded();
let storage_resolver = StorageResolver::unconfigured();
Expand Down