Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conf/openmetadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ elasticsearch:
payLoadSize: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760}
searchIndexMappingLanguage : ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
searchIndexFactoryClassName : org.openmetadata.service.search.SearchIndexFactory
# AWS IAM Authentication Configuration (for AWS OpenSearch or OpenSearch Serverless)
aws:
useIamAuth: ${OPENSEARCH_USE_IAM_AUTH:-false} # Enable AWS IAM authentication
region: ${AWS_REGION:-us-east-1} # AWS region (required when useIamAuth is true)
serviceName: ${AWS_SERVICE_NAME:-es} # Service name: "es" for OpenSearch, "aoss" for OpenSearch Serverless
# Optional: Explicit AWS credentials (if not provided, uses default credential chain)
accessKeyId: ${AWS_ACCESS_KEY_ID:-""}
secretAccessKey: ${AWS_SECRET_ACCESS_KEY:-""}
sessionToken: ${AWS_SESSION_TOKEN:-""} # For temporary credentials
naturalLanguageSearch:
enabled: false
embeddingProvider: ${EMBEDDING_PROVIDER:-bedrock}
Expand Down
4 changes: 4 additions & 0 deletions openmetadata-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
</dependency>
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
import os.org.opensearch.client.opensearch.core.bulk.BulkOperation;
import os.org.opensearch.client.opensearch.nodes.NodesStatsResponse;
import os.org.opensearch.client.transport.rest_client.RestClientTransport;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;

@Slf4j
// Not tagged with Repository annotation as it is programmatically initialized
Expand Down Expand Up @@ -599,19 +605,17 @@ private RestClientBuilder getLowLevelRestClient(ElasticSearchConfiguration esCon
httpAsyncClientBuilder.setMaxConnPerRoute(esConfig.getMaxConnPerRoute());
}

// Configure authentication if provided
// Configure authentication if provided: prioritize username/password over AWS IAM
if (StringUtils.isNotEmpty(esConfig.getUsername())
&& StringUtils.isNotEmpty(esConfig.getPassword())) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
esConfig.getUsername(), esConfig.getPassword()));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
configureBasicAuthentication(httpAsyncClientBuilder, esConfig);
} else if (esConfig.getAws() != null
&& Boolean.TRUE.equals(esConfig.getAws().getUseIamAuth())) {
configureAwsIamAuthentication(httpAsyncClientBuilder, esConfig);
}

// Configure SSL if needed
SSLContext sslContext = null;
SSLContext sslContext;
try {
sslContext = createElasticSearchSSLContext(esConfig);
} catch (KeyStoreException e) {
Expand Down Expand Up @@ -835,4 +839,85 @@ public SearchSchemaEntityRelationshipResult getSchemaEntityRelationship(
return entityManager.getSchemaEntityRelationship(
schemaFqn, queryFilter, includeSourceFields, offset, limit, from, size, deleted);
}

/**
* Configures AWS IAM authentication for OpenSearch using SigV4 request signing.
*
* @param httpAsyncClientBuilder The HTTP client builder to configure
* @param esConfig The ElasticSearch configuration containing AWS credentials
*/
private void configureAwsIamAuthentication(
org.apache.http.impl.nio.client.HttpAsyncClientBuilder httpAsyncClientBuilder,
ElasticSearchConfiguration esConfig) {
if (StringUtils.isEmpty(esConfig.getAws().getRegion())) {
LOG.error("AWS region is required for IAM authentication but was not provided");
return;
}

try {
Region region = Region.of(esConfig.getAws().getRegion());
String serviceName =
esConfig.getAws().getServiceName() != null
? esConfig.getAws().getServiceName().value()
: "es";

AwsCredentialsProvider credentialsProvider = createAwsCredentialsProvider(esConfig);

httpAsyncClientBuilder.addInterceptorLast(
new SigV4RequestSigningInterceptor(credentialsProvider, region, serviceName));

LOG.info(
"AWS IAM authentication configured for OpenSearch with region: {} and service: {}",
region,
serviceName);
} catch (Exception e) {
LOG.error("Failed to configure AWS IAM authentication", e);
}
}

/**
* Creates AWS credentials provider based on the configuration. Supports explicit credentials with
* optional session token, or falls back to default credential chain.
*
* @param esConfig The ElasticSearch configuration containing AWS credentials
* @return AwsCredentialsProvider instance
*/
private AwsCredentialsProvider createAwsCredentialsProvider(ElasticSearchConfiguration esConfig) {
if (StringUtils.isNotEmpty(esConfig.getAws().getAccessKeyId())
&& StringUtils.isNotEmpty(esConfig.getAws().getSecretAccessKey())) {
if (StringUtils.isNotEmpty(esConfig.getAws().getSessionToken())) {
// Use session credentials for temporary access
return StaticCredentialsProvider.create(
AwsSessionCredentials.create(
esConfig.getAws().getAccessKeyId(),
esConfig.getAws().getSecretAccessKey(),
esConfig.getAws().getSessionToken()));
} else {
// Use basic credentials for long-term access
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(
esConfig.getAws().getAccessKeyId(), esConfig.getAws().getSecretAccessKey()));
}
} else {
// Use default credential chain (IAM role, environment variables, etc.)
return DefaultCredentialsProvider.create();
}
}

