Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class ObPartitionLocation {
private ReplicaLocation leader;
Expand Down Expand Up @@ -90,10 +91,9 @@ public ReplicaLocation getReplica(ObReadConsistency consistencyLevel,
}

private ReplicaLocation getReadReplicaNoLdc(ObRoutePolicy routePolicy) {
for (ReplicaLocation r : replicas) {
if (r.isValid() && !r.isLeader()) {
return r;
}
ReplicaLocation follower = getRandomFollowerFromList(replicas);
if (follower != null) {
return follower;
}
if (routePolicy == ObRoutePolicy.FOLLOWER_ONLY) {
throw new IllegalArgumentException("No follower replica found for route policy: "
Expand All @@ -107,25 +107,22 @@ private ReplicaLocation getReadReplicaByRoutePolicy(ObRoutePolicy routePolicy)
// 路由策略优先:FOLLOWER_FIRST 优先选择 follower,FOLLOWER_ONLY 只能选择 follower
// 在满足路由策略的前提下,按就近原则选择(同机房 -> 同 region -> 其他 region)

// 优先在同机房找 follower
for (ReplicaLocation r : sameIdc) {
if (r.isValid() && !r.isLeader()) {
return r;
}
// 优先在同机房找 follower(随机选择以避免热点)
ReplicaLocation follower = getRandomFollowerFromList(sameIdc);
if (follower != null) {
return follower;
}

// 如果同机房没有 follower,在同 region 找 follower
for (ReplicaLocation r : sameRegion) {
if (r.isValid() && !r.isLeader()) {
return r;
}
// 如果同机房没有 follower,在同 region 找 follower(随机选择以避免热点)
follower = getRandomFollowerFromList(sameRegion);
if (follower != null) {
return follower;
}

// 如果同 region 没有 follower,在其他 region 找 follower
for (ReplicaLocation r : otherRegion) {
if (r.isValid() && !r.isLeader()) {
return r;
}
// 如果同 region 没有 follower,在其他 region 找 follower(随机选择以避免热点)
follower = getRandomFollowerFromList(otherRegion);
if (follower != null) {
return follower;
}

// 如果都没有找到 follower
Expand All @@ -139,6 +136,33 @@ private ReplicaLocation getReadReplicaByRoutePolicy(ObRoutePolicy routePolicy)
return leader;
}

/**
* 从列表中随机选择一个有效的 follower,避免热点问题
*
* @param locations replica 列表
* @return 随机选择的 follower,如果没有有效的 follower 则返回 null
*/
private ReplicaLocation getRandomFollowerFromList(List<ReplicaLocation> locations) {
if (locations == null || locations.isEmpty()) {
return null;
}

// 收集所有有效的 follower
List<ReplicaLocation> validFollowers = new ArrayList<>();
for (ReplicaLocation r : locations) {
if (r.isValid() && !r.isLeader()) {
validFollowers.add(r);
}
}

if (validFollowers.isEmpty()) {
return null;
}

// 随机选择一个 follower
return validFollowers.get(ThreadLocalRandom.current().nextInt(validFollowers.size()));
}

/*
* Classify Replica for weak read, according to Server LDC location.
* Synchronized to avoid duplicate initialization.
Expand Down