diff --git a/src/pages/docs/ai-transport/sessions-identity/resuming-sessions.mdx b/src/pages/docs/ai-transport/sessions-identity/resuming-sessions.mdx index 434b0a0b4d..88d63c12fb 100644 --- a/src/pages/docs/ai-transport/sessions-identity/resuming-sessions.mdx +++ b/src/pages/docs/ai-transport/sessions-identity/resuming-sessions.mdx @@ -106,26 +106,28 @@ await channel.subscribe('prompt', (message) => { saveLastProcessedTimestamp(message.timestamp); }); -// Fetch history up until the point of attachment, starting from last checkpoint +// Fetch history from the attachment point back to the last checkpoint let page = await channel.history({ untilAttach: true, start: lastProcessedTimestamp, - direction: 'forwards' }); -// Paginate through all missed messages +// Paginate through all missed messages, storing in oldest-to-newest order +const missedMessages = []; while (page) { - for (const message of page.items) { - // Process the historical message - await processMessage(message); - - // Persist the timestamp after successful processing - await saveLastProcessedTimestamp(message.timestamp); - } + missedMessages.unshift(...page.items.reverse()); // Move to next page if available page = page.hasNext() ? await page.next() : null; } + +// Process messages oldest-to-newest +for (const message of missedMessages) { + await processMessage(message); + + // Persist the timestamp after successful processing + await saveLastProcessedTimestamp(message.timestamp); +} ``` ```python # Agent code @@ -155,24 +157,26 @@ def on_prompt(message): await channel.subscribe("prompt", on_prompt) -# Fetch history up until the point of attachment, starting from last checkpoint +# Fetch history from the attachment point back to the last checkpoint page = await channel.history( until_attach=True, start=last_processed_timestamp, - direction="forwards" ) -# Paginate through all missed messages +# Paginate through all missed messages, storing in oldest-to-newest order +missed_messages = [] while page: - for message in page.items: - # Process the historical message - await process_message(message) - - # Persist the timestamp after successful processing - await save_last_processed_timestamp(message.timestamp) + missed_messages = list(reversed(page.items)) + missed_messages # Move to next page if available page = await page.next() if page.has_next() else None + +# Process messages oldest-to-newest +for message in missed_messages: + await process_message(message) + + # Persist the timestamp after successful processing + await save_last_processed_timestamp(message.timestamp) ``` ```java // Agent code @@ -182,6 +186,10 @@ import io.ably.lib.types.ClientOptions; import io.ably.lib.types.Message; import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; ClientOptions options = new ClientOptions(); options.key = System.getenv("ABLY_API_KEY"); @@ -206,27 +214,31 @@ channel.subscribe("prompt", message -> { saveLastProcessedTimestamp(message.timestamp); }); -// Fetch history up until the point of attachment, starting from last checkpoint +// Fetch history from the attachment point back to the last checkpoint Param[] params = new Param[] { new Param("untilAttach", "true"), - new Param("start", String.valueOf(lastProcessedTimestamp)), - new Param("direction", "forwards") + new Param("start", String.valueOf(lastProcessedTimestamp)) }; PaginatedResult page = channel.history(params); -// Paginate through all missed messages +// Paginate through all missed messages, storing in oldest-to-newest order +List missedMessages = new ArrayList<>(); while (page != null) { - for (Message message : page.items()) { - // Process the historical message - processMessage(message); - - // Persist the timestamp after successful processing - saveLastProcessedTimestamp(message.timestamp); - } + List items = Arrays.asList(page.items()); + Collections.reverse(items); + missedMessages.addAll(0, items); // Move to next page if available page = page.hasNext() ? page.next() : null; } + +// Process messages oldest-to-newest +for (Message message : missedMessages) { + processMessage(message); + + // Persist the timestamp after successful processing + saveLastProcessedTimestamp(message.timestamp); +} ``` diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx index c258c8455d..441c3c9337 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx @@ -444,9 +444,8 @@ await channel.subscribe((message) => { // Fetch history up until the point of attachment let page = await channel.history({ untilAttach: true }); -// Paginate backwards through history +// Paginate through all historical messages while (page) { - // Messages are newest-first for (const message of page.items) { // message.data contains the full concatenated text responses.set(message.serial, message.data); @@ -482,9 +481,8 @@ await channel.subscribe(on_message) # Fetch history up until the point of attachment page = await channel.history(until_attach=True) -# Paginate backwards through history +# Paginate through all historical messages while page: - # Messages are newest-first for message in page.items: # message.data contains the full concatenated text responses[message.serial] = message.data @@ -520,9 +518,8 @@ channel.subscribe(message -> { // Fetch history up until the point of attachment PaginatedResult page = channel.history(new Param("untilAttach", "true")); -// Paginate backwards through history +// Paginate through all historical messages while (page != null) { - // Messages are newest-first for (Message message : page.items()) { // message.data contains the full concatenated text responses.put(message.serial, (String) message.data); @@ -773,7 +770,7 @@ Alternatively, instead of including `responseId` in message [`extras`](/docs/mes #### Hydrate using history -Load completed responses from your database, then use [channel history](/docs/storage-history/history) with the [`untilAttach` option](/docs/storage-history/history#continuous-history) to catch up on any in-progress responses. Use the timestamp of the last completed response to start pagination from that point forward, ensuring continuity with live message delivery. +Load completed responses from your database, then use [channel history](/docs/storage-history/history) with the [`untilAttach` option](/docs/storage-history/history#continuous-history) to catch up on any in-progress responses. Use the timestamp of the last completed response as a lower bound, so that only messages after that timestamp are retrieved, ensuring continuity with live message delivery. ```javascript @@ -823,7 +820,6 @@ await channel.subscribe((message) => { let page = await channel.history({ untilAttach: true, start: latestTimestamp, - direction: 'forwards' }); // Paginate through all missed messages @@ -887,7 +883,6 @@ await channel.subscribe(on_message) page = await channel.history( until_attach=True, start=latest_timestamp, - direction='forwards' ) # Paginate through all missed messages @@ -952,8 +947,7 @@ channel.subscribe(message -> { // Fetch history from the last completed response until attachment PaginatedResult page = channel.history( new Param("untilAttach", "true"), - new Param("start", String.valueOf(latestTimestamp)), - new Param("direction", "forwards") + new Param("start", String.valueOf(latestTimestamp)) ); // Paginate through all missed messages