Skip to content

perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681

Open
VelikovPetar wants to merge 18 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db_part2
Open

perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681
VelikovPetar wants to merge 18 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db_part2

Conversation

@VelikovPetar
Copy link
Copy Markdown
Contributor

@VelikovPetar VelikovPetar commented May 25, 2026

Submit a pull request

Linear: Part two of: FLU-485

Review after: #2679

CLA

  • I have signed the Stream CLA (required).
  • The code changes follow best practices
  • Code changes are tested (add some information if not applicable)

Description of the pull request

Replaces the per-message hydration (_messageFromJoinRow) with a batched hydration (_messagesFromJoinRows).

Testing:

Apply the following patch and run the new parity/benchmark tests to verify the performance improvements and no regression (except where some behaviour was intentionally changed):

Benchmark and parity tests
Subject: [PATCH] refactor(dao): Benchmark
---
Index: packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
new file mode 100644
--- /dev/null	(date 1779706335909)
+++ b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart	(date 1779706335909)
@@ -0,0 +1,481 @@
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+import '../../stream_chat_persistence_client_test.dart';
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+
+  setUp(() {
+    database = testDatabaseProvider('testUserId');
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds a channel with `count` messages that exercise every hydration path
+  // `_messageFromJoinRow` touches: user attribution, latest/own reactions
+  // (including on quoted messages), quoted messages (including one pointing
+  // at a deleted/missing target), polls *with* mixed votes + an answer,
+  // and a thread draft attached to a real parent message. Monotonic
+  // 1-second offsets on `createdAt` because Drift stores DateTime as
+  // integer Unix seconds — sub-second offsets collapse onto the same tick.
+  Future<void> seedRichMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final dbUser = User(id: 'testUserId'); // matches the DB's _userId
+    final otherUsers = List.generate(count, (i) => User(id: 'otherUser$i'));
+    final allUsers = [dbUser, ...otherUsers];
+    final baseTime = DateTime.now();
+
+    const optionA = PollOption(id: 'opt0', text: 'A');
+    const optionB = PollOption(id: 'opt1', text: 'B');
+    final poll = Poll(
+      id: 'poll0',
+      name: 'Pick one',
+      options: const [optionA, optionB],
+      // `createdById` must reference an existing user — `PollDao._pollFromJoinRow`
+      // uses `readTable(users)` (not `readTableOrNull`) on a LEFT JOIN, which
+      // throws if there's no matching row. Pre-existing PollDao quirk; out of
+      // scope for the pagination-pushdown work.
+      createdById: dbUser.id,
+    );
+
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: allUsers[i % allUsers.length],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+        // Every 3rd message (i ≥ 3) quotes the message 2 positions earlier.
+        // The very last quoting message instead points at a non-existent
+        // id, so we cover the "quote target deleted from cache" path.
+        quotedMessageId: (i >= 3 && i % 3 == 0)
+            ? (i == count - 1 ? 'msg-deleted' : 'msg${i - 2}')
+            : null,
+        // Every 5th message attaches to a poll.
+        pollId: (i % 5 == 0) ? 'poll0' : null,
+      ),
+    );
+
+    // Reactions populate both `latestReactions` (any user) and
+    // `ownReactions` (matching the DB user) on top-level messages AND on the
+    // messages that are themselves quote targets, so the parity check
+    // catches divergence in nested hydration too.
+    final reactions = <Reaction>[
+      for (var i = 0; i < count; i++) ...[
+        if (i.isEven)
+          Reaction(
+            type: 'like',
+            messageId: 'msg$i',
+            user: dbUser,
+            createdAt: baseTime.add(Duration(seconds: i)),
+          ),
+        Reaction(
+          type: 'love',
+          messageId: 'msg$i',
+          user: otherUsers[i % otherUsers.length],
+          createdAt: baseTime.add(Duration(seconds: i)),
+        ),
+      ],
+    ];
+
+    // Mixed-user poll votes: an own vote, two other-user votes (one per
+    // option), and an own free-text answer. Exercises every bucket the
+    // poll hydration splits into (latestVotesByOption, latestAnswers,
+    // ownVotesAndAnswers).
+    final pollVotes = [
+      PollVote(
+        id: 'vote-own-a',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        optionId: optionA.id,
+        createdAt: baseTime,
+      ),
+      PollVote(
+        id: 'vote-other-a',
+        pollId: poll.id,
+        userId: otherUsers[0].id,
+        user: otherUsers[0],
+        optionId: optionA.id,
+        createdAt: baseTime.add(const Duration(seconds: 1)),
+      ),
+      PollVote(
+        id: 'vote-other-b',
+        pollId: poll.id,
+        userId: otherUsers[1 % otherUsers.length].id,
+        user: otherUsers[1 % otherUsers.length],
+        optionId: optionB.id,
+        createdAt: baseTime.add(const Duration(seconds: 2)),
+      ),
+      PollVote(
+        id: 'answer-own',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        answerText: 'because reasons',
+        createdAt: baseTime.add(const Duration(seconds: 3)),
+      ),
+    ];
+
+    // Thread draft attached to `msg1` (a top-level message that lives in
+    // the main `messages` table — required because `DraftMessages.parentId`
+    // FK-references `Messages.id`). This is the only way `fetchDraft=true`
+    // actually attaches a draft to any row.
+    final threadDraft = Draft(
+      channelCid: cid,
+      parentId: 'msg1',
+      createdAt: baseTime,
+      message: DraftMessage(
+        id: 'thread-draft-msg1',
+        text: 'Unsent reply to msg1',
+        parentId: 'msg1',
+      ),
+    );
+
+    await database.userDao.updateUsers(allUsers);
+    await database.channelDao.updateChannels(channels);
+    await database.pollDao.updatePolls([poll]);
+    await messageDao.updateMessages(cid, messages);
+    await database.reactionDao.updateReactions(reactions);
+    await database.pollVoteDao.updatePollVotes(pollVotes);
+    await database.draftMessageDao.updateDraftMessages([threadDraft]);
+  }
+
+  // Builds a structural fingerprint of a Message that captures every field
+  // the two implementations should agree on after hydration. Used in place
+  // of `==` because `Reaction` (and friends) don't extend `Equatable`, so
+  // identity-based equality on nested lists fails for instances built by
+  // separate calls.
+  //
+  // Goes one level into the quoted message so divergence in nested
+  // hydration (reactions/poll on the quote) surfaces too. Also fingerprints
+  // poll vote *counts* per bucket — a regression that, say, leaked
+  // other-user votes into `ownVotesAndAnswers` would change the counts.
+  String fingerprintMessage(Message m) {
+    String reactionFp(Reaction r) => '${r.type}@${r.user?.id ?? "-"}';
+    String reactionListFp(List<Reaction>? rs) =>
+        '[${(rs ?? const []).map(reactionFp).join(",")}]';
+    String pollFp(Poll? p) {
+      if (p == null) return '-';
+      final voteCounts = {
+        for (final entry in p.latestVotesByOption.entries)
+          entry.key: entry.value.length,
+      };
+      return [
+        p.id,
+        'answers=${p.latestAnswers.length}',
+        'own=${p.ownVotesAndAnswers.length}',
+        'votes=$voteCounts',
+      ].join(';');
+    }
+
+    String quotedFp(Message? q) {
+      if (q == null) return '-';
+      return [
+        q.id,
+        'latest=${reactionListFp(q.latestReactions)}',
+        'own=${reactionListFp(q.ownReactions)}',
+        'poll=${pollFp(q.poll)}',
+      ].join(';');
+    }
+
+    String draftFp(Draft? d) {
+      if (d == null) return '-';
+      return '${d.message.id}@parent=${d.parentId ?? "-"}';
+    }
+
+    return [
+      'id=${m.id}',
+      'text=${m.text ?? ""}',
+      'user=${m.user?.id ?? "-"}',
+      'createdAt=${m.createdAt.toUtc().toIso8601String()}',
+      'latest=${reactionListFp(m.latestReactions)}',
+      'own=${reactionListFp(m.ownReactions)}',
+      'quoted=${quotedFp(m.quotedMessage)}',
+      'poll=${pollFp(m.poll)}',
+      'draft=${draftFp(m.draft)}',
+    ].join(' | ');
+  }
+
+  group('getMessagesByCid: full data parity (legacy vs SQL pushdown)', () {
+    const cid = 'test:Cid';
+    const n = 30;
+
+    Future<void> assertParity(String label, PaginationParams? p) async {
+      await seedRichMessages(cid, n);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(
+        cid,
+        messagePagination: p,
+      );
+      final pushdown = await messageDao.getMessagesByCid(
+        cid,
+        messagePagination: p,
+      );
+
+      expect(
+        pushdown.length,
+        legacy.length,
+        reason: 'list lengths differ for "$label"',
+      );
+      expect(
+        pushdown.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+        reason: 'message data parity broken for "$label"',
+      );
+    }
+
+    test('no pagination', () => assertParity('no pagination', null));
+
+    test(
+      'limit only',
+      () => assertParity('limit: 10', const PaginationParams(limit: 10)),
+    );
+
+    test(
+      'lessThan + limit',
+      () => assertParity(
+        'lessThan: msg25, limit: 10',
+        const PaginationParams(limit: 10, lessThan: 'msg25'),
+      ),
+    );
+
+    // `greaterThan-only + limit` intentionally diverges from the legacy
+    // Dart-side filter: the SQL pushdown treats it as forward pagination
+    // (cursor exclusive, ASC, first N after the cursor) — see the
+    // `isForwardPagination` branch in `MessageDao.getMessagesByCid`. The
+    // legacy reference keeps the cursor and returns the tail of
+    // `[cursor..end]`. Assert each implementation's contract explicitly
+    // rather than comparing them.
+    test('greaterThan + limit (forward pagination, no legacy parity)',
+        () async {
+      await seedRichMessages(cid, n);
+
+      const params = PaginationParams(limit: 10, greaterThan: 'msg5');
+      final legacy =
+          await messageDao.getMessagesByCidLegacy(cid, messagePagination: params);
+      final pushdown =
+          await messageDao.getMessagesByCid(cid, messagePagination: params);
+
+      // Legacy: keeps the cursor, takes the last 10 of [msg5..msg29] → msg20..msg29.
+      expect(
+        legacy.map((m) => m.id).toList(),
+        equals([for (var i = 20; i < 30; i++) 'msg$i']),
+      );
+      // Pushdown: cursor exclusive, first 10 after the cursor → msg6..msg15.
+      expect(
+        pushdown.map((m) => m.id).toList(),
+        equals([for (var i = 6; i < 16; i++) 'msg$i']),
+      );
+    });
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => assertParity(
+        'lessThan: msg25, greaterThan: msg5, limit: 10',
+        const PaginationParams(
+          limit: 10,
+          lessThan: 'msg25',
+          greaterThan: 'msg5',
+        ),
+      ),
+    );
+
+    test('empty channel', () async {
+      // No seed at all — the legacy path returns `[]` early; the batched path
+      // also short-circuits on empty rows. Locks that contract.
+      const emptyCid = 'test:Empty';
+      await database.channelDao.updateChannels([ChannelModel(cid: emptyCid)]);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(emptyCid);
+      final batched = await messageDao.getMessagesByCid(emptyCid);
+
+      expect(legacy, isEmpty);
+      expect(batched, isEmpty);
+    });
+
+    test('single message', () async {
+      const singleCid = 'test:Single';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: singleCid)]);
+      await messageDao.updateMessages(singleCid, [
+        Message(
+          id: 'only',
+          type: 'regular',
+          user: dbUser,
+          text: 'solo',
+          createdAt: DateTime.now(),
+        ),
+      ]);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(singleCid);
+      final batched = await messageDao.getMessagesByCid(singleCid);
+
+      expect(batched.length, legacy.length);
+      expect(
+        batched.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+      );
+    });
+  });
+
+  // The standard `seedRichMessages` helper exercises every hydration path
+  // separately. These extra scenarios put related entities together on the
+  // *same* message and stack quoting depth — they catch interactions that a
+  // one-feature-per-message seed cannot.
+  group('getMessagesByCid: edge-case parity (legacy vs SQL pushdown)', () {
+    Future<void> assertParityForChannel(String cid) async {
+      final legacy = await messageDao.getMessagesByCidLegacy(cid);
+      final batched = await messageDao.getMessagesByCid(cid);
+
+      expect(batched.length, legacy.length);
+      expect(
+        batched.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+      );
+    }
+
+    test('message with poll AND quote AND thread draft attached', () async {
+      const cid = 'test:Mixed';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      await database.pollDao.updatePolls([
+        Poll(
+          id: 'poll-mixed',
+          name: 'Pick',
+          options: const [
+            PollOption(id: 'a', text: 'A'),
+            PollOption(id: 'b', text: 'B'),
+          ],
+          createdById: dbUser.id,
+        ),
+      ]);
+
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'quoted',
+          type: 'regular',
+          user: dbUser,
+          text: 'first',
+          createdAt: baseTime,
+        ),
+        Message(
+          id: 'mixed',
+          type: 'regular',
+          user: dbUser,
+          text: 'all three',
+          createdAt: baseTime.add(const Duration(seconds: 1)),
+          quotedMessageId: 'quoted',
+          pollId: 'poll-mixed',
+        ),
+      ]);
+      await database.reactionDao.updateReactions([
+        Reaction(
+          type: 'like',
+          messageId: 'quoted',
+          user: dbUser,
+          createdAt: baseTime,
+        ),
+      ]);
+      await database.draftMessageDao.updateDraftMessages([
+        Draft(
+          channelCid: cid,
+          parentId: 'mixed',
+          createdAt: baseTime,
+          message: DraftMessage(
+            id: 'thread-draft',
+            text: 'unsent reply',
+            parentId: 'mixed',
+          ),
+        ),
+      ]);
+
+      await assertParityForChannel(cid);
+    });
+
+    test('depth-2 quote chain (A → B → C)', () async {
+      const cid = 'test:Chain';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'C',
+          type: 'regular',
+          user: dbUser,
+          text: 'root',
+          createdAt: baseTime,
+        ),
+        Message(
+          id: 'B',
+          type: 'regular',
+          user: dbUser,
+          text: 'mid',
+          createdAt: baseTime.add(const Duration(seconds: 1)),
+          quotedMessageId: 'C',
+        ),
+        Message(
+          id: 'A',
+          type: 'regular',
+          user: dbUser,
+          text: 'top',
+          createdAt: baseTime.add(const Duration(seconds: 2)),
+          quotedMessageId: 'B',
+        ),
+      ]);
+      await assertParityForChannel(cid);
+    });
+
+    test('three messages quoting the same target', () async {
+      // Deduped quotes: the batched path collects unique quotedIds into one
+      // SELECT-IN. The fingerprint of the quoted message must be identical
+      // across all three quoting rows.
+      const cid = 'test:DedupedQuotes';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'target',
+          type: 'regular',
+          user: dbUser,
+          text: 'quoted everywhere',
+          createdAt: baseTime,
+        ),
+        for (var i = 0; i < 3; i++)
+          Message(
+            id: 'q$i',
+            type: 'regular',
+            user: dbUser,
+            text: 'quoting $i',
+            createdAt: baseTime.add(Duration(seconds: i + 1)),
+            quotedMessageId: 'target',
+          ),
+      ]);
+      await database.reactionDao.updateReactions([
+        Reaction(
+          type: 'like',
+          messageId: 'target',
+          user: dbUser,
+          createdAt: baseTime,
+        ),
+      ]);
+
+      await assertParityForChannel(cid);
+    });
+  });
+}
Index: packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart
new file mode 100644
--- /dev/null	(date 1779706330758)
+++ b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart	(date 1779706330758)
@@ -0,0 +1,420 @@
+import 'package:drift/drift.dart';
+import 'package:drift/native.dart';
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+/// Counts SELECT statements and rows returned through them. Used to
+/// instrument the legacy (pre-pushdown + N+1 hydration) and current
+/// (SQL-pushdown + batched hydration) implementations of `getMessagesByCid`
+/// for head-to-head comparison.
+class _CountingInterceptor extends QueryInterceptor {
+  int selectCount = 0;
+  int rowsReturned = 0;
+
+  void reset() {
+    selectCount = 0;
+    rowsReturned = 0;
+  }
+
+  @override
+  Future<List<Map<String, Object?>>> runSelect(
+    QueryExecutor executor,
+    String statement,
+    List<Object?> args,
+  ) async {
+    final result = await executor.runSelect(statement, args);
+    selectCount++;
+    rowsReturned += result.length;
+    return result;
+  }
+}
+
+typedef _BenchResult = ({
+  List<String> messageIds,
+  List<String> fingerprints,
+  int selectCount,
+  int rowsReturned,
+  int medianMicros,
+});
+
+// Goes one level into the quoted message and into the poll's vote buckets
+// so divergence in nested hydration (reactions/votes/answers on the quote
+// or poll) surfaces — bench parity is otherwise just "same ids in same
+// order" and would miss a regression that left, say, ownReactions empty.
+String _fingerprintMessage(Message m) {
+  String reactionFp(Reaction r) => '${r.type}@${r.user?.id ?? "-"}';
+  String reactionListFp(List<Reaction>? rs) =>
+      '[${(rs ?? const []).map(reactionFp).join(",")}]';
+  String pollFp(Poll? p) {
+    if (p == null) return '-';
+    final voteCounts = {
+      for (final entry in p.latestVotesByOption.entries)
+        entry.key: entry.value.length,
+    };
+    return [
+      p.id,
+      'answers=${p.latestAnswers.length}',
+      'own=${p.ownVotesAndAnswers.length}',
+      'votes=$voteCounts',
+    ].join(';');
+  }
+
+  String quotedFp(Message? q) {
+    if (q == null) return '-';
+    return [
+      q.id,
+      'latest=${reactionListFp(q.latestReactions)}',
+      'own=${reactionListFp(q.ownReactions)}',
+      'poll=${pollFp(q.poll)}',
+    ].join(';');
+  }
+
+  String draftFp(Draft? d) {
+    if (d == null) return '-';
+    return '${d.message.id}@parent=${d.parentId ?? "-"}';
+  }
+
+  return [
+    'id=${m.id}',
+    'user=${m.user?.id ?? "-"}',
+    'latest=${reactionListFp(m.latestReactions)}',
+    'own=${reactionListFp(m.ownReactions)}',
+    'quoted=${quotedFp(m.quotedMessage)}',
+    'poll=${pollFp(m.poll)}',
+    'draft=${draftFp(m.draft)}',
+  ].join(' | ');
+}
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+  late _CountingInterceptor interceptor;
+
+  setUp(() {
+    interceptor = _CountingInterceptor();
+    final executor = NativeDatabase.memory().interceptWith(interceptor);
+    database = DriftChatDatabase('testUserId', executor);
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds messages with reactions (own + other-user) on top-level AND on
+  // quoted-target messages, every-3rd quote chain (with the last quote
+  // pointing at a deleted target), every-5th poll with a mixed-user vote
+  // set + an own answer, and a thread draft attached to a real parent. The
+  // bench needs the same rich shape as the parity test or the SELECT-count
+  // improvement is invisible (and a hydration regression would be invisible
+  // too).
+  Future<void> seedRichMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final dbUser = User(id: 'testUserId');
+    final otherUsers = List.generate(count, (i) => User(id: 'otherUser$i'));
+    final allUsers = [dbUser, ...otherUsers];
+    final baseTime = DateTime.now();
+
+    const optionA = PollOption(id: 'opt0', text: 'A');
+    const optionB = PollOption(id: 'opt1', text: 'B');
+    final poll = Poll(
+      id: 'poll0',
+      name: 'Pick one',
+      options: const [optionA, optionB],
+      createdById: dbUser.id,
+    );
+
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: allUsers[i % allUsers.length],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+        quotedMessageId: (i >= 3 && i % 3 == 0)
+            ? (i == count - 1 ? 'msg-deleted' : 'msg${i - 2}')
+            : null,
+        pollId: (i % 5 == 0) ? 'poll0' : null,
+      ),
+    );
+
+    final reactions = <Reaction>[
+      for (var i = 0; i < count; i++) ...[
+        if (i.isEven)
+          Reaction(
+            type: 'like',
+            messageId: 'msg$i',
+            user: dbUser,
+            createdAt: baseTime.add(Duration(seconds: i)),
+          ),
+        Reaction(
+          type: 'love',
+          messageId: 'msg$i',
+          user: otherUsers[i % otherUsers.length],
+          createdAt: baseTime.add(Duration(seconds: i)),
+        ),
+      ],
+    ];
+
+    final pollVotes = [
+      PollVote(
+        id: 'vote-own-a',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        optionId: optionA.id,
+        createdAt: baseTime,
+      ),
+      PollVote(
+        id: 'vote-other-a',
+        pollId: poll.id,
+        userId: otherUsers[0].id,
+        user: otherUsers[0],
+        optionId: optionA.id,
+        createdAt: baseTime.add(const Duration(seconds: 1)),
+      ),
+      PollVote(
+        id: 'vote-other-b',
+        pollId: poll.id,
+        userId: otherUsers[1 % otherUsers.length].id,
+        user: otherUsers[1 % otherUsers.length],
+        optionId: optionB.id,
+        createdAt: baseTime.add(const Duration(seconds: 2)),
+      ),
+      PollVote(
+        id: 'answer-own',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        answerText: 'because reasons',
+        createdAt: baseTime.add(const Duration(seconds: 3)),
+      ),
+    ];
+
+    final threadDraft = Draft(
+      channelCid: cid,
+      parentId: 'msg1',
+      createdAt: baseTime,
+      message: DraftMessage(
+        id: 'thread-draft-msg1',
+        text: 'Unsent reply to msg1',
+        parentId: 'msg1',
+      ),
+    );
+
+    await database.userDao.updateUsers(allUsers);
+    await database.channelDao.updateChannels(channels);
+    await database.pollDao.updatePolls([poll]);
+    await messageDao.updateMessages(cid, messages);
+    await database.reactionDao.updateReactions(reactions);
+    await database.pollVoteDao.updatePollVotes(pollVotes);
+    await database.draftMessageDao.updateDraftMessages([threadDraft]);
+  }
+
+  Future<_BenchResult> runBench(
+    Future<List<Message>> Function() fn, {
+    int warmups = 2,
+    int iterations = 10,
+  }) async {
+    for (var i = 0; i < warmups; i++) {
+      await fn();
+    }
+
+    final timings = <int>[];
+    List<Message>? lastResult;
+    for (var i = 0; i < iterations; i++) {
+      interceptor.reset();
+      final sw = Stopwatch()..start();
+      lastResult = await fn();
+      sw.stop();
+      timings.add(sw.elapsedMicroseconds);
+    }
+    timings.sort();
+    final median = timings[timings.length ~/ 2];
+
+    return (
+      messageIds: lastResult!.map((m) => m.id).toList(),
+      fingerprints: lastResult.map(_fingerprintMessage).toList(),
+      selectCount: interceptor.selectCount,
+      rowsReturned: interceptor.rowsReturned,
+      medianMicros: median,
+    );
+  }
+
+  void printTable(
+    String scenario,
+    int n,
+    _BenchResult legacy,
+    _BenchResult batched,
+  ) {
+    String improvement(int oldV, int newV) =>
+        newV == 0 ? '—' : '${(oldV / newV).toStringAsFixed(2)}×';
+
+    String pad(Object v, [int width = 10]) => v.toString().padRight(width);
+
+    // Full-data parity, not just id-order: the fingerprint catches a
+    // regression in any hydrated field (reactions, polls, votes, quoted
+    // sub-fields, draft).
+    final parity =
+        batched.fingerprints.toString() == legacy.fingerprints.toString()
+            ? 'OK'
+            : 'MISMATCH';
+
+    // ignore: avoid_print
+    print('''
+
+Scenario: $scenario  (N=$n, P=${batched.messageIds.length})
+                 ${pad('OLD')}${pad('NEW')}Improvement (old/new)
+SELECT calls     ${pad(legacy.selectCount)}${pad(batched.selectCount)}${improvement(legacy.selectCount, batched.selectCount)}
+Rows fetched     ${pad(legacy.rowsReturned)}${pad(batched.rowsReturned)}${improvement(legacy.rowsReturned, batched.rowsReturned)}
+Time (us, med)   ${pad(legacy.medianMicros)}${pad(batched.medianMicros)}${improvement(legacy.medianMicros, batched.medianMicros)}
+Result parity    $parity
+''');
+  }
+
+  group('getMessagesByCid hydration: legacy vs batched', () {
+    const cid = 'test:Cid';
+    const n = 100;
+
+    Future<void> runScenario(String label, PaginationParams? p) async {
+      await seedRichMessages(cid, n);
+
+      final legacy = await runBench(
+        () => messageDao.getMessagesByCidLegacy(cid, messagePagination: p),
+      );
+      final batched = await runBench(
+        () => messageDao.getMessagesByCid(cid, messagePagination: p),
+      );
+
+      expect(
+        batched.messageIds,
+        equals(legacy.messageIds),
+        reason: 'id-order parity broken for "$label"',
+      );
+      expect(
+        batched.fingerprints,
+        equals(legacy.fingerprints),
+        reason: 'full-data parity broken for "$label" — a hydrated field '
+            '(reactions / poll votes / quoted sub-fields / draft) differs',
+      );
+      // Strict less-than: with rich related data the batched path must
+      // collapse the per-row reaction/poll/draft fetches into a handful of
+      // batched ones. Anything ≥ legacy would mean the refactor regressed.
+      expect(
+        batched.selectCount,
+        lessThan(legacy.selectCount),
+        reason: 'batched hydration must issue fewer SELECTs than legacy '
+            'for "$label"',
+      );
+      expect(
+        batched.rowsReturned,
+        lessThanOrEqualTo(legacy.rowsReturned),
+        reason: 'batched hydration must not materialize more rows than '
+            'legacy for "$label"',
+      );
+
+      printTable(label, n, legacy, batched);
+    }
+
+    test('no pagination', () => runScenario('no pagination', null));
+
+    test(
+      'limit: 25',
+      () => runScenario('limit: 25', const PaginationParams(limit: 25)),
+    );
+
+    test(
+      'lessThan + limit (scroll up)',
+      () => runScenario(
+        'lessThan: msg80, limit: 25',
+        const PaginationParams(limit: 25, lessThan: 'msg80'),
+      ),
+    );
+
+    // `greaterThan-only + limit` intentionally diverges from the legacy
+    // Dart-side filter: the SQL pushdown treats it as forward pagination
+    // (cursor exclusive, ASC, first N after the cursor). The batched
+    // hydration SELECT-count win is orthogonal to the pagination-semantics
+    // change, so we still assert it; for ids we compare against the
+    // explicit expected window instead of against legacy.
+    test('greaterThan + limit (scroll down, forward pagination)', () async {
+      await seedRichMessages(cid, n);
+
+      const params = PaginationParams(limit: 25, greaterThan: 'msg10');
+      final legacy = await runBench(
+        () => messageDao.getMessagesByCidLegacy(cid, messagePagination: params),
+      );
+      final batched = await runBench(
+        () => messageDao.getMessagesByCid(cid, messagePagination: params),
+      );
+
+      // Legacy: keeps the cursor, takes the last 25 of [msg10..msg99]
+      // → msg75..msg99.
+      expect(
+        legacy.messageIds,
+        equals([for (var i = 75; i < 100; i++) 'msg$i']),
+        reason: 'legacy must keep the cursor and return the tail of '
+            '[cursor..end]',
+      );
+      // Pushdown: forward pagination, first 25 after the cursor → msg11..msg35.
+      expect(
+        batched.messageIds,
+        equals([for (var i = 11; i < 36; i++) 'msg$i']),
+        reason: 'forward pagination must return the first 25 messages after '
+            'the cursor in ASC order',
+      );
+      expect(batched.selectCount, lessThan(legacy.selectCount));
+      expect(batched.rowsReturned, lessThanOrEqualTo(legacy.rowsReturned));
+
+      printTable(
+          'greaterThan: msg10, limit: 25 (forward)', n, legacy, batched);
+    });
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => runScenario(
+        'lessThan: msg80, greaterThan: msg10, limit: 25',
+        const PaginationParams(
+          limit: 25,
+          lessThan: 'msg80',
+          greaterThan: 'msg10',
+        ),
+      ),
+    );
+  });
+
+  group('getMessagesByCid hydration: stress', () {
+    const cid = 'test:Cid';
+
+    test(
+      '500 messages, limit: 25',
+      () async {
+        await seedRichMessages(cid, 500);
+
+        final legacy = await runBench(
+          () => messageDao.getMessagesByCidLegacy(
+            cid,
+            messagePagination: const PaginationParams(limit: 25),
+          ),
+        );
+        final batched = await runBench(
+          () => messageDao.getMessagesByCid(
+            cid,
+            messagePagination: const PaginationParams(limit: 25),
+          ),
+        );
+
+        expect(batched.messageIds, equals(legacy.messageIds));
+        expect(batched.fingerprints, equals(legacy.fingerprints));
+        expect(batched.selectCount, lessThan(legacy.selectCount));
+        expect(batched.rowsReturned, lessThanOrEqualTo(legacy.rowsReturned));
+
+        printTable('stress: 500 messages, limit 25', 500, legacy, batched);
+      },
+    );
+  });
+}
Index: packages/stream_chat_persistence/lib/src/dao/message_dao.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart
--- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(revision cc443dce0fecb9d52e3749172f4fbc6b02425531)
+++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(date 1779706338696)
@@ -1,4 +1,7 @@
+import 'dart:math';
+
 import 'package:drift/drift.dart';
