Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions src/pages/docs/ai-transport/sessions-identity/resuming-sessions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,28 @@ await channel.subscribe('prompt', (message) => {
saveLastProcessedTimestamp(message.timestamp);
});

// Fetch history up until the point of attachment, starting from last checkpoint
// Fetch history from the attachment point back to the last checkpoint
let page = await channel.history({
untilAttach: true,
start: lastProcessedTimestamp,
direction: 'forwards'
});

// Paginate through all missed messages
// Paginate through all missed messages, storing in oldest-to-newest order
const missedMessages = [];
while (page) {
for (const message of page.items) {
// Process the historical message
await processMessage(message);

// Persist the timestamp after successful processing
await saveLastProcessedTimestamp(message.timestamp);
}
missedMessages.unshift(...page.items.reverse());

// Move to next page if available
page = page.hasNext() ? await page.next() : null;
}

// Process messages oldest-to-newest
for (const message of missedMessages) {
await processMessage(message);

// Persist the timestamp after successful processing
await saveLastProcessedTimestamp(message.timestamp);
}
```
```python
# Agent code
Expand Down Expand Up @@ -155,24 +157,26 @@ def on_prompt(message):

await channel.subscribe("prompt", on_prompt)

# Fetch history up until the point of attachment, starting from last checkpoint
# Fetch history from the attachment point back to the last checkpoint
page = await channel.history(
until_attach=True,
start=last_processed_timestamp,
direction="forwards"
)

# Paginate through all missed messages
# Paginate through all missed messages, storing in oldest-to-newest order
missed_messages = []
while page:
for message in page.items:
# Process the historical message
await process_message(message)

# Persist the timestamp after successful processing
await save_last_processed_timestamp(message.timestamp)
missed_messages = list(reversed(page.items)) + missed_messages

# Move to next page if available
page = await page.next() if page.has_next() else None

# Process messages oldest-to-newest
for message in missed_messages:
await process_message(message)

# Persist the timestamp after successful processing
await save_last_processed_timestamp(message.timestamp)
```
```java
// Agent code
Expand All @@ -182,6 +186,10 @@ import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.Message;
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

ClientOptions options = new ClientOptions();
options.key = System.getenv("ABLY_API_KEY");
Expand All @@ -206,27 +214,31 @@ channel.subscribe("prompt", message -> {
saveLastProcessedTimestamp(message.timestamp);
});

// Fetch history up until the point of attachment, starting from last checkpoint
// Fetch history from the attachment point back to the last checkpoint
Param[] params = new Param[] {
new Param("untilAttach", "true"),
new Param("start", String.valueOf(lastProcessedTimestamp)),
new Param("direction", "forwards")
new Param("start", String.valueOf(lastProcessedTimestamp))
};
PaginatedResult<Message> page = channel.history(params);

// Paginate through all missed messages
// Paginate through all missed messages, storing in oldest-to-newest order
List<Message> missedMessages = new ArrayList<>();
while (page != null) {
for (Message message : page.items()) {
// Process the historical message
processMessage(message);

// Persist the timestamp after successful processing
saveLastProcessedTimestamp(message.timestamp);
}
List<Message> items = Arrays.asList(page.items());
Collections.reverse(items);
missedMessages.addAll(0, items);

// Move to next page if available
page = page.hasNext() ? page.next() : null;
}

// Process messages oldest-to-newest
for (Message message : missedMessages) {
processMessage(message);

// Persist the timestamp after successful processing
saveLastProcessedTimestamp(message.timestamp);
}
```
</Code>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,8 @@ await channel.subscribe((message) => {
// Fetch history up until the point of attachment
let page = await channel.history({ untilAttach: true });

// Paginate backwards through history
// Paginate through all historical messages
while (page) {
// Messages are newest-first
for (const message of page.items) {
// message.data contains the full concatenated text
responses.set(message.serial, message.data);
Expand Down Expand Up @@ -482,9 +481,8 @@ await channel.subscribe(on_message)
# Fetch history up until the point of attachment
page = await channel.history(until_attach=True)

# Paginate backwards through history
# Paginate through all historical messages
while page:
# Messages are newest-first
for message in page.items:
# message.data contains the full concatenated text
responses[message.serial] = message.data
Expand Down Expand Up @@ -520,9 +518,8 @@ channel.subscribe(message -> {
// Fetch history up until the point of attachment
PaginatedResult<Message> page = channel.history(new Param("untilAttach", "true"));

// Paginate backwards through history
// Paginate through all historical messages
while (page != null) {
// Messages are newest-first
for (Message message : page.items()) {
// message.data contains the full concatenated text
responses.put(message.serial, (String) message.data);
Expand Down Expand Up @@ -773,7 +770,7 @@ Alternatively, instead of including `responseId` in message [`extras`](/docs/mes

#### Hydrate using history

Load completed responses from your database, then use [channel history](/docs/storage-history/history) with the [`untilAttach` option](/docs/storage-history/history#continuous-history) to catch up on any in-progress responses. Use the timestamp of the last completed response to start pagination from that point forward, ensuring continuity with live message delivery.
Load completed responses from your database, then use [channel history](/docs/storage-history/history) with the [`untilAttach` option](/docs/storage-history/history#continuous-history) to catch up on any in-progress responses. Use the timestamp of the last completed response as a lower bound, so that only messages after that timestamp are retrieved, ensuring continuity with live message delivery.

<Code>
```javascript
Expand Down Expand Up @@ -823,7 +820,6 @@ await channel.subscribe((message) => {
let page = await channel.history({
untilAttach: true,
start: latestTimestamp,
direction: 'forwards'
});

// Paginate through all missed messages
Expand Down Expand Up @@ -887,7 +883,6 @@ await channel.subscribe(on_message)
page = await channel.history(
until_attach=True,
start=latest_timestamp,
direction='forwards'
)

# Paginate through all missed messages
Expand Down Expand Up @@ -952,8 +947,7 @@ channel.subscribe(message -> {
// Fetch history from the last completed response until attachment
PaginatedResult<Message> page = channel.history(
new Param("untilAttach", "true"),
new Param("start", String.valueOf(latestTimestamp)),
new Param("direction", "forwards")
new Param("start", String.valueOf(latestTimestamp))
);

// Paginate through all missed messages
Expand Down