Skip to content

Commit ff5f43f

Browse files
javier-aliagasalaboysiri-varma
authored
feat: Add failure policy to actor reminders (dapr#1643)
* feat: Add failure policy to actor reminders Signed-off-by: Javier Aliaga <javier@diagrid.io> * chore: Use jackson to serialize/deserialize FailurePolicies Signed-off-by: Javier Aliaga <javier@diagrid.io> --------- Signed-off-by: Javier Aliaga <javier@diagrid.io> Co-authored-by: salaboy <Salaboy@gmail.com> Co-authored-by: Siri Varma Vegiraju <siri.varma@outlook.com>
1 parent 60f01d9 commit ff5f43f

10 files changed

Lines changed: 212 additions & 34 deletions

File tree

sdk-actors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
<artifactId>dapr-sdk</artifactId>
2727
<version>${project.version}</version>
2828
</dependency>
29+
<dependency>
30+
<groupId>com.fasterxml.jackson.datatype</groupId>
31+
<artifactId>jackson-datatype-jsr310</artifactId>
32+
</dependency>
2933
<dependency>
3034
<groupId>org.mockito</groupId>
3135
<artifactId>mockito-core</artifactId>

sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import com.fasterxml.jackson.core.JsonFactory;
1717
import com.fasterxml.jackson.core.JsonGenerator;
1818
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
1920
import io.dapr.client.ObjectSerializer;
21+
import io.dapr.client.domain.FailurePolicy;
2022
import io.dapr.utils.DurationUtils;
2123

2224
import java.io.ByteArrayOutputStream;
@@ -33,6 +35,11 @@ public class ActorObjectSerializer extends ObjectSerializer {
3335
*/
3436
private static final JsonFactory JSON_FACTORY = new JsonFactory();
3537

38+
static {
39+
// Configure OBJECT_MAPPER to handle Java 8 time types
40+
OBJECT_MAPPER.registerModule(new JavaTimeModule());
41+
}
42+
3643
/**
3744
* {@inheritDoc}
3845
*/
@@ -99,12 +106,20 @@ private byte[] serialize(ActorTimerParams timer) throws IOException {
99106
private byte[] serialize(ActorReminderParams reminder) throws IOException {
100107
try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) {
101108
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
109+
generator.setCodec(OBJECT_MAPPER);
102110
generator.writeStartObject();
103111
generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(reminder.getDueTime()));
104112
generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(reminder.getPeriod()));
105113
if (reminder.getData() != null) {
106114
generator.writeBinaryField("data", reminder.getData());
107115
}
116+
117+
// serialize failure policy
118+
if (reminder.getFailurePolicy() != null) {
119+
generator.writeObjectField("failurePolicy", reminder.getFailurePolicy());
120+
}
121+
122+
108123
generator.writeEndObject();
109124
generator.close();
110125
writer.flush();
@@ -243,7 +258,15 @@ private ActorReminderParams deserializeActorReminder(byte[] value) throws IOExce
243258
Duration period = extractDurationOrNull(node, "period");
244259
byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null;
245260

246-
return new ActorReminderParams(data, dueTime, period);
261+
// Handle failure policy if present
262+
JsonNode failurePolicyNode = node.get("failurePolicy");
263+
FailurePolicy failurePolicy = null;
264+
if (failurePolicyNode != null) {
265+
failurePolicy = OBJECT_MAPPER.treeToValue(failurePolicyNode, FailurePolicy.class);
266+
}
267+
268+
269+
return new ActorReminderParams(data, dueTime, period, failurePolicy);
247270
}
248271

249272
/**

sdk-actors/src/main/java/io/dapr/actors/runtime/ActorReminderParams.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.dapr.actors.runtime;
1515

16+
import io.dapr.client.domain.FailurePolicy;
17+
1618
import java.time.Duration;
1719

1820
/**
@@ -40,6 +42,11 @@ final class ActorReminderParams {
4042
*/
4143
private final Duration period;
4244

45+
/**
46+
* Failure Policy.
47+
*/
48+
private FailurePolicy failurePolicy;
49+
4350
/**
4451
* Instantiates a new instance for the params of a reminder.
4552
*
@@ -55,6 +62,18 @@ final class ActorReminderParams {
5562
this.period = period;
5663
}
5764

65+
/**
66+
* Instantiates a new instance for the params of a reminder.
67+
*
68+
* @param data Data to be passed in as part of the reminder trigger.
69+
* @param dueTime Time the reminder is due for the 1st time.
70+
* @param period Interval between triggers.
71+
*/
72+
ActorReminderParams(byte[] data, Duration dueTime, Duration period, FailurePolicy failurePolicy) {
73+
this(data, dueTime, period);
74+
this.failurePolicy = failurePolicy;
75+
}
76+
5877
/**
5978
* Gets the time the reminder is due for the 1st time.
6079
*
@@ -109,4 +128,13 @@ private static void validatePeriod(String argName, Duration value) throws Illega
109128
throw new IllegalArgumentException(message);
110129
}
111130
}
131+
132+
/**
133+
* Gets the failure policy.
134+
*
135+
* @return the failure policy
136+
*/
137+
public FailurePolicy getFailurePolicy() {
138+
return failurePolicy;
139+
}
112140
}

sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.List;
3434
import java.util.concurrent.ExecutionException;
3535

36+
import static io.dapr.utils.FailurePolicyUtils.getJobFailurePolicy;
37+
3638
/**
3739
* A DaprClient over HTTP for Actor's runtime.
3840
*/
@@ -145,15 +147,21 @@ public Mono<Void> registerReminder(
145147
String actorId,
146148
String reminderName,
147149
ActorReminderParams reminderParams) {
148-
DaprActorsProtos.RegisterActorReminderRequest req =
150+
151+
var builder =
149152
DaprActorsProtos.RegisterActorReminderRequest.newBuilder()
150153
.setActorType(actorType)
151154
.setActorId(actorId)
152155
.setName(reminderName)
153156
.setData(ByteString.copyFrom(reminderParams.getData()))
154157
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
155-
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
156-
.build();
158+
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()));
159+
160+
if (reminderParams.getFailurePolicy() != null) {
161+
builder.setFailurePolicy(getJobFailurePolicy(reminderParams.getFailurePolicy()));
162+
}
163+
164+
DaprActorsProtos.RegisterActorReminderRequest req = builder.build();
157165
return Mono.<Empty>create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then();
158166
}
159167

sdk-actors/src/test/java/io/dapr/actors/runtime/ActorReminderParamsTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.dapr.actors.runtime;
1515

16+
import io.dapr.client.domain.ConstantFailurePolicy;
17+
import io.dapr.client.domain.DropFailurePolicy;
1618
import org.junit.jupiter.api.Assertions;
1719
import org.junit.jupiter.api.Test;
1820

@@ -75,4 +77,69 @@ public void withState() {
7577
Assertions.assertEquals(original.getDueTime(), recreated.getDueTime());
7678
Assertions.assertEquals(original.getPeriod(), recreated.getPeriod());
7779
}
80+
81+
@Test
82+
public void withDropFailurePolicy() {
83+
ActorReminderParams original = new ActorReminderParams("maru".getBytes(),
84+
Duration.ZERO.plusMinutes(2),
85+
Duration.ZERO.plusMinutes(5),
86+
new DropFailurePolicy());
87+
ActorReminderParams recreated = null;
88+
try {
89+
byte[] serialized = SERIALIZER.serialize(original);
90+
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
91+
} catch (Exception e) {
92+
System.out.println("The error is: " + e);
93+
Assertions.fail();
94+
}
95+
96+
Assertions.assertArrayEquals(original.getData(), recreated.getData());
97+
Assertions.assertEquals(original.getDueTime(), recreated.getDueTime());
98+
Assertions.assertEquals(original.getPeriod(), recreated.getPeriod());
99+
Assertions.assertEquals(original.getFailurePolicy().getFailurePolicyType(), recreated.getFailurePolicy().getFailurePolicyType());
100+
}
101+
102+
@Test
103+
public void withConstantRetryFailurePolicy() {
104+
ActorReminderParams original = new ActorReminderParams("maru".getBytes(),
105+
Duration.ZERO.plusMinutes(2),
106+
Duration.ZERO.plusMinutes(5),
107+
new ConstantFailurePolicy(4));
108+
ActorReminderParams recreated = null;
109+
try {
110+
byte[] serialized = SERIALIZER.serialize(original);
111+
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
112+
} catch (Exception e) {
113+
System.out.println("The error is: " + e);
114+
Assertions.fail();
115+
}
116+
117+
Assertions.assertArrayEquals(original.getData(), recreated.getData());
118+
Assertions.assertEquals(original.getDueTime(), recreated.getDueTime());
119+
Assertions.assertEquals(original.getPeriod(), recreated.getPeriod());
120+
Assertions.assertEquals(original.getFailurePolicy().getFailurePolicyType(), recreated.getFailurePolicy().getFailurePolicyType());
121+
Assertions.assertEquals(((ConstantFailurePolicy) original.getFailurePolicy()).getMaxRetries(), ((ConstantFailurePolicy) recreated.getFailurePolicy()).getMaxRetries());
122+
}
123+
124+
@Test
125+
public void withConstantIntervalFailurePolicy() {
126+
ActorReminderParams original = new ActorReminderParams("maru".getBytes(),
127+
Duration.ZERO.plusMinutes(2),
128+
Duration.ZERO.plusMinutes(5),
129+
new ConstantFailurePolicy(Duration.ofSeconds(4)));
130+
ActorReminderParams recreated = null;
131+
try {
132+
byte[] serialized = SERIALIZER.serialize(original);
133+
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
134+
} catch (Exception e) {
135+
System.out.println("The error is: " + e);
136+
Assertions.fail();
137+
}
138+
139+
Assertions.assertArrayEquals(original.getData(), recreated.getData());
140+
Assertions.assertEquals(original.getDueTime(), recreated.getDueTime());
141+
Assertions.assertEquals(original.getPeriod(), recreated.getPeriod());
142+
Assertions.assertEquals(original.getFailurePolicy().getFailurePolicyType(), recreated.getFailurePolicy().getFailurePolicyType());
143+
Assertions.assertEquals(((ConstantFailurePolicy) original.getFailurePolicy()).getDurationBetweenRetries(), ((ConstantFailurePolicy) recreated.getFailurePolicy()).getDurationBetweenRetries());
144+
}
78145
}

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import io.dapr.client.domain.EncryptRequestAlpha1;
5959
import io.dapr.client.domain.ExecuteStateTransactionRequest;
6060
import io.dapr.client.domain.FailurePolicy;
61-
import io.dapr.client.domain.FailurePolicyType;
6261
import io.dapr.client.domain.GetBulkSecretRequest;
6362
import io.dapr.client.domain.GetBulkStateRequest;
6463
import io.dapr.client.domain.GetConfigurationRequest;
@@ -100,6 +99,7 @@
10099
import io.dapr.serializer.DaprObjectSerializer;
101100
import io.dapr.serializer.DefaultObjectSerializer;
102101
import io.dapr.utils.DefaultContentTypeConverter;
102+
import io.dapr.utils.FailurePolicyUtils;
103103
import io.dapr.utils.TypeRef;
104104
import io.dapr.v1.CommonProtos;
105105
import io.dapr.v1.DaprAiProtos;
@@ -1422,10 +1422,9 @@ public Mono<Void> scheduleJob(ScheduleJobRequest scheduleJobRequest) {
14221422
}
14231423

