Skip to content

Commit e9ac801

Browse files
committed
[TRaft]Raft Optimization for Time series Workloads
1 parent 3b6aa45 commit e9ac801

27 files changed

Lines changed: 2251 additions & 4 deletions

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class RouteBalancer implements IClusterStatusSubscriber {
8787
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
8888
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
8989
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
90+
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
91+
&& ConsensusFactory.TRAFT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
9092
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
9193
&& ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
9294
// The simple consensus protocol will always automatically designate itself as the leader
@@ -200,6 +202,7 @@ private void balanceRegionLeader(
200202
newLeaderId);
201203
switch (consensusProtocolClass) {
202204
case ConsensusFactory.IOT_CONSENSUS:
205+
case ConsensusFactory.TRAFT_CONSENSUS:
203206
case ConsensusFactory.SIMPLE_CONSENSUS:
204207
// For IoTConsensus or SimpleConsensus protocol, change
205208
// RegionRouteMap is enough

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
6969
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
7070
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
71+
import static org.apache.iotdb.consensus.ConsensusFactory.TRAFT_CONSENSUS;
7172

7273
public class RegionMaintainHandler {
7374

@@ -148,7 +149,8 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio
148149
List<TDataNodeLocation> currentPeerNodes;
149150
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
150151
&& (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())
151-
|| IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) {
152+
|| IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())
153+
|| TRAFT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()))) {
152154
// parameter of createPeer for MultiLeader should be all peers
153155
currentPeerNodes = new ArrayList<>(regionReplicaNodes);
154156
currentPeerNodes.add(destDataNode);

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ConsensusFactory {
3737
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
3838
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
3939
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
40+
public static final String TRAFT_CONSENSUS = "org.apache.iotdb.consensus.traft.TRaftConsensus";
4041
public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus";
4142
public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2";
4243
public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch";

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ public interface IConsensusRequest {
3636
*/
3737
ByteBuffer serializeToByteBuffer();
3838

39+
default boolean hasTime() {
40+
return false;
41+
}
42+
43+
/**
44+
* Return the primary timestamp carried by this request.
45+
*
46+
* <p>Callers should check {@link #hasTime()} before calling this method.
47+
*/
48+
default long getTime() {
49+
throw new UnsupportedOperationException(
50+
String.format("%s does not carry timestamp", getClass().getName()));
51+
}
52+
3953
default long getMemorySize() {
4054
// return 0 by default
4155
return 0;

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class ConsensusConfig {
3333
private final RatisConfig ratisConfig;
3434
private final IoTConsensusConfig iotConsensusConfig;
3535
private final PipeConsensusConfig pipeConsensusConfig;
36+
private final TRaftConfig tRaftConfig;
3637

3738
private ConsensusConfig(
3839
TEndPoint thisNode,
@@ -41,14 +42,16 @@ private ConsensusConfig(
4142
TConsensusGroupType consensusGroupType,
4243
RatisConfig ratisConfig,
4344
IoTConsensusConfig iotConsensusConfig,
44-
PipeConsensusConfig pipeConsensusConfig) {
45+
PipeConsensusConfig pipeConsensusConfig,
46+
TRaftConfig tRaftConfig) {
4547
this.thisNodeEndPoint = thisNode;
4648
this.thisNodeId = thisNodeId;
4749
this.storageDir = storageDir;
4850
this.consensusGroupType = consensusGroupType;
4951
this.ratisConfig = ratisConfig;
5052
this.iotConsensusConfig = iotConsensusConfig;
5153
this.pipeConsensusConfig = pipeConsensusConfig;
54+
this.tRaftConfig = tRaftConfig;
5255
}
5356

5457
public TEndPoint getThisNodeEndPoint() {
@@ -79,6 +82,10 @@ public PipeConsensusConfig getPipeConsensusConfig() {
7982
return pipeConsensusConfig;
8083
}
8184

85+
public TRaftConfig getTRaftConfig() {
86+
return tRaftConfig;
87+
}
88+
8289
public static ConsensusConfig.Builder newBuilder() {
8390
return new ConsensusConfig.Builder();
8491
}
@@ -92,6 +99,7 @@ public static class Builder {
9299
private RatisConfig ratisConfig;
93100
private IoTConsensusConfig iotConsensusConfig;
94101
private PipeConsensusConfig pipeConsensusConfig;
102+
private TRaftConfig tRaftConfig;
95103

96104
public ConsensusConfig build() {
97105
return new ConsensusConfig(
@@ -103,7 +111,8 @@ public ConsensusConfig build() {
103111
Optional.ofNullable(iotConsensusConfig)
104112
.orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
105113
Optional.ofNullable(pipeConsensusConfig)
106-
.orElseGet(() -> PipeConsensusConfig.newBuilder().build()));
114+
.orElseGet(() -> PipeConsensusConfig.newBuilder().build()),
115+
Optional.ofNullable(tRaftConfig).orElseGet(() -> TRaftConfig.newBuilder().build()));
107116
}
108117

109118
public Builder setThisNode(TEndPoint thisNode) {
@@ -140,5 +149,10 @@ public Builder setPipeConsensusConfig(PipeConsensusConfig pipeConsensusConfig) {
140149
this.pipeConsensusConfig = pipeConsensusConfig;
141150
return this;
142151
}
152+
153+
public Builder setTRaftConfig(TRaftConfig tRaftConfig) {
154+
this.tRaftConfig = tRaftConfig;
155+
return this;
156+
}
143157
}
144158
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.consensus.config;
21+
22+
import java.util.Optional;
23+
24+
public class TRaftConfig {
25+
26+
private final Replication replication;
27+
private final Election election;
28+
29+
private TRaftConfig(Replication replication, Election election) {
30+
this.replication = replication;
31+
this.election = election;
32+
}
33+
34+
public Replication getReplication() {
35+
return replication;
36+
}
37+
38+
public Election getElection() {
39+
return election;
40+
}
41+
42+
public static Builder newBuilder() {
43+
return new Builder();
44+
}
45+
46+
public static class Builder {
47+
48+
private Replication replication;
49+
private Election election;
50+
51+
public Builder setReplication(Replication replication) {
52+
this.replication = replication;
53+
return this;
54+
}
55+
56+
public Builder setElection(Election election) {
57+
this.election = election;
58+
return this;
59+
}
60+
61+
public TRaftConfig build() {
62+
return new TRaftConfig(
63+
Optional.ofNullable(replication).orElseGet(() -> Replication.newBuilder().build()),
64+
Optional.ofNullable(election).orElseGet(() -> Election.newBuilder().build()));
65+
}
66+
}
67+
68+
public static class Replication {
69+
70+
private final int maxPendingRetryEntriesPerFollower;
71+
private final long waitingReplicationTimeMs;
72+
73+
private Replication(int maxPendingRetryEntriesPerFollower, long waitingReplicationTimeMs) {
74+
this.maxPendingRetryEntriesPerFollower = maxPendingRetryEntriesPerFollower;
75+
this.waitingReplicationTimeMs = waitingReplicationTimeMs;
76+
}
77+
78+
public int getMaxPendingRetryEntriesPerFollower() {
79+
return maxPendingRetryEntriesPerFollower;
80+
}
81+
82+
public long getWaitingReplicationTimeMs() {
83+
return waitingReplicationTimeMs;
84+
}
85+
86+
public static Builder newBuilder() {
87+
return new Builder();
88+
}
89+
90+
public static class Builder {
91+
92+
private int maxPendingRetryEntriesPerFollower = 100_000;
93+
private long waitingReplicationTimeMs = 1L;
94+
95+
public Builder setMaxPendingRetryEntriesPerFollower(int maxPendingRetryEntriesPerFollower) {
96+
this.maxPendingRetryEntriesPerFollower = maxPendingRetryEntriesPerFollower;
97+
return this;
98+
}
99+
100+
public Builder setWaitingReplicationTimeMs(long waitingReplicationTimeMs) {
101+
this.waitingReplicationTimeMs = waitingReplicationTimeMs;
102+
return this;
103+
}
104+
105+
public Replication build() {
106+
return new Replication(maxPendingRetryEntriesPerFollower, waitingReplicationTimeMs);
107+
}
108+
}
109+
}
110+
111+
public static class Election {
112+
113+
private final long randomSeed;
114+
115+
private Election(long randomSeed) {
116+
this.randomSeed = randomSeed;
117+
}
118+
119+
public long getRandomSeed() {
120+
return randomSeed;
121+
}
122+
123+
public static Builder newBuilder() {
124+
return new Builder();
125+
}
126+
127+
public static class Builder {
128+
129+
private long randomSeed = System.nanoTime();
130+
131+
public Builder setRandomSeed(long randomSeed) {
132+
this.randomSeed = randomSeed;
133+
return this;
134+
}
135+
136+
public Election build() {
137+
return new Election(randomSeed);
138+
}
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)