Skip to content

Commit f6af2bb

Browse files
committed
Update changelog for version 0.1.2, enhance WebsocketManager to prevent use-after-free issues, and improve test coverage for unsubscribe behavior
1 parent 53228e8 commit f6af2bb

5 files changed

Lines changed: 232 additions & 70 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
---
99

10+
## [0.1.2] - 2026-06-09
11+
12+
### Fixed
13+
- **`WebsocketManager`: use-after-free / same-dispatch unsubscribe race** — callbacks now dispatch from a lock-free snapshot of shared handler state instead of copying `std::function`s under a mutex. Each callback entry carries an `active` flag and `in_flight` counter: `unsubscribe()` removes the callback from future snapshots, marks that specific callback inactive, and waits on its per-callback in-flight count with C++20 atomic wait/notify. This prevents a later callback from firing after being removed earlier in the same snapshot while still allowing self-unsubscribe without deadlocking.
14+
15+
### Changed
16+
- **`WebsocketManager`: removed `pending_subs_` pre-connection queue** — the underlying `slick::net::Websocket` already buffers outbound frames until the connection is established, making the manual pending-subscription queue redundant. `subscribe()` and `unsubscribe()` now call `ws_->send()` unconditionally; `flush_pending()`, `writer_mutex_`, and the `pending_subs_` vector are all removed.
17+
18+
### Tests
19+
- **`UnsubscribeBlocksUntilCallbackCompletes`** (integration) — regression test for the above fix; subscribes with a callback that blocks until explicitly released, calls `unsubscribe()` concurrently from a second thread, and asserts that `unsubscribe()` does not return before the in-flight callback has finished.
20+
- **`CallbackCanRemoveLaterCallbackFromSameDispatch`** (integration) — regression test for the snapshot hazard where callback A unsubscribes callback B on the same channel before the dispatcher reaches B in the already-loaded snapshot.
21+
1022
## [0.1.1] - 2026-06-08
1123

1224
### Added

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.21)
2-
project(hyperliquid-cpp VERSION 0.1.1 LANGUAGES CXX)
2+
project(hyperliquid-cpp VERSION 0.1.2 LANGUAGES CXX)
33

44
set(CMAKE_CXX_STANDARD 20)
55
set(CMAKE_CXX_STANDARD_REQUIRED ON)

include/hyperliquid/websocket_manager.hpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#include <atomic>
44
#include <functional>
55
#include <memory>
6-
#include <mutex>
76
#include <string>
87
#include <string_view>
98
#include <thread>
@@ -51,25 +50,32 @@ class WebsocketManager {
5150
void on_error(std::string err);
5251

5352
void ping_loop(); // runs in ping_thread_
54-
void flush_pending(); // send queued subs once connected
5553

5654
std::string ws_url_;
5755
std::unique_ptr<slick::net::Websocket> ws_;
5856

59-
std::mutex mutex_;
57+
// Handler snapshots are published with copy-on-write. Each snapshot holds
58+
// shared per-callback state so unsubscribe() can deactivate exactly one
59+
// callback and wait for only that callback's in-flight invocation to drain.
60+
struct Handler {
61+
int subscription_id;
62+
std::function<void(const nlohmann::json&)> callback;
63+
std::atomic<bool> active{true};
64+
std::atomic<unsigned int> in_flight{0};
65+
66+
Handler(int id, std::function<void(const nlohmann::json&)> cb)
67+
: subscription_id(id), callback(std::move(cb)) {}
68+
};
69+
70+
using HandlerPtr = std::shared_ptr<Handler>;
71+
using HandlerMap = std::unordered_map<std::string, std::vector<HandlerPtr>>;
72+
73+
std::atomic<std::shared_ptr<const HandlerMap>> handlers_;
74+
6075
std::atomic<bool> connected_{false};
6176
std::atomic<bool> running_{true};
6277
std::atomic<int> next_id_{1};
6378

64-
// identifier → list of {subscription_id, callback}
65-
std::unordered_map<
66-
std::string,
67-
std::vector<std::pair<int, std::function<void(const nlohmann::json&)>>>>
68-
handlers_;
69-
70-
// Full subscription JSONs queued before connection ready
71-
std::vector<nlohmann::json> pending_subs_;
72-
7379
std::thread ping_thread_;
7480
};
7581

src/websocket_manager.cpp

Lines changed: 112 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,22 @@
77

88
#include <chrono>
99
#include <iostream>
10-
#include <stdexcept>
1110
#include <thread>
1211

