diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java new file mode 100644 index 00000000..1fe04373 --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java @@ -0,0 +1,168 @@ +package dev.braintrust.trace; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import dev.braintrust.json.BraintrustJsonMapper; +import java.util.Base64; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** + * Scans JSON content for base64 data URI attachments and replaces them with attachment references + * after uploading to S3. + * + *

Package-private; not exposed in the public API. + */ +@Slf4j +class AttachmentProcessor { + /** + * quick heuristic to determine if the json payload contains a base64 encoded file + * + *

This is used for performance reasons as a fail-fast to avoid doing a json parse. + */ + static final Pattern BASE64_DATA_URI_PATTERN = + Pattern.compile("data:([\\w/\\-.+]+);base64,([A-Za-z0-9+/=]{20,})"); + + private final AttachmentUploader uploader; + + AttachmentProcessor(AttachmentUploader uploader) { + this.uploader = uploader; + } + + /** + * Scans a JSON string for base64 data URIs, uploads them, and returns the modified JSON with + * attachment references. + * + * @param json the JSON string to scan + * @return the modified JSON with base64 data replaced by attachment references, or the original + * JSON if no base64 data was found + */ + String processAndUpload(String json) { + if (json == null || !BASE64_DATA_URI_PATTERN.matcher(json).find()) { + return json; + } + + try { + JsonNode root = BraintrustJsonMapper.get().readTree(json); + AtomicBoolean modified = new AtomicBoolean(false); + JsonNode result = replaceBase64Attachments(root, modified); + return modified.get() ? BraintrustJsonMapper.get().writeValueAsString(result) : json; + } catch (Exception e) { + throw new RuntimeException("Failed to process attachments in JSON", e); + } + } + + // NOTE: not concerned with recursion blowing the stack because we're mutating AI vendor + // messages which are not deep enough for this to be a concern + private JsonNode replaceBase64Attachments(JsonNode node, AtomicBoolean modified) { + if (node.isTextual()) { + return replaceInText((TextNode) node, modified); + } else if (node.isObject()) { + ObjectNode objectNode = (ObjectNode) node; + ObjectNode result = objectNode.deepCopy(); + var fieldNames = objectNode.fieldNames(); + while (fieldNames.hasNext()) { + String fieldName = fieldNames.next(); + JsonNode child = objectNode.get(fieldName); + result.set(fieldName, replaceBase64Attachments(child, modified)); + } + return result; + } else if (node.isArray()) { + ArrayNode arrayNode = (ArrayNode) node; + ArrayNode result = arrayNode.deepCopy(); + for (int i = 0; i < arrayNode.size(); i++) { + result.set(i, replaceBase64Attachments(arrayNode.get(i), modified)); + } + return result; + } + return node; + } + + @SneakyThrows + private JsonNode replaceInText(TextNode textNode, AtomicBoolean modified) { + String value = textNode.asText(); + Matcher matcher = BASE64_DATA_URI_PATTERN.matcher(value); + if (!matcher.find()) { + return textNode; + } + if (!isEntirelyDataUri(value)) { + log.debug("found base64 string but text contained extra content {}", value); + return textNode; + } + + matcher.reset(); + StringBuilder sb = new StringBuilder(); + while (matcher.find()) { + String contentType = matcher.group(1); + String base64Data = matcher.group(2); + byte[] data = Base64.getDecoder().decode(base64Data); + + String extension = contentTypeToExtension(contentType); + String filename = "attachment" + extension; + AttachmentReference ref = AttachmentReference.create(filename, contentType); + + try { + uploader.enqueue(ref, data); + } catch (IllegalStateException e) { + throw new RuntimeException("Failed to enqueue attachment upload", e); + } + + String replacement = + "{\"type\":\"braintrust_attachment\",\"content_type\":\"" + + contentType + + "\",\"filename\":\"" + + filename + + "\",\"key\":\"" + + ref.key() + + "\"}"; + + matcher.appendReplacement(sb, Matcher.quoteReplacement(replacement)); + } + matcher.appendTail(sb); + + modified.set(true); + + return BraintrustJsonMapper.get().readTree(sb.toString()); + } + + static boolean isEntirelyDataUri(String value) { + String trimmed = value.trim(); + return trimmed.startsWith("data:") + && !trimmed.contains("\"") + && !trimmed.contains("\\") + && !trimmed.contains(" "); + } + + private static String contentTypeToExtension(String contentType) { + switch (contentType.toLowerCase()) { + case "image/png": + return ".png"; + case "image/jpeg": + case "image/jpg": + return ".jpg"; + case "image/gif": + return ".gif"; + case "image/webp": + return ".webp"; + case "image/svg+xml": + return ".svg"; + case "application/pdf": + return ".pdf"; + case "text/plain": + return ".txt"; + case "application/json": + return ".json"; + default: + String[] parts = contentType.split("/"); + if (parts.length == 2) { + return "." + parts[1].split("[;\\-]")[0]; + } + return ""; + } + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java new file mode 100644 index 00000000..5942540e --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java @@ -0,0 +1,33 @@ +package dev.braintrust.trace; + +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nonnull; + +/** + * Represents an attachment reference stored on a span in place of uploaded attachment data. + * + *

Its shape intentionally matches the cross-SDK Braintrust attachment reference format. + */ +record AttachmentReference( + @Nonnull String type, + @Nonnull String filename, + @Nonnull String contentType, + @Nonnull String key) { + + private static final String DEFAULT_TYPE = "braintrust_attachment"; + + /** + * Creates an attachment reference with a generated UUID key. + * + * @param filename the display filename for the attachment + * @param contentType the MIME type of the attachment content + * @return a new AttachmentReference with a unique key + */ + static AttachmentReference create(@Nonnull String filename, @Nonnull String contentType) { + Objects.requireNonNull(filename, "filename cannot be null"); + Objects.requireNonNull(contentType, "contentType cannot be null"); + return new AttachmentReference( + DEFAULT_TYPE, filename, contentType, UUID.randomUUID().toString()); + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java new file mode 100644 index 00000000..0d7df512 --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java @@ -0,0 +1,305 @@ +package dev.braintrust.trace; + +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.config.BraintrustConfig; +import java.io.IOException; +import java.net.http.HttpClient; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** + * Uploads Braintrust attachments in the background. + * + *

Implementations accept attachment data via {@link #enqueue}, process them asynchronously, and + * support graceful shutdown with {@link #shutdown}. + */ +interface AttachmentUploader { + /** + * Enqueues an attachment for upload. + * + * @param reference the attachment reference metadata + * @param data the attachment data to upload + */ + void enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data); + + /** runs force flush with a default timeout */ + default void forceFlush() { + forceFlush(Duration.ofSeconds(30)); + } + + /** + * Waits for all currently enqueued uploads to complete with a timeout. + * + *

Note: the completion of this method does not guarantee an empty queue. This guarantees + * only that all enqueued jobs which were submitted before this method was invoked have been + * uploaded. + * + * @param timeout the maximum time to wait + * @return true if all uploads completed, false if timed out + * @throws InterruptedException if interrupted while waiting + */ + boolean forceFlush(@Nonnull Duration timeout); + + /** runs shutdown with a default timeout */ + default void shutdown() { + shutdown(Duration.ofSeconds(30)); + } + + /** + * Shuts down the uploader with a custom timeout. + * + * @param timeout the maximum time to wait for pending uploads + */ + void shutdown(@Nonnull Duration timeout); + + /** + * Background uploader for Braintrust attachments that uploads to S3 via signed URLs. + * + *

Uploads are enqueued and processed by a single-threaded worker that: + * + *

    + *
  1. Requests a signed upload URL from the Braintrust API + *
  2. Uploads the data to the signed URL + *
  3. Reports the upload status (done/error) to the Braintrust API + *
+ * + *

The uploader starts lazily on first enqueue and can be shut down gracefully. + */ + @Slf4j + class S3AttachmentUploader implements AttachmentUploader { + private static final int QUEUE_SIZE = 1024; + private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10); + + private final BraintrustOpenApiClient apiClient; + private final HttpClient httpClient; + + private final LinkedBlockingQueue queue; + private final AtomicReference error = new AtomicReference<>(); + private final AtomicReference worker = new AtomicReference<>(); + private final AtomicReference orgId = new AtomicReference<>(); + + // non thread safe fields must be checked and read under the lock + private final Object lock = new Object(); + private boolean closed = false; + private int numPendingUploads = 0; + private CountDownLatch currentBatch = new CountDownLatch(1); + + /** + * Creates a new attachment uploader. + * + * @param config the Braintrust configuration + * @param apiClient the Braintrust API client for authenticated requests + */ + S3AttachmentUploader( + @Nonnull BraintrustConfig config, @Nonnull BraintrustOpenApiClient apiClient) { + this.apiClient = apiClient; + this.queue = new LinkedBlockingQueue<>(QUEUE_SIZE); + this.httpClient = HttpClient.newBuilder().sslContext(config.sslContext()).build(); + } + + @Override + public void enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data) { + synchronized (lock) { + if (closed) { + throw new IllegalStateException("Attachment uploader is shut down"); + } + ensureWorkerStarted(); + UploadJob job = new UploadJob(reference, data); + queue.add(job); + numPendingUploads++; + } + } + + @Override + @SneakyThrows + public boolean forceFlush(@Nonnull Duration timeout) { + CountDownLatch batch; + synchronized (lock) { + if (numPendingUploads == 0) { + return true; + } + batch = currentBatch; + } + boolean completed = batch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + + Throwable err = error.getAndSet(null); + if (err != null) { + throw new RuntimeException("Attachment upload failed", err); + } + return completed; + } + + @Override + @SneakyThrows + public void shutdown(@Nonnull Duration timeout) { + synchronized (lock) { + if (closed) { + return; + } + closed = true; + } + forceFlush(timeout); + ExecutorService executor = worker.get(); + if (executor == null) { + return; + } + try { + executor.shutdown(); + if (!executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } finally { + worker.set(null); + } + } + + private void ensureWorkerStarted() { + if (worker.get() == null) { + var newWorker = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "braintrust-attachment-uploader"); + t.setDaemon(true); + return t; + }); + if (worker.compareAndSet(null, newWorker)) { + worker.get().submit(this::workerLoop); + BraintrustShutdownHook.addShutdownHook( + BraintrustShutdownHook.SPAN_PROCESSOR_ORDER, this::shutdown); + } else { + // tried to start the worker concurrently. no biggie. + newWorker.shutdown(); + } + } + } + + private void workerLoop() { + log.debug("Attachment uploader worker started"); + while (!isClosed() || queue.peek() != null) { + UploadJob job = null; + try { + job = queue.poll(100, TimeUnit.MILLISECONDS); + while (job != null) { + upload(job); + finishPending(); + job = queue.poll(100, TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + synchronized (lock) { + closed = true; + } + recordError(e); + if (job == null) { + log.error("Failed to upload attachment", e); + } else { + log.error( + "Failed to upload attachment key={}", job.reference().key(), e); + reportStatus(job.reference().key(), "error", e.getMessage()); + } + synchronized (lock) { + finishPending(); + } + break; + } + } + log.debug("Attachment uploader worker stopped"); + } + + private boolean isClosed() { + synchronized (lock) { + return closed; + } + } + + private void upload(@Nonnull UploadJob job) throws IOException, InterruptedException { + S3UploadUtils.UploadUrlResponse urlResponse = + S3UploadUtils.requestUploadUrl( + httpClient, + apiClient, + getOrgId(), + job.reference().key(), + job.reference().filename(), + job.reference().contentType()); + + S3UploadUtils.uploadToSignedUrl( + httpClient, + urlResponse.signedUrl(), + urlResponse.headers(), + job.reference().contentType(), + job.data()); + + reportStatus(job.reference().key(), "done", null); + } + + private String getOrgId() { + if (orgId.get() != null) { + return orgId.get(); + } + return orgId.updateAndGet(curr -> curr != null ? curr : resolveOrgId()); + } + + private String resolveOrgId() { + try { + var loginResponse = apiClient.login(); + if (loginResponse.orgInfo() != null && !loginResponse.orgInfo().isEmpty()) { + return loginResponse.orgInfo().get(0).id(); + } else { + throw new IllegalStateException("No org info returned from login"); + } + } catch (Exception e) { + throw new RuntimeException("Failed to resolve org ID for attachment upload", e); + } + } + + private void reportStatus( + @Nonnull String key, @Nonnull String status, @Nullable String errorMessage) { + try { + var statusMap = new java.util.HashMap(); + statusMap.put("upload_status", status); + if (errorMessage != null) { + statusMap.put("error_message", errorMessage); + } + + S3UploadUtils.updateUploadStatus(httpClient, apiClient, getOrgId(), key, statusMap); + } catch (Exception e) { + log.warn("Failed to report attachment status key={} status={}", key, status, e); + recordError(e); + } + } + + private void recordError(@Nonnull Throwable err) { + error.updateAndGet( + current -> { + if (current == null) { + return err; + } + current.addSuppressed(err); + return current; + }); + } + + private void finishPending() { + synchronized (lock) { + numPendingUploads--; + if (numPendingUploads <= 0) { + currentBatch.countDown(); + currentBatch = new CountDownLatch(1); + } + if (numPendingUploads < 0) { + log.debug( + "unexpected num pending uploads: {}. this is a bug. attempting" + + " recovery", + numPendingUploads); + numPendingUploads = 0; + } + } + } + + private record UploadJob(AttachmentReference reference, byte[] data) {} + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java index a6f19910..e51136db 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java @@ -6,6 +6,9 @@ import java.util.concurrent.CopyOnWriteArrayList; class BraintrustShutdownHook { + public static final int SPAN_PROCESSOR_ORDER = 0; + public static final int S3_UPLOAD_ORDER = SPAN_PROCESSOR_ORDER + 1; + private record OrderedTarget(int order, Runnable target) {} private static final List shutdownTargets = new CopyOnWriteArrayList<>(); @@ -22,9 +25,16 @@ private record OrderedTarget(int order, Runnable target) {} } public static void addShutdownHook(Runnable target) { - addShutdownHook(0, target); + addShutdownHook(SPAN_PROCESSOR_ORDER, target); } + /** + * Add a jvm shutdown hook. + * + * @param order lower numbers run first. targets with the same order number can run in any + * order. Span processor/exporter flush runs at level 0 + * @param target the shutdown code to run + */ public static void addShutdownHook(int order, Runnable target) { shutdownTargets.add(new OrderedTarget(order, target)); } diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java index 532193bd..4718a91b 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java @@ -1,13 +1,19 @@ package dev.braintrust.trace; +import dev.braintrust.api.BraintrustOpenApiClient; import dev.braintrust.config.BraintrustConfig; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.DelegatingSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -21,7 +27,10 @@ */ @Slf4j public class BraintrustSpanProcessor implements SpanProcessor { - // Braintrust-specific attributes + static final AttributeKey INPUT_JSON = AttributeKey.stringKey("braintrust.input_json"); + static final AttributeKey OUTPUT_JSON = + AttributeKey.stringKey("braintrust.output_json"); + public static final AttributeKey PARENT = AttributeKey.stringKey(BraintrustTracing.PARENT_KEY); @@ -29,11 +38,16 @@ public class BraintrustSpanProcessor implements SpanProcessor { private final SpanProcessor delegate; private final List samplers; private final ConcurrentMap parentContexts = new ConcurrentHashMap<>(); + private final AttachmentProcessor attachmentProcessor; BraintrustSpanProcessor(BraintrustConfig config, SpanProcessor delegate) { this.config = config; this.delegate = delegate; this.samplers = buildSamplers(config); + this.attachmentProcessor = + new AttachmentProcessor( + new AttachmentUploader.S3AttachmentUploader( + config, BraintrustOpenApiClient.of(config))); } private static List buildSamplers(BraintrustConfig config) { @@ -113,12 +127,25 @@ public void onEnd(ReadableSpan span) { return; } } - delegate.onEnd(span); + + var spanData = span.toSpanData(); + @Nullable String inputJson = spanData.getAttributes().get(INPUT_JSON); + @Nullable String outputJson = spanData.getAttributes().get(OUTPUT_JSON); + + @Nullable String newInputJson = attachmentProcessor.processAndUpload(inputJson); + @Nullable String newOutputJson = attachmentProcessor.processAndUpload(outputJson); + + if (!Objects.equals(newInputJson, inputJson) + || !Objects.equals(newOutputJson, outputJson)) { + delegate.onEnd(new TransformedReadableSpan(span, newInputJson, newOutputJson)); + } else { + delegate.onEnd(span); + } } @Override public boolean isEndRequired() { - return !samplers.isEmpty() || delegate.isEndRequired(); + return true; } @Override @@ -170,4 +197,93 @@ public static ParentContext experiment(String experimentId) { return new ParentContext(null, experimentId, ParentType.EXPERIMENT); } } + + /** + * otel java does not implement onEnding, so this is the most idiomatic way to mutate a span + * once it ends + */ + private static class TransformedReadableSpan implements ReadableSpan { + private final ReadableSpan delegate; + private final Attributes attributes; + + TransformedReadableSpan(ReadableSpan delegate, String inputJson, String outputJson) { + this.delegate = delegate; + var builder = delegate.getAttributes().toBuilder(); + builder.put(INPUT_JSON, inputJson); + builder.put(OUTPUT_JSON, outputJson); + attributes = builder.build(); + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + @SuppressWarnings("unchecked") + public T getAttribute(AttributeKey key) { + if (key.equals(INPUT_JSON)) { + return (T) attributes.get(INPUT_JSON); + } + if (key.equals(OUTPUT_JSON)) { + return (T) attributes.get(OUTPUT_JSON); + } + return delegate.getAttribute(key); + } + + @Override + public SpanData toSpanData() { + return new DelegatingSpanData(delegate.toSpanData()) { + @Override + public io.opentelemetry.api.common.Attributes getAttributes() { + return TransformedReadableSpan.this.getAttributes(); + } + + @Override + public int getTotalAttributeCount() { + return getAttributes().size(); + } + }; + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public io.opentelemetry.api.trace.SpanContext getSpanContext() { + return delegate.getSpanContext(); + } + + @Override + public boolean hasEnded() { + return delegate.hasEnded(); + } + + @Override + public io.opentelemetry.sdk.common.InstrumentationScopeInfo getInstrumentationScopeInfo() { + return delegate.getInstrumentationScopeInfo(); + } + + @Override + public InstrumentationLibraryInfo getInstrumentationLibraryInfo() { + return delegate.getInstrumentationLibraryInfo(); + } + + @Override + public long getLatencyNanos() { + return delegate.getLatencyNanos(); + } + + @Override + public io.opentelemetry.api.trace.SpanContext getParentSpanContext() { + return delegate.getParentSpanContext(); + } + + @Override + public io.opentelemetry.api.trace.SpanKind getKind() { + return delegate.getKind(); + } + } } diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/S3UploadUtils.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/S3UploadUtils.java new file mode 100644 index 00000000..d36361bb --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/S3UploadUtils.java @@ -0,0 +1,305 @@ +package dev.braintrust.trace; + +import com.fasterxml.jackson.annotation.JsonProperty; +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.json.BraintrustJsonMapper; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** Utilities for uploading attachments to S3-compatible object storage via signed URLs. */ +@Slf4j +class S3UploadUtils { + /** Default per-request timeout for HTTP calls. */ + private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(60); + + /** Maximum number of retry attempts for transient failures. */ + private static final int MAX_RETRIES = 3; + + /** Initial backoff delay between retries. Doubles on each subsequent attempt. */ + private static final Duration INITIAL_RETRY_DELAY = Duration.ofMillis(500); + + private S3UploadUtils() {} + + /** + * Requests a signed upload URL from the Braintrust API. + * + * @param httpClient the HTTP client to use for requests + * @param apiClient the Braintrust API client (provides auth and base URL) + * @param orgId the organization ID + * @param key the attachment key + * @param filename the filename for the attachment + * @param contentType the MIME type of the attachment + * @return the signed URL response with upload URL and required headers + * @throws IOException if the request fails + * @throws InterruptedException if the request is interrupted + */ + static UploadUrlResponse requestUploadUrl( + @Nonnull HttpClient httpClient, + @Nonnull BraintrustOpenApiClient apiClient, + @Nonnull String orgId, + @Nonnull String key, + @Nonnull String filename, + @Nonnull String contentType) + throws IOException, InterruptedException { + + var requestBody = + BraintrustJsonMapper.get() + .writeValueAsString( + new UploadUrlRequest(key, filename, contentType, orgId)); + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(apiClient.getBaseUri() + "/attachment")) + .timeout(REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)); + + if (apiClient.getRequestInterceptor() != null) { + apiClient.getRequestInterceptor().accept(requestBuilder); + } + + HttpResponse response = + sendWithRetry( + httpClient, requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to request upload URL: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + + UploadUrlResponse result = + BraintrustJsonMapper.get().readValue(response.body(), UploadUrlResponse.class); + + if (result.signedUrl() == null || result.signedUrl().isEmpty()) { + throw new IOException("Signed URL response missing signedUrl"); + } + + return result; + } + + /** + * Uploads data to a signed URL with the specified headers. + * + *

Automatically detects Azure Blob Storage URLs and adds the required {@code x-ms-blob-type: + * BlockBlob} header. + * + * @param httpClient the HTTP client to use for the upload + * @param signedUrl the signed upload URL + * @param headers additional headers to include in the upload request + * @param contentType the MIME type of the data being uploaded + * @param data the data to upload + * @throws IOException if the upload fails + * @throws InterruptedException if the upload is interrupted + */ + static void uploadToSignedUrl( + @Nonnull HttpClient httpClient, + @Nonnull String signedUrl, + @Nonnull Map headers, + @Nonnull String contentType, + @Nonnull byte[] data) + throws IOException, InterruptedException { + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(signedUrl)) + .timeout(REQUEST_TIMEOUT) + .header("Content-Type", contentType) + .PUT(HttpRequest.BodyPublishers.ofByteArray(data)); + + for (var entry : headers.entrySet()) { + requestBuilder.header(entry.getKey(), entry.getValue()); + } + + addAzureBlobHeaders(signedUrl, requestBuilder); + + HttpResponse response = + sendWithRetry( + httpClient, requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to upload to object store: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + } + + /** + * Updates the upload status for an attachment. + * + * @param httpClient the HTTP client to use for the request + * @param apiClient the Braintrust API client (provides auth and base URL) + * @param orgId the organization ID + * @param key the attachment key + * @param status the status map (e.g., {"upload_status": "done"} or {"upload_status": "error", + * "error_message": "..."}) + * @throws IOException if the request fails + * @throws InterruptedException if the request is interrupted + */ + static void updateUploadStatus( + @Nonnull HttpClient httpClient, + @Nonnull BraintrustOpenApiClient apiClient, + @Nonnull String orgId, + @Nonnull String key, + @Nonnull Map status) + throws IOException, InterruptedException { + + var requestBody = + BraintrustJsonMapper.get() + .writeValueAsString(new StatusRequest(key, orgId, status)); + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(apiClient.getBaseUri() + "/attachment/status")) + .timeout(REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)); + + if (apiClient.getRequestInterceptor() != null) { + apiClient.getRequestInterceptor().accept(requestBuilder); + } + + HttpResponse response = + sendWithRetry( + httpClient, requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to update upload status: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + } + + /** Returns {@code true} if the HTTP status code indicates success (2xx). */ + private static boolean isSuccessStatus(int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + /** + * Parses a URI string, wrapping any syntax error in an {@link IOException}. + * + * @param uriString the string to parse + * @return the parsed URI + * @throws IOException if the string is not a valid URI + */ + @SneakyThrows + private static URI toUri(@Nonnull String uriString) { + try { + return new URI(uriString); + } catch (URISyntaxException e) { + throw new IOException("Invalid URI: " + uriString, e); + } + } + + /** + * Sends an HTTP request with retry logic for transient failures. + * + *

Retries on 5xx server errors and {@link IOException} (network errors) up to {@link + * #MAX_RETRIES} times with exponential backoff starting at {@link #INITIAL_RETRY_DELAY}. + * + * @param httpClient the HTTP client + * @param request the request to send + * @param bodyHandler the response body handler + * @param the response body type + * @return the HTTP response + * @throws IOException if all retries are exhausted + * @throws InterruptedException if the thread is interrupted during retry backoff + */ + private static HttpResponse sendWithRetry( + @Nonnull HttpClient httpClient, + @Nonnull HttpRequest request, + @Nonnull HttpResponse.BodyHandler bodyHandler) + throws IOException, InterruptedException { + + IOException lastException = null; + long backoffMs = INITIAL_RETRY_DELAY.toMillis(); + + for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) { + if (attempt > 0) { + log.debug( + "Retrying request {} (attempt {}/{})", request.uri(), attempt, MAX_RETRIES); + Thread.sleep(backoffMs); + backoffMs *= 2; + } + + HttpResponse response; + try { + response = httpClient.send(request, bodyHandler); + } catch (IOException e) { + lastException = e; + log.debug("Request to {} failed with IOException", request.uri(), e); + continue; + } + + // Don't retry client errors (4xx) or successes + if (response.statusCode() < 500) { + return response; + } + + // Server error (5xx) — retry + lastException = new IOException("Server error: HTTP " + response.statusCode()); + log.debug( + "Request to {} returned status {}, will retry", + request.uri(), + response.statusCode()); + } + + throw new IOException( + "Request to " + request.uri() + " failed after " + MAX_RETRIES + " retries", + lastException); + } + + /** Adds Azure Blob Storage specific headers when the signed URL points to Azure. */ + private static void addAzureBlobHeaders( + @Nonnull String signedUrl, HttpRequest.Builder requestBuilder) { + try { + var uri = new URI(signedUrl); + String host = uri.getHost(); + if (host != null && host.endsWith(".blob.core.windows.net")) { + requestBuilder.header("x-ms-blob-type", "BlockBlob"); + } + } catch (URISyntaxException e) { + log.warn("Failed to parse signed URL for Azure detection: {}", signedUrl, e); + } + } + + /** Response from requesting a signed upload URL. */ + record UploadUrlResponse( + @JsonProperty("signedUrl") @Nonnull String signedUrl, + @JsonProperty("headers") @Nonnull Map headers) { + + /** Compact constructor that enforces non-null headers with an empty map default. */ + UploadUrlResponse { + if (headers == null) { + headers = Map.of(); + } + } + } + + private record UploadUrlRequest( + @Nonnull String key, + @Nonnull String filename, + @JsonProperty("content_type") @Nonnull String contentType, + @JsonProperty("org_id") @Nonnull String orgId) {} + + private record StatusRequest( + @Nonnull String key, + @JsonProperty("org_id") @Nonnull String orgId, + @Nonnull Map status) {} +} diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java new file mode 100644 index 00000000..f0dee63d --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java @@ -0,0 +1,77 @@ +package dev.braintrust.trace; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import dev.braintrust.json.BraintrustJsonMapper; +import java.time.Duration; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.Test; + +public class AttachmentProcessorTest { + private final AttachmentProcessor attachmentProcessor = + new AttachmentProcessor(new NoOpAttachmentUploader()); + + @Test + void replacesEntireDataUriWithObjectNode() throws Exception { + String json = + """ + [{"role":"user","content":[{"type":"text","text":"hi"},{"type":"image_url","image_url":{"url":"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="}}]}] + """; + String newJson = attachmentProcessor.processAndUpload(json); + assertNotEquals(json, newJson); + + JsonNode root = BraintrustJsonMapper.get().readTree(json); + { + JsonNode urlNode = root.get(0).get("content").get(1).get("image_url").get("url"); + assertTrue(urlNode.isTextual(), "original url should be a text node"); + } + + JsonNode newRoot = BraintrustJsonMapper.get().readTree(newJson); + { + JsonNode newUrlNode = newRoot.get(0).get("content").get(1).get("image_url").get("url"); + // Verify newUrl is a valid braintrust attachment object + assertTrue(newUrlNode.isObject(), "replaced url should be an object node"); + assertEquals("braintrust_attachment", newUrlNode.get("type").asText()); + assertEquals("image/png", newUrlNode.get("content_type").asText()); + assertEquals("attachment.png", newUrlNode.get("filename").asText()); + assertNotNull(newUrlNode.get("key"), "attachment key must be present"); + assertFalse( + newUrlNode.get("key").asText().isEmpty(), "attachment key must not be empty"); + } + + // Verify only image_url.url was mutated: removing the url field from both should yield + // identical JSON + ((ObjectNode) root.get(0).get("content").get(1).get("image_url")).remove("url"); + ((ObjectNode) newRoot.get(0).get("content").get(1).get("image_url")).remove("url"); + assertEquals( + BraintrustJsonMapper.get().writeValueAsString(root), + BraintrustJsonMapper.get().writeValueAsString(newRoot), + "only the url field should differ between original and processed JSON"); + } + + @Test + void doesNotReplacePartialDataUri() { + String json = + """ + {"text":"hello data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg== world"} + """; + // processAndUpload should leave the JSON unchanged end-to-end + String newJson = attachmentProcessor.processAndUpload(json); + assertEquals(json, newJson, "partial data URI in text should not be replaced"); + } + + private static class NoOpAttachmentUploader implements AttachmentUploader { + @Override + public void enqueue(@NonNull AttachmentReference reference, @NonNull byte[] data) {} + + @Override + public boolean forceFlush(@NonNull Duration timeout) { + return true; + } + + @Override + public void shutdown(@NonNull Duration timeout) {} + } +} diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentReferenceTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentReferenceTest.java new file mode 100644 index 00000000..e73450d6 --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentReferenceTest.java @@ -0,0 +1,46 @@ +package dev.braintrust.trace; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.UUID; +import org.junit.jupiter.api.Test; + +public class AttachmentReferenceTest { + + @Test + void createGeneratesUniqueKeys() { + var ref1 = AttachmentReference.create("test.json", "application/json"); + var ref2 = AttachmentReference.create("test.json", "application/json"); + + assertEquals("braintrust_attachment", ref1.type()); + assertEquals("test.json", ref1.filename()); + assertEquals("application/json", ref1.contentType()); + assertNotNull(ref1.key()); + + assertNotEquals(ref1.key(), ref2.key()); + } + + @Test + void createValidatesFilename() { + assertThrows( + NullPointerException.class, + () -> AttachmentReference.create(null, "application/json")); + } + + @Test + void createValidatesContentType() { + assertThrows( + NullPointerException.class, () -> AttachmentReference.create("test.json", null)); + } + + @Test + void recordComponentsAccessible() { + String customKey = UUID.randomUUID().toString(); + var ref = new AttachmentReference("custom_type", "data.xml", "application/xml", customKey); + + assertEquals("custom_type", ref.type()); + assertEquals("data.xml", ref.filename()); + assertEquals("application/xml", ref.contentType()); + assertEquals(customKey, ref.key()); + } +} diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java new file mode 100644 index 00000000..0d10568b --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java @@ -0,0 +1,186 @@ +package dev.braintrust.trace; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.junit.jupiter.api.Assertions.*; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.config.BraintrustConfig; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@WireMockTest +public class AttachmentUploaderTest { + private BraintrustOpenApiClient apiClient; + private AttachmentUploader.S3AttachmentUploader uploader; + private String baseUrl; + + @BeforeEach + void setUp(WireMockRuntimeInfo wmRuntimeInfo) { + baseUrl = wmRuntimeInfo.getHttpBaseUrl(); + var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build(); + apiClient = BraintrustOpenApiClient.of(config); + uploader = new AttachmentUploader.S3AttachmentUploader(config, apiClient); + } + + @AfterEach + void tearDown() throws InterruptedException { + uploader.shutdown(Duration.ofSeconds(5)); + } + + private void stubLoginAndUploadFlow() { + stubFor( + post(urlEqualTo("/api/apikey/login")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}"))); + + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{}}"))); + + stubFor(put(urlEqualTo("/upload")).willReturn(aResponse().withStatus(200))); + + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + } + + @Test + void enqueueUploadsSuccessfully() throws Exception { + stubLoginAndUploadFlow(); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "{\"key\":\"value\"}".getBytes()); + uploader.forceFlush(); + + verify( + postRequestedFor(urlEqualTo("/attachment")) + .withRequestBody(containing("\"key\":\"" + ref.key() + "\""))); + verify(putRequestedFor(urlEqualTo("/upload"))); + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"upload_status\":\"done\""))); + } + + @Test + void enqueueRejectsAfterShutdown() { + assertDoesNotThrow(() -> uploader.shutdown()); + + var ref = AttachmentReference.create("test.json", "application/json"); + assertThrows(IllegalStateException.class, () -> uploader.enqueue(ref, "data".getBytes())); + } + + @Test + void forceFlushWaitsForPendingUploads() throws Exception { + stubLoginAndUploadFlow(); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "data".getBytes()); + + AtomicBoolean flushed = new AtomicBoolean(false); + var flushThread = + new Thread( + () -> { + uploader.forceFlush(); + flushed.set(true); + }); + flushThread.start(); + + flushThread.join(5000); + assertTrue(flushed.get(), "forceFlush should complete after upload finishes"); + assertFalse(flushThread.isAlive()); + } + + @Test + void shutdownWaitsForPendingUploads() throws Exception { + stubLoginAndUploadFlow(); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "data".getBytes()); + + uploader.shutdown(Duration.ofSeconds(5)); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"upload_status\":\"done\""))); + } + + @Test + void uploadFailureReportsErrorStatus() throws Exception { + stubFor( + post(urlEqualTo("/api/apikey/login")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}"))); + + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{}}"))); + + stubFor( + put(urlEqualTo("/upload")) + .willReturn(aResponse().withStatus(500).withBody("Failed"))); + + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "data".getBytes()); + + assertThrows(RuntimeException.class, () -> uploader.forceFlush(Duration.ofSeconds(5))); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"upload_status\":\"error\"")) + .withRequestBody(containing("\"error_message\""))); + } + + @Test + void multipleUploadsProcessedSequentially() throws Exception { + stubLoginAndUploadFlow(); + + var ref1 = AttachmentReference.create("test1.json", "application/json"); + var ref2 = AttachmentReference.create("test2.json", "application/json"); + + uploader.enqueue(ref1, "data1".getBytes()); + uploader.enqueue(ref2, "data2".getBytes()); + uploader.forceFlush(); + + verify(2, postRequestedFor(urlEqualTo("/attachment"))); + verify(2, putRequestedFor(urlEqualTo("/upload"))); + verify(2, postRequestedFor(urlEqualTo("/attachment/status"))); + } + + @Test + void lazyWorkerStartsOnFirstEnqueue() throws Exception { + stubLoginAndUploadFlow(); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "data".getBytes()); + uploader.forceFlush(); + + verify(postRequestedFor(urlEqualTo("/attachment"))); + } +} diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/S3UploadUtilsTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/S3UploadUtilsTest.java new file mode 100644 index 00000000..5ffed8a4 --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/S3UploadUtilsTest.java @@ -0,0 +1,245 @@ +package dev.braintrust.trace; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.junit.jupiter.api.Assertions.*; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.config.BraintrustConfig; +import java.net.http.HttpClient; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@WireMockTest +public class S3UploadUtilsTest { + + private BraintrustOpenApiClient apiClient; + private HttpClient httpClient; + private String baseUrl; + + @BeforeEach + void setUp(WireMockRuntimeInfo wmRuntimeInfo) { + baseUrl = wmRuntimeInfo.getHttpBaseUrl(); + var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build(); + apiClient = BraintrustOpenApiClient.of(config); + httpClient = HttpClient.newHttpClient(); + } + + @Test + void requestUploadUrlSendsCorrectRequest() throws Exception { + stubFor( + post(urlEqualTo("/attachment")) + .withHeader("Authorization", equalTo("Bearer test-api-key")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{\"X-Custom\":\"value\"}}"))); + + var response = + S3UploadUtils.requestUploadUrl( + httpClient, apiClient, "org-123", "key-1", "test.json", "application/json"); + + assertEquals(baseUrl + "/upload", response.signedUrl()); + assertEquals("value", response.headers().get("X-Custom")); + + var requests = findAll(postRequestedFor(urlEqualTo("/attachment"))); + assertEquals(1, requests.size()); + } + + @Test + void requestUploadUrlThrowsOnMissingSignedUrl() { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn(aResponse().withStatus(200).withBody("{\"headers\":{}}"))); + + assertThrows( + java.io.IOException.class, + () -> + S3UploadUtils.requestUploadUrl( + httpClient, + apiClient, + "org-123", + "key-1", + "test.json", + "application/json")); + } + + @Test + void requestUploadUrlThrowsOnApiError() { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn(aResponse().withStatus(500).withBody("Internal error"))); + + assertThrows( + java.io.IOException.class, + () -> + S3UploadUtils.requestUploadUrl( + httpClient, + apiClient, + "org-123", + "key-1", + "test.json", + "application/json")); + } + + @Test + void uploadToSignedUrlSendsPutRequest() throws Exception { + byte[] testData = "{\"hello\":\"world\"}".getBytes(); + + stubFor( + put(urlEqualTo("/upload")) + .withRequestBody(equalToJson("{\"hello\":\"world\"}")) + .willReturn(aResponse().withStatus(200))); + + S3UploadUtils.uploadToSignedUrl( + httpClient, baseUrl + "/upload", Map.of(), "application/json", testData); + + verify( + putRequestedFor(urlEqualTo("/upload")) + .withHeader("Content-Type", equalTo("application/json"))); + } + + @Test + void uploadToSignedUrlIncludesHeaders() throws Exception { + byte[] testData = "test data".getBytes(); + + stubFor( + put(urlEqualTo("/upload")) + .withHeader("X-Custom-Header", equalTo("custom-value")) + .willReturn(aResponse().withStatus(200))); + + S3UploadUtils.uploadToSignedUrl( + httpClient, + baseUrl + "/upload", + Map.of("X-Custom-Header", "custom-value"), + "application/octet-stream", + testData); + } + + @Test + void uploadToSignedUrlThrowsOnUploadError() { + byte[] testData = "test data".getBytes(); + + stubFor( + put(urlEqualTo("/upload")) + .willReturn(aResponse().withStatus(500).withBody("Upload failed"))); + + assertThrows( + java.io.IOException.class, + () -> + S3UploadUtils.uploadToSignedUrl( + httpClient, + baseUrl + "/upload", + Map.of(), + "application/octet-stream", + testData)); + } + + @Test + void updateUploadStatusSendsCorrectRequest() throws Exception { + stubFor( + post(urlEqualTo("/attachment/status")) + .withHeader("Authorization", equalTo("Bearer test-api-key")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn(aResponse().withStatus(200))); + + S3UploadUtils.updateUploadStatus( + httpClient, apiClient, "org-123", "key-1", Map.of("upload_status", "done")); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"key\":\"key-1\"")) + .withRequestBody(containing("\"org_id\":\"org-123\"")) + .withRequestBody(containing("\"upload_status\":\"done\""))); + } + + @Test + void updateUploadStatusIncludesErrorMessage() throws Exception { + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + + S3UploadUtils.updateUploadStatus( + httpClient, + apiClient, + "org-123", + "key-1", + Map.of("upload_status", "error", "error_message", "something went wrong")); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"error_message\":\"something went wrong\""))); + } + + @Test + void updateUploadStatusThrowsOnApiError() { + stubFor( + post(urlEqualTo("/attachment/status")) + .willReturn(aResponse().withStatus(500).withBody("Error"))); + + assertThrows( + java.io.IOException.class, + () -> + S3UploadUtils.updateUploadStatus( + httpClient, + apiClient, + "org-123", + "key-1", + Map.of("upload_status", "done"))); + } + + @Test + void uploadToAzureBlobAddsRequiredHeader() throws Exception { + // WireMock's localhost URL won't match Azure detection, so we test + // using the actual Azure-style host via a separate WireMock stub that + // captures the request. We override the host by mapping a path that + // will be hit and then verify the header on a non-Azure URL first, + // ensuring the header is absent. + byte[] testData = "azure test".getBytes(); + + stubFor(put(urlEqualTo("/non-azure-upload")).willReturn(aResponse().withStatus(200))); + + S3UploadUtils.uploadToSignedUrl( + httpClient, + baseUrl + "/non-azure-upload", + Map.of(), + "application/octet-stream", + testData); + + // Non-Azure URL should NOT have the x-ms-blob-type header + verify(putRequestedFor(urlEqualTo("/non-azure-upload")).withoutHeader("x-ms-blob-type")); + } + + @Test + void uploadUrlResponseDefaultsNullHeadersToEmptyMap() { + // When the server returns null headers, the compact constructor + // should default to an empty map. + var response = new S3UploadUtils.UploadUrlResponse("https://example.com/upload", null); + + assertNotNull(response.headers()); + assertTrue(response.headers().isEmpty()); + } + + @Test + void requestUploadUrlDefaultsNullHeadersToEmptyMap() throws Exception { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody("{\"signedUrl\":\"" + baseUrl + "/upload\"}"))); + + var response = + S3UploadUtils.requestUploadUrl( + httpClient, apiClient, "org-123", "key-1", "test.json", "application/json"); + + assertNotNull(response.headers()); + assertTrue(response.headers().isEmpty()); + } +} diff --git a/test-harness/build.gradle b/test-harness/build.gradle index 9cbf0a74..bd5df12f 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -23,7 +23,8 @@ ext { dependencies { // testFixtures dependencies — everything lives in testFixtures source set - testFixturesImplementation project(":braintrust-sdk") // SDK main source (for TestHarness -> Braintrust, BraintrustConfig) + testFixturesImplementation project(":braintrust-api") + testFixturesImplementation project(":braintrust-sdk") testFixturesImplementation project(":braintrust-java-agent:internal") testFixturesImplementation project(":braintrust-java-agent:bootstrap") testFixturesImplementation project(":braintrust-java-agent:instrumenter") diff --git a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java index d16e1279..98bf9945 100644 --- a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java +++ b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; +import dev.braintrust.api.BraintrustOpenApiClient; import dev.braintrust.config.BraintrustConfig; import dev.braintrust.trace.UnitTestShutdownHook; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -16,6 +17,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -26,6 +28,26 @@ public class TestHarness { private static final VCR vcr; + private static final BraintrustConfig envConfig = + BraintrustConfig.builder().defaultProjectName("java-unit-test").build(); + + @Getter + @Accessors(fluent = true) + private static final String defaultOrgName; + + @Getter + @Accessors(fluent = true) + private static final String defaultProjectName; + + @Getter + @Accessors(fluent = true) + private static final String defaultOrgId; + + @Getter + @Accessors(fluent = true) + private static final String defaultProjectId; + + private static final AtomicReference INSTANCE = new AtomicReference<>(); static { // Collect all API keys that should never appear in recorded cassettes @@ -42,14 +64,34 @@ public class TestHarness { vcr = new VCR( java.util.Map.of( - "https://api.openai.com/v1", "openai", - "https://api.anthropic.com", "anthropic", - "https://generativelanguage.googleapis.com", "google", - "https://api.braintrust.dev", "braintrust", - "https://bedrock-runtime.us-east-1.amazonaws.com", "bedrock"), + "https://api.openai.com/v1", + "openai", + "https://api.anthropic.com", + "anthropic", + "https://generativelanguage.googleapis.com", + "google", + envConfig.apiUrl(), + "braintrust", + "https://bedrock-runtime.us-east-1.amazonaws.com", + "bedrock"), apiKeysToNeverRecord); vcr.start(); UnitTestShutdownHook.addShutdownHook(1, vcr::stop); + var client = BraintrustOpenApiClient.of(envConfig); + var project = client.fetchOrCreateProject(envConfig); + defaultOrgId = project.getOrgId().toString(); + defaultProjectId = project.getId().toString(); + defaultProjectName = project.getName(); + var orgs = client.login().orgInfo(); + BraintrustOpenApiClient.OrgInfo defaultOrg = null; + for (var org : orgs) { + if (org.id().equals(defaultOrgId)) { + defaultOrg = org; + break; + } + } + Objects.requireNonNull(defaultOrg); + defaultOrgName = defaultOrg.name(); } public static TestHarness setup() { @@ -60,7 +102,7 @@ public static TestHarness setup( Function configCustomizer) { var configBuilder = BraintrustConfig.builder() - .apiUrl(vcr.getUrlForTargetBase("https://api.braintrust.dev")) + .apiUrl(vcr.getUrlForTargetBase(envConfig.apiUrl())) .defaultProjectName(defaultProjectName()); if (vcr.getMode() == VCR.VcrMode.REPLAY) { // tolerate missing api key in replay mode @@ -84,24 +126,6 @@ private static synchronized TestHarness setup(BraintrustConfig config) { return harness; } - @Getter - @Accessors(fluent = true) - private static final String defaultProjectId = "6ae68365-7620-4630-921b-bac416634fc8"; - - @Getter - @Accessors(fluent = true) - private static final String defaultProjectName = "java-unit-test"; - - @Getter - @Accessors(fluent = true) - private static final String defaultOrgId = "5d7c97d7-fef1-4cb7-bda6-7e3756a0ca8e"; - - @Getter - @Accessors(fluent = true) - private static final String defaultOrgName = "braintrustdata.com"; - - private static final AtomicReference INSTANCE = new AtomicReference<>(); - @Getter @Accessors(fluent = true) private final OpenTelemetrySdk openTelemetry;