+import 'package:flutter/foundation.dart';
 import 'package:stream_chat/stream_chat.dart';
 import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
 import 'package:stream_chat_persistence/src/entity/messages.dart';
@@ -360,4 +363,106 @@
         .map((row) => row.read(messages.createdAt))
         .getSingleOrNull();
   }
+
+  Future<Message> _messageFromJoinRow(
+      TypedResult rows, {
+        bool fetchDraft = false,
+      }) async {
+    final userEntity = rows.readTableOrNull(_users);
+    final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers);
+    final msgEntity = rows.readTable(messages);
+    final latestReactions = await _db.reactionDao.getReactions(msgEntity.id);
+    final ownReactions = await _db.reactionDao.getReactionsByUserId(
+      msgEntity.id,
+      _db.userId,
+    );
+
+    final quotedMessage = await switch (msgEntity.quotedMessageId) {
+      final id? => getMessageById(id),
+      _ => null,
+    };
+
+    final poll = await switch (msgEntity.pollId) {
+      final id? => _db.pollDao.getPollById(id),
+      _ => null,
+    };
+
+    final draft = await switch (fetchDraft) {
+      true => _db.draftMessageDao.getDraftMessageByCid(
+        msgEntity.channelCid,
+        parentId: msgEntity.id,
+      ),
+      _ => null,
+    };
+
+    return msgEntity.toMessage(
+      user: userEntity?.toUser(),
+      pinnedBy: pinnedByEntity?.toUser(),
+      latestReactions: latestReactions,
+      ownReactions: ownReactions,
+      quotedMessage: quotedMessage,
+      poll: poll,
+      draft: draft,
+    );
+  }
+
+  /// Pre-SQL-pushdown reference implementation of [getMessagesByCid]. Fetches
+  /// every cached message for the channel, hydrates each row, then trims the
+  /// result in Dart. Kept only as the head-to-head baseline for the
+  /// `get_messages_by_cid_bench_test.dart` benchmark — remove once we no
+  /// longer need behavioral parity proof.
+  @visibleForTesting
+  Future<List<Message>> getMessagesByCidLegacy(
+      String cid, {
+        bool fetchDraft = true,
+        PaginationParams? messagePagination,
+      }) async {
+    final query = select(messages).join([
+      leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
+      leftOuterJoin(
+        _pinnedByUsers,
+        messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
+      ),
+    ])
+      ..where(messages.channelCid.equals(cid))
+      ..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
+      ..orderBy([OrderingTerm.asc(messages.createdAt)]);
+
+    final result = await query.get();
+    if (result.isEmpty) return [];
+
+    final msgList = await Future.wait(
+      result.map(
+            (row) => _messageFromJoinRow(
+          row,
+          fetchDraft: fetchDraft,
+        ),
+      ),
+    );
+
+    if (msgList.isNotEmpty) {
+      if (messagePagination?.lessThan != null) {
+        final lessThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.lessThan,
+        );
+        if (lessThanIndex != -1) {
+          msgList.removeRange(lessThanIndex, msgList.length);
+        }
+      }
+      if (messagePagination?.greaterThan != null) {
+        final greaterThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.greaterThan,
+        );
+        if (greaterThanIndex != -1) {
+          msgList.removeRange(0, greaterThanIndex);
+        }
+      }
+      if (messagePagination?.limit != null) {
+        return msgList
+            .skip(max(0, msgList.length - messagePagination!.limit))
+            .toList();
+      }
+    }
+    return msgList;
+  }
 }

Summary by CodeRabbit

  • New Features

    • Added bulk retrieval methods for messages, reactions, polls, and draft messages for improved batch operations.
  • Bug Fixes

    • Fixed pagination cursor handling to correctly filter messages with inclusive/exclusive cursor support.
    • Corrected forward pagination to return accurate message segments.
  • Performance

    • Optimized database queries through batched operations and reduced database reads during message retrieval.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 25, 2026

📝 Walkthrough

Walkthrough

This PR optimizes the stream_chat_persistence layer by batching related database fetches, moving filtering to SQL level, and adding bulk query APIs. A new chunked utility handles SQLite parameter limits, message hydration shifts to a batched _messagesFromJoinRows pattern, and getMessagesByCid pagination now uses SQL-level cursor filtering instead of post-fetch trimming. Comprehensive tests cover hydration isolation, pagination semantics, and bulk operation behavior.

Changes

Persistence Layer Query Optimization

Layer / File(s) Summary
Query Utility Foundation
packages/stream_chat_persistence/lib/src/db/query_utils.dart, packages/stream_chat_persistence/CHANGELOG.md
Introduces chunked<T> generator to split large lists and avoid SQLite parameter limits in IN (?, ...) queries with default 900-element chunks. CHANGELOG documents persistence optimizations and pagination cursor semantics fixes.
Bulk Query Methods Across DAOs
packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart, packages/stream_chat_persistence/lib/src/dao/poll_dao.dart, packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart, packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart
Adds getDraftMessagesByParentIds for batch draft lookup by parent IDs per channel; getPollsByIds for bulk poll hydration with votes/answers; getPollVotesForPolls for grouped poll votes; getReactionsForMessages and getReactionsForMessagesByUserId in both ReactionDao and PinnedMessageReactionDao using shared _selectReactions helper that centralizes Drift query with chunked ID processing.
Batched Message Hydration Refactoring
packages/stream_chat_persistence/lib/src/dao/message_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
Replaces per-message hydration with new _messagesFromJoinRows pattern: collects message/quoted/poll/draft IDs from join rows, batch-fetches reactions/polls/drafts via Future.wait, builds Message objects via _buildMessage helper using prefetched maps. Updates getMessageById, getThreadMessages, getThreadMessagesByParentId to use batched hydration.
Message Pagination with SQL-Level Cursors
packages/stream_chat_persistence/lib/src/dao/message_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
Overhauled getMessagesByCid to compute cursor-based createdAt cutoff values via _lookupMessageCreatedAt, determine pagination direction (forward/backward), apply SQL createdAt comparisons (isSmallerThan/OrEqual, isBiggerThan/OrEqual) with deterministic orderBy (by createdAt then id), and reverse rows when needed before hydration. Removes dart:math import; pagination now applies limit at SQL level.
Hydration Test Coverage
packages/stream_chat_persistence/test/src/dao/message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart
Adds comprehensive hydration tests validating: latest vs own reactions with per-row isolation, poll data with mixed user votes/answers, conditional thread draft attachment via fetchDraft flag, quoted message hydration including depth-2 quote chains, and hydration correctness across pagination boundaries. Uses shared baseTime for deterministic timestamps.
Pagination Test Coverage
packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
Updates pagination tests with exclusive cursor semantics (greaterThan replaces greaterThanOrEqual), deterministic timestamps (baseTime + offset), and expanded scenarios covering limit-only, no-op cursors, default limits, inclusive/exclusive pivot variants, and expected first/last message ID validation.
Bulk Query API Test Coverage
packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart, packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart, packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart, packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
Tests for new bulk methods including: getDraftMessagesByParentIds with channel scoping; getReactionsForMessages with >900 message IDs and dense result maps; getPollsByIds with vote/answer grouping and null-mapped unknown IDs; getPollVotesForPolls with mixed empty and unknown polls; and reaction isolation tests ensuring no cross-ID leakage.

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly Related PRs

  • GetStream/stream-chat-flutter#2320: Related through draft-message fetching and hydration refactoring in MessageDao/PinnedMessageDao; main PR adds batched draft lookup and reworked hydration while retrieved PR fixes draft association and persistence behaviors in the same DAOs.

