Skip to content

Commit 1bca544

Browse files
committed
Add entity registration overloads, auto-paginating queries, and includesState for .NET parity
1 parent 16f3505 commit 1bca544

9 files changed

Lines changed: 742 additions & 10 deletions

File tree

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,47 @@ public EntityMetadata getEntityMetadata(EntityInstanceId entityId) {
396396
*/
397397
public abstract EntityQueryResult queryEntities(EntityQuery query);
398398

399+
/**
400+
* Returns an auto-paginating iterable over entity instances matching the specified filter criteria.
401+
* <p>
402+
* This method automatically handles pagination when iterating over results. It fetches pages
403+
* from the store on demand, making it convenient when you want to process all matching entities
404+
* without manually managing continuation tokens.
405+
* <p>
406+
* You can iterate over individual items:
407+
* <pre>{@code
408+
* for (EntityMetadata entity : client.getEntities(query)) {
409+
* System.out.println(entity.getEntityInstanceId());
410+
* }
411+
* }</pre>
412+
* <p>
413+
* Or iterate page by page for more control:
414+
* <pre>{@code
415+
* for (EntityQueryResult page : client.getEntities(query).byPage()) {
416+
* for (EntityMetadata entity : page.getEntities()) {
417+
* System.out.println(entity.getEntityInstanceId());
418+
* }
419+
* }
420+
* }</pre>
421+
*
422+
* @param query the query filter criteria
423+
* @return a pageable iterable over all matching entities
424+
*/
425+
public EntityQueryPageable getEntities(EntityQuery query) {
426+
return new EntityQueryPageable(query, this::queryEntities);
427+
}
428+
429+
/**
430+
* Returns an auto-paginating iterable over all entity instances.
431+
* <p>
432+
* This is a convenience overload equivalent to {@code getEntities(new EntityQuery())}.
433+
*
434+
* @return a pageable iterable over all entities
435+
*/
436+
public EntityQueryPageable getEntities() {
437+
return getEntities(new EntityQuery());
438+
}
439+
399440
/**
400441
* Cleans up entity storage by removing empty entities and/or releasing orphaned locks.
401442
* <p>

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ private EntityMetadata toEntityMetadata(
526526
protoEntity.getBacklogQueueSize(),
527527
lockedBy,
528528
serializedState,
529+
protoEntity.hasSerializedState(),
529530
this.dataConverter);
530531
}
531532

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.grpc.Channel;
66

7+
import java.lang.reflect.InvocationTargetException;
78
import java.time.Duration;
89
import java.util.HashMap;
910
import java.util.Locale;
@@ -90,6 +91,90 @@ public DurableTaskGrpcWorkerBuilder addEntity(String name, TaskEntityFactory fac
9091
return this;
9192
}
9293

94+
/**
95+
* Registers an entity type for the constructed {@link DurableTaskGrpcWorker}.
96+
* <p>
97+
* The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
98+
* A new instance of the entity is created for each operation batch using reflection.
99+
* <p>
100+
* The entity name is derived from the simple class name of the provided type.
101+
*
102+
* @param entityClass the entity class to register; must implement {@link ITaskEntity}
103+
* @return this builder object
104+
* @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
105+
*/
106+
public DurableTaskGrpcWorkerBuilder addEntity(Class<? extends ITaskEntity> entityClass) {
107+
if (entityClass == null) {
108+
throw new IllegalArgumentException("entityClass must not be null.");
109+
}
110+
String name = entityClass.getSimpleName();
111+
return this.addEntity(name, entityClass);
112+
}
113+
114+
/**
115+
* Registers an entity type with a specific name for the constructed {@link DurableTaskGrpcWorker}.
116+
* <p>
117+
* The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
118+
* A new instance of the entity is created for each operation batch using reflection.
119+
*
120+
* @param name the name of the entity type
121+
* @param entityClass the entity class to register; must implement {@link ITaskEntity}
122+
* @return this builder object
123+
* @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
124+
*/
125+
public DurableTaskGrpcWorkerBuilder addEntity(String name, Class<? extends ITaskEntity> entityClass) {
126+
if (entityClass == null) {
127+
throw new IllegalArgumentException("entityClass must not be null.");
128+
}
129+
if (!ITaskEntity.class.isAssignableFrom(entityClass)) {
130+
throw new IllegalArgumentException(
131+
String.format("Type %s does not implement ITaskEntity.", entityClass.getName()));
132+
}
133+
return this.addEntity(name, () -> {
134+
try {
135+
return entityClass.getDeclaredConstructor().newInstance();
136+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
137+
throw new RuntimeException(
138+
String.format("Failed to create instance of entity type %s. Ensure it has a public no-argument constructor.", entityClass.getName()), e);
139+
}
140+
});
141+
}
142+
143+
/**
144+
* Registers an entity singleton for the constructed {@link DurableTaskGrpcWorker}.
145+
* <p>
146+
* The same entity instance is reused for every operation batch. This is useful for stateless entities
147+
* or entities that manage their own lifecycle.
148+
* <p>
149+
* The entity name is derived from the simple class name of the provided entity instance.
150+
*
151+
* @param entity the entity instance to register
152+
* @return this builder object
153+
*/
154+
public DurableTaskGrpcWorkerBuilder addEntity(ITaskEntity entity) {
155+
if (entity == null) {
156+
throw new IllegalArgumentException("entity must not be null.");
157+
}
158+
String name = entity.getClass().getSimpleName();
159+
return this.addEntity(name, () -> entity);
160+
}
161+
162+
/**
163+
* Registers an entity singleton with a specific name for the constructed {@link DurableTaskGrpcWorker}.
164+
* <p>
165+
* The same entity instance is reused for every operation batch.
166+
*
167+
* @param name the name of the entity type
168+
* @param entity the entity instance to register
169+
* @return this builder object
170+
*/
171+
public DurableTaskGrpcWorkerBuilder addEntity(String name, ITaskEntity entity) {
172+
if (entity == null) {
173+
throw new IllegalArgumentException("entity must not be null.");
174+
}
175+
return this.addEntity(name, () -> entity);
176+
}
177+
93178
/**
94179
* Sets the gRPC channel to use for communicating with the sidecar process.
95180
* <p>

client/src/main/java/com/microsoft/durabletask/EntityMetadata.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class EntityMetadata {
1414
private final int backlogQueueSize;
1515
private final String lockedBy;
1616
private final String serializedState;
17+
private final boolean includesState;
1718
private final DataConverter dataConverter;
1819

1920
/**
@@ -24,6 +25,7 @@ public final class EntityMetadata {
2425
* @param backlogQueueSize the number of operations waiting in the entity's backlog queue
2526
* @param lockedBy the orchestration instance ID that currently holds a lock on this entity, or {@code null}
2627
* @param serializedState the serialized entity state, or {@code null} if state was not fetched
28+
* @param includesState {@code true} if the state was requested and is included in this metadata
2729
* @param dataConverter the data converter used to deserialize state
2830
*/
2931
EntityMetadata(
@@ -32,12 +34,14 @@ public final class EntityMetadata {
3234
int backlogQueueSize,
3335
@Nullable String lockedBy,
3436
@Nullable String serializedState,
37+
boolean includesState,
3538
DataConverter dataConverter) {
3639
this.instanceId = instanceId;
3740
this.lastModifiedTime = lastModifiedTime;
3841
this.backlogQueueSize = backlogQueueSize;
3942
this.lockedBy = lockedBy;
4043
this.serializedState = serializedState;
44+
this.includesState = includesState;
4145
this.dataConverter = dataConverter;
4246
}
4347

@@ -98,6 +102,19 @@ public String getSerializedState() {
98102
return this.serializedState;
99103
}
100104

105+
/**
106+
* Gets whether this metadata response includes the entity state.
107+
* <p>
108+
* Queries can exclude the state of the entity from the metadata that is retrieved.
109+
* When this returns {@code false}, {@link #getSerializedState()} and {@link #readStateAs(Class)}
110+
* will return {@code null}.
111+
*
112+
* @return {@code true} if state was requested and included in this metadata
113+
*/
114+
public boolean isIncludesState() {
115+
return this.includesState;
116+
}
117+
101118
/**
102119
* Deserializes the entity state into an object of the specified type.
103120
*
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
import javax.annotation.Nullable;
6+
import java.util.Iterator;
7+
import java.util.List;
8+
import java.util.NoSuchElementException;
9+
import java.util.function.Function;
10+
11+
/**
12+
* An auto-paginating iterable over entity query results.
13+
* <p>
14+
* This class automatically handles pagination when iterating over entity metadata results.
15+
* It fetches pages from the store on demand and yields individual {@link EntityMetadata}
16+
* items to the caller.
17+
* <p>
18+
* Use {@link DurableTaskClient#getEntities(EntityQuery)} to obtain an instance of this class.
19+
*
20+
* <h3>Example: iterate over all entities</h3>
21+
* <pre>{@code
22+
* EntityQuery query = new EntityQuery()
23+
* .setInstanceIdStartsWith("counter")
24+
* .setIncludeState(true);
25+
*
26+
* for (EntityMetadata entity : client.getEntities(query)) {
27+
* System.out.println(entity.getEntityInstanceId());
28+
* }
29+
* }</pre>
30+
*
31+
* <h3>Example: iterate page by page</h3>
32+
* <pre>{@code
33+
* for (EntityQueryResult page : client.getEntities(query).byPage()) {
34+
* System.out.println("Got " + page.getEntities().size() + " entities");
35+
* for (EntityMetadata entity : page.getEntities()) {
36+
* System.out.println(entity.getEntityInstanceId());
37+
* }
38+
* }
39+
* }</pre>
40+
*/
41+
public final class EntityQueryPageable implements Iterable<EntityMetadata> {
42+
private final EntityQuery baseQuery;
43+
private final Function<EntityQuery, EntityQueryResult> queryExecutor;
44+
45+
/**
46+
* Creates a new {@code EntityQueryPageable}.
47+
*
48+
* @param baseQuery the base query parameters
49+
* @param queryExecutor the function that executes a single page query
50+
*/
51+
EntityQueryPageable(EntityQuery baseQuery, Function<EntityQuery, EntityQueryResult> queryExecutor) {
52+
this.baseQuery = baseQuery;
53+
this.queryExecutor = queryExecutor;
54+
}
55+
56+
/**
57+
* Returns an iterator over individual {@link EntityMetadata} items, automatically
58+
* fetching subsequent pages as needed.
59+
*
60+
* @return an iterator over all matching entities
61+
*/
62+
@Override
63+
public Iterator<EntityMetadata> iterator() {
64+
return new EntityItemIterator();
65+
}
66+
67+
/**
68+
* Returns an iterable over pages of results, where each page is an {@link EntityQueryResult}
69+
* containing a list of entities and an optional continuation token.
70+
*
71+
* @return an iterable over result pages
72+
*/
73+
public Iterable<EntityQueryResult> byPage() {
74+
return PageIterable::new;
75+
}
76+
77+
private class EntityItemIterator implements Iterator<EntityMetadata> {
78+
private String continuationToken = baseQuery.getContinuationToken();
79+
private Iterator<EntityMetadata> currentPageIterator;
80+
private boolean finished;
81+
82+
EntityItemIterator() {
83+
fetchNextPage();
84+
}
85+
86+
@Override
87+
public boolean hasNext() {
88+
while (true) {
89+
if (currentPageIterator != null && currentPageIterator.hasNext()) {
90+
return true;
91+
}
92+
if (finished) {
93+
return false;
94+
}
95+
fetchNextPage();
96+
}
97+
}
98+
99+
@Override
100+
public EntityMetadata next() {
101+
if (!hasNext()) {
102+
throw new NoSuchElementException();
103+
}
104+
return currentPageIterator.next();
105+
}
106+
107+
private void fetchNextPage() {
108+
if (finished) {
109+
return;
110+
}
111+
112+
EntityQuery pageQuery = cloneQuery(baseQuery);
113+
pageQuery.setContinuationToken(continuationToken);
114+
115+
EntityQueryResult result = queryExecutor.apply(pageQuery);
116+
List<EntityMetadata> entities = result.getEntities();
117+
118+
if (entities == null || entities.isEmpty()) {
119+
finished = true;
120+
currentPageIterator = null;
121+
return;
122+
}
123+
124+
currentPageIterator = entities.iterator();
125+
continuationToken = result.getContinuationToken();
126+
127+
if (continuationToken == null || continuationToken.isEmpty()) {
128+
finished = true;
129+
}
130+
}
131+
}
132+
133+
private class PageIterable implements Iterator<EntityQueryResult> {
134+
private String continuationToken = baseQuery.getContinuationToken();
135+
private boolean finished;
136+
private boolean firstPage = true;
137+
138+
@Override
139+
public boolean hasNext() {
140+
return !finished;
141+
}
142+
143+
@Override
144+
public EntityQueryResult next() {
145+
if (finished) {
146+
throw new NoSuchElementException();
147+
}
148+
149+
EntityQuery pageQuery = cloneQuery(baseQuery);
150+
if (!firstPage) {
151+
pageQuery.setContinuationToken(continuationToken);
152+
}
153+
firstPage = false;
154+
155+
EntityQueryResult result = queryExecutor.apply(pageQuery);
156+
continuationToken = result.getContinuationToken();
157+
158+
if (continuationToken == null || continuationToken.isEmpty()) {
159+
finished = true;
160+
}
161+
162+
return result;
163+
}
164+
}
165+
166+
private static EntityQuery cloneQuery(EntityQuery source) {
167+
EntityQuery clone = new EntityQuery();
168+
if (source.getInstanceIdStartsWith() != null) {
169+
// Use raw setter value since the source is already normalized
170+
clone.setInstanceIdStartsWith(source.getInstanceIdStartsWith());
171+
}
172+
clone.setLastModifiedFrom(source.getLastModifiedFrom());
173+
clone.setLastModifiedTo(source.getLastModifiedTo());
174+
clone.setIncludeState(source.isIncludeState());
175+
clone.setIncludeTransient(source.isIncludeTransient());
176+
clone.setPageSize(source.getPageSize());
177+
clone.setContinuationToken(source.getContinuationToken());
178+
return clone;
179+
}
180+
}

0 commit comments

Comments
 (0)