diff --git a/concurrentqueue.h b/concurrentqueue.h index d65f73c4..5f46eb9a 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -355,6 +355,12 @@ struct ConcurrentQueueDefaultTraits // How many full blocks can be expected for a single implicit producer? This should // reflect that number's maximum for optimal performance. Must be a power of 2. + // Note: This impacts the maximum number of elements that can be enqueued by a + // single implicit producer when using try_enqueue/try_enqeue_bulk exclusively (which + // cannot allocate), since it limits the number of blocks that the producer can hold to + // store elements. When pre-allocating blocks for use with try-enqueueing, configure + // this initial size to the desired maximum number of blocks per implicit producer. + // Alternately, use the regular enqueue methods, which can grow the index as needed. static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32; // The initial size of the hash table mapping thread IDs to implicit producers. @@ -1063,6 +1069,9 @@ class ConcurrentQueue // Does not allocate memory. Fails if not enough room to enqueue (or implicit // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE // is 0). + // Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure + // Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient + // capacity for the number of blocks each producer may need. // Thread-safe. inline bool try_enqueue(T const& item) { @@ -1074,6 +1083,9 @@ class ConcurrentQueue // Does not allocate memory (except for one-time implicit producer). // Fails if not enough room to enqueue (or implicit production is // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0). + // Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure + // Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient + // capacity for the number of blocks each producer may need. // Thread-safe. inline bool try_enqueue(T&& item) { @@ -1101,6 +1113,9 @@ class ConcurrentQueue // Does not allocate memory (except for one-time implicit producer). // Fails if not enough room to enqueue (or implicit production is // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0). + // Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure + // Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient + // capacity for the number of blocks each producer may need. // Note: Use std::make_move_iterator if the elements should be moved // instead of copied. // Thread-safe. diff --git a/tests/unittests/unittests.cpp b/tests/unittests/unittests.cpp index b4c4d9cc..7371f03e 100644 --- a/tests/unittests/unittests.cpp +++ b/tests/unittests/unittests.cpp @@ -136,6 +136,18 @@ struct LargeTraits : public MallocTrackingTraits static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128; }; +struct SmallImplicitIndexTraits : public MallocTrackingTraits +{ + static const size_t BLOCK_SIZE = 4; + static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4; +}; + +struct LargerImplicitIndexTraits : public MallocTrackingTraits +{ + static const size_t BLOCK_SIZE = 4; + static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 16; +}; + // Note: Not thread safe! struct Foo { @@ -375,6 +387,7 @@ class ConcurrentQueueTests : public TestClass REGISTER_TEST(explicit_strings_threaded); REGISTER_TEST(large_traits); + REGISTER_TEST(implicit_producer_index_limit); } bool postTest(bool testSucceeded) override @@ -5031,6 +5044,68 @@ class ConcurrentQueueTests : public TestClass return true; } + bool implicit_producer_index_limit() + { + // Issue #418: try_enqueue() fails around BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE + // elements even when blocks have been pre-allocated, because the block index + // (not the blocks themselves) needs to grow, which requires allocation that + // try_enqueue refuses to do. + + // SmallImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=4, limit=16 + // LargerImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=16, limit=64 + + { + // Demonstrate the limit: try_enqueue fails at BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE + const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); + ConcurrentQueue q(limit + 64); // Pre-allocate plenty of blocks + + int successCount = 0; + for (int i = 0; i < limit + 64; ++i) { + if (!q.try_enqueue(i)) + break; + ++successCount; + } + // try_enqueue should stop succeeding at the index limit + ASSERT_OR_FAIL(successCount == limit); + } + + { + // Workaround #1: enqueue() (which can allocate) can grow the index + const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); + ConcurrentQueue q(limit + 64); + + for (int i = 0; i < limit + 64; ++i) { + ASSERT_OR_FAIL(q.enqueue(i)); + } + // Verify all elements are dequeued correctly + int item; + for (int i = 0; i < limit + 64; ++i) { + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item == i); + } + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } + + { + // Workaround #2: larger IMPLICIT_INITIAL_INDEX_SIZE allows more try_enqueue calls + const int small_limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 16 + const int large_limit = (int)(LargerImplicitIndexTraits::BLOCK_SIZE * LargerImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 64 + ConcurrentQueue q(large_limit + 64); + + int successCount = 0; + for (int i = 0; i < large_limit + 64; ++i) { + if (!q.try_enqueue(i)) + break; + ++successCount; + } + // Should succeed well past the old small limit + ASSERT_OR_FAIL(successCount > small_limit); + ASSERT_OR_FAIL(successCount == large_limit); + } + + return true; + } + bool large_traits() { union Elem { uint32_t x; char dummy[156]; };