From 99c0f033a5750dcef8ab1b76cf5fca36dadb972c Mon Sep 17 00:00:00 2001 From: maochongxin Date: Fri, 26 Dec 2025 17:48:25 +0800 Subject: [PATCH] feat: add flag field serialization support for ObTableSingleOpQuery --- .../payload/impl/execute/ObTableSingleOpQuery.java | 13 +++++++++++++ .../rpc/table/ObTableClientLSBatchOpsImpl.java | 14 +++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java index 9520278a..0f7fb17b 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java @@ -106,6 +106,12 @@ public byte[] encode() { idx += len; } } + + // 5. encode flag + len = Serialization.getNeedBytes(flag.getValue()); + System.arraycopy(Serialization.encodeVi64(flag.getValue()), 0, bytes, idx, len); + idx += len; + return bytes; } @@ -151,6 +157,9 @@ public void encode(ObByteBuf buf) { buf.writeBytes(HTABLE_DUMMY_BYTES); } } + + // encode flag + Serialization.encodeVi64(buf, flag.getValue()); } /* @@ -233,6 +242,10 @@ public long getPayloadContentSize() { payloadContentSize += HTABLE_DUMMY_BYTES.length; } } + + // calculate flag size + payloadContentSize += Serialization.getNeedBytes(flag.getValue()); + this.payLoadContentSize = payloadContentSize; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index 2c5ab666..2bb90e28 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -223,6 +223,7 @@ public void addOperation(TableQuery query) throws Exception { obTableQuery.getSelectColumns(), obTableQuery.getScanOrder(), obTableQuery.isHbaseQuery(), obTableQuery.gethTableFilter(), obTableQuery.getObKVParams(), obTableQuery.getFilterString()); + singleOpQuery.setFlag(obTableQuery.getFlag()); singleOp.setQuery(singleOpQuery); singleOp.setSingleOpType(ObTableOperationType.SCAN); } else { @@ -243,6 +244,7 @@ public void addOperation(QueryAndMutate queryAndMutate) { obTableQuery.getSelectColumns(), obTableQuery.getScanOrder(), obTableQuery.isHbaseQuery(), obTableQuery.gethTableFilter(), obTableQuery.getObKVParams(), obTableQuery.getFilterString()); + singleOpQuery.setFlag(obTableQuery.getFlag()); singleOp.setQuery(singleOpQuery); singleOp.setQuery(singleOpQuery); singleOp.setSingleOpType(ObTableOperationType.QUERY_AND_MUTATE); @@ -859,14 +861,15 @@ private void executeWithRetries(ObTableSingleOpResult[] results, long costMillis = System.currentTimeMillis() - startExecute; if (costMillis > runTimeMaxWait) { errMsg = tableName + " failed to execute operation after retrying " + retryCount - + " times and it has waited " + costMillis + " ms" - + " which exceeds runtime max wait timeout " + runTimeMaxWait - + " ms. Last error Msg:" + "[errCode=" + errCode + "] " + errMsg; + + " times and it has waited " + costMillis + " ms" + + " which exceeds runtime max wait timeout " + runTimeMaxWait + + " ms. Last error Msg:" + "[errCode=" + errCode + "] " + errMsg; throw new ObTableUnexpectedException(errMsg); } boolean allPartitionsSuccess = true; - Iterator> iterator = currentPartitions.entrySet().iterator(); + Iterator> iterator = currentPartitions.entrySet() + .iterator(); while (iterator.hasNext()) { Map.Entry currentEntry = iterator.next(); try { @@ -876,7 +879,8 @@ private void executeWithRetries(ObTableSingleOpResult[] results, retryCount++; errCode = ((ObTableNeedFetchMetaException) e).getErrorCode(); errMsg = e.getMessage(); - BatchIdxOperationPairList failedOperations = extractOperations(currentEntry, iterator); // reschedule failed and sequent operations + BatchIdxOperationPairList failedOperations = extractOperations( + currentEntry, iterator); // reschedule failed and sequent operations currentPartitions = prepareOperations(failedOperations); allPartitionsSuccess = false; break;