Suggested Reviewers

  • xsahil03x

🐰 Hop, hop! A persistence tale unfolds,
Where batches bloom and cursors fold,
One query now, where once were ten,
The database smiles at speed again! ✨📊

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: performance optimization reducing database message reads during pagination through batched hydration instead of per-message lookups, clearly marked as part 2 of the effort.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/FLU-485_optimize_read_message_from_db_part2

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@VelikovPetar VelikovPetar changed the title Feature/flu 485 optimize read message from db part2 perf(llc): Reduce the number of read message per channel from DB when paginating (part 2) May 25, 2026
@VelikovPetar VelikovPetar marked this pull request as ready for review May 25, 2026 11:00
@codecov
Copy link
Copy Markdown

codecov Bot commented May 25, 2026

Codecov Report

❌ Patch coverage is 99.37888% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.62%. Comparing base (508c019) to head (cc443dc).

Files with missing lines Patch % Lines
...at_persistence/lib/src/dao/pinned_message_dao.dart 97.64% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2681      +/-   ##
==========================================
+ Coverage   65.33%   65.62%   +0.29%     
==========================================
  Files         423      424       +1     
  Lines       26646    26844     +198     
==========================================
+ Hits        17408    17616     +208     
+ Misses       9238     9228      -10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart`:
- Around line 306-317: The cursor predicates only filter by messages.createdAt,
which can cause duplicates/misses when createdAt ties exist; update each branch
(lessThanCutoff, lessThanOrEqualCutoff, greaterThanCutoff,
greaterThanOrEqualCutoff) to add a secondary predicate on messages.id matching
the ordering key so the pair (createdAt, id) is used (e.g., for lessThan use
createdAt < t OR (createdAt == t AND id < cursorId)); modify the query.where
calls in the blocks referencing messages.createdAt and messages.id to apply the
combined comparisons for all four operators, and apply the same change to the
analogous blocks around the second occurrence (lines referenced in the comment:
the other block at 350-362).
- Around line 40-43: The recursive quoted-message hydration in
_messagesFromJoinRows lacks a visited-set, so protect against cycles by adding a
visited ID set parameter (e.g., Set<String> visited or Set<String>
visitedMessageIds) defaulting to empty, check the current message's id before
hydrating its quote(s) and skip recursion if already visited, and pass the
updated set when calling the same hydration logic recursively; apply the same
visited-set guard to the other recursive hydration block referenced around lines
88-106 so both recursion entry points use the visited set to prevent infinite
recursion.

In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart`:
- Around line 39-42: The recursive quoted-message hydration in
_messagesFromJoinRows (and the similar logic around lines 88-106) lacks cycle
protection; add a visited-id guard: introduce a Set<String> visited (or Set<int>
depending on message id type) parameter with a default empty set, add the
current message id to visited before recursing, and before resolving a quoted
message check if its id is already in visited; if it is, stop recursion (e.g.,
set quotedMessage to null or a shallow reference) to break the cycle. Pass the
visited set through any recursive calls so cycles are detected across the entire
resolution chain and avoid infinite recursion.

