Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/camel-ai/camel-docling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility-version}</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package org.apache.camel.component.docling;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
Expand All @@ -33,6 +37,11 @@ public class DoclingComponent extends DefaultComponent {
@Metadata
DoclingConfiguration configuration;

// Shared across all producers so that SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS
// (which may resolve to different endpoints/producers) can see each other's tasks.
private final Map<String, CompletableFuture<ConvertDocumentResponse>> pendingAsyncTasks = new ConcurrentHashMap<>();
private final AtomicLong taskIdCounter = new AtomicLong();

public DoclingComponent() {
this(null);
}
Expand Down Expand Up @@ -61,4 +70,19 @@ public void setConfiguration(DoclingConfiguration configuration) {
this.configuration = configuration;
}

Map<String, CompletableFuture<ConvertDocumentResponse>> getPendingAsyncTasks() {
return pendingAsyncTasks;
}

AtomicLong getTaskIdCounter() {
return taskIdCounter;
}

@Override
protected void doStop() throws Exception {
pendingAsyncTasks.forEach((id, future) -> future.cancel(true));
pendingAsyncTasks.clear();
super.doStop();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -70,6 +69,7 @@
import ai.docling.serve.api.convert.request.source.HttpSource;
import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
import ai.docling.serve.api.convert.response.DocumentResponse;
import ai.docling.serve.api.convert.response.InBodyConvertDocumentResponse;
import ai.docling.serve.api.task.request.TaskStatusPollRequest;
import ai.docling.serve.api.task.response.TaskStatus;
import ai.docling.serve.api.task.response.TaskStatusPollResponse;
Expand Down Expand Up @@ -158,13 +158,16 @@ public class DoclingProducer extends DefaultProducer {
private DoclingConfiguration configuration;
private DoclingServeApi doclingServeApi;
private ObjectMapper objectMapper;
private final Map<String, CompletableFuture<ConvertDocumentResponse>> pendingAsyncTasks = new ConcurrentHashMap<>();
private final AtomicLong taskIdCounter = new AtomicLong();
private Map<String, CompletableFuture<ConvertDocumentResponse>> pendingAsyncTasks;
private AtomicLong taskIdCounter;

public DoclingProducer(DoclingEndpoint endpoint) {
super(endpoint);
this.configuration = endpoint.getConfiguration();
this.objectMapper = new ObjectMapper();
DoclingComponent component = (DoclingComponent) endpoint.getComponent();
this.pendingAsyncTasks = component.getPendingAsyncTasks();
this.taskIdCounter = component.getTaskIdCounter();
}

@Override
Expand Down Expand Up @@ -203,9 +206,6 @@ protected void doStart() throws Exception {
@Override
protected void doStop() throws Exception {
super.doStop();
// Cancel any pending async tasks
pendingAsyncTasks.forEach((id, future) -> future.cancel(true));
pendingAsyncTasks.clear();
if (doclingServeApi != null) {
doclingServeApi = null;
LOG.info("DoclingServeApi reference cleared");
Expand Down Expand Up @@ -1124,15 +1124,19 @@ private DoclingDocument convertToDoclingDocument(
}

private DoclingDocument extractDoclingDocument(ConvertDocumentResponse response) throws IOException {
DocumentResponse document = response.getDocument();
if (document == null) {
throw new IOException("No document in response");
}
DoclingDocument result = document.getJsonContent();
if (result == null) {
throw new IOException("No JSON content in document response");
if (response instanceof InBodyConvertDocumentResponse inBodyResponse) {
DocumentResponse document = inBodyResponse.getDocument();
if (document == null) {
throw new IOException("No document in response");
}
DoclingDocument result = document.getJsonContent();
if (result == null) {
throw new IOException("No JSON content in document response");
}
return result;
} else {
throw new IOException("Unsupported response type: cannot extract DoclingDocument");
}
return result;
}

private void processBatchStructuredData(Exchange exchange) throws Exception {
Expand Down Expand Up @@ -1555,35 +1559,39 @@ private void applyConfigurationToOptions(ConvertDocumentOptions.Builder optionsB

private String extractConvertedContent(ConvertDocumentResponse response, String outputFormat) throws IOException {
try {
DocumentResponse document = response.getDocument();
if (response instanceof InBodyConvertDocumentResponse inBodyResponse) {
DocumentResponse document = inBodyResponse.getDocument();

if (document == null) {
throw new IOException("No document in response");
}
if (document == null) {
throw new IOException("No document in response");
}

String format = mapOutputFormat(outputFormat);

switch (format) {
case "md":
String markdown = document.getMarkdownContent();
return markdown != null ? markdown : "";
case "html":
String html = document.getHtmlContent();
return html != null ? html : "";
case "text":
String text = document.getTextContent();
return text != null ? text : "";
case "json":
// Return the document JSON content
var jsonDoc = document.getJsonContent();
if (jsonDoc != null) {
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonDoc);
}
return "{}";
default:
// Default to markdown
String defaultMarkdown = document.getMarkdownContent();
return defaultMarkdown != null ? defaultMarkdown : "";
String format = mapOutputFormat(outputFormat);

switch (format) {
case "md":
String markdown = document.getMarkdownContent();
return markdown != null ? markdown : "";
case "html":
String html = document.getHtmlContent();
return html != null ? html : "";
case "text":
String text = document.getTextContent();
return text != null ? text : "";
case "json":
// Return the document JSON content
var jsonDoc = document.getJsonContent();
if (jsonDoc != null) {
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonDoc);
}
return "{}";
default:
// Default to markdown
String defaultMarkdown = document.getMarkdownContent();
return defaultMarkdown != null ? defaultMarkdown : "";
}
} else {
throw new IOException("Unsupported response type: cannot extract converted content");
}
} catch (Exception e) {
LOG.warn("Failed to extract content from response: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
import ai.docling.serve.api.convert.response.DocumentResponse;
import ai.docling.serve.api.convert.response.InBodyConvertDocumentResponse;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
Expand Down Expand Up @@ -94,7 +95,7 @@ void checkStatusReturnsCompletedForFinishedLocalTask() throws Exception {
Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer);

// Create a completed future with a mock response
ConvertDocumentResponse mockResponse = ConvertDocumentResponse.builder()
ConvertDocumentResponse mockResponse = InBodyConvertDocumentResponse.builder()
.document(DocumentResponse.builder()
.markdownContent("# Converted Document")
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;

import ai.docling.core.DoclingDocument;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.docling.ConversionStatus;
import org.apache.camel.component.docling.ConversionStatus.Status;
import org.apache.camel.component.docling.DoclingHeaders;
import org.apache.camel.component.docling.DoclingOperations;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.io.TempDir;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Integration test for Docling-Serve producer operations using test-infra for container management.
Expand Down Expand Up @@ -245,17 +247,20 @@ void testCheckConversionStatus() throws Exception {

assertNotNull(taskId, "Task ID should not be null");

// Wait a bit for processing
Thread.sleep(1000);

// Check status
ConversionStatus status = template.requestBody("direct:check-status", taskId, ConversionStatus.class);

assertNotNull(status, "Status should not be null");
assertNotNull(status.getTaskId(), "Status task ID should not be null");
assertThat(status.getStatus()).isEqualTo(Status.COMPLETED);

LOG.info("Successfully checked status for task {}: {}", taskId, status.getStatus());
// Poll until the async future completes
AtomicReference<ConversionStatus> statusRef = new AtomicReference<>();
Awaitility.await()
.atMost(Duration.ofSeconds(120))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
ConversionStatus s = template.requestBody("direct:check-status", taskId, ConversionStatus.class);
assertNotNull(s, "Status should not be null");
assertNotNull(s.getTaskId(), "Status task ID should not be null");
statusRef.set(s);
assertThat(s.getStatus()).isEqualTo(Status.COMPLETED);
});

LOG.info("Successfully checked status for task {}: {}", taskId, statusRef.get().getStatus());
}

@Test
Expand All @@ -270,34 +275,21 @@ void testCustomAsyncWorkflow() throws Exception {
assertNotNull(taskId, "Task ID should not be null");
LOG.info("Submitted conversion with task ID: {}", taskId);

// Poll for completion
ConversionStatus status = null;
int maxAttempts = 120; // 120 seconds max (increased from 60)
int attempts = 0;

while (attempts < maxAttempts) {
status = template.requestBody("direct:check-status", taskId, ConversionStatus.class);
LOG.info("Attempt {}: Task {} status is {}", attempts + 1, taskId, status.getStatus());

if (status.isCompleted()) {
LOG.info("Task completed successfully after {} attempts", attempts + 1);
break;
} else if (status.isFailed()) {
throw new RuntimeException("Task failed: " + status.getErrorMessage());
}

Thread.sleep(1000);
attempts++;
}

assertNotNull(status, "Final status should not be null");
if (!status.isCompleted()) {
fail(String.format("Task did not complete within %d seconds. Last status: %s",
maxAttempts, status.getStatus()));
}

// Poll for completion using Awaitility
AtomicReference<ConversionStatus> statusRef = new AtomicReference<>();
Awaitility.await()
.atMost(Duration.ofSeconds(120))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
ConversionStatus s = template.requestBody("direct:check-status", taskId, ConversionStatus.class);
LOG.info("Task {} status is {}", taskId, s.getStatus());
assertThat(s.isFailed()).as("Task should not fail: " + s.getErrorMessage()).isFalse();
statusRef.set(s);
assertThat(s.isCompleted()).as("Task should complete").isTrue();
});

ConversionStatus status = statusRef.get();
if (status.getResult() != null) {
// TODO: this check can never happen as there is an if condition before. The status.getResult() is actually null, is it expected or a bug?
assertTrue(status.getResult().length() > 0, "Result should not be empty");
LOG.info("Successfully retrieved result: {} characters", status.getResult().length());
}
Expand Down
2 changes: 1 addition & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
<dnsjava-version>3.6.4</dnsjava-version>
<djl-version>0.36.0</djl-version>
<djl-python-version>0.34.0</djl-python-version>
<docling-java-version>0.4.7</docling-java-version>
<docling-java-version>0.5.0</docling-java-version>
<docker-java-version>3.7.1</docker-java-version>
<dropbox-version>7.0.0</dropbox-version>
<eddsa-version>0.3.0</eddsa-version>
Expand Down
Loading