From 058cf4d44763389b7df84ce6db9ae1bc3036f9b6 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 26 Dec 2025 14:59:58 +0800 Subject: [PATCH] add hasDoneRpc flag for obkv hbase scan --- .../rpc/stream/ObTableClientQueryAsyncStreamResult.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index bda74674..da48edf5 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -46,6 +46,7 @@ public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResu private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest(); private ObTableConnection prevConnection = null; private boolean allowDistributeScan = true; // false when partition scan + private boolean hasDoneRpc = false; // this flag is for obkv-hbase scan @Override public void init() throws Exception { @@ -74,6 +75,7 @@ public void init() throws Exception { while (it.hasNext()) { Map.Entry> firstEntry = it.next(); try { + hasDoneRpc = true; // try access new partition, async will not remove useless expectant referToNewPartition(firstEntry.getValue()); break; @@ -183,6 +185,7 @@ public boolean queryLastStreamResultInNext() throws Exception { Iterator>> it = expectant.entrySet().iterator(); Map.Entry> lastEntry = it.next(); try { + hasDoneRpc = true; // try access new partition, async will not remove useless expectant referToLastStreamResult(lastEntry.getValue()); } catch (Exception e) { @@ -223,6 +226,7 @@ public boolean queryNewStreamResultInNext() throws Exception { int retryTimes = 0; long startExecute = System.currentTimeMillis(); while (it.hasNext()) { + hasDoneRpc = true; Map.Entry> entry = it.next(); try { // try access new partition, async will not remove useless expectant @@ -316,6 +320,7 @@ public boolean next() throws Exception { checkStatus(); lock.lock(); try { + hasDoneRpc = false; // firstly, refer to the cache if (!cacheRows.isEmpty()) { nextRow(); @@ -453,4 +458,8 @@ private boolean needTabletId(ObTableQueryRequest queryRequest) { public void setAllowDistributeScan(boolean allowDistributeScan) { this.allowDistributeScan = allowDistributeScan; } + + public boolean hasDoneRpc() { + return hasDoneRpc; + } }