From 6f10b3ed6337ca165e2879a4028d7b28f71caea4 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Mon, 29 Dec 2025 13:17:47 +0800 Subject: [PATCH 1/2] fix some bugs and add time trace --- .../rpc/bolt/protocol/ObTablePacket.java | 18 ++ .../bolt/protocol/ObTablePacketEncoder.java | 12 + .../rpc/bolt/transport/ObClientFuture.java | 41 +++ .../rpc/bolt/transport/ObTableConnection.java | 47 ++-- .../rpc/bolt/transport/ObTableRemoting.java | 137 +++++++++- .../rpc/bolt/transport/ObTableTimeTrace.java | 255 ++++++++++++++++++ .../alipay/oceanbase/rpc/table/ObTable.java | 57 +++- .../alipay/oceanbase/rpc/util/TraceUtil.java | 7 + 8 files changed, 537 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableTimeTrace.java diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacket.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacket.java index 1dcf7d28..cca8f8a2 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacket.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacket.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.bolt.protocol; +import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace; import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader; import com.alipay.remoting.CommandCode; import com.alipay.remoting.InvokeContext; @@ -43,6 +44,9 @@ public class ObTablePacket implements RemotingCommand { private int transportCode; private String message; private Throwable cause; + + // 时间追踪 + private ObTableTimeTrace timeTrace; /** * Decode packet header. @@ -205,6 +209,20 @@ public void setCause(Throwable cause) { this.cause = cause; } + /* + * Get time trace. + */ + public ObTableTimeTrace getTimeTrace() { + return timeTrace; + } + + /* + * Set time trace. + */ + public void setTimeTrace(ObTableTimeTrace timeTrace) { + this.timeTrace = timeTrace; + } + // TODO useless for now /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketEncoder.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketEncoder.java index f20e984c..483f10c8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketEncoder.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketEncoder.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.bolt.protocol; +import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace; import com.alipay.oceanbase.rpc.util.Serialization; import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory; import com.alipay.remoting.CommandEncoder; @@ -48,6 +49,12 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr * ----------------------------------- */ ObTablePacket cmd = (ObTablePacket) msg; + + // Record when EventLoop starts encoding (reflects queue wait time) + ObTableTimeTrace timeTrace = cmd.getTimeTrace(); + if (timeTrace != null) { + timeTrace.markNettyEncodeStart(); + } // 1. header out.writeBytes(ObTableProtocol.MAGIC_HEADER_FLAG); @@ -57,6 +64,11 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr // 2. payload out.writeBytes(cmd.getPacketContent()); + + // Record when encoding is complete + if (timeTrace != null) { + timeTrace.markNettyEncodeEnd(); + } } else { String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of ObCommand"; diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java index bf23b05f..966afc2a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java @@ -43,6 +43,12 @@ public class ObClientFuture implements InvokeFuture { private static int BY_BACKGROUND = 2; private AtomicInteger releaseFlag = new AtomicInteger(INIT); + + // 响应接收时间 + private long responseReceivedTime; + + // 时间追踪 + private ObTableTimeTrace timeTrace; /* * Ob client future. @@ -50,6 +56,14 @@ public class ObClientFuture implements InvokeFuture { public ObClientFuture(int channelId) { this.channelId = channelId; } + + /* + * Ob client future with time trace. + */ + public ObClientFuture(int channelId, ObTableTimeTrace timeTrace) { + this.channelId = channelId; + this.timeTrace = timeTrace; + } /* * Wait response. @@ -87,6 +101,12 @@ public RemotingCommand waitResponse() throws InterruptedException { */ @Override public void putResponse(RemotingCommand response) { + // 记录响应接收时间 + this.responseReceivedTime = System.currentTimeMillis(); + // 更新时间追踪 + if (timeTrace != null) { + timeTrace.markResponseReceived(); + } this.response = response; waiter.countDown(); if (!releaseFlag.compareAndSet(INIT, BY_WORKER)) { @@ -96,6 +116,27 @@ public void putResponse(RemotingCommand response) { } } + /* + * Get response received time. + */ + public long getResponseReceivedTime() { + return responseReceivedTime; + } + + /* + * Get time trace. + */ + public ObTableTimeTrace getTimeTrace() { + return timeTrace; + } + + /* + * Set time trace. + */ + public void setTimeTrace(ObTableTimeTrace timeTrace) { + this.timeTrace = timeTrace; + } + /* * Invoke id. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java index 218dea5c..9bc001ad 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java @@ -96,6 +96,10 @@ public void init() throws Exception { } private boolean connect() throws Exception { + return connect(obTable.getObTableConnectTimeout()); + } + + private boolean connect(int connectTimeoutMs) throws Exception { if (checkAvailable()) { // double check status available return false; } @@ -106,7 +110,7 @@ private boolean connect() throws Exception { for (; tries < maxTryTimes; tries++) { try { connection = obTable.getConnectionFactory().createConnection(obTable.getIp(), - obTable.getPort(), obTable.getObTableConnectTimeout()); + obTable.getPort(), connectTimeoutMs); break; } catch (Exception e) { cause = e; @@ -243,20 +247,20 @@ public void checkStatus() throws Exception { if (connection.getChannel() == null || !connection.getChannel().isActive()) { reconnect("Check connection failed for address: " + connection.getUrl()); } - if (!connection.getChannel().isWritable()) { - LOGGER.warn("The connection might be write overflow : " + connection.getUrl()); - // Wait some interval for the case when a big package is blocking the buffer but server is ok. - // Don't bother to call flush() here as we invoke writeAndFlush() when send request. - Thread.sleep(obTable.getNettyBlockingWaitInterval()); - if (!connection.getChannel().isWritable()) { - throw new ObTableConnectionUnWritableException( - "Check connection failed for address: " + connection.getUrl() - + ", maybe write overflow!"); - } - } } public void reConnectAndLogin(String msg) throws ObTableException { + reConnectAndLogin(msg, obTable.getObTableConnectTimeout()); + } + + /** + * Reconnect and login with a specified connect timeout. + * This is useful for background tasks that need a longer timeout to ensure connection success. + * + * @param msg the reconnect reason + * @param connectTimeoutMs the connection timeout in milliseconds + */ + public void reConnectAndLogin(String msg, int connectTimeoutMs) throws ObTableException { try { // 1. check the connection is available, force to close it if (checkAvailable()) { @@ -264,8 +268,8 @@ public void reConnectAndLogin(String msg) throws ObTableException { + connection.getUrl()); close(); } - // 2. reconnect - reconnect(msg); + // 2. reconnect with specified timeout + reconnect(msg, connectTimeoutMs); } catch (ConnectException ex) { // cannot connect to ob server, need refresh table location throw new ObTableServerConnectException(ex); @@ -286,9 +290,22 @@ public void reConnectAndLogin(String msg) throws ObTableException { * */ private void reconnect(String msg) throws Exception { + reconnect(msg, obTable.getObTableConnectTimeout()); + } + + /** + * Reconnect current connection and login with specified timeout + * + * @param msg the reconnect reason + * @param connectTimeoutMs the connection timeout in milliseconds + * @exception Exception if connect successfully or connection already reconnected by others + * throw exception if connect failed + * + */ + private void reconnect(String msg, int connectTimeoutMs) throws Exception { if (isReConnecting.compareAndSet(false, true)) { try { - if (connect()) { + if (connect(connectTimeoutMs)) { LOGGER.info("reconnect success. reconnect reason: [{}]", msg); } else { LOGGER.info( diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index c4d138c7..04b03157 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -37,6 +37,9 @@ public class ObTableRemoting extends BaseRemoting { private static final Logger logger = TableClientLoggerFactory.getLogger(ObTableRemoting.class); + + // 使用 ThreadLocal 传递 timeTrace 给 createInvokeFuture + private static final ThreadLocal currentTimeTrace = new ThreadLocal<>(); /* * Ob table remoting. @@ -51,7 +54,9 @@ public ObTableRemoting(CommandFactory commandFactory) { public ObPayload invokeSync(final ObTableConnection conn, final ObPayload request, final int timeoutMillis) throws RemotingException, InterruptedException { - + // Create time trace for debugging timeout issues + ObTableTimeTrace timeTrace = new ObTableTimeTrace(); + request.setSequence(conn.getNextSequence()); request.setUniqueId(conn.getUniqueId()); @@ -71,21 +76,48 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques ((AbstractPayload) request).setTenantId(conn.getTenantId()); } + // Serialize request (done in createRequestCommand) ObTablePacket obRequest = this.getCommandFactory().createRequestCommand(request); + timeTrace.markSerializeEnd(); + + // Set time trace info (use lazy formatting to avoid String.format in hot path) + timeTrace.setChannelId(obRequest.getId()); + timeTrace.setTraceIdComponents(request.getUniqueId(), request.getSequence()); + if (obRequest.getPacketContent() != null) { + timeTrace.setPayloadSize(obRequest.getPacketContent().length); + } + obRequest.setTimeTrace(timeTrace); - ObTablePacket response = (ObTablePacket) super.invokeSync(conn.getConnection(), obRequest, - timeoutMillis); + // Wait for channel to become writable before sending (with backoff) + waitForChannelWritable(conn, timeoutMillis, timeTrace); + + // Mark time when submitting to Netty + timeTrace.markWriteToNetty(); + + // Set ThreadLocal so that createInvokeFuture can access timeTrace + currentTimeTrace.set(timeTrace); + ObTablePacket response; + try { + response = (ObTablePacket) super.invokeSync(conn.getConnection(), obRequest, + timeoutMillis); + } finally { + currentTimeTrace.remove(); + } if (response == null) { - String errMessage = TraceUtil.formatTraceMessage(conn, request, "get null response"); - logger.warn(errMessage); + // Timeout - generate time trace report for debugging + timeTrace.markEnd(); + String errMessage = TraceUtil.formatTraceMessage(conn, request, "get null response") + + timeTrace.generateReport(); ExceptionUtil.throwObTableTransportException(errMessage, TransportCodes.BOLT_RESPONSE_NULL); return null; } else if (!response.isSuccess()) { + // Transport error - generate time trace report for debugging + timeTrace.markEnd(); String errMessage = TraceUtil.formatTraceMessage(conn, request, - "get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode()); - logger.warn(errMessage); + "get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode()) + + timeTrace.generateReport(); response.releaseByteBuf(); ExceptionUtil.throwObTableTransportException(errMessage, response.getTransportCode()); return null; @@ -178,16 +210,103 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques } } + /** + * Wait for channel to become writable with exponential backoff. + * First wait uses NETTY_BLOCKING_WAIT_INTERVAL, then exponential backoff. + * If channel is not writable after timeout, throw -20002 timeout exception. + * + * @param conn the connection + * @param timeoutMillis the timeout in milliseconds + * @param timeTrace the time trace for logging + */ + private void waitForChannelWritable(ObTableConnection conn, int timeoutMillis, + ObTableTimeTrace timeTrace) throws RemotingException { + if (conn.getConnection() == null || conn.getConnection().getChannel() == null) { + return; // Will be handled later + } + + if (conn.getConnection().getChannel().isWritable()) { + return; // Channel is writable, proceed + } + + // Channel is not writable, start backoff + timeTrace.markWaitWritableStart(); + long startTime = System.currentTimeMillis(); + long deadline = startTime + timeoutMillis; + + // First wait uses NETTY_BLOCKING_WAIT_INTERVAL (in ms), then exponential backoff + // e.g., if interval=1ms: 1ms -> 2ms -> 4ms -> 8ms -> ... -> maxSleepMs + int nettyBlockingWaitInterval = conn.getObTable().getNettyBlockingWaitInterval(); + long sleepMs = Math.max(nettyBlockingWaitInterval, 1); // At least 1ms + final long maxSleepMs = 2000; // Max backoff: 2000ms + + logger.info("Channel {} is not writable, waiting with backoff. " + + "initialInterval={}ms, timeout={}ms", + conn.getConnection().getUrl(), sleepMs, timeoutMillis); + + while (!conn.getConnection().getChannel().isWritable()) { + long now = System.currentTimeMillis(); + if (now >= deadline) { + // Timeout - throw -20002 + timeTrace.markEnd(); + String errMessage = String.format( + "Channel %s is not writable after waiting %dms, timeout. " + + "This may be caused by slow network or large packets blocking the send buffer.%s", + conn.getConnection().getUrl(), + now - startTime, + timeTrace.generateReport()); + logger.warn(errMessage); + ExceptionUtil.throwObTableTransportException(errMessage, TransportCodes.BOLT_TIMEOUT); + } + + // Sleep with backoff + long remainingTime = deadline - now; + long actualSleep = Math.min(sleepMs, remainingTime); + if (actualSleep > 0) { + try { + Thread.sleep(actualSleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RemotingException("Interrupted while waiting for channel to become writable"); + } + } + + // Exponential backoff based on initial interval + // e.g., 1 -> 2 -> 4 -> 8 -> 16 -> ... -> 2000 (max) + sleepMs = Math.min(sleepMs * 2, maxSleepMs); + } + + // Channel became writable + timeTrace.markWaitWritableEnd(); + long waitedMs = System.currentTimeMillis() - startTime; + if (waitedMs > 100) { + logger.info("Channel {} became writable after waiting {}ms", + conn.getConnection().getUrl(), waitedMs); + } + } + @Override protected InvokeFuture createInvokeFuture(RemotingCommand request, InvokeContext invokeContext) { - return new ObClientFuture(request.getId()); + ObClientFuture future = new ObClientFuture(request.getId()); + // Get timeTrace from ThreadLocal and set to future for response time tracking + ObTableTimeTrace timeTrace = currentTimeTrace.get(); + if (timeTrace != null) { + future.setTimeTrace(timeTrace); + } + return future; } @Override protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand request, InvokeContext invokeContext, InvokeCallback invokeCallback) { - return new ObClientFuture(request.getId()); + ObClientFuture future = new ObClientFuture(request.getId()); + // Get timeTrace from ThreadLocal and set to future for response time tracking + ObTableTimeTrace timeTrace = currentTimeTrace.get(); + if (timeTrace != null) { + future.setTimeTrace(timeTrace); + } + return future; } // schema changed diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableTimeTrace.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableTimeTrace.java new file mode 100644 index 00000000..fddae0cb --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableTimeTrace.java @@ -0,0 +1,255 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.bolt.transport; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Request time trace for analyzing timeout issues. + *

+ * This class tracks various timestamps during the lifecycle of an RPC request: + *

+ *

+ * The time trace report is generated only when an error/timeout occurs, + * so there is minimal performance overhead during normal operations. + */ +public class ObTableTimeTrace { + + // Timestamps at various stages (milliseconds) + private long startTime; // Request start time + private long serializeEndTime; // Serialization complete time + private long waitWritableStartTime; // Time when started waiting for channel writable + private long waitWritableEndTime; // Time when channel became writable (or timeout) + private long writeToNettyTime; // Time when submitted to Netty (writeAndFlush called) + private long nettyEncodeStartTime; // Time when EventLoop starts encoding (reflects queue wait) + private long nettyEncodeEndTime; // Time when EventLoop finishes encoding + private long responseReceivedTime; // Time when response is received + private long endTime; // Request end time (success or failure) + + // Additional info + private int channelId; + private String traceId; // Lazy: only formatted when report is generated + private long uniqueId; // For lazy traceId formatting + private long sequence; // For lazy traceId formatting + private int payloadSize; // Request payload size in bytes + + public ObTableTimeTrace() { + this.startTime = System.currentTimeMillis(); + } + + public void markSerializeEnd() { + this.serializeEndTime = System.currentTimeMillis(); + } + + public void markWaitWritableStart() { + this.waitWritableStartTime = System.currentTimeMillis(); + } + + public void markWaitWritableEnd() { + this.waitWritableEndTime = System.currentTimeMillis(); + } + + public void markWriteToNetty() { + this.writeToNettyTime = System.currentTimeMillis(); + } + + public void markNettyEncodeStart() { + this.nettyEncodeStartTime = System.currentTimeMillis(); + } + + public void markNettyEncodeEnd() { + this.nettyEncodeEndTime = System.currentTimeMillis(); + } + + public void markResponseReceived() { + this.responseReceivedTime = System.currentTimeMillis(); + } + + public void markEnd() { + this.endTime = System.currentTimeMillis(); + } + + public void setChannelId(int channelId) { + this.channelId = channelId; + } + + /** + * Set trace id directly (for cases where it's already formatted). + */ + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + /** + * Set trace id components for lazy formatting. + * TraceId will only be formatted when generateReport() is called, + * avoiding String.format overhead in the hot path. + */ + public void setTraceIdComponents(long uniqueId, long sequence) { + this.uniqueId = uniqueId; + this.sequence = sequence; + } + + /** + * Get formatted traceId (lazy formatting). + */ + private String getFormattedTraceId() { + if (traceId != null) { + return traceId; + } + if (uniqueId != 0 || sequence != 0) { + return String.format("Y%X-%016X", uniqueId, sequence); + } + return "N/A"; + } + + public void setPayloadSize(int payloadSize) { + this.payloadSize = payloadSize; + } + + public long getStartTime() { + return startTime; + } + + public long getTotalCost() { + return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime; + } + + /** + * 生成时间追踪报告 + */ + public String generateReport() { + SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); + StringBuilder sb = new StringBuilder(); + + sb.append("\n========== 时间追踪报告 ==========\n"); + sb.append("TraceId: ").append(getFormattedTraceId()).append("\n"); + sb.append("ChannelId: ").append(channelId).append("\n"); + sb.append("PayloadSize: ").append(payloadSize / 1024).append(" KB\n"); + sb.append("\n时间线:\n"); + + sb.append(String.format(" [%s] 请求开始\n", formatTime(sdf, startTime))); + + if (serializeEndTime > 0) { + sb.append(String.format(" [%s] 序列化完成 (+%d ms)\n", + formatTime(sdf, serializeEndTime), serializeEndTime - startTime)); + } + + if (waitWritableStartTime > 0) { + sb.append(String.format(" [%s] 开始等待Channel可写 (+%d ms)\n", + formatTime(sdf, waitWritableStartTime), waitWritableStartTime - startTime)); + if (waitWritableEndTime > 0) { + sb.append(String.format(" [%s] Channel变为可写 (+%d ms, 等待了 %d ms)\n", + formatTime(sdf, waitWritableEndTime), waitWritableEndTime - startTime, + waitWritableEndTime - waitWritableStartTime)); + } else { + sb.append(" [--:--:--.---] Channel变为可写 (超时! Channel一直不可写)\n"); + } + } + + if (writeToNettyTime > 0) { + sb.append(String.format(" [%s] 提交到Netty (+%d ms)\n", + formatTime(sdf, writeToNettyTime), writeToNettyTime - startTime)); + } + + if (nettyEncodeStartTime > 0) { + sb.append(String.format(" [%s] EventLoop开始编码 (+%d ms)\n", + formatTime(sdf, nettyEncodeStartTime), nettyEncodeStartTime - startTime)); + } else if (writeToNettyTime > 0) { + sb.append(" [--:--:--.---] EventLoop开始编码 (未执行! 可能队列积压)\n"); + } + + if (nettyEncodeEndTime > 0) { + sb.append(String.format(" [%s] EventLoop编码完成 (+%d ms)\n", + formatTime(sdf, nettyEncodeEndTime), nettyEncodeEndTime - startTime)); + } + + if (responseReceivedTime > 0) { + sb.append(String.format(" [%s] 收到响应 (+%d ms)\n", + formatTime(sdf, responseReceivedTime), responseReceivedTime - startTime)); + } else { + sb.append(" [--:--:--.---] 收到响应 (未收到! 超时)\n"); + } + + if (endTime > 0) { + sb.append(String.format(" [%s] 请求结束 (总耗时: %d ms)\n", + formatTime(sdf, endTime), endTime - startTime)); + } + + // 分析各阶段耗时 + sb.append("\n耗时分析:\n"); + if (serializeEndTime > 0) { + sb.append(String.format(" 用户线程序列化: %d ms\n", serializeEndTime - startTime)); + } + if (waitWritableStartTime > 0 && serializeEndTime > 0) { + sb.append(String.format(" 准备发送: %d ms\n", waitWritableStartTime - serializeEndTime)); + } + if (waitWritableStartTime > 0) { + if (waitWritableEndTime > 0) { + long waitTime = waitWritableEndTime - waitWritableStartTime; + sb.append(String.format(" ★ 等待Channel可写: %d ms%s\n", + waitTime, waitTime > 100 ? " [!!!发送缓冲区满!!!]" : "")); + } else { + long waitTime = endTime > 0 ? endTime - waitWritableStartTime : System.currentTimeMillis() - waitWritableStartTime; + sb.append(String.format(" ★ 等待Channel可写: %d ms [!!!超时! 发送缓冲区一直满!!!]\n", waitTime)); + } + } + if (writeToNettyTime > 0 && waitWritableEndTime > 0) { + sb.append(String.format(" 提交到Netty: %d ms\n", writeToNettyTime - waitWritableEndTime)); + } else if (writeToNettyTime > 0 && serializeEndTime > 0) { + sb.append(String.format(" 提交到Netty: %d ms\n", writeToNettyTime - serializeEndTime)); + } + if (nettyEncodeStartTime > 0 && writeToNettyTime > 0) { + long queueWait = nettyEncodeStartTime - writeToNettyTime; + sb.append(String.format(" ★ Netty队列等待: %d ms%s\n", + queueWait, queueWait > 100 ? " [!!!可能阻塞!!!]" : "")); + } + if (nettyEncodeEndTime > 0 && nettyEncodeStartTime > 0) { + sb.append(String.format(" EventLoop编码耗时: %d ms\n", nettyEncodeEndTime - nettyEncodeStartTime)); + } + if (responseReceivedTime > 0 && nettyEncodeEndTime > 0) { + sb.append(String.format(" 网络+服务端处理: %d ms\n", responseReceivedTime - nettyEncodeEndTime)); + } else if (nettyEncodeEndTime > 0) { + sb.append(String.format(" 网络+服务端处理: 超时 (编码完成后等待 %d ms 未收到响应)\n", + System.currentTimeMillis() - nettyEncodeEndTime)); + } + + sb.append("==================================\n"); + + return sb.toString(); + } + + private String formatTime(SimpleDateFormat sdf, long time) { + return sdf.format(new Date(time)); + } + + @Override + public String toString() { + return generateReport(); + } +} + diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index efde7f2c..784f92df 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -711,26 +711,55 @@ public int getConnectionNum() { return connectionPool.getConnectionNum(); } - /* - * Get connection. + /** + * Get a connection from the pool. + *

+ * This method will: + * 1. Skip connections with null credential + * 2. Skip connections that are not writable (try to find a writable one) + * 3. If all connections are not writable, return the last one anyway + * (backoff will be handled in invokeSync before sending) + * + * @return an available connection + * @throws Exception if no connection is available */ public ObTableConnection getConnection() throws Exception { ObTableConnection conn = connectionPool.getConnection(); - int count = 0; + int credentialRetryCount = 0; + int writableRetryCount = 0; + + // Step 1: Skip connections with null credential while (conn != null - && (conn.getConnection() != null - && (conn.getCredential() == null || conn.getCredential().length() == 0) - && count < obTableConnectionPoolSize)) { + && conn.getConnection() != null + && (conn.getCredential() == null || conn.getCredential().length() == 0) + && credentialRetryCount < obTableConnectionPoolSize) { conn = connectionPool.getConnection(); - count++; + credentialRetryCount++; } - if (count == obTableConnectionPoolSize) { + + if (credentialRetryCount >= obTableConnectionPoolSize) { throw new ObTableException("all connection's credential is null"); } - // conn is null, maybe all connection has expired and reconnect fail + + // conn is null, maybe all connections have expired and reconnect failed if (conn == null) { throw new ObTableServerConnectException("connection is null"); } + + // Step 2: Try to find a writable connection + // If current connection is not writable, try to get another one + while (conn != null + && conn.getConnection() != null + && conn.getConnection().getChannel() != null + && !conn.getConnection().getChannel().isWritable() + && writableRetryCount < obTableConnectionPoolSize) { + conn = connectionPool.getConnection(); + writableRetryCount++; + } + + // Note: Even if all connections are not writable (writableRetryCount == poolSize), + // we still return the connection. The invokeSync will handle backoff before sending. + return conn; } @@ -972,21 +1001,23 @@ private void checkAndReconnect() { connections[idx].setExpired(true); } - // Sleep for a predefined timeout period before attempting reconnection + // Sleep for 1.5x the user-configured RPC timeout before attempting reconnection try { - Thread.sleep(RPC_EXECUTE_TIMEOUT.getDefaultInt()); + Thread.sleep((long) (obTable.getObTableExecuteTimeout() * 1.5)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - // Attempt to reconnect the marked connections + // Attempt to reconnect the marked connections with a dedicated timeout (10 seconds) + // to ensure background reconnection has enough time to complete + final int BACKGROUND_RECONNECT_TIMEOUT_MS = 10000; for (int i = 0; i < needReconnectCount; i++) { int idx = expiredConnIds.get(i); try { if (i == 0) { connections[idx].enableLoginWithConfigs(); } - connections[idx].reConnectAndLogin("expired"); + connections[idx].reConnectAndLogin("expired", BACKGROUND_RECONNECT_TIMEOUT_MS); } catch (Exception e) { log.warn("ObTableConnectionPool::checkAndReconnect reconnect fail {}:{}. {}", obTable.getIp(), obTable.getPort(), e.getMessage()); } finally { diff --git a/src/main/java/com/alipay/oceanbase/rpc/util/TraceUtil.java b/src/main/java/com/alipay/oceanbase/rpc/util/TraceUtil.java index e4ecb532..1a0088a7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/util/TraceUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/util/TraceUtil.java @@ -67,4 +67,11 @@ public static String formatIpPort(final ObTable obTable) { public static String formatTraceMessage(final ObPayload payload) { return String.format("[Y%X-%016X]", payload.getUniqueId(), payload.getSequence()); } + + /* + * Format trace id from payload. + */ + public static String formatTraceId(final ObPayload payload) { + return String.format("Y%X-%016X", payload.getUniqueId(), payload.getSequence()); + } } From c49ddfc3f7558abde0a3c0b17a70e8dd4791b614 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Mon, 29 Dec 2025 15:25:27 +0800 Subject: [PATCH 2/2] fix review --- .../rpc/bolt/transport/ObTableRemoting.java | 68 ++++++++++++------- .../oceanbase/rpc/property/Property.java | 3 + .../oceanbase/rpc/table/AbstractObTable.java | 16 +++++ .../alipay/oceanbase/rpc/table/ObTable.java | 1 + .../table/ObTableClientLSBatchOpsImpl.java | 3 + 5 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index 04b03157..95a9ed14 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -54,8 +54,9 @@ public ObTableRemoting(CommandFactory commandFactory) { public ObPayload invokeSync(final ObTableConnection conn, final ObPayload request, final int timeoutMillis) throws RemotingException, InterruptedException { - // Create time trace for debugging timeout issues - ObTableTimeTrace timeTrace = new ObTableTimeTrace(); + // Create time trace for debugging timeout issues (if enabled) + final boolean traceEnabled = conn.getObTable().isTimeTraceEnabled(); + ObTableTimeTrace timeTrace = traceEnabled ? new ObTableTimeTrace() : null; request.setSequence(conn.getNextSequence()); request.setUniqueId(conn.getUniqueId()); @@ -78,13 +79,14 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques // Serialize request (done in createRequestCommand) ObTablePacket obRequest = this.getCommandFactory().createRequestCommand(request); - timeTrace.markSerializeEnd(); - - // Set time trace info (use lazy formatting to avoid String.format in hot path) - timeTrace.setChannelId(obRequest.getId()); - timeTrace.setTraceIdComponents(request.getUniqueId(), request.getSequence()); - if (obRequest.getPacketContent() != null) { - timeTrace.setPayloadSize(obRequest.getPacketContent().length); + if (timeTrace != null) { + timeTrace.markSerializeEnd(); + // Set time trace info (use lazy formatting to avoid String.format in hot path) + timeTrace.setChannelId(obRequest.getId()); + timeTrace.setTraceIdComponents(request.getUniqueId(), request.getSequence()); + if (obRequest.getPacketContent() != null) { + timeTrace.setPayloadSize(obRequest.getPacketContent().length); + } } obRequest.setTimeTrace(timeTrace); @@ -92,32 +94,42 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques waitForChannelWritable(conn, timeoutMillis, timeTrace); // Mark time when submitting to Netty - timeTrace.markWriteToNetty(); + if (timeTrace != null) { + timeTrace.markWriteToNetty(); + } - // Set ThreadLocal so that createInvokeFuture can access timeTrace - currentTimeTrace.set(timeTrace); + // Set ThreadLocal so that createInvokeFuture can access timeTrace (only if enabled) + if (timeTrace != null) { + currentTimeTrace.set(timeTrace); + } ObTablePacket response; try { response = (ObTablePacket) super.invokeSync(conn.getConnection(), obRequest, timeoutMillis); } finally { - currentTimeTrace.remove(); + if (timeTrace != null) { + currentTimeTrace.remove(); + } } if (response == null) { // Timeout - generate time trace report for debugging - timeTrace.markEnd(); - String errMessage = TraceUtil.formatTraceMessage(conn, request, "get null response") - + timeTrace.generateReport(); + String errMessage = TraceUtil.formatTraceMessage(conn, request, "get null response"); + if (timeTrace != null) { + timeTrace.markEnd(); + errMessage += timeTrace.generateReport(); + } ExceptionUtil.throwObTableTransportException(errMessage, TransportCodes.BOLT_RESPONSE_NULL); return null; } else if (!response.isSuccess()) { // Transport error - generate time trace report for debugging - timeTrace.markEnd(); String errMessage = TraceUtil.formatTraceMessage(conn, request, - "get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode()) - + timeTrace.generateReport(); + "get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode()); + if (timeTrace != null) { + timeTrace.markEnd(); + errMessage += timeTrace.generateReport(); + } response.releaseByteBuf(); ExceptionUtil.throwObTableTransportException(errMessage, response.getTransportCode()); return null; @@ -230,7 +242,9 @@ private void waitForChannelWritable(ObTableConnection conn, int timeoutMillis, } // Channel is not writable, start backoff - timeTrace.markWaitWritableStart(); + if (timeTrace != null) { + timeTrace.markWaitWritableStart(); + } long startTime = System.currentTimeMillis(); long deadline = startTime + timeoutMillis; @@ -248,13 +262,15 @@ private void waitForChannelWritable(ObTableConnection conn, int timeoutMillis, long now = System.currentTimeMillis(); if (now >= deadline) { // Timeout - throw -20002 - timeTrace.markEnd(); String errMessage = String.format( "Channel %s is not writable after waiting %dms, timeout. " + - "This may be caused by slow network or large packets blocking the send buffer.%s", + "This may be caused by slow network or large packets blocking the send buffer.", conn.getConnection().getUrl(), - now - startTime, - timeTrace.generateReport()); + now - startTime); + if (timeTrace != null) { + timeTrace.markEnd(); + errMessage += timeTrace.generateReport(); + } logger.warn(errMessage); ExceptionUtil.throwObTableTransportException(errMessage, TransportCodes.BOLT_TIMEOUT); } @@ -277,7 +293,9 @@ private void waitForChannelWritable(ObTableConnection conn, int timeoutMillis, } // Channel became writable - timeTrace.markWaitWritableEnd(); + if (timeTrace != null) { + timeTrace.markWaitWritableEnd(); + } long waitedMs = System.currentTimeMillis() - startTime; if (waitedMs > 100) { logger.info("Channel {} became writable after waiting {}ms", diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java index 589558a7..da82db8c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java +++ b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java @@ -139,6 +139,9 @@ public enum Property { // [ObTable][OTHERS] SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"), + // [ObTable][DEBUG] + RPC_TIME_TRACE_ENABLED("rpc.time.trace.enabled", true, "启用RPC时间追踪,用于分析超时问题"), + /* * other config */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java index 4b009c9e..d9039c5d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java @@ -43,6 +43,8 @@ public abstract class AbstractObTable extends AbstractTable { protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong(); + protected boolean timeTraceEnabled = RPC_TIME_TRACE_ENABLED.getDefaultBoolean(); + /* * Get ob table connect try times. */ @@ -168,4 +170,18 @@ public int getNettyBlockingWaitInterval() { public long getConnMaxExpiredTime() { return maxConnExpiredTime; } + + /* + * Check if time trace is enabled. + */ + public boolean isTimeTraceEnabled() { + return timeTraceEnabled; + } + + /* + * Set time trace enabled. + */ + public void setTimeTraceEnabled(boolean timeTraceEnabled) { + this.timeTraceEnabled = timeTraceEnabled; + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index 784f92df..6153adf3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -180,6 +180,7 @@ private void initProperties() { nettyBlockingWaitInterval); enableRerouting = parseToBoolean(SERVER_ENABLE_REROUTING.getKey(), enableRerouting); maxConnExpiredTime = parseToLong(MAX_CONN_EXPIRED_TIME.getKey(), maxConnExpiredTime); + timeTraceEnabled = parseToBoolean(RPC_TIME_TRACE_ENABLED.getKey(), timeTraceEnabled); Object value = this.configs.get("runtime"); if (value instanceof Map) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index b5840cce..95831819 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -375,6 +375,9 @@ public List execute() throws Exception { public List executeWithResult() throws Exception { List results = new ArrayList(batchOperation.size()); ObTableSingleOpResult[] singleResults = executeInternal(); + if (singleResults.length == 1 && singleResults[0] == null) { // get empty result + return results; + } for (int i = 0; i < singleResults.length; i++) { ObTableSingleOpResult result = singleResults[i]; // Sometimes the server does not set the operation type,so we use request operation type