Skip to content

Commit 97af90d

Browse files
committed
根据watch信息拉取指定应用、服务名的实例信息
1 parent c8b77b3 commit 97af90d

File tree

5 files changed

+89
-7
lines changed

5 files changed

+89
-7
lines changed

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
import java.util.List;
2121

2222
import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import com.fasterxml.jackson.databind.JsonNode;
27+
import com.fasterxml.jackson.databind.ObjectMapper;
2328

2429
public abstract class DiscoveryEvents {
2530
public static class InstanceChangedEvent extends DiscoveryEvents {
@@ -52,6 +57,46 @@ public List<MicroserviceInstance> getInstances() {
5257
* internal events to ask for a immediate instance pull
5358
*/
5459
public static class PullInstanceEvent extends DiscoveryEvents {
60+
private static final Logger LOGGER = LoggerFactory.getLogger(PullInstanceEvent.class);
61+
62+
private final String appId;
63+
64+
private final String serviceName;
65+
66+
public PullInstanceEvent(String message) {
67+
JsonNode messageNode = parseJsonString(message);
68+
this.appId = getContextFromNode(messageNode, "appId");
69+
this.serviceName = getContextFromNode(messageNode, "serviceName");
70+
}
71+
72+
public String getAppId() {
73+
return appId;
74+
}
5575

76+
public String getServiceName() {
77+
return serviceName;
78+
}
79+
80+
private JsonNode parseJsonString(String message) {
81+
try {
82+
ObjectMapper objectMapper = new ObjectMapper();
83+
return objectMapper.readTree(message);
84+
} catch (Exception e) {
85+
LOGGER.error("parse message [{}] failed!", message, e);
86+
return null;
87+
}
88+
}
89+
90+
private String getContextFromNode(JsonNode messageNode, String itemKey) {
91+
if (messageNode == null) {
92+
return "";
93+
}
94+
try {
95+
return messageNode.get("key").get(itemKey).asText();
96+
} catch (Exception e) {
97+
LOGGER.error("get [{}] context from node [{}] failed!", itemKey, e);
98+
return "";
99+
}
100+
}
56101
}
57102
}

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,7 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro
237237

238238
@Override
239239
public FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
240-
String versionRule,
241-
String revision) {
240+
String versionRule, String revision) {
242241
try {
243242
Map<String, String> headers = new HashMap<>();
244243
headers.put("X-ConsumerId", consumerId);

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Random;
2525
import java.util.concurrent.ConcurrentHashMap;
2626

27+
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.servicecomb.http.client.task.AbstractTask;
2829
import org.apache.servicecomb.http.client.task.Task;
2930
import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent;
@@ -151,7 +152,45 @@ public void onPullInstanceEvent(PullInstanceEvent event) {
151152
return;
152153
}
153154
pullInstanceTaskOnceInProgress = true;
154-
startTask(new PullInstanceOnceTask());
155+
if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) {
156+
// If the application or service name cannot be resolved, pulled all services.
157+
startTask(new PullInstanceOnceTask());
158+
return;
159+
}
160+
try {
161+
pullTargetService(event);
162+
} finally {
163+
pullInstanceTaskOnceInProgress = false;
164+
}
165+
}
166+
167+
private void pullTargetService(PullInstanceEvent event) {
168+
SubscriptionKey currentKey = new SubscriptionKey(event.getAppId(), event.getServiceName());
169+
if (instancesCache.get(currentKey) == null) {
170+
// No pull during the service startup phase.
171+
return;
172+
}
173+
if (LOGGER.isDebugEnabled()) {
174+
LOGGER.debug("pull [{}#{}] instances from service center", event.getAppId(), event.getServiceName());
175+
}
176+
String originRev = instancesCache.get(currentKey).revision;
177+
for (int i = 1; i <= 3; i++) {
178+
try {
179+
pullInstance(currentKey, instancesCache.get(currentKey), true);
180+
String currentRev = instancesCache.get(currentKey).revision;
181+
if (LOGGER.isDebugEnabled()) {
182+
LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev);
183+
}
184+
if (!originRev.equals(currentRev)) {
185+
break;
186+
}
187+
int positive = random.nextInt(300);
188+
int sign = random.nextBoolean() ? 1 : -1;
189+
Thread.sleep(i * 1000 + sign * positive);
190+
} catch (Exception e) {
191+
LOGGER.error("pull [{}#{}] instances failed.", event.getAppId(), event.getServiceName(), e);
192+
}
193+
}
155194
}
156195

157196
private void pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) {
@@ -250,7 +289,7 @@ private static String instanceToString(List<MicroserviceInstance> instances) {
250289
sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
251290
sb.append("|");
252291
}
253-
sb.append(instance.getServiceName());
292+
sb.append(instance.getStatus());
254293
sb.append("|");
255294
}
256295
sb.append("#");

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ public interface ServiceCenterOperation {
114114
* @throws OperationException If some problems happened to contact service center or non http 200 returned.n
115115
*/
116116
FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
117-
String versionRule,
118-
String revision);
117+
String versionRule, String revision);
119118

120119
/**
121120
* Delete a microservice instance

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private void backOff() {
177177
@Override
178178
public void onMessage(String s) {
179179
LOGGER.info("web socket receive message [{}], start query instance", s);
180-
this.eventBus.post(new PullInstanceEvent());
180+
this.eventBus.post(new PullInstanceEvent(s));
181181
}
182182

183183
@Override

0 commit comments

Comments
 (0)