Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -215,6 +216,16 @@ protected BackendColumnIterator queryByIds(RocksDBSessions.Session session,
));
}

protected BackendColumnIterator queryByIdsWithGet(RocksDBSessions.Session session,
Collection<Id> ids) {
E.checkState(!session.hasChanges(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: enabling queryByIdsWithGet() for vertex/edge changes pending-write behavior: any session.hasChanges() now throws before even the single-id path. Please confirm all callers reach this only after commit/rollback, or keep the old per-id fallback for pending sessions and add a test through a public query path.

"Can't queryByIds() when RocksDB session has pending changes");
if (ids.isEmpty()) {
return BackendColumnIterator.empty();
}
return this.getByIds(session, ids);
}
Comment on lines +219 to +227

protected BackendColumnIterator getById(RocksDBSessions.Session session, Id id) {
byte[] value = session.get(this.table(), id.asBytes());
if (value == null) {
Expand All @@ -224,13 +235,15 @@ protected BackendColumnIterator getById(RocksDBSessions.Session session, Id id)
return BackendColumnIterator.iterator(col);
}

protected BackendColumnIterator getByIds(RocksDBSessions.Session session, Set<Id> ids) {
protected BackendColumnIterator getByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Set<Id>Collection<Id> 语义变化需注意

原方法接收 Set<Id>(天然去重),改为 Collection<Id> 后,传入 List 时若含重复 ID,RocksDB multiGet 会对同一 key 重复查询并返回重复结果。

测试 testVertexQueryByIdsWithDuplicateIds 验证了这个行为(id1 返回 2 次),但这与原 Set 语义不一致。需要确认上层 IdQuery 的 ids 是否可能含重复——如果含重复,行为变更可能导致上层重复处理数据。

建议:在 getByIds 入口去重以保持原语义,或者明确文档说明 Collection 含重复返回的行为变更。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added LinkedHashSet deduplication at the getByIds() entry point while preserving input order; skips if Set is already passed. Test updated to verify dedup behavior. The
backend deduplication does not affect final semantics since GraphTransaction reassembles results by the original input order.

if (ids.size() == 1) {
return this.getById(session, ids.iterator().next());
}

List<byte[]> keys = new ArrayList<>(ids.size());
for (Id id : ids) {
Collection<Id> uniqueIds = ids instanceof Set ? ids : new LinkedHashSet<>(ids);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 这里的全局去重会让 Edge 的公开查询路径吞掉非相邻重复 id,属于用户可见的结果集回归。

完整路径是:

graph.edges(e1, e2, e1)
        |
        v
GraphTransaction.queryEdgesByIds()
        |
        |  IdQuery.query() 只折叠相邻重复 id
        |  [e1, e2, e1] => query.idsSize()==3, ids.size()==3
        v
edges.isEmpty() && query.idsSize() == ids.size()
        |
        |  命中 fast path,直接返回 backend iterator
        v
RocksDBTables.Edge.queryByIds()
        |
        v
RocksDBTable.getByIds()
        |
        |  new LinkedHashSet<>(ids) 全局去重
        v
[e1, e2, e1] => [e1, e2]

也就是说,GraphTransaction 只有在检测到重复 id 时才会回到按原始 ids 重建结果的慢路径;但当前 fast path 只能识别相邻重复,识别不了 e1, e2, e1 这种非相邻重复。旧的 scan/flat-map 路径会按输入 id 展开 3 次查询,新路径在这里被压成 2 个 key,最终返回结果会少一条。

建议不要在 Edge 的这条 fast path 上做 table-level 全局去重,或者在 transaction 层改成能检测任意重复 id 后再决定是否直接返回 backend iterator。

List<byte[]> keys = new ArrayList<>(uniqueIds.size());
for (Id id : uniqueIds) {
keys.add(id.asBytes());
}
return session.get(this.table(), keys);
Expand Down Expand Up @@ -309,7 +322,7 @@ protected static BackendEntryIterator newEntryIterator(BackendColumnIterator col
}

protected static BackendEntryIterator newEntryIteratorOlap(
BackendColumnIterator cols, Query query, boolean isOlap) {
BackendColumnIterator cols, Query query, boolean isOlap) {
return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
if (entry == null || !entry.belongToMe(col)) {
HugeType type = query.resultType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ protected BackendColumnIterator queryById(RocksDBSessions.Session session, Id id
@Override
protected BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
// TODO: use getByIds() after batch version multi-get is ready
return super.queryByIds(session, ids);
return this.queryByIdsWithGet(session, ids);
}
Comment on lines 182 to 186
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch! This PR is RocksDB-only. I’ve updated the description to change closes 2674 to related 2674

}

Expand All @@ -208,6 +207,12 @@ public static Edge in(String database) {
protected BackendColumnIterator queryById(RocksDBSessions.Session session, Id id) {
return this.getById(session, id);
}

@Override
protected BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
return this.queryByIdsWithGet(session, ids);
}
}

public static class IndexTable extends RocksDBTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hugegraph.unit.rocksdb.RocksDBCountersTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionsTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBTableQueryByIdsTest;
import org.apache.hugegraph.unit.serializer.BinaryBackendEntryTest;
import org.apache.hugegraph.unit.serializer.BinaryScatterSerializerTest;
import org.apache.hugegraph.unit.serializer.BinarySerializerTest;
Expand Down Expand Up @@ -141,6 +142,7 @@
RocksDBSessionsTest.class,
RocksDBSessionTest.class,
RocksDBCountersTest.class,
RocksDBTableQueryByIdsTest.class,

/* utils */
VersionTest.class,
Expand Down
Loading
Loading