Skip to content

Commit 8bfff43

Browse files
committed
Support accepting queries and returning data from apollo
1 parent 4dbd471 commit 8bfff43

9 files changed

+254
-41
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
version = 4.7.1
1+
version = 5.0.0-SNAPSHOT
22
group = com.graphql-java

src/main/java/graphql/servlet/GraphQLObjectMapper.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.fasterxml.jackson.databind.SerializationFeature;
99
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
1010
import graphql.ExecutionResult;
11+
import graphql.ExecutionResultImpl;
12+
import graphql.GraphQLError;
1113
import graphql.servlet.internal.GraphQLRequest;
1214
import graphql.servlet.internal.VariablesDeserializer;
1315

@@ -51,18 +53,18 @@ private ObjectMapper createObjectMapper() {
5153
ObjectMapper mapper = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).registerModule(new Jdk8Module());
5254
objectMapperConfigurerSupplier.get().configure(mapper);
5355

56+
InjectableValues.Std injectableValues = new InjectableValues.Std();
57+
injectableValues.addValue(ObjectMapper.class, mapper);
58+
mapper.setInjectableValues(injectableValues);
59+
5460
return mapper;
5561
}
5662

5763
/**
58-
* Creates an {@link ObjectReader} for deserializing {@link GraphQLRequest}
64+
* @return an {@link ObjectReader} for deserializing {@link GraphQLRequest}
5965
*/
6066
public ObjectReader getGraphQLRequestMapper() {
61-
// Add object mapper to injection so VariablesDeserializer can access it...
62-
InjectableValues.Std injectableValues = new InjectableValues.Std();
63-
injectableValues.addValue(ObjectMapper.class, getJacksonMapper());
64-
65-
return getJacksonMapper().reader(injectableValues).forType(GraphQLRequest.class);
67+
return getJacksonMapper().reader().forType(GraphQLRequest.class);
6668
}
6769

6870
public GraphQLRequest readGraphQLRequest(InputStream inputStream) throws IOException {
@@ -103,15 +105,35 @@ public String serializeResultAsJson(ExecutionResult executionResult) {
103105
}
104106
}
105107

106-
public Map<String, Object> createResultFromExecutionResult(ExecutionResult executionResult) {
108+
public boolean areErrorsPresent(ExecutionResult executionResult) {
109+
return graphQLErrorHandlerSupplier.get().errorsPresent(executionResult.getErrors());
110+
}
111+
112+
public ExecutionResult sanitizeErrors(ExecutionResult executionResult) {
113+
Object data = executionResult.getData();
114+
Map<Object, Object> extensions = executionResult.getExtensions();
115+
List<GraphQLError> errors = executionResult.getErrors();
107116

108117
GraphQLErrorHandler errorHandler = graphQLErrorHandlerSupplier.get();
118+
if(errorHandler.errorsPresent(errors)) {
119+
errors = errorHandler.processErrors(errors);
120+
} else {
121+
errors = null;
122+
}
123+
124+
return new ExecutionResultImpl(data, errors, extensions);
125+
}
126+
127+
public Map<String, Object> createResultFromExecutionResult(ExecutionResult executionResult) {
128+
return convertSanitizedExecutionResult(sanitizeErrors(executionResult));
129+
}
109130

131+
public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult) {
110132
final Map<String, Object> result = new LinkedHashMap<>();
111133
result.put("data", executionResult.getData());
112134

113-
if (errorHandler.errorsPresent(executionResult.getErrors())) {
114-
result.put("errors", errorHandler.processErrors(executionResult.getErrors()));
135+
if (areErrorsPresent(executionResult)) {
136+
result.put("errors", executionResult.getErrors());
115137
}
116138

117139
if(executionResult.getExtensions() != null){

src/main/java/graphql/servlet/GraphQLWebsocketServlet.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import graphql.servlet.internal.ApolloSubscriptionProtocolFactory;
44
import graphql.servlet.internal.FallbackSubscriptionProtocolFactory;
5+
import graphql.servlet.internal.SubscriptionHandlerInput;
56
import graphql.servlet.internal.SubscriptionProtocolFactory;
67
import graphql.servlet.internal.SubscriptionProtocolHandler;
78
import graphql.servlet.internal.WsSessionSubscriptions;
@@ -49,11 +50,13 @@ public class GraphQLWebsocketServlet extends Endpoint {
4950
private final GraphQLQueryInvoker queryInvoker;
5051
private final GraphQLInvocationInputFactory invocationInputFactory;
5152
private final GraphQLObjectMapper graphQLObjectMapper;
53+
private final SubscriptionHandlerInput subscriptionHandlerInput;
5254

5355
public GraphQLWebsocketServlet(GraphQLQueryInvoker queryInvoker, GraphQLInvocationInputFactory invocationInputFactory, GraphQLObjectMapper graphQLObjectMapper) {
5456
this.queryInvoker = queryInvoker;
5557
this.invocationInputFactory = invocationInputFactory;
5658
this.graphQLObjectMapper = graphQLObjectMapper;
59+
this.subscriptionHandlerInput = new SubscriptionHandlerInput(invocationInputFactory, queryInvoker, graphQLObjectMapper);
5760
}
5861

5962
@Override
@@ -105,13 +108,13 @@ private void closeUnexpectedly(Session session, Throwable t) {
105108
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
106109
sec.getUserProperties().put(HANDSHAKE_REQUEST_KEY, request);
107110

108-
List<String> accept = request.getHeaders().get(HandshakeResponse.SEC_WEBSOCKET_ACCEPT);
109-
if(accept == null) {
110-
accept = Collections.emptyList();
111+
List<String> protocol = request.getHeaders().get(HandshakeRequest.SEC_WEBSOCKET_PROTOCOL);
112+
if(protocol == null) {
113+
protocol = Collections.emptyList();
111114
}
112115

113-
SubscriptionProtocolFactory subscriptionProtocolFactory = getSubscriptionProtocolFactory(accept);
114-
sec.getUserProperties().put(PROTOCOL_HANDLER_REQUEST_KEY, subscriptionProtocolFactory.createHandler(invocationInputFactory, queryInvoker, graphQLObjectMapper));
116+
SubscriptionProtocolFactory subscriptionProtocolFactory = getSubscriptionProtocolFactory(protocol);
117+
sec.getUserProperties().put(PROTOCOL_HANDLER_REQUEST_KEY, subscriptionProtocolFactory.createHandler(subscriptionHandlerInput));
115118

116119
if(request.getHeaders().get(HandshakeResponse.SEC_WEBSOCKET_ACCEPT) != null) {
117120
response.getHeaders().put(HandshakeResponse.SEC_WEBSOCKET_ACCEPT, allSubscriptionProtocols);
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package graphql.servlet.internal;
22

3-
import graphql.servlet.GraphQLInvocationInputFactory;
4-
import graphql.servlet.GraphQLObjectMapper;
5-
import graphql.servlet.GraphQLQueryInvoker;
6-
73
/**
84
* @author Andrew Potter
95
*/
@@ -13,7 +9,7 @@ public ApolloSubscriptionProtocolFactory() {
139
}
1410

1511
@Override
16-
public SubscriptionProtocolHandler createHandler(GraphQLInvocationInputFactory invocationInputFactory, GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQLObjectMapper) {
17-
return new ApolloSubscriptionProtocolHandler();
12+
public SubscriptionProtocolHandler createHandler(SubscriptionHandlerInput subscriptionHandlerInput) {
13+
return new ApolloSubscriptionProtocolHandler(subscriptionHandlerInput);
1814
}
1915
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,188 @@
11
package graphql.servlet.internal;
22

3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonInclude;
5+
import com.fasterxml.jackson.annotation.JsonValue;
6+
import graphql.ExecutionResult;
7+
import graphql.servlet.GraphQLObjectMapper;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.Subscriber;
10+
import org.reactivestreams.Subscription;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
314
import javax.websocket.Session;
415
import javax.websocket.server.HandshakeRequest;
16+
import java.io.IOException;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.concurrent.atomic.AtomicReference;
520

621
/**
722
* @author Andrew Potter
823
*/
924
public class ApolloSubscriptionProtocolHandler implements SubscriptionProtocolHandler {
25+
26+
private static final Logger log = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler.class);
27+
28+
private final SubscriptionHandlerInput input;
29+
30+
public ApolloSubscriptionProtocolHandler(SubscriptionHandlerInput subscriptionHandlerInput) {
31+
this.input = subscriptionHandlerInput;
32+
}
33+
1034
@Override
1135
public void onMessage(HandshakeRequest request, Session session, String text) {
36+
OperationMessage message;
37+
try {
38+
message = input.getGraphQLObjectMapper().getJacksonMapper().readValue(text, OperationMessage.class);
39+
} catch(Throwable t) {
40+
log.warn("Error parsing message", t);
41+
sendMessage(session, OperationMessage.Type.GQL_CONNECTION_ERROR, null);
42+
return;
43+
}
44+
45+
switch(message.getType()) {
46+
case GQL_CONNECTION_INIT:
47+
sendMessage(session, OperationMessage.Type.GQL_CONNECTION_ACK, message.getId());
48+
// sendMessage(session, OperationMessage.Type.GQL_CONNECTION_KEEP_ALIVE, message.getId());
49+
break;
50+
51+
case GQL_START:
52+
handleSubscriptionStart(
53+
session,
54+
message.id,
55+
input.getQueryInvoker().query(input.getInvocationInputFactory().create(
56+
input.getGraphQLObjectMapper().getJacksonMapper().convertValue(message.payload, GraphQLRequest.class)
57+
))
58+
);
59+
break;
60+
}
61+
}
62+
63+
@SuppressWarnings("unchecked")
64+
private void handleSubscriptionStart(Session session, String id, ExecutionResult executionResult) {
65+
executionResult = input.getGraphQLObjectMapper().sanitizeErrors(executionResult);
66+
OperationMessage.Type type = input.getGraphQLObjectMapper().areErrorsPresent(executionResult) ? OperationMessage.Type.GQL_ERROR : OperationMessage.Type.GQL_DATA;
67+
68+
Object data = executionResult.getData();
69+
if(data instanceof Publisher) {
70+
if(type == OperationMessage.Type.GQL_DATA) {
71+
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
72+
73+
((Publisher<ExecutionResult>) data).subscribe(new Subscriber<ExecutionResult>() {
74+
@Override
75+
public void onSubscribe(Subscription subscription) {
76+
subscriptionReference.set(subscription);
77+
subscriptionReference.get().request(1);
78+
}
79+
80+
@Override
81+
public void onNext(ExecutionResult executionResult) {
82+
subscriptionReference.get().request(1);
83+
Map<String, Object> result = new HashMap<>();
84+
result.put("data", executionResult.getData());
85+
sendMessage(session, OperationMessage.Type.GQL_DATA, id, result);
86+
}
87+
88+
@Override
89+
public void onError(Throwable throwable) {
90+
log.error("Subscription error", throwable);
91+
sendMessage(session, OperationMessage.Type.GQL_ERROR, id);
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
sendMessage(session, OperationMessage.Type.GQL_COMPLETE, id);
97+
}
98+
});
99+
}
100+
}
101+
102+
sendMessage(session, type, id, input.getGraphQLObjectMapper().convertSanitizedExecutionResult(executionResult));
103+
}
104+
105+
private void sendMessage(Session session, OperationMessage.Type type, String id) {
106+
sendMessage(session, type, id, null);
107+
}
108+
109+
private void sendMessage(Session session, OperationMessage.Type type, String id, Object payload) {
110+
try {
111+
session.getBasicRemote().sendText(input.getGraphQLObjectMapper().getJacksonMapper().writeValueAsString(
112+
new OperationMessage(type, id, payload)
113+
));
114+
} catch (IOException e) {
115+
throw new RuntimeException("Error sending subscription response", e);
116+
}
117+
}
118+
119+
@JsonInclude(JsonInclude.Include.NON_NULL)
120+
public static class OperationMessage {
121+
private Type type;
122+
private String id;
123+
private Object payload;
124+
125+
public OperationMessage() {
126+
}
127+
128+
public OperationMessage(Type type, String id, Object payload) {
129+
this.type = type;
130+
this.id = id;
131+
this.payload = payload;
132+
}
133+
134+
public Type getType() {
135+
return type;
136+
}
137+
138+
public String getId() {
139+
return id;
140+
}
141+
142+
public Object getPayload() {
143+
return payload;
144+
}
12145

146+
public enum Type {
147+
148+
// Server Messages
149+
GQL_CONNECTION_ACK("connection_ack"),
150+
GQL_CONNECTION_ERROR("connection_error"),
151+
GQL_CONNECTION_KEEP_ALIVE("ka"),
152+
GQL_DATA("data"),
153+
GQL_ERROR("error"),
154+
GQL_COMPLETE("complete"),
155+
156+
// Client Messages
157+
GQL_CONNECTION_INIT("connection_init"),
158+
GQL_CONNECTION_TERMINATE("connection_terminate"),
159+
GQL_START("start"),
160+
GQL_STOP("stop");
161+
162+
private static final Map<String, Type> reverseLookup = new HashMap<>();
163+
164+
static {
165+
for(Type type: Type.values()) {
166+
reverseLookup.put(type.getType(), type);
167+
}
168+
}
169+
170+
private final String type;
171+
172+
Type(String type) {
173+
this.type = type;
174+
}
175+
176+
@JsonCreator
177+
public static Type findType(String type) {
178+
return reverseLookup.get(type);
179+
}
180+
181+
@JsonValue
182+
public String getType() {
183+
return type;
184+
}
185+
}
13186
}
187+
14188
}

src/main/java/graphql/servlet/internal/FallbackSubscriptionProtocolFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public FallbackSubscriptionProtocolFactory() {
1313
}
1414

1515
@Override
16-
public SubscriptionProtocolHandler createHandler(GraphQLInvocationInputFactory invocationInputFactory, GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQLObjectMapper) {
17-
return new FallbackSubscriptionProtocolHandler(queryInvoker, invocationInputFactory, graphQLObjectMapper);
16+
public SubscriptionProtocolHandler createHandler(SubscriptionHandlerInput subscriptionHandlerInput) {
17+
return new FallbackSubscriptionProtocolHandler(subscriptionHandlerInput);
1818
}
1919
}
Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package graphql.servlet.internal;
22

3-
import graphql.servlet.GraphQLInvocationInputFactory;
4-
import graphql.servlet.GraphQLObjectMapper;
5-
import graphql.servlet.GraphQLQueryInvoker;
6-
73
import javax.websocket.Session;
84
import javax.websocket.server.HandshakeRequest;
95

@@ -12,20 +8,16 @@
128
*/
139
public class FallbackSubscriptionProtocolHandler implements SubscriptionProtocolHandler {
1410

15-
private final GraphQLQueryInvoker queryInvoker;
16-
private final GraphQLInvocationInputFactory invocationInputFactory;
17-
private final GraphQLObjectMapper graphQLObjectMapper;
11+
private final SubscriptionHandlerInput input;
1812

19-
public FallbackSubscriptionProtocolHandler(GraphQLQueryInvoker queryInvoker, GraphQLInvocationInputFactory invocationInputFactory, GraphQLObjectMapper graphQLObjectMapper) {
20-
this.queryInvoker = queryInvoker;
21-
this.invocationInputFactory = invocationInputFactory;
22-
this.graphQLObjectMapper = graphQLObjectMapper;
13+
public FallbackSubscriptionProtocolHandler(SubscriptionHandlerInput subscriptionHandlerInput) {
14+
this.input = subscriptionHandlerInput;
2315
}
2416

2517
@Override
2618
public void onMessage(HandshakeRequest request, Session session, String text) throws Exception {
27-
session.getBasicRemote().sendText(graphQLObjectMapper.serializeResultAsJson(
28-
queryInvoker.query(invocationInputFactory.create(graphQLObjectMapper.readGraphQLRequest(text), request))
19+
session.getBasicRemote().sendText(input.getGraphQLObjectMapper().serializeResultAsJson(
20+
input.getQueryInvoker().query(input.getInvocationInputFactory().create(input.getGraphQLObjectMapper().readGraphQLRequest(text), request))
2921
));
3022
}
3123
}

0 commit comments

Comments
 (0)