Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResu
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
private ObTableConnection prevConnection = null;
private boolean allowDistributeScan = true; // false when partition scan
private boolean hasDoneRpc = false; // this flag is for obkv-hbase scan

@Override
public void init() throws Exception {
Expand Down Expand Up @@ -74,6 +75,7 @@ public void init() throws Exception {
while (it.hasNext()) {
Map.Entry<Long, ObPair<Long, ObTableParam>> firstEntry = it.next();
try {
hasDoneRpc = true;
// try access new partition, async will not remove useless expectant
referToNewPartition(firstEntry.getValue());
break;
Expand Down Expand Up @@ -183,6 +185,7 @@ public boolean queryLastStreamResultInNext() throws Exception {
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet().iterator();
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
try {
hasDoneRpc = true;
// try access new partition, async will not remove useless expectant
referToLastStreamResult(lastEntry.getValue());
} catch (Exception e) {
Expand Down Expand Up @@ -223,6 +226,7 @@ public boolean queryNewStreamResultInNext() throws Exception {
int retryTimes = 0;
long startExecute = System.currentTimeMillis();
while (it.hasNext()) {
hasDoneRpc = true;
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
try {
// try access new partition, async will not remove useless expectant
Expand Down Expand Up @@ -316,6 +320,7 @@ public boolean next() throws Exception {
checkStatus();
lock.lock();
try {
hasDoneRpc = false;
// firstly, refer to the cache
if (!cacheRows.isEmpty()) {
nextRow();
Expand Down Expand Up @@ -453,4 +458,8 @@ private boolean needTabletId(ObTableQueryRequest queryRequest) {
public void setAllowDistributeScan(boolean allowDistributeScan) {
this.allowDistributeScan = allowDistributeScan;
}

public boolean hasDoneRpc() {
return hasDoneRpc;
}
}