/**
* Configures basic authentication for OpenSearch using username and password.
*
* @param httpAsyncClientBuilder The HTTP client builder to configure
* @param esConfig The ElasticSearch configuration containing username and password
*/
private void configureBasicAuthentication(
org.apache.http.impl.nio.client.HttpAsyncClientBuilder httpAsyncClientBuilder,
ElasticSearchConfiguration esConfig) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
LOG.info("Using basic authentication with username/password for OpenSearch");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.openmetadata.service.search.opensearch;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.RequestLine;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.protocol.HttpContext;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

@Slf4j
public class SigV4RequestSigningInterceptor implements HttpRequestInterceptor {
private final AwsCredentialsProvider credentialsProvider;
private final Aws4Signer signer;
private final Region region;
private final String serviceName;

public SigV4RequestSigningInterceptor(
AwsCredentialsProvider credentialsProvider, Region region, String serviceName) {
this.credentialsProvider = credentialsProvider;
this.region = region;
this.serviceName = serviceName;
this.signer = Aws4Signer.create();
}

@Override
public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
RequestLine requestLine = request.getRequestLine();
String method = requestLine.getMethod();
URI uri = URI.create(requestLine.getUri());

Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : request.getAllHeaders()) {
headers.computeIfAbsent(header.getName(), k -> new ArrayList<>()).add(header.getValue());
}

final InputStream contentStream;
if (request instanceof HttpEntityEnclosingRequest entityRequest) {
if (entityRequest.getEntity() != null) {
contentStream = entityRequest.getEntity().getContent();
} else {
contentStream = null;
}
} else {
contentStream = null;
}

SdkHttpFullRequest.Builder requestBuilder =
SdkHttpFullRequest.builder()
.method(SdkHttpMethod.fromValue(method))
.uri(uri)
.headers(headers);

if (contentStream != null) {
requestBuilder.contentStreamProvider(() -> contentStream);
}

SdkHttpFullRequest sdkRequest = requestBuilder.build();

Aws4SignerParams signerParams =
Aws4SignerParams.builder()
.awsCredentials(credentialsProvider.resolveCredentials())
.signingName(serviceName)
.signingRegion(region)
.build();

SdkHttpFullRequest signedRequest = signer.sign(sdkRequest, signerParams);

request.removeHeaders("Authorization");
request.removeHeaders("X-Amz-Date");
request.removeHeaders("X-Amz-Security-Token");

signedRequest
.headers()
.forEach(
(headerName, headerValues) -> {
for (String headerValue : headerValues) {
request.addHeader(headerName, headerValue);
}
});

if (request instanceof HttpEntityEnclosingRequest entityRequest
&& entityRequest.getEntity() != null) {
if (signedRequest.contentStreamProvider().isPresent()) {
try {
InputStream signedStream = signedRequest.contentStreamProvider().get().newStream();
BasicHttpEntity entity = new BasicHttpEntity();
entity.setContent(signedStream);
entityRequest.setEntity(entity);
} catch (Exception e) {
LOG.error("Error setting signed entity", e);
}
}
}
}
}
Loading
Loading