In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart`:
- Around line 14-17: The public function chunked<T>(List<T> input, [int size =
900]) can hang or misbehave if size <= 0; add an upfront argument validation in
chunked to guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3db24b32-5504-4c67-995b-722956c7187b

📥 Commits

Reviewing files that changed from the base of the PR and between 508c019 and cc443dc.

📒 Files selected for processing (16)
  • packages/stream_chat_persistence/CHANGELOG.md
  • packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/poll_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart
  • packages/stream_chat_persistence/lib/src/db/query_utils.dart
  • packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart

Comment on lines +40 to 43
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
}) async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard recursive quote hydration against cyclic quote graphs.

_messagesFromJoinRows recursively hydrates quoted messages without a visited-set guard. A cycle like A -> B -> A will recurse until stack overflow.

💡 Suggested fix
 Future<List<Message>> _messagesFromJoinRows(
   List<TypedResult> rows, {
   bool fetchDraft = false,
+  Set<String>? _visitedQuoteIds,
 }) async {
   if (rows.isEmpty) return const [];
+  final visited = _visitedQuoteIds ?? <String>{};

   final messageIds = <String>[];
   final quotedIds = <String>[];
@@
   for (final row in rows) {
     final msg = row.readTable(messages);
     messageIds.add(msg.id);
@@
   }
+  visited.addAll(messageIds);
@@
-  if (quotedIds.isNotEmpty) {
+  final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+  if (nextQuotedIds.isNotEmpty) {
     final quoteRows = await (select(messages).join([
@@
-          ..where(messages.id.isIn(quotedIds)))
+          ..where(messages.id.isIn(nextQuotedIds)))
         .get();
     final quotedMessages = await _messagesFromJoinRows(
       quoteRows,
       fetchDraft: true,
+      _visitedQuoteIds: visited,
     );

Also applies to: 88-106

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
40 - 43, The recursive quoted-message hydration in _messagesFromJoinRows lacks a
visited-set, so protect against cycles by adding a visited ID set parameter
(e.g., Set<String> visited or Set<String> visitedMessageIds) defaulting to
empty, check the current message's id before hydrating its quote(s) and skip
recursion if already visited, and pass the updated set when calling the same
hydration logic recursively; apply the same visited-set guard to the other
recursive hydration block referenced around lines 88-106 so both recursion entry
points use the visited set to prevent infinite recursion.

Comment on lines +306 to +317
if (lessThanCutoff case final t?) {
query.where(messages.createdAt.isSmallerThanValue(t));
}
if (lessThanOrEqualCutoff case final t?) {
query.where(messages.createdAt.isSmallerOrEqualValue(t));
}
if (greaterThanCutoff case final t?) {
query.where(messages.createdAt.isBiggerThanValue(t));
}
if (greaterThanOrEqualCutoff case final t?) {
query.where(messages.createdAt.isBiggerOrEqualValue(t));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use (createdAt, id) cursor predicates to avoid duplicate/missing rows on timestamp ties.

Current cursor filtering only checks createdAt, but ordering also uses id. When multiple messages share the same createdAt, pagination can skip or repeat records across pages.

💡 Suggested fix
- Future<DateTime?> _lookupMessageCreatedAt(String id) {
+ Future<(DateTime, String)?> _lookupMessageCursor(String id) {
   return (selectOnly(messages)
-        ..addColumns([messages.createdAt])
+        ..addColumns([messages.createdAt, messages.id])
         ..where(messages.id.equals(id))
         ..where(
           messages.parentId.isNull() | messages.showInChannel.equals(true),
         ))
-      .map((row) => row.read(messages.createdAt))
+      .map((row) => (
+            row.read(messages.createdAt)!,
+            row.read(messages.id)!,
+          ))
       .getSingleOrNull();
 }
- if (lessThanCutoff case final t?) {
-   query.where(messages.createdAt.isSmallerThanValue(t));
+ if (lessThanCursor case final (t, id)?) {
+   query.where(
+     messages.createdAt.isSmallerThanValue(t) |
+         (messages.createdAt.equals(t) & messages.id.isSmallerThanValue(id)),
+   );
 }

Apply analogous predicates for <=, >, >=.

Also applies to: 350-362

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
306 - 317, The cursor predicates only filter by messages.createdAt, which can
cause duplicates/misses when createdAt ties exist; update each branch
(lessThanCutoff, lessThanOrEqualCutoff, greaterThanCutoff,
greaterThanOrEqualCutoff) to add a secondary predicate on messages.id matching
the ordering key so the pair (createdAt, id) is used (e.g., for lessThan use
createdAt < t OR (createdAt == t AND id < cursorId)); modify the query.where
calls in the blocks referencing messages.createdAt and messages.id to apply the
combined comparisons for all four operators, and apply the same change to the
analogous blocks around the second occurrence (lines referenced in the comment:
the other block at 350-362).

Comment on lines +39 to 42
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
}) async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add cycle protection to recursive quoted-message hydration.

_messagesFromJoinRows recursively resolves quotes with no visited guard. Cyclic quote references can lead to infinite recursion and crash.

💡 Suggested fix
 Future<List<Message>> _messagesFromJoinRows(
   List<TypedResult> rows, {
   bool fetchDraft = false,
+  Set<String>? _visitedQuoteIds,
 }) async {
   if (rows.isEmpty) return const [];
+  final visited = _visitedQuoteIds ?? <String>{};
@@
   for (final row in rows) {
     final msg = row.readTable(pinnedMessages);
     messageIds.add(msg.id);
@@
   }
+  visited.addAll(messageIds);
@@
-  if (quotedIds.isNotEmpty) {
+  final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+  if (nextQuotedIds.isNotEmpty) {
@@
-          ..where(pinnedMessages.id.isIn(quotedIds)))
+          ..where(pinnedMessages.id.isIn(nextQuotedIds)))
         .get();
     final quotedMessages = await _messagesFromJoinRows(
       quoteRows,
       fetchDraft: true,
+      _visitedQuoteIds: visited,
     );

Also applies to: 88-106

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart` around
lines 39 - 42, The recursive quoted-message hydration in _messagesFromJoinRows
(and the similar logic around lines 88-106) lacks cycle protection; add a
visited-id guard: introduce a Set<String> visited (or Set<int> depending on
message id type) parameter with a default empty set, add the current message id
to visited before recursing, and before resolving a quoted message check if its
id is already in visited; if it is, stop recursion (e.g., set quotedMessage to
null or a shallow reference) to break the cycle. Pass the visited set through
any recursive calls so cycles are detected across the entire resolution chain
and avoid infinite recursion.

Comment on lines +14 to +17
Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* {
for (var i = 0; i < input.length; i += size) {
yield input.sublist(i, math.min(i + size, input.length));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard size to prevent infinite loops on invalid input.

chunked is public, and size <= 0 makes the loop at Line 15 non-terminating or invalid. Add an argument check up front.

Suggested fix
 Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* {
+  if (size <= 0) {
+    throw ArgumentError.value(size, 'size', 'must be greater than 0');
+  }
   for (var i = 0; i < input.length; i += size) {
     yield input.sublist(i, math.min(i + size, input.length));
   }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart` around lines 14
- 17, The public function chunked<T>(List<T> input, [int size = 900]) can hang
or misbehave if size <= 0; add an upfront argument validation in chunked to
guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant