Skip to content

Commit 745dc18

Browse files
committed
re-work flow to allow short-circuiting
1 parent 0cb5a2d commit 745dc18

File tree

1 file changed

+47
-64
lines changed

1 file changed

+47
-64
lines changed

src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java

Lines changed: 47 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -127,45 +127,51 @@ private List<String> performFullResolution(List<String> names) {
127127
// Build node-to-datanode map for O(1) colocated lookups
128128
Map<String, String> nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes);
129129

130+
List<String> topologies = new ArrayList<>();
130131
// Resolve masqueraded IPs to nodes: do this before inspecting possible listener-
131132
// or other pod-IPs as we don't want to mistakenly treat a masqueraded IP as a
132-
// cache-miss
133-
List<String> resolvedNames = tryResolveNodes(names, nodeToDatanodeIp);
134-
135-
// Resolve dataNode listeners to datanode IPs
136-
resolvedNames = tryResolveListeners(resolvedNames);
133+
// cache-miss. Examine each name in a loop, so we have the chance to short-circuit.
134+
for (String name : names) {
135+
String datanodeIp = tryNodeOrListenerOrPod(name, nodeToDatanodeIp, podLabels);
136+
// Build topology strings and cache results
137+
String topology = buildAndCacheTopology(name, datanodeIp, podLabels, nodeLabels);
138+
topologies.add(topology);
139+
}
137140

138-
// Step : Resolve client pods to co-located dataNodes
139-
List<String> datanodeIps =
140-
tryResolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp);
141+
return topologies;
142+
}
141143

142-
// Step : Build topology strings and cache results
143-
// IP-masquerading can mean that the advertised IP is either a client Pod's node IP,
144-
// or an IP that is nto easily associated with a node (e.g. if the veth interface is
145-
// used).
146-
return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels);
144+
private String tryNodeOrListenerOrPod(
145+
String name,
146+
Map<String, String> nodeToDatanodeIp,
147+
Map<String, Map<String, String>> podLabels) {
148+
String dataNodeIp = nodeToDatanodeIp.get(name);
149+
if (dataNodeIp != null) {
150+
return dataNodeIp;
151+
} else {
152+
String resolvedListener = tryResolveListener(name);
153+
if (resolvedListener != null) {
154+
return resolvedListener;
155+
} else {
156+
return tryResolveClientPodToDataNode(name, podLabels, nodeToDatanodeIp);
157+
}
158+
}
147159
}
148160

149-
private List<String> buildAndCacheTopology(
150-
List<String> originalNames,
151-
List<String> datanodeIps,
161+
private String buildAndCacheTopology(
162+
String originalName,
163+
String datanodeIp,
152164
Map<String, Map<String, String>> podLabels,
153165
Map<String, Map<String, String>> nodeLabels) {
154-
List<String> result = new ArrayList<>();
155-
for (int i = 0; i < datanodeIps.size(); i++) {
156-
String datanodeIp = datanodeIps.get(i);
157-
String originalName = originalNames.get(i);
158166

159-
String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels);
160-
result.add(topology);
167+
String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels);
161168

162-
// Cache both the resolved IP and original name
163-
cache.putTopology(datanodeIp, topology);
164-
cache.putTopology(originalName, topology);
165-
}
169+
// Cache both the resolved IP and original name
170+
cache.putTopology(datanodeIp, topology);
171+
cache.putTopology(originalName, topology);
166172

167-
LOG.info("Built topology: {}", result);
168-
return result;
173+
LOG.info("Built topology: {}", topology);
174+
return topology;
169175
}
170176

171177
// ============================================================================
@@ -189,25 +195,6 @@ private List<Pod> fetchDataNodes() {
189195
return dataNodes;
190196
}
191197

192-
// ============================================================================
193-
// NODE RESOLUTION
194-
// ============================================================================
195-
196-
private List<String> tryResolveNodes(List<String> names, Map<String, String> nodeToDatanodeIp) {
197-
List<String> result = new ArrayList<>();
198-
199-
for (String name : names) {
200-
String dataNodeIp = nodeToDatanodeIp.get(name);
201-
if (dataNodeIp == null) {
202-
result.add(name);
203-
} else {
204-
LOG.debug("Returning dataNode {} for {}", name, dataNodeIp);
205-
result.add(dataNodeIp);
206-
}
207-
}
208-
return result;
209-
}
210-
211198
// ============================================================================
212199
// LISTENER RESOLUTION
213200
// ============================================================================
@@ -246,20 +233,18 @@ private String getListenerVersion() {
246233
}
247234
}
248235

249-
private List<String> tryResolveListeners(List<String> names) {
250-
refreshListenerCacheIfNeeded(names);
236+
private String tryResolveListener(String name) {
237+
refreshListenerCacheIfNeeded(name);
251238

252-
return names.stream().map(this::tryResolveListenerToDatanode).collect(Collectors.toList());
239+
return tryResolveListenerToDatanode(name);
253240
}
254241

255-
private void refreshListenerCacheIfNeeded(List<String> names) {
256-
List<String> missingNames =
257-
names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList());
258-
259-
if (missingNames.isEmpty()) {
242+
private void refreshListenerCacheIfNeeded(String name) {
243+
if (cache.getListener(name) != null) {
260244
LOG.debug("Listener cache contains all required entries");
261245
return;
262246
}
247+
263248
// Listeners are typically few, so fetch all
264249
LOG.debug("Fetching all listeners to populate cache");
265250
if (listenerVersion == null) {
@@ -339,21 +324,19 @@ private GenericKubernetesResourceList fetchListeners(String listenerVersion) {
339324
// CLIENT POD RESOLUTION
340325
// ============================================================================
341326

342-
private List<String> tryResolveClientPodsToDataNodes(
343-
List<String> names,
327+
private String tryResolveClientPodToDataNode(
328+
String name,
344329
Map<String, Map<String, String>> podLabels,
345330
Map<String, String> nodeToDatanodeIp) {
346331

347-
refreshPodCacheIfNeeded(names);
332+
refreshPodCacheIfNeeded(name);
348333

349-
return names.stream()
350-
.map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp))
351-
.collect(Collectors.toList());
334+
return resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp);
352335
}
353336

354-
private void refreshPodCacheIfNeeded(List<String> names) {
355-
if (cache.hasAllPods(names)) {
356-
LOG.debug("Pod cache contains all required entries");
337+
private void refreshPodCacheIfNeeded(String name) {
338+
if (cache.getPod(name) != null) {
339+
LOG.debug("Pod cache contains entry");
357340
return;
358341
}
359342

0 commit comments

Comments
 (0)