Skip to content

Indefinite waiting in the fine-grained thread-safe queue (Listings 6.7 to 6.10) #53

@lczech

Description

@lczech

Hi there,

it seems that the fine-grained thread-safe queue of Listings 6.7 to 6.10 can block indefinitely. I took the code from the book, with a minor fix (see here for my bug report of that fix).

Here is a complete reproduction of the class from those listings:

template <typename T>
class threadsafe_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;

public:
    threadsafe_queue()
        : head(new node)
        , tail(head.get())
    {
    }
    threadsafe_queue(const threadsafe_queue& other) = delete;
    threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> const old_head = wait_pop_head();
        return old_head->data;
    }
    void wait_and_pop(T& value)
    {
        std::unique_ptr<node> const old_head = wait_pop_head(value);
    }

    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head = try_pop_head();
        return old_head ? old_head->data : std::shared_ptr<T>();
    }
    bool try_pop(T& value)
    {
        std::unique_ptr<node> const old_head = try_pop_head(value);
        return old_head;
    }
    bool empty()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head.get() == get_tail());
    }
    void push(T new_value)
    {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data = new_data;
            node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }
        data_cond.notify_one();
    }

private:
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if (head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }
    std::unique_ptr<node> try_pop_head(T& value)
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if (head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        value = std::move(*head->data);
        return pop_head();
    }
    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }
    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> head_lock(head_mutex);

        // ================================================
        // The following line can block indefinitely:
        data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
        return head_lock;
    }
    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }
    std::unique_ptr<node> wait_pop_head(T& value)
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        value = std::move(*head->data);
        return pop_head();
    }
};

For testing, I used the code form Listing 11.1:

void test_concurrent_push_and_pop_on_empty_queue()
{
    threadsafe_queue<int> q;
    std::promise<void> go, push_ready, pop_ready;
    std::shared_future<void> ready(go.get_future());
    std::future<void> push_done;
    std::future<int> pop_done;
    try {
        push_done = std::async(std::launch::async,
            [&q, ready, &push_ready]() {
                push_ready.set_value();
                ready.wait();
                q.push(42);
            });
        pop_done = std::async(std::launch::async,
            [&q, ready, &pop_ready]() {
                pop_ready.set_value();
                ready.wait();
                // return q.pop();
                return *q.wait_and_pop();
            });
        push_ready.get_future().wait();
        pop_ready.get_future().wait();
        go.set_value();
        push_done.get();
        assert(pop_done.get() == 42);
        assert(q.empty());
    } catch (...) {
        go.set_value();
        throw;
    }
}

The only change to the original test is the replacement of q.pop(); with return *q.wait_and_pop();, to match the interface of the queue. (NB: I find it noteworthy that that test uses pop() in the first place - after so many chapters taking about correctly defining concurrent interfaces...).

Running that test function in a loop of ~100 iterations is enough on my setup to reliably reach a blocking state. The line

data_cond.wait(...)

seems to block, and not get woken up any more. I played around with this a bit, and interestingly, it seems that this is despite the data_cond.notify_one(); happening after the wait() in the cases where this blocks...

To test a bit more, I replaced the wait with a loop

while( head.get() == get_tail() ) {
    data_cond.wait_for(
        head_lock,
        std::chrono::milliseconds(1)
    );
}

and this seems to work, but that is of course less then ideal, and in the end slower then the thread-safe queue with a mutex on the whole data structure from Listing 4.5...

Anyway, what is going on here, and can anyone confirm that this is happening? It's weird, as the example is coming straight from the book, and I had hoped that this is a correct implementation.

My system is Ubuntu 20.04.6 LTS, and the problem occurs both with Clang 11 and GCC 10.5.0.

Cheers and thanks
Lucas

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions