Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.bolt.protocol;

import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace;
import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.InvokeContext;
Expand All @@ -43,6 +44,9 @@ public class ObTablePacket implements RemotingCommand {
private int transportCode;
private String message;
private Throwable cause;

// 时间追踪
private ObTableTimeTrace timeTrace;

/**
* Decode packet header.
Expand Down Expand Up @@ -205,6 +209,20 @@ public void setCause(Throwable cause) {
this.cause = cause;
}

/*
* Get time trace.
*/
public ObTableTimeTrace getTimeTrace() {
return timeTrace;
}

/*
* Set time trace.
*/
public void setTimeTrace(ObTableTimeTrace timeTrace) {
this.timeTrace = timeTrace;
}

// TODO useless for now

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.bolt.protocol;

import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace;
import com.alipay.oceanbase.rpc.util.Serialization;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import com.alipay.remoting.CommandEncoder;
Expand Down Expand Up @@ -48,6 +49,12 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr
* -----------------------------------
*/
ObTablePacket cmd = (ObTablePacket) msg;

// Record when EventLoop starts encoding (reflects queue wait time)
ObTableTimeTrace timeTrace = cmd.getTimeTrace();
if (timeTrace != null) {
timeTrace.markNettyEncodeStart();
}

// 1. header
out.writeBytes(ObTableProtocol.MAGIC_HEADER_FLAG);
Expand All @@ -57,6 +64,11 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr

// 2. payload
out.writeBytes(cmd.getPacketContent());

// Record when encoding is complete
if (timeTrace != null) {
timeTrace.markNettyEncodeEnd();
}

} else {
String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of ObCommand";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,27 @@ public class ObClientFuture implements InvokeFuture {
private static int BY_BACKGROUND = 2;

private AtomicInteger releaseFlag = new AtomicInteger(INIT);

// 响应接收时间
private long responseReceivedTime;

// 时间追踪
private ObTableTimeTrace timeTrace;

/*
* Ob client future.
*/
public ObClientFuture(int channelId) {
this.channelId = channelId;
}

/*
* Ob client future with time trace.
*/
public ObClientFuture(int channelId, ObTableTimeTrace timeTrace) {
this.channelId = channelId;
this.timeTrace = timeTrace;
}

/*
* Wait response.
Expand Down Expand Up @@ -87,6 +101,12 @@ public RemotingCommand waitResponse() throws InterruptedException {
*/
@Override
public void putResponse(RemotingCommand response) {
// 记录响应接收时间
this.responseReceivedTime = System.currentTimeMillis();
// 更新时间追踪
if (timeTrace != null) {
timeTrace.markResponseReceived();
}
this.response = response;
waiter.countDown();
if (!releaseFlag.compareAndSet(INIT, BY_WORKER)) {
Expand All @@ -96,6 +116,27 @@ public void putResponse(RemotingCommand response) {
}
}

/*
* Get response received time.
*/
public long getResponseReceivedTime() {
return responseReceivedTime;
}

/*
* Get time trace.
*/
public ObTableTimeTrace getTimeTrace() {
return timeTrace;
}

/*
* Set time trace.
*/
public void setTimeTrace(ObTableTimeTrace timeTrace) {
this.timeTrace = timeTrace;
}

/*
* Invoke id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public void init() throws Exception {
}

private boolean connect() throws Exception {
return connect(obTable.getObTableConnectTimeout());
}

private boolean connect(int connectTimeoutMs) throws Exception {
if (checkAvailable()) { // double check status available
return false;
}
Expand All @@ -106,7 +110,7 @@ private boolean connect() throws Exception {
for (; tries < maxTryTimes; tries++) {
try {
connection = obTable.getConnectionFactory().createConnection(obTable.getIp(),
obTable.getPort(), obTable.getObTableConnectTimeout());
obTable.getPort(), connectTimeoutMs);
break;
} catch (Exception e) {
cause = e;
Expand Down Expand Up @@ -243,29 +247,29 @@ public void checkStatus() throws Exception {
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
reconnect("Check connection failed for address: " + connection.getUrl());
}
if (!connection.getChannel().isWritable()) {
LOGGER.warn("The connection might be write overflow : " + connection.getUrl());
// Wait some interval for the case when a big package is blocking the buffer but server is ok.
// Don't bother to call flush() here as we invoke writeAndFlush() when send request.
Thread.sleep(obTable.getNettyBlockingWaitInterval());
if (!connection.getChannel().isWritable()) {
throw new ObTableConnectionUnWritableException(
"Check connection failed for address: " + connection.getUrl()
+ ", maybe write overflow!");
}
}
}

public void reConnectAndLogin(String msg) throws ObTableException {
reConnectAndLogin(msg, obTable.getObTableConnectTimeout());
}

/**
* Reconnect and login with a specified connect timeout.
* This is useful for background tasks that need a longer timeout to ensure connection success.
*
* @param msg the reconnect reason
* @param connectTimeoutMs the connection timeout in milliseconds
*/
public void reConnectAndLogin(String msg, int connectTimeoutMs) throws ObTableException {
try {
// 1. check the connection is available, force to close it
if (checkAvailable()) {
LOGGER.info("The connection would be closed and reconnected is: "
+ connection.getUrl());
close();
}
// 2. reconnect
reconnect(msg);
// 2. reconnect with specified timeout
reconnect(msg, connectTimeoutMs);
} catch (ConnectException ex) {
// cannot connect to ob server, need refresh table location
throw new ObTableServerConnectException(ex);
Expand All @@ -286,9 +290,22 @@ public void reConnectAndLogin(String msg) throws ObTableException {
*
*/
private void reconnect(String msg) throws Exception {
reconnect(msg, obTable.getObTableConnectTimeout());
}

/**
* Reconnect current connection and login with specified timeout
*
* @param msg the reconnect reason
* @param connectTimeoutMs the connection timeout in milliseconds
* @exception Exception if connect successfully or connection already reconnected by others
* throw exception if connect failed
*
*/
private void reconnect(String msg, int connectTimeoutMs) throws Exception {
if (isReConnecting.compareAndSet(false, true)) {
try {
if (connect()) {
if (connect(connectTimeoutMs)) {
LOGGER.info("reconnect success. reconnect reason: [{}]", msg);
} else {
LOGGER.info(
Expand Down
Loading