Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions concurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ struct ConcurrentQueueDefaultTraits

// How many full blocks can be expected for a single implicit producer? This should
// reflect that number's maximum for optimal performance. Must be a power of 2.
// Note: This impacts the maximum number of elements that can be enqueued by a
// single implicit producer when using try_enqueue/try_enqeue_bulk exclusively (which
// cannot allocate), since it limits the number of blocks that the producer can hold to
// store elements. When pre-allocating blocks for use with try-enqueueing, configure
// this initial size to the desired maximum number of blocks per implicit producer.
// Alternately, use the regular enqueue methods, which can grow the index as needed.
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;

// The initial size of the hash table mapping thread IDs to implicit producers.
Expand Down Expand Up @@ -1063,6 +1069,9 @@ class ConcurrentQueue
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0).
// Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure
// Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient
// capacity for the number of blocks each producer may need.
// Thread-safe.
inline bool try_enqueue(T const& item)
{
Expand All @@ -1074,6 +1083,9 @@ class ConcurrentQueue
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure
// Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient
// capacity for the number of blocks each producer may need.
// Thread-safe.
inline bool try_enqueue(T&& item)
{
Expand Down Expand Up @@ -1101,6 +1113,9 @@ class ConcurrentQueue
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Note: If using only try_enqueue/try_enqueue_bulk with pre-allocated blocks, configure
// Traits::IMPLICIT_INITIAL_INDEX_SIZE appropriately to ensure the index has sufficient
// capacity for the number of blocks each producer may need.
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
Expand Down
75 changes: 75 additions & 0 deletions tests/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ struct LargeTraits : public MallocTrackingTraits
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128;
};

struct SmallImplicitIndexTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 4;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4;
};

struct LargerImplicitIndexTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 4;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 16;
};

// Note: Not thread safe!
struct Foo
{
Expand Down Expand Up @@ -375,6 +387,7 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>

REGISTER_TEST(explicit_strings_threaded);
REGISTER_TEST(large_traits);
REGISTER_TEST(implicit_producer_index_limit);
}

bool postTest(bool testSucceeded) override
Expand Down Expand Up @@ -5031,6 +5044,68 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
return true;
}

bool implicit_producer_index_limit()
{
// Issue #418: try_enqueue() fails around BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE
// elements even when blocks have been pre-allocated, because the block index
// (not the blocks themselves) needs to grow, which requires allocation that
// try_enqueue refuses to do.

// SmallImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=4, limit=16
// LargerImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=16, limit=64

{
// Demonstrate the limit: try_enqueue fails at BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE
const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE);
ConcurrentQueue<int, SmallImplicitIndexTraits> q(limit + 64); // Pre-allocate plenty of blocks

int successCount = 0;
for (int i = 0; i < limit + 64; ++i) {
if (!q.try_enqueue(i))
break;
++successCount;
}
// try_enqueue should stop succeeding at the index limit
ASSERT_OR_FAIL(successCount == limit);
}

{
// Workaround #1: enqueue() (which can allocate) can grow the index
const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE);
ConcurrentQueue<int, SmallImplicitIndexTraits> q(limit + 64);

for (int i = 0; i < limit + 64; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
// Verify all elements are dequeued correctly
int item;
for (int i = 0; i < limit + 64; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

{
// Workaround #2: larger IMPLICIT_INITIAL_INDEX_SIZE allows more try_enqueue calls
const int small_limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 16
const int large_limit = (int)(LargerImplicitIndexTraits::BLOCK_SIZE * LargerImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 64
ConcurrentQueue<int, LargerImplicitIndexTraits> q(large_limit + 64);

int successCount = 0;
for (int i = 0; i < large_limit + 64; ++i) {
if (!q.try_enqueue(i))
break;
++successCount;
}
// Should succeed well past the old small limit
ASSERT_OR_FAIL(successCount > small_limit);
ASSERT_OR_FAIL(successCount == large_limit);
}

return true;
}

bool large_traits()
{
union Elem { uint32_t x; char dummy[156]; };
Expand Down