Skip to content

Commit d51f5a6

Browse files
committed
Add support to gRPC
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent a93c52b commit d51f5a6

34 files changed

Lines changed: 1601 additions & 2 deletions

impl/grpc/pom.xml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-grpc</artifactId>
9+
<name>Serverless Workflow :: Impl :: gRPC</name>
10+
11+
<dependencies>
12+
<dependency>
13+
<groupId>io.serverlessworkflow</groupId>
14+
<artifactId>serverlessworkflow-impl-core</artifactId>
15+
</dependency>
16+
<dependency>
17+
<groupId>io.serverlessworkflow</groupId>
18+
<artifactId>serverlessworkflow-api</artifactId>
19+
</dependency>
20+
<dependency>
21+
<groupId>io.serverlessworkflow</groupId>
22+
<artifactId>serverlessworkflow-impl-jackson</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.grpc</groupId>
26+
<artifactId>grpc-stub</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.google.protobuf</groupId>
30+
<artifactId>protobuf-java-util</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.github.os72</groupId>
34+
<artifactId>protoc-jar</artifactId>
35+
</dependency>
36+
<dependency>
37+
<groupId>io.grpc</groupId>
38+
<artifactId>grpc-protobuf</artifactId>
39+
</dependency>
40+
</dependencies>
41+
</project>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.google.protobuf.DescriptorProtos;
19+
20+
public record FileDescriptorContext(
21+
DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.github.os72.protocjar.Protoc;
19+
import com.google.protobuf.DescriptorProtos;
20+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.UncheckedIOException;
24+
import java.nio.file.Files;
25+
import java.nio.file.Path;
26+
import java.nio.file.StandardCopyOption;
27+
import java.util.Optional;
28+
29+
public class FileDescriptorReader {
30+
31+
public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
32+
Path grpcDir =
33+
tryCreateTempGrpcDir()
34+
.orElseThrow(
35+
() -> new IllegalStateException("Could not create temporary gRPC directory"));
36+
37+
try (InputStream inputStream = externalResourceHandler.open()) {
38+
39+
Path protoFile = grpcDir.resolve(externalResourceHandler.name());
40+
if (!Files.exists(protoFile)) {
41+
Files.createDirectories(protoFile);
42+
}
43+
44+
Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING);
45+
46+
Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
47+
48+
try {
49+
50+
generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
51+
52+
DescriptorProtos.FileDescriptorSet fileDescriptorSet =
53+
DescriptorProtos.FileDescriptorSet.newBuilder()
54+
.mergeFrom(Files.readAllBytes(descriptorOutput))
55+
.build();
56+
57+
return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());
58+
59+
} catch (IOException e) {
60+
throw new UncheckedIOException(
61+
"Unable to read external resource handler: " + externalResourceHandler.name(), e);
62+
}
63+
} catch (IOException e) {
64+
throw new UncheckedIOException("Unable to read descriptor file", e);
65+
}
66+
}
67+
68+
private Optional<Path> tryCreateTempGrpcDir() {
69+
try {
70+
return Optional.of(Files.createTempDirectory("serverless-workflow-"));
71+
} catch (IOException e) {
72+
throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
73+
}
74+
}
75+
76+
/**
77+
* Calls protoc binary with <code>--descriptor_set_out=</code> option set.
78+
*
79+
* @param grpcDir a temporary directory
80+
* @param protoFile the .proto file used by <code>protoc</code> to generate the file descriptor
81+
* @param descriptorOutput the output directory where the descriptor file will be generated
82+
*/
83+
private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
84+
String[] protocArgs =
85+
new String[] {
86+
"--include_imports",
87+
"--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
88+
"-I",
89+
grpcDir.toAbsolutePath().toString(),
90+
protoFile.toAbsolutePath().toString()
91+
};
92+
93+
try {
94+
95+
int status = Protoc.runProtoc(protocArgs);
96+
97+
if (status != 0) {
98+
throw new RuntimeException(
99+
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
100+
}
101+
} catch (InterruptedException e) {
102+
Thread.currentThread().interrupt();
103+
throw new RuntimeException("Unable to generate file descriptor", e);
104+
} catch (IOException e) {
105+
throw new UncheckedIOException("Unable to generate file descriptor", e);
106+
}
107+
}
108+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.grpc.Channel;
19+
import io.grpc.ManagedChannelBuilder;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowApplication;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
24+
public class GrpcChannelResolver {
25+
26+
public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider";
27+
28+
public static Channel channel(
29+
WorkflowContext workflowContext,
30+
TaskContext taskContext,
31+
GrpcRequestContext grpcRequestContext) {
32+
WorkflowApplication appl = workflowContext.definition().application();
33+
return appl.<Channel>additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext)
34+
.orElseGet(
35+
() ->
36+
ManagedChannelBuilder.forAddress(
37+
grpcRequestContext.address(), grpcRequestContext.port())
38+
.usePlaintext()
39+
.build());
40+
}
41+
}

0 commit comments

Comments
 (0)