14241424
if (scheduleJobRequest.getFailurePolicy() != null) {
1425-
jobBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
1425+
jobBuilder.setFailurePolicy(FailurePolicyUtils.getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
14261426
}
14271427

1428-
14291428
Mono<DaprJobsProtos.ScheduleJobResponse> scheduleJobResponseMono =
14301429
Mono.deferContextual(context -> this.createMono(
14311430
it -> intercept(context, asyncStub)
@@ -1504,32 +1503,6 @@ private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailu
15041503
ChronoUnit.NANOS));
15051504
}
15061505

1507-
private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
1508-
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();
1509-
1510-
if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
1511-
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
1512-
return jobFailurePolicyBuilder.build();
1513-
}
1514-
1515-
CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
1516-
CommonProtos.JobFailurePolicyConstant.newBuilder();
1517-
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;
1518-
1519-
if (jobConstantFailurePolicy.getMaxRetries() != null) {
1520-
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
1521-
}
1522-
1523-
if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
1524-
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
1525-
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
1526-
}
1527-
1528-
jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());
1529-
1530-
return jobFailurePolicyBuilder.build();
1531-
}
1532-
15331506
/**
15341507
* {@inheritDoc}
15351508
*/

sdk/src/main/java/io/dapr/client/domain/ConstantFailurePolicy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,28 @@
1313

1414
package io.dapr.client.domain;
1515

16+
import com.fasterxml.jackson.annotation.JsonTypeName;
17+
1618
import java.time.Duration;
1719

1820
/**
1921
* A failure policy that applies a constant retry interval for job retries.
2022
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
2123
* with a constant delay between each retry attempt.
2224
*/
25+
@JsonTypeName("CONSTANT")
2326
public class ConstantFailurePolicy implements FailurePolicy {
2427

2528
private Integer maxRetries;
2629
private Duration durationBetweenRetries;
2730

31+
/**
32+
* Default constructor.
33+
*/
34+
public ConstantFailurePolicy() {
35+
36+
}
37+
2838
/**
2939
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
3040
*

sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313

1414
package io.dapr.client.domain;
1515

16+
import com.fasterxml.jackson.annotation.JsonTypeName;
17+
1618
/**
1719
* A failure policy that drops the job upon failure without retrying.
1820
* This implementation of {@link FailurePolicy} immediately discards failed jobs
1921
* instead of retrying them.
2022
*/
23+
@JsonTypeName("DROP")
2124
public class DropFailurePolicy implements FailurePolicy {
2225

2326
/**

sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@
1313

1414
package io.dapr.client.domain;
1515

16+
import com.fasterxml.jackson.annotation.JsonSubTypes;
17+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
18+
1619
/**
17-
* Set a failure policy for the job.
20+
* Set a failure policy for the job or reminder.
1821
*/
22+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
23+
@JsonSubTypes({
24+
@JsonSubTypes.Type(value = DropFailurePolicy.class, name = "DROP"),
25+
@JsonSubTypes.Type(value = ConstantFailurePolicy.class, name = "CONSTANT")
26+
})
1927
public interface FailurePolicy {
2028
FailurePolicyType getFailurePolicyType();
2129
}

0 commit comments

Comments
 (0)