From 70a55ef1b09676c0ad16aed06490067fb8d2c7a7 Mon Sep 17 00:00:00 2001 From: gydeng Date: Sat, 5 Jul 2025 06:20:34 +0000 Subject: [PATCH 01/14] [improve] modify the negativeACK structure to reduce memory overhead --- CMakeLists.txt | 4 ++++ lib/NegativeAcksTracker.cc | 27 +++++++++++++++++++++------ lib/NegativeAcksTracker.h | 5 ++++- vcpkg.json | 3 +++ 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d9af806f..3c043557 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,6 +124,7 @@ if (INTEGRATE_VCPKG) find_package(protobuf CONFIG REQUIRED) find_package(zstd CONFIG REQUIRED) find_package(Snappy CONFIG REQUIRED) + find_package(roaring CONFIG REQUIRED) set(COMMON_LIBS CURL::libcurl ZLIB::ZLIB OpenSSL::SSL @@ -131,6 +132,9 @@ if (INTEGRATE_VCPKG) protobuf::libprotobuf $,zstd::libzstd_shared,zstd::libzstd_static> Snappy::snappy + roaring::roaring + roaring::roaring-headers + roaring::roaring-headers-cpp ) if (USE_ASIO) find_package(asio CONFIG REQUIRED) diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 7116b1c5..21acf4c2 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -19,14 +19,20 @@ #include "NegativeAcksTracker.h" +#include + #include #include +#include #include "ClientImpl.h" #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" #include "MessageIdUtil.h" +#include "pulsar/MessageBuilder.h" +#include "pulsar/MessageId.h" +#include "pulsar/MessageIdBuilder.h" DECLARE_LOG_OBJECT() namespace pulsar { @@ -75,13 +81,22 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) { auto now = Clock::now(); + // The map is sorted by time, so we can exit immediately when we traverse to a time that does not match for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) { - if (it->second < now) { - messagesToRedeliver.insert(it->first); - it = nackedMessages_.erase(it); - } else { - ++it; + if (it->first > now) { + // We are done with all the messages that need to be redelivered + break; + } + + auto ledgerMap = it->second; + for (auto ledgerIt = ledgerMap.begin(); ledgerIt != ledgerMap.end(); ++ledgerIt) { + auto entrySet = ledgerIt->second; + for (auto setIt = entrySet.begin(); setIt != entrySet.end(); ++setIt) { + messagesToRedeliver.insert( + MessageIdBuilder().ledgerId(ledgerIt->first).entryId(*setIt).build()); + } } + it = nackedMessages_.erase(it); } lock.unlock(); @@ -99,7 +114,7 @@ void NegativeAcksTracker::add(const MessageId &m) { { std::lock_guard lock{mutex_}; // Erase batch id to group all nacks from same batch - nackedMessages_[msgId] = now + nackDelay_; + nackedMessages_[now][msgId.ledgerId()].add((uint64_t)msgId.entryId()); } scheduleTimer(); diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index bf1d9318..1e0c8388 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include "AsioDefines.h" #include "AsioTimer.h" @@ -39,6 +41,7 @@ class ClientImpl; using ClientImplPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; +using LedgerId = int64_t; class NegativeAcksTracker : public std::enable_shared_from_this { public: @@ -65,7 +68,7 @@ class NegativeAcksTracker : public std::enable_shared_from_this nackedMessages_; + std::map> nackedMessages_; const DeadlineTimerPtr timer_; std::atomic_bool closed_{false}; diff --git a/vcpkg.json b/vcpkg.json index ec9aabd7..5fbd1e1c 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -54,6 +54,9 @@ { "name": "zstd", "version>=": "1.5.5" + }, + { + "name" : "roaring" } ], "features": { From 00a5f1b50ae08c1beccfde4a5a722b188dc4e6db Mon Sep 17 00:00:00 2001 From: gydeng Date: Sat, 5 Jul 2025 08:15:29 +0000 Subject: [PATCH 02/14] [chore] format vcpkg.json --- vcpkg.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcpkg.json b/vcpkg.json index 5fbd1e1c..0f88f0b8 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -56,7 +56,7 @@ "version>=": "1.5.5" }, { - "name" : "roaring" + "name": "roaring" } ], "features": { From 23b78f26827c09977a1d74e55505e3af6bfa75ec Mon Sep 17 00:00:00 2001 From: gydeng Date: Sun, 6 Jul 2025 05:00:15 +0000 Subject: [PATCH 03/14] chore: add compatibility with c++ 20 --- .github/workflows/ci-pr-validation.yaml | 2 +- LegacyFindPackages.cmake | 3 +++ lib/NegativeAcksTracker.h | 7 ++++++- vcpkg.json | 7 ++++--- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index c5140094..4b81f343 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -167,7 +167,7 @@ jobs: sudo apt-get install -y libcurl4-openssl-dev libssl-dev \ protobuf-compiler libprotobuf-dev libboost-dev \ libboost-dev libboost-program-options-dev \ - libzstd-dev libsnappy-dev libgmock-dev libgtest-dev + libzstd-dev libsnappy-dev libgmock-dev libgtest-dev libroaring-dev - name: CMake run: cmake -B build -DBUILD_PERF_TOOLS=ON -DCMAKE_CXX_STANDARD=20 - name: Build diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 5004545b..01aaeecc 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -93,6 +93,8 @@ endif () message("Protobuf_INCLUDE_DIRS: " ${Protobuf_INCLUDE_DIRS}) message("Protobuf_LIBRARIES: " ${Protobuf_LIBRARIES}) +find_package(roaring REQUIRED) + # NOTE: CMake might not find curl and zlib on some platforms like Ubuntu, in this case, find them manually set(CURL_NO_CURL_CMAKE ON) find_package(curl QUIET) @@ -129,6 +131,7 @@ if (LINK_STATIC AND NOT VCPKG_TRIPLET) elseif (LINK_STATIC AND VCPKG_TRIPLET) find_package(Protobuf REQUIRED) message(STATUS "Found protobuf static library: " ${Protobuf_LIBRARIES}) + find_package(roaring REQUIRED) if (MSVC AND (${CMAKE_BUILD_TYPE} STREQUAL Debug)) find_library(ZLIB_LIBRARIES NAMES zlibd) else () diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index 1e0c8388..23b6cb11 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -42,6 +42,11 @@ using ClientImplPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; using LedgerId = int64_t; +#if __cplusplus >= 202002L +using ConditionalRoaringMap = Roaring64Map; +#else +using ConditionalRoaringMap = roaring::Roaring64Map; +#endif class NegativeAcksTracker : public std::enable_shared_from_this { public: @@ -68,7 +73,7 @@ class NegativeAcksTracker : public std::enable_shared_from_this> nackedMessages_; + std::map> nackedMessages_; const DeadlineTimerPtr timer_; std::atomic_bool closed_{false}; diff --git a/vcpkg.json b/vcpkg.json index 0f88f0b8..bbb75027 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -43,6 +43,10 @@ "name": "protobuf", "version>=": "3.21.12" }, + { + "name": "roaring", + "version>=": "2.0.4" + }, { "name": "snappy", "version>=": "1.1.10" @@ -54,9 +58,6 @@ { "name": "zstd", "version>=": "1.5.5" - }, - { - "name": "roaring" } ], "features": { From 3a2e311c0b43fddfc7367477639e3ae0fb16e624 Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 18 Jul 2025 08:46:27 +0000 Subject: [PATCH 04/14] fix compatibility issues --- LegacyFindPackages.cmake | 15 +++++++++++++-- dependencies.yaml | 1 + lib/NegativeAcksTracker.cc | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 01aaeecc..53871bc5 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -93,8 +93,6 @@ endif () message("Protobuf_INCLUDE_DIRS: " ${Protobuf_INCLUDE_DIRS}) message("Protobuf_LIBRARIES: " ${Protobuf_LIBRARIES}) -find_package(roaring REQUIRED) - # NOTE: CMake might not find curl and zlib on some platforms like Ubuntu, in this case, find them manually set(CURL_NO_CURL_CMAKE ON) find_package(curl QUIET) @@ -119,6 +117,17 @@ if (NOT ZLIB_INCLUDE_DIRS OR NOT ZLIB_LIBRARIES) message(FATAL_ERROR "Could not find zlib") endif () +find_package(roaring QUIET) +if (NOT ROARING_FOUND) + find_path(ROARING_INCLUDE_DIRS NAMES roaring/roaring.hh) + find_library(ROARING_LIBRARIES NAMES roaring libroaring) +endif () +message("ROARING_INCLUDE_DIRS: " ${ROARING_INCLUDE_DIRS}) +message("ROARING_LIBRARIES: " ${ROARING_LIBRARIES}) +if (NOT ROARING_INCLUDE_DIRS OR NOT ROARING_LIBRARIES) + message(FATAL_ERROR "Could not find libroaring") +endif () + if (LINK_STATIC AND NOT VCPKG_TRIPLET) find_library(LIB_ZSTD NAMES libzstd.a) message(STATUS "ZStd: ${LIB_ZSTD}") @@ -234,6 +243,7 @@ include_directories( ${Boost_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${ZLIB_INCLUDE_DIRS} + ${ROARING_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIRS} ${GTEST_INCLUDE_PATH} @@ -249,6 +259,7 @@ set(COMMON_LIBS ${CURL_LIBRARIES} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} + ${ROARING_LIBRARIES} ${ADDITIONAL_LIBRARIES} ${CMAKE_DL_LIBS} ) diff --git a/dependencies.yaml b/dependencies.yaml index 8d338e4d..a245f558 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -26,3 +26,4 @@ zstd: 1.5.5 snappy: 1.1.10 openssl: 1.1.1w curl: 8.6.0 +roaring: 0.2.66 diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 21acf4c2..2b1fc604 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -19,7 +19,7 @@ #include "NegativeAcksTracker.h" -#include +#include #include #include From 7e857c550ff125c355b8209757562039c7d5ca2f Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 18 Jul 2025 09:24:08 +0000 Subject: [PATCH 05/14] fix format --- lib/NegativeAcksTracker.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 2b1fc604..5ea9d2d4 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -20,7 +20,6 @@ #include "NegativeAcksTracker.h" #include - #include #include #include From db0411eb7098f0e08a0169ee69e4d60bbc459c29 Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 18 Jul 2025 12:08:13 +0000 Subject: [PATCH 06/14] update the version of libroaring --- vcpkg.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcpkg.json b/vcpkg.json index bbb75027..5ec0e0ea 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -45,7 +45,7 @@ }, { "name": "roaring", - "version>=": "2.0.4" + "version>=": "4.3.1" }, { "name": "snappy", From c34b20751a4d2d565790575ed0cde2aa29a58588 Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 29 Jul 2025 03:09:03 +0000 Subject: [PATCH 07/14] chore: update the Windows version, because Windows 2019 has been retired --- .github/workflows/ci-pr-validation.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 4b81f343..157cf25e 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -188,16 +188,16 @@ jobs: matrix: include: - name: 'Windows x64' - os: windows-2019 + os: windows-2022 triplet: x64-windows-static suffix: 'windows-win64' - generator: 'Visual Studio 16 2019' + generator: 'Visual Studio 17 2022' arch: '-A x64' - name: 'Windows x86' - os: windows-2019 + os: windows-2022 triplet: x86-windows-static suffix: 'windows-win32' - generator: 'Visual Studio 16 2019' + generator: 'Visual Studio 17 2022' arch: '-A Win32' steps: From 07577c86ad11170666754e4be10742f5472d8a79 Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 29 Jul 2025 03:09:13 +0000 Subject: [PATCH 08/14] fix compilation issue on macos --- pkg/mac/build-static-library.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh index 449222b1..32c60422 100755 --- a/pkg/mac/build-static-library.sh +++ b/pkg/mac/build-static-library.sh @@ -72,5 +72,5 @@ cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/ # Test the libraries clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I $INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar ./dynamic.out -clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I $INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a +clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I $INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a $PWD/build-osx/vcpkg_installed/$VCPKG_TRIPLET/lib/libroaring.a ./static.out From e690e6b08916a29f651044f02f6ce5a3e13e29e4 Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 29 Jul 2025 06:28:07 +0000 Subject: [PATCH 09/14] chore: update Debian 10 (buster) to Debian 11, as it has reached end-of-life. --- pkg/deb/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/deb/Dockerfile b/pkg/deb/Dockerfile index 502b0934..01a35175 100644 --- a/pkg/deb/Dockerfile +++ b/pkg/deb/Dockerfile @@ -19,7 +19,7 @@ # Build pulsar client library in Centos with tools to -FROM debian:10 +FROM debian:11 ARG PLATFORM From 2e4d2f4cf808a5b1e31eebebe1baf882902cbc18 Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 29 Jul 2025 12:41:29 +0000 Subject: [PATCH 10/14] update Dockerfile to add compilation and installation of CRoaring --- CMakeLists.txt | 2 -- pkg/apk/Dockerfile | 10 ++++++++++ pkg/deb/Dockerfile | 12 +++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c043557..de9a2457 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,8 +133,6 @@ if (INTEGRATE_VCPKG) $,zstd::libzstd_shared,zstd::libzstd_static> Snappy::snappy roaring::roaring - roaring::roaring-headers - roaring::roaring-headers-cpp ) if (USE_ASIO) find_package(asio CONFIG REQUIRED) diff --git a/pkg/apk/Dockerfile b/pkg/apk/Dockerfile index d7d87181..db501f06 100644 --- a/pkg/apk/Dockerfile +++ b/pkg/apk/Dockerfile @@ -34,6 +34,7 @@ RUN apk add \ python3 \ py3-pip \ perl \ + git \ sudo RUN pip3 install pyyaml @@ -86,6 +87,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \ make -j8 && make install && \ rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz +# Roaring +RUN ROARING_VERSION=$(dep-version.py roaring) && \ + curl -O -L https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz && \ + tar xfz v${ROARING_VERSION}.tar.gz && \ + cd CRoaring-${ROARING_VERSION} && \ + mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \ + make -j8 && make install && \ + rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION} + RUN OPENSSL_VERSION=$(dep-version.py openssl) && \ OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \ curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz && \ diff --git a/pkg/deb/Dockerfile b/pkg/deb/Dockerfile index 01a35175..bbafa8b0 100644 --- a/pkg/deb/Dockerfile +++ b/pkg/deb/Dockerfile @@ -32,7 +32,8 @@ RUN apt-get update -y && \ perl \ dpkg-dev \ python3 \ - python3-pip + python3-pip \ + git RUN pip3 install pyyaml @@ -91,6 +92,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \ make -j8 && make install && \ rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz +# Roaring +RUN ROARING_VERSION=$(dep-version.py roaring) && \ + curl -O -L https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz && \ + tar xfz v${ROARING_VERSION}.tar.gz && \ + cd CRoaring-${ROARING_VERSION} && \ + mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \ + make -j8 && make install && \ + rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION} + RUN OPENSSL_VERSION=$(dep-version.py openssl) && \ OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \ curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz && \ From 8cede913c8776fd72d9beca4d77329183c6c1d3d Mon Sep 17 00:00:00 2001 From: gydeng Date: Tue, 29 Jul 2025 13:23:07 +0000 Subject: [PATCH 11/14] chore: update roaring version in Dockerfile --- dependencies.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies.yaml b/dependencies.yaml index a245f558..55087d29 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -26,4 +26,4 @@ zstd: 1.5.5 snappy: 1.1.10 openssl: 1.1.1w curl: 8.6.0 -roaring: 0.2.66 +roaring: 4.3.1 From 4014fca82623bbd500aa42a817e9956c9d0a71c2 Mon Sep 17 00:00:00 2001 From: gydeng Date: Thu, 31 Jul 2025 08:37:34 +0000 Subject: [PATCH 12/14] feat: add negativeack precision bit configuration and related tests --- include/pulsar/ConsumerConfiguration.h | 15 +++++++ lib/ConsumerConfiguration.cc | 10 +++++ lib/ConsumerConfigurationImpl.h | 1 + lib/NegativeAcksTracker.cc | 16 +++++++- lib/NegativeAcksTracker.h | 1 + lib/c/c_ConsumerConfiguration.cc | 10 +++++ tests/BasicEndToEndTest.cc | 54 ++++++++++++++++++++++++++ tests/ConsumerConfigurationTest.cc | 4 ++ 8 files changed, 110 insertions(+), 1 deletion(-) diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 3254a95e..ee0550c6 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -277,6 +277,21 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ long getNegativeAckRedeliveryDelayMs() const; + /** + * Set the precision bit count for negative ack redelivery delay. + * The lower bits of the redelivery time will be trimmed to reduce the memory occupation. + * @param negativeAckPrecisionBitCnt + * negative ack precision bit count + */ + void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt); + + /** + * Get the configured precision bit count for negative ack redelivery delay. + * + * @return redelivery time precision bit count + */ + int getNegativeAckPrecisionBitCnt() const; + /** * Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent * to broker until the time window reaches its end, or the number of grouped messages reaches diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 60f8fea6..3c39cadf 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -134,6 +134,16 @@ void ConsumerConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) { impl_->ackGroupingTimeMs = ackGroupingMillis; } +int ConsumerConfiguration::getNegativeAckPrecisionBitCnt() const { return impl_->negativeAckPrecisionBitCnt; } + +void ConsumerConfiguration::setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt) { + if (negativeAckPrecisionBitCnt < 0) { + throw std::invalid_argument( + "Consumer Config Exception: NegativeAckPrecisionBitCnt should be nonnegative number."); + } + impl_->negativeAckPrecisionBitCnt = negativeAckPrecisionBitCnt; +} + long ConsumerConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; } void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) { diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 711b6f96..2acbfb45 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -26,6 +26,7 @@ struct ConsumerConfigurationImpl { long unAckedMessagesTimeoutMs{0}; long tickDurationInMs{1000}; long negativeAckRedeliveryDelayMs{60000}; + int negativeAckPrecisionBitCnt{8}; long ackGroupingTimeMs{100}; long ackGroupingMaxSize{1000}; long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 5ea9d2d4..9dc64cdf 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -46,6 +46,7 @@ NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr &client, ConsumerIm nackDelay_ = std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS)); timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3)); + nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt(); LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() << " ms - Timer interval: " << timerInterval_.count()); } @@ -106,14 +107,27 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) { scheduleTimer(); } +std::chrono::steady_clock::time_point trimLowerBit(const std::chrono::steady_clock::time_point &tp, + int bits) { + // get origin timestamp in nanoseconds + auto timestamp = std::chrono::duration_cast(tp.time_since_epoch()).count(); + + // trim lower bits + auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1)); + + return std::chrono::steady_clock::time_point(std::chrono::nanoseconds(trimmedTimestamp)); +} + void NegativeAcksTracker::add(const MessageId &m) { auto msgId = discardBatch(m); auto now = Clock::now(); { std::lock_guard lock{mutex_}; + auto trimmedTimestamp = trimLowerBit(now + nackDelay_, nackPrecisionBit_); + // If the timestamp is already in the map, we can just add the message to the existing entry // Erase batch id to group all nacks from same batch - nackedMessages_[now][msgId.ledgerId()].add((uint64_t)msgId.entryId()); + nackedMessages_[trimmedTimestamp][msgId.ledgerId()].add((uint64_t)msgId.entryId()); } scheduleTimer(); diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index 23b6cb11..51fc2150 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -72,6 +72,7 @@ class NegativeAcksTracker : public std::enable_shared_from_this> nackedMessages_; diff --git a/lib/c/c_ConsumerConfiguration.cc b/lib/c/c_ConsumerConfiguration.cc index 248957fd..273e7f39 100644 --- a/lib/c/c_ConsumerConfiguration.cc +++ b/lib/c/c_ConsumerConfiguration.cc @@ -121,6 +121,16 @@ long pulsar_configure_get_negative_ack_redelivery_delay_ms( return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs(); } +void pulsar_configure_set_negative_ack_precision_bit_cnt( + pulsar_consumer_configuration_t *consumer_configuration, int negativeAckPrecisionBitCnt) { + consumer_configuration->consumerConfiguration.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCnt); +} + +int pulsar_configure_get_negative_ack_precision_bit_cnt( + pulsar_consumer_configuration_t *consumer_configuration) { + return consumer_configuration->consumerConfiguration.getNegativeAckPrecisionBitCnt(); +} + void pulsar_configure_set_ack_grouping_time_ms(pulsar_consumer_configuration_t *consumer_configuration, long ackGroupingMillis) { consumer_configuration->consumerConfiguration.setAckGroupingTimeMs(ackGroupingMillis); diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index e3c70396..229ebb76 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -3249,6 +3249,60 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) { testNegativeAcks(topicName, true); } +int64_t getCurrentTimeMs() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCnt) { + constexpr int delayMs = 2000; + const int64_t timeDeviation = 1L << precisionBitCnt; + + Client client(lookupUrl); + + Consumer consumer; + ConsumerConfiguration conf; + conf.setNegativeAckRedeliveryDelayMs(delayMs); + conf.setNegativeAckPrecisionBitCnt(precisionBitCnt); + + Result result = client.subscribe(topic, "sub1", conf, consumer); + ASSERT_EQ(ResultOk, result); + + Producer producer; + ProducerConfiguration producerConf; + result = client.createProducer(topic, producerConf, producer); + ASSERT_EQ(ResultOk, result); + + Message msg = MessageBuilder().setContent("test-0").build(); + producer.sendAsync(msg, nullptr); + producer.flush(); + + // receive and trigger negative ack + Message received; + consumer.receive(received); + consumer.negativeAcknowledge(received); + + int64_t expectedRedeliveryTime = getCurrentTimeMs() + delayMs; + + Message redelivered; + consumer.receive(redelivered); + int64_t now = getCurrentTimeMs(); + ASSERT_GE(now, expectedRedeliveryTime - timeDeviation); + ASSERT_EQ(redelivered.getDataAsString(), "test-0"); + + consumer.acknowledge(redelivered); + client.shutdown(); +} + +TEST(BasicEndToEndTest, testNegativeAckPrecisionBitCnt) { + for (int precisionBitCnt = 1; precisionBitCnt <= 12; precisionBitCnt++) { + std::string topic = "testNegativeAckPrecisionBitCnt-" + std::to_string(precisionBitCnt) + "-" + + std::to_string(time(nullptr)); + testNegativeAckPrecisionBitCnt(topic, precisionBitCnt); + } +} + static long regexTestMessagesReceived = 0; static void regexMessageListenerFunction(const Consumer &consumer, const Message &msg) { diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc index e44543d1..e847f4f6 100644 --- a/tests/ConsumerConfigurationTest.cc +++ b/tests/ConsumerConfigurationTest.cc @@ -52,6 +52,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0); ASSERT_EQ(conf.getTickDurationInMs(), 1000); ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000); + ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 8); ASSERT_EQ(conf.getAckGroupingTimeMs(), 100); ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000); ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000); @@ -114,6 +115,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) { conf.setNegativeAckRedeliveryDelayMs(10000); ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000); + conf.setNegativeAckPrecisionBitCnt(4); + ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 4); + conf.setAckGroupingTimeMs(200); ASSERT_EQ(conf.getAckGroupingTimeMs(), 200); From 426678263c6bd644cbace415932c44535398f86e Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 1 Aug 2025 02:34:31 +0000 Subject: [PATCH 13/14] chore: delete redundant tool functions --- .github/workflows/codeql-analysis.yml | 2 +- tests/BasicEndToEndTest.cc | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index c877c642..70f833c3 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -60,7 +60,7 @@ jobs: sudo apt-get install -y libcurl4-openssl-dev libssl-dev \ protobuf-compiler libprotobuf-dev libboost-dev \ libboost-dev libboost-program-options-dev \ - libzstd-dev libsnappy-dev + libzstd-dev libsnappy-dev libroaring-dev - name: Build run: | diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index 229ebb76..c269538b 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -3249,12 +3249,6 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) { testNegativeAcks(topicName, true); } -int64_t getCurrentTimeMs() { - return std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); -} - void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCnt) { constexpr int delayMs = 2000; const int64_t timeDeviation = 1L << precisionBitCnt; @@ -3283,11 +3277,11 @@ void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCn consumer.receive(received); consumer.negativeAcknowledge(received); - int64_t expectedRedeliveryTime = getCurrentTimeMs() + delayMs; + int64_t expectedRedeliveryTime = TimeUtils::currentTimeMillis() + delayMs; Message redelivered; consumer.receive(redelivered); - int64_t now = getCurrentTimeMs(); + int64_t now = TimeUtils::currentTimeMillis(); ASSERT_GE(now, expectedRedeliveryTime - timeDeviation); ASSERT_EQ(redelivered.getDataAsString(), "test-0"); From fe435c918874b0c1447c59ab8a5511b5ea36c604 Mon Sep 17 00:00:00 2001 From: gydeng Date: Fri, 1 Aug 2025 07:26:07 +0000 Subject: [PATCH 14/14] chore: add namespace compatibility macro for Roaring64Map (libroaring vcpkg/apt) --- LegacyFindPackages.cmake | 8 ++++++++ lib/NegativeAcksTracker.h | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 53871bc5..2c70ec27 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -127,6 +127,14 @@ message("ROARING_LIBRARIES: " ${ROARING_LIBRARIES}) if (NOT ROARING_INCLUDE_DIRS OR NOT ROARING_LIBRARIES) message(FATAL_ERROR "Could not find libroaring") endif () +file(READ "${ROARING_INCLUDE_DIRS}/roaring/roaring.hh" ROARING_HEADER_CONTENTS) +string(REGEX MATCH "namespace roaring" ROARING_HAS_NAMESPACE "${ROARING_HEADER_CONTENTS}") +if (ROARING_HAS_NAMESPACE) + message(STATUS "Roaring64Map is in namespace roaring") +else () + message(STATUS "Roaring64Map is in global namespace") + add_definitions(-DROARING_NAMESPACE_GLOBAL) +endif () if (LINK_STATIC AND NOT VCPKG_TRIPLET) find_library(LIB_ZSTD NAMES libzstd.a) diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index 51fc2150..8199966c 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -42,7 +42,7 @@ using ClientImplPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; using LedgerId = int64_t; -#if __cplusplus >= 202002L +#ifdef ROARING_NAMESPACE_GLOBAL using ConditionalRoaringMap = Roaring64Map; #else using ConditionalRoaringMap = roaring::Roaring64Map;