Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ store=hugegraph
pd.peers=$PD_PEERS_LIST$

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder {
public static final ConfigOption<String> SERVER_ID =
new ConfigOption<>(
"server.id",
"The id of hugegraph-server.",
disallowEmpty(),
"server-1"
"The id of hugegraph-server, auto-generated if not specified.",
null,
""
);
public static final ConfigOption<String> SERVER_ROLE =
new ConfigOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.cache.CacheManager;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.store.AbstractBackendStoreProvider;
import org.apache.hugegraph.backend.store.BackendStoreInfo;
import org.apache.hugegraph.config.ConfigOption;
Expand All @@ -68,6 +67,7 @@
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.exception.ExistedException;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.io.HugeGraphSONModule;
import org.apache.hugegraph.k8s.K8sDriver;
Expand Down Expand Up @@ -195,8 +195,6 @@ public final class GraphManager {
public GraphManager(HugeConfig conf, EventHub hub) {
LOG.info("Init graph manager");
E.checkArgumentNotNull(conf, "The config can't be null");
String server = conf.get(ServerOptions.SERVER_ID);
String role = conf.get(ServerOptions.SERVER_ROLE);

this.config = conf;
this.url = conf.get(ServerOptions.REST_SERVER_URL);
Expand All @@ -206,10 +204,6 @@ public GraphManager(HugeConfig conf, EventHub hub) {
conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S);
this.startIgnoreSingleGraphError = conf.get(
ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR);
E.checkArgument(server != null && !server.isEmpty(),
"The server name can't be null or empty");
E.checkArgument(role != null && !role.isEmpty(),
"The server role can't be null or empty");
this.graphsDir = conf.get(ServerOptions.GRAPHS);
this.cluster = conf.get(ServerOptions.CLUSTER);
this.graphSpaces = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -1557,6 +1551,9 @@ private void loadGraph(String name, String graphConfPath) {
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

this.transferPdPeersConfig(config);

this.transferRoleWorkerConfig(config);

Graph graph = GraphFactory.open(config);
Expand All @@ -1575,6 +1572,19 @@ private void loadGraph(String name, String graphConfPath) {
}
}

private void transferPdPeersConfig(HugeConfig config) {
if (config.containsKey(CoreOptions.PD_PEERS.name())) {
return;
}

String backend = config.get(CoreOptions.BACKEND);
boolean needPdPeers = this.conf.get(ServerOptions.USE_PD) ||
"hstore".equals(backend);
if (needPdPeers) {
config.addProperty(CoreOptions.PD_PEERS.name(), this.pdPeers);
}
}

private void transferRoleWorkerConfig(HugeConfig config) {
config.setProperty(RoleElectionOptions.NODE_EXTERNAL_URL.name(),
this.conf.get(ServerOptions.REST_SERVER_URL));
Expand Down Expand Up @@ -1635,23 +1645,14 @@ private void checkBackendVersionOrExit(HugeConfig config) {
}

private void initNodeRole() {
String id = config.get(ServerOptions.SERVER_ID);
boolean enableRoleElection = config.get(
ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
E.checkArgument(!enableRoleElection,
"The server.role_election is no longer supported");

String role = config.get(ServerOptions.SERVER_ROLE);
E.checkArgument(StringUtils.isNotEmpty(id),
"The server name can't be null or empty");
E.checkArgument(StringUtils.isNotEmpty(role),
"The server role can't be null or empty");

NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
boolean supportRoleElection = !nodeRole.computer() &&
this.supportRoleElection() &&
config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
if (supportRoleElection) {
// Init any server as Worker role, then do role election
nodeRole = NodeRole.WORKER;
}

this.globalNodeRoleInfo.initNodeId(IdGenerator.of(id));
this.globalNodeRoleInfo.initNodeRole(nodeRole);
}

Expand Down Expand Up @@ -1937,26 +1938,29 @@ public Set<String> getServiceUrls(String graphSpace, String service,
public HugeGraph graph(String graphSpace, String name) {
String key = String.join(DELIMITER, graphSpace, name);
Graph graph = this.graphs.get(key);
if (graph == null && isPDEnabled()) {
Map<String, Map<String, Object>> configs =
this.metaManager.graphConfigs(graphSpace);
// If current server registered graph space is not DEFAULT, only load graph creation
// under registered graph space
if (!configs.containsKey(key) ||
(!"DEFAULT".equals(this.serviceGraphSpace) &&
!graphSpace.equals(this.serviceGraphSpace))) {
return null;
if (graph == null) {
if (isPDEnabled()) {
Map<String, Map<String, Object>> configs =
this.metaManager.graphConfigs(graphSpace);
// If current server registered graph space is not DEFAULT, only load graph creation
// under registered graph space
if (!configs.containsKey(key) ||
(!"DEFAULT".equals(this.serviceGraphSpace) &&
!graphSpace.equals(this.serviceGraphSpace))) {
return null;
}
Map<String, Object> config = configs.get(key);
String creator = String.valueOf(config.get("creator"));
Date createTime = parseDate(config.get("create_time"));
Date updateTime = parseDate(config.get("update_time"));
HugeGraph graph1 = this.createGraph(graphSpace, name,
creator, config, false);
graph1.createTime(createTime);
graph1.updateTime(updateTime);
this.graphs.put(key, graph1);
return graph1;
}
Map<String, Object> config = configs.get(key);
String creator = String.valueOf(config.get("creator"));
Date createTime = parseDate(config.get("create_time"));
Date updateTime = parseDate(config.get("update_time"));
HugeGraph graph1 = this.createGraph(graphSpace, name,
creator, config, false);
graph1.createTime(createTime);
graph1.updateTime(updateTime);
this.graphs.put(key, graph1);
return graph1;
throw new NotFoundException(String.format("Graph '%s' does not exist", name));
} else if (graph instanceof HugeGraph) {
return (HugeGraph) graph;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph {
private final BackendStoreProvider storeProvider;
private final TinkerPopTransaction tx;
private final RamTable ramtable;
private final String schedulerType;
private volatile boolean started;
private volatile boolean closed;
private volatile GraphMode mode;
Expand Down Expand Up @@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) {
this.closed = false;
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);

LockUtil.init(this.spaceGraphName());

Expand Down Expand Up @@ -315,6 +313,7 @@ public String backend() {
return this.storeProvider.type();
}

@Override
public BackendStoreInfo backendStoreInfo() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
// TODO: pass storeProvider.metaStore()
Expand All @@ -332,11 +331,10 @@ public void serverStarted(GlobalMasterInfo nodeInfo) {
LOG.info("Init system info for graph '{}'", this.spaceGraphName());
this.initSystemInfo();

LOG.info("Init server info [{}-{}] for graph '{}'...",
nodeInfo.nodeId(), nodeInfo.nodeRole(), this.spaceGraphName());
this.serverInfoManager().initServerInfo(nodeInfo);

this.initRoleStateMachine(nodeInfo.nodeId());
if (nodeInfo != null && nodeInfo.nodeId() != null) {
this.serverInfoManager().initServerInfo(nodeInfo);
this.initRoleStateMachine(nodeInfo.nodeId());
}

// TODO: check necessary?
LOG.info("Check olap property-key tables for graph '{}'", this.spaceGraphName());
Expand Down Expand Up @@ -465,6 +463,7 @@ public void updateTime(Date updateTime) {
this.updateTime = updateTime;
}

@Override
public void waitStarted() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
Expand All @@ -481,9 +480,7 @@ public void initBackend() {
try {
this.storeProvider.init();
/*
* NOTE: The main goal is to write the serverInfo to the central
* node, such as etcd, and also create the system schema in memory,
* which has no side effects
* NOTE: Create system schema in memory, which has no side effects.
*/
this.initSystemInfo();
} finally {
Expand Down Expand Up @@ -524,8 +521,7 @@ public void truncateBackend() {
LockUtil.lock(this.spaceGraphName(), LockUtil.GRAPH_LOCK);
try {
this.storeProvider.truncate();
// TODO: remove this after serverinfo saved in etcd
this.serverStarted(this.serverInfoManager().globalNodeRoleInfo());
this.serverStarted(null);
} finally {
LockUtil.unlock(this.spaceGraphName(), LockUtil.GRAPH_LOCK);
}
Expand All @@ -547,7 +543,6 @@ public KvStore kvStore() {
public void initSystemInfo() {
try {
this.taskScheduler().init();
this.serverInfoManager().init();
this.authManager().init();
} finally {
this.closeTx();
Expand Down Expand Up @@ -1632,7 +1627,9 @@ public <T> void submitEphemeralJob(EphemeralJob<T> job) {

@Override
public String schedulerType() {
return StandardHugeGraph.this.schedulerType;
// Use distributed scheduler for hstore backend, otherwise use local
// After the merger of rocksdb and hstore, consider whether to change this logic
return StandardHugeGraph.this.isHstore() ? "distributed" : "local";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ This changes scheduler selection from an explicit config to a backend-only rule, so an existing non-HStore deployment that set task.scheduler_type=distributed will silently fall back to local after upgrade. HugeConfig only warns for an unregistered option, and TaskManager.addScheduler() still trusts only graph.schedulerType() here, so there is no migration guard or fail-fast path. Please either keep the old option as an override during migration, or fail fast when the removed option is still configured, and add a compatibility regression test for that upgrade path.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder {
rangeInt(1, 500),
1
);
public static final ConfigOption<String> SCHEDULER_TYPE =
new ConfigOption<>(
"task.scheduler_type",
"The type of scheduler used in distribution system.",
allowValues("local", "distributed"),
"local"
);

public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
new ConfigOption<>(
"task.sync_deletion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.E;

// TODO: rename to GlobalNodeRoleInfo
// TODO: We need to completely delete the startup of master-worker
public final class GlobalMasterInfo {

private static final NodeInfo NO_MASTER = new NodeInfo(false, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.hugegraph.masterelection;

import java.util.Objects;

import org.apache.hugegraph.task.TaskManager;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import java.util.Objects;

Comment on lines 20 to +25
public class StandardRoleListener implements RoleListener {

private static final Logger LOG = Log.logger(StandardRoleListener.class);
Expand All @@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener {
public StandardRoleListener(TaskManager taskManager,
GlobalMasterInfo roleInfo) {
this.taskManager = taskManager;
this.taskManager.enableRoleElection();
this.roleInfo = roleInfo;
this.selfIsMaster = false;
}
Expand Down
Loading
Loading