12+
namespace {
13+
// Points at the callback currently executing on the WebSocket IO thread.
14+
// unsubscribe() uses this to avoid deadlocking when a callback removes itself.
15+
thread_local const void* dispatching_handler = nullptr;
16+
17+
void wait_for_zero(std::atomic<unsigned int>& counter) {
18+
unsigned int in_flight = counter.load(std::memory_order_acquire);
19+
while (in_flight != 0) {
20+
counter.wait(in_flight, std::memory_order_relaxed);
21+
in_flight = counter.load(std::memory_order_acquire);
22+
}
23+
}
24+
}
25+
1326
namespace hyperliquid {
1427

1528
// ── URL helpers ───────────────────────────────────────────────────────────────
@@ -57,6 +70,7 @@ std::string WebsocketManager::to_identifier(const nlohmann::json& sub) {
5770

5871
WebsocketManager::WebsocketManager(std::string_view http_base_url)
5972
: ws_url_(http_to_ws_url(http_base_url))
73+
, handlers_(std::make_shared<HandlerMap>())
6074
{
6175
ws_ = std::make_unique<slick::net::Websocket>(
6276
ws_url_,
@@ -80,12 +94,11 @@ WebsocketManager::~WebsocketManager() {
8094
// ── Connection callbacks ──────────────────────────────────────────────────────
8195

8296
void WebsocketManager::on_connected() {
83-
connected_ = true;
84-
flush_pending();
97+
connected_.store(true, std::memory_order_release);
8598
}
8699

87100
void WebsocketManager::on_disconnected() {
88-
connected_ = false;
101+
connected_.store(false, std::memory_order_release);
89102
}
90103

91104
void WebsocketManager::on_message(const char* data, std::size_t len) {
@@ -113,21 +126,42 @@ void WebsocketManager::on_message(const char* data, std::size_t len) {
113126
identifier = channel + ":" + d["user"].get<std::string>();
114127
}
115128

116-
std::vector<std::function<void(const nlohmann::json&)>> cbs;
117-
{
118-
std::lock_guard lock(mutex_);
119-
auto it = handlers_.find(identifier);
120-
if (it == handlers_.end()) {
121-
// Try without suffix (e.g. allMids has no coin)
122-
it = handlers_.find(channel);
129+
// Lock-free snapshot: atomically borrow a reference to the current map.
130+
// Each callback entry carries its own active/in-flight state so a callback
131+
// removed mid-dispatch can still be skipped before invocation.
132+
auto h = handlers_.load(std::memory_order_acquire);
133+
auto it = h->find(identifier);
134+
if (it == h->end())
135+
it = h->find(channel); // fallback for channels without a suffix (allMids)
136+
if (it == h->end())
137+
return;
138+
139+
for (const auto& handler : it->second) {
140+
if (!handler->active.load(std::memory_order_acquire))
141+
continue;
142+
143+
handler->in_flight.fetch_add(1, std::memory_order_acq_rel);
144+
if (!handler->active.load(std::memory_order_acquire)) {
145+
if (handler->in_flight.fetch_sub(1, std::memory_order_acq_rel) == 1)
146+
handler->in_flight.notify_all();
147+
continue;
123148
}
124-
if (it != handlers_.end()) {
125-
for (const auto& [id, cb] : it->second)
126-
cbs.push_back(cb);
149+
150+
const void* previous = dispatching_handler;
151+
dispatching_handler = handler.get();
152+
try {
153+
handler->callback(msg);
154+
} catch (...) {
155+
dispatching_handler = previous;
156+
if (handler->in_flight.fetch_sub(1, std::memory_order_acq_rel) == 1)
157+
handler->in_flight.notify_all();
158+
throw;
127159
}
128-
}
160+
dispatching_handler = previous;
129161

130-
for (auto& cb : cbs) cb(msg);
162+
if (handler->in_flight.fetch_sub(1, std::memory_order_acq_rel) == 1)
163+
handler->in_flight.notify_all();
164+
}
131165
}
132166

133167
void WebsocketManager::on_error(std::string err) {
@@ -139,29 +173,13 @@ void WebsocketManager::on_error(std::string err) {
139173
void WebsocketManager::ping_loop() {
140174
while (running_) {
141175
std::this_thread::sleep_for(std::chrono::milliseconds(50));
142-
if (connected_ && ws_) {
176+
if (connected_.load(std::memory_order_acquire) && ws_) {
143177
static const std::string ping_msg = R"({"method":"ping"})";
144178
ws_->send(ping_msg.c_str(), ping_msg.size());
145179
}
146180
}
147181
}
148182

149-
// ── Pending subscription flush ────────────────────────────────────────────────
150-
151-
void WebsocketManager::flush_pending() {
152-
std::vector<nlohmann::json> subs;
153-
{
154-
std::lock_guard lock(mutex_);
155-
subs = std::move(pending_subs_);
156-
pending_subs_.clear();
157-
}
158-
for (const auto& sub : subs) {
159-
nlohmann::json msg{{"method", "subscribe"}, {"subscription", sub}};
160-
std::string s = msg.dump();
161-
ws_->send(s.c_str(), s.size());
162-
}
163-
}
164-
165183
// ── Subscribe / unsubscribe ───────────────────────────────────────────────────
166184

167185
int WebsocketManager::subscribe(
@@ -171,45 +189,82 @@ int WebsocketManager::subscribe(
171189
int id = next_id_.fetch_add(1);
172190
std::string ident = to_identifier(subscription);
173191

174-
{
175-
std::lock_guard lock(mutex_);
176-
handlers_[ident].emplace_back(id, std::move(callback));
177-
178-
if (!connected_) {
179-
pending_subs_.push_back(subscription);
180-
} else {
181-
nlohmann::json msg{{"method", "subscribe"}, {"subscription", subscription}};
182-
std::string s = msg.dump();
183-
ws_->send(s.c_str(), s.size());
184-
}
185-
}
192+
// CAS loop: build a new map, swap it in; retry if another writer raced us.
193+
auto old = handlers_.load(std::memory_order_acquire);
194+
auto handler = std::make_shared<Handler>(id, std::move(callback));
195+
std::shared_ptr<HandlerMap> new_map;
196+
do {
197+
new_map = std::make_shared<HandlerMap>(*old);
198+
(*new_map)[ident].push_back(handler);
199+
} while (!handlers_.compare_exchange_weak(
200+
old, new_map,
201+
std::memory_order_release,
202+
std::memory_order_acquire));
203+
204+
// The WS layer buffers sends made before the connection is established,
205+
// so no need to track pending subscriptions ourselves.
206+
nlohmann::json msg{{"method", "subscribe"}, {"subscription", subscription}};
207+
std::string s = msg.dump();
208+
ws_->send(s.c_str(), s.size());
186209
return id;
187210
}
188211

189212
void WebsocketManager::unsubscribe(
190213
const nlohmann::json& subscription, int subscription_id)
191214
{
192215
std::string ident = to_identifier(subscription);
193-
{
194-
std::lock_guard lock(mutex_);
195-
auto it = handlers_.find(ident);
196-
if (it != handlers_.end()) {
216+
HandlerPtr removed_handler;
217+
bool last_handler = false;
218+
219+
// CAS loop: remove the entry from a fresh copy; retry on contention.
220+
std::shared_ptr<const HandlerMap> old = handlers_.load(std::memory_order_acquire);
221+
for (;;) {
222+
auto new_map = std::make_shared<HandlerMap>(*old);
223+
removed_handler.reset();
224+
last_handler = false;
225+
226+
auto it = new_map->find(ident);
227+
if (it != new_map->end()) {
197228
auto& vec = it->second;
198229
vec.erase(
199230
std::remove_if(vec.begin(), vec.end(),
200-
[subscription_id](const auto& p) { return p.first == subscription_id; }),
231+
[&](const HandlerPtr& handler) {
232+
if (handler->subscription_id != subscription_id)
233+
return false;
234+
removed_handler = handler;
235+
return true;
236+
}),
201237
vec.end());
202238
if (vec.empty()) {
203-
handlers_.erase(it);
204-
// Send unsubscribe only when no more handlers remain
205-
if (connected_) {
206-
nlohmann::json msg{{"method", "unsubscribe"}, {"subscription", subscription}};
207-
std::string s = msg.dump();
208-
ws_->send(s.c_str(), s.size());
209-
}
239+
new_map->erase(it);
240+
last_handler = (removed_handler != nullptr);
210241
}
211242
}
243+
if (handlers_.compare_exchange_weak(
244+
old, new_map,
245+
std::memory_order_release,
246+
std::memory_order_acquire))
247+
break;
212248
}
249+
250+
if (!removed_handler)
251+
return;
252+
253+
removed_handler->active.store(false, std::memory_order_release);
254+
255+
if (last_handler) {
256+
nlohmann::json msg{{"method", "unsubscribe"}, {"subscription", subscription}};
257+
std::string s = msg.dump();
258+
ws_->send(s.c_str(), s.size());
259+
}
260+
261+
// Self-unsubscribe from inside the callback would deadlock waiting for our
262+
// own in-flight invocation to finish. Cross-callback removal remains safe:
263+
// later callbacks in the same snapshot re-check `active` before invoking.
264+
if (dispatching_handler == removed_handler.get())
265+
return;
266+
267+
wait_for_zero(removed_handler->in_flight);
213268
}
214269

215270
} // namespace hyperliquid

0 commit comments

Comments
 (0)