diff --git a/include/ws-streaming/connection.hpp b/include/ws-streaming/connection.hpp index bf3c0ec..ee8bfe6 100644 --- a/include/ws-streaming/connection.hpp +++ b/include/ws-streaming/connection.hpp @@ -253,7 +253,7 @@ namespace wss void on_signal_subscribe_requested(const std::string& signal_id); void on_signal_unsubscribe_requested(const std::string& signal_id); - std::shared_ptr on_signal_sought(const std::string& signal_id); + std::shared_ptr on_table_sought(const std::string& table_id); void dispatch_metadata( unsigned signo, diff --git a/include/ws-streaming/detail/linear_table.hpp b/include/ws-streaming/detail/linear_table.hpp index 66ea70c..560c6b4 100644 --- a/include/ws-streaming/detail/linear_table.hpp +++ b/include/ws-streaming/detail/linear_table.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -27,8 +28,11 @@ namespace wss::detail std::int64_t driven_index() const noexcept; + const std::string& id() const noexcept; + private: + std::string _id; std::int64_t _index = 0; std::int64_t _value = 0; std::int64_t _delta = 0; diff --git a/include/ws-streaming/detail/remote_signal_container.hpp b/include/ws-streaming/detail/remote_signal_container.hpp index 173bc10..7533b8e 100644 --- a/include/ws-streaming/detail/remote_signal_container.hpp +++ b/include/ws-streaming/detail/remote_signal_container.hpp @@ -19,25 +19,31 @@ namespace wss::detail struct remote_signal_entry { - remote_signal_entry(const std::string& id) + remote_signal_entry(const std::string& id, bool hidden) : signal(std::make_shared(id)) + , hidden(hidden) { } + bool hidden = false; std::shared_ptr signal; boost::signals2::scoped_connection on_subscribe_requested; boost::signals2::scoped_connection on_unsubscribe_requested; - boost::signals2::scoped_connection on_signal_sought; + boost::signals2::scoped_connection on_table_sought; }; protected: - std::pair add_remote_signal(const std::string& id); + std::pair add_remote_signal( + const std::string& id, + bool hidden); remote_signal_entry *find_remote_signal(const std::string& id); + remote_signal_entry *find_table(const std::string& table_id); remote_signal_entry *find_remote_signal(unsigned signo); const remote_signal_entry *find_remote_signal(const std::string& id) const; + const remote_signal_entry *find_table(const std::string& table_id) const; const remote_signal_entry *find_remote_signal(unsigned signo) const; template @@ -51,7 +57,8 @@ namespace wss::detail signal.second.signal->detach(); for (const auto& signal : old_signals) - func(signal.second.signal); + if (!signal.second.hidden) + func(signal.second.signal); } auto remote_signals() diff --git a/include/ws-streaming/detail/remote_signal_impl.hpp b/include/ws-streaming/detail/remote_signal_impl.hpp index 7049bb5..75bc124 100644 --- a/include/ws-streaming/detail/remote_signal_impl.hpp +++ b/include/ws-streaming/detail/remote_signal_impl.hpp @@ -40,7 +40,9 @@ namespace wss::detail boost::signals2::signal on_subscribe_requested; boost::signals2::signal on_unsubscribe_requested; - boost::signals2::signal(const std::string& id)> on_signal_sought; + boost::signals2::signal(const std::string& table_id)> on_table_sought; + + const std::shared_ptr& table() const noexcept; private: diff --git a/src/connection.cpp b/src/connection.cpp index b0a1c52..e8c630a 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -334,10 +334,10 @@ void wss::connection::on_signal_unsubscribe_requested( } std::shared_ptr -wss::connection::on_signal_sought( - const std::string& signal_id) +wss::connection::on_table_sought( + const std::string& table_id) { - auto *entry = detail::remote_signal_container::find_remote_signal(signal_id); + auto *entry = detail::remote_signal_container::find_table(table_id); if (entry) return entry->signal; @@ -401,7 +401,7 @@ void wss::connection::handle_available( if (!id.is_string()) continue; - auto [added, signal] = add_remote_signal(id); + auto [added, signal] = add_remote_signal(id, false); if (!added) continue; @@ -411,8 +411,8 @@ void wss::connection::handle_available( signal.on_unsubscribe_requested = signal.signal->on_unsubscribe_requested.connect( std::bind(&connection::on_signal_unsubscribe_requested, this, id)); - signal.on_signal_sought = signal.signal->on_signal_sought.connect( - std::bind(&connection::on_signal_sought, this, _1)); + signal.on_table_sought = signal.signal->on_table_sought.connect( + std::bind(&connection::on_table_sought, this, _1)); on_available(signal.signal); } @@ -429,12 +429,22 @@ void wss::connection::handle_subscribe( else if (params.is_array() && params.size() > 0 && params[0].is_string()) signal_id = params[0]; - auto *entry = detail::remote_signal_container::find_remote_signal(static_cast(signal_id)); - if (!entry) - return; + auto [added, signal] = add_remote_signal(signal_id, true); + + if (added) + { + signal.on_subscribe_requested = signal.signal->on_subscribe_requested.connect( + std::bind(&connection::on_signal_subscribe_requested, this, signal_id)); + + signal.on_unsubscribe_requested = signal.signal->on_unsubscribe_requested.connect( + std::bind(&connection::on_signal_unsubscribe_requested, this, signal_id)); + + signal.on_table_sought = signal.signal->on_table_sought.connect( + std::bind(&connection::on_table_sought, this, _1)); + } - set_remote_signal_signo(entry, signo); - entry->signal->handle_metadata("subscribe", params); + set_remote_signal_signo(&signal, signo); + signal.signal->handle_metadata("subscribe", params); } void wss::connection::handle_unsubscribe( diff --git a/src/detail/linear_table.cpp b/src/detail/linear_table.cpp index e7fe430..d00bd7a 100644 --- a/src/detail/linear_table.cpp +++ b/src/detail/linear_table.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -15,6 +16,7 @@ wss::detail::linear_table::linear_table(const metadata& metadata) void wss::detail::linear_table::update( const metadata& metadata) { + _id = metadata.table_id(); auto [start, delta] = metadata.linear_start_delta(); _value = start.value_or(_value); @@ -60,3 +62,8 @@ std::int64_t wss::detail::linear_table::driven_index() const noexcept { return _driven_index; } + +const std::string& wss::detail::linear_table::id() const noexcept +{ + return _id; +} diff --git a/src/detail/peer.cpp b/src/detail/peer.cpp index 0a24fd2..a5e2680 100644 --- a/src/detail/peer.cpp +++ b/src/detail/peer.cpp @@ -210,17 +210,13 @@ void wss::detail::peer::process_buffer() void wss::detail::peer::process_buffer_tcp() { - // Process as many streaming protocol packets as possible. - while (true) - { - std::size_t bytes_consumed = process_packet( - _rx_buffer.data(), - _rx_buffer_bytes); - - // If there's not enough data to form a complete packet, we can't process any more. - if (!bytes_consumed) - break; + std::size_t bytes_consumed = process_packet( + _rx_buffer.data(), + _rx_buffer_bytes); + // If there's not enough data to form a complete packet, we can't process any more. + if (bytes_consumed) + { // Consume the handled frame by sliding the remaining data in the read buffer over // to the left. (Can't use std::memcpy() for this because the ranges overlap.) std::memmove( @@ -337,32 +333,40 @@ std::size_t wss::detail::peer::process_packet( const std::uint8_t *data, std::size_t size) { - // Try to decode the WebSocket Streaming Protocol packet. - auto header = detail::streaming_protocol::decode_header(data, size, _use_tcp_protocol); - if (!header.header_size) - return 0; + const auto *origin = data; - switch (header.type) + while (true) { - case detail::streaming_protocol::packet_type::DATA: - process_data_packet( - header.signo, - data + header.header_size, - header.payload_size); + // Try to decode the WebSocket Streaming Protocol packet. + auto header = detail::streaming_protocol::decode_header(data, size, _use_tcp_protocol); + if (!header.header_size) break; - case detail::streaming_protocol::packet_type::METADATA: - process_metadata_packet( - header.signo, - data + header.header_size, - header.payload_size); - break; + switch (header.type) + { + case detail::streaming_protocol::packet_type::DATA: + process_data_packet( + header.signo, + data + header.header_size, + header.payload_size); + break; - default: - break; + case detail::streaming_protocol::packet_type::METADATA: + process_metadata_packet( + header.signo, + data + header.header_size, + header.payload_size); + break; + + default: + break; + } + + data += header.header_size + header.payload_size; + size -= header.header_size + header.payload_size; } - return header.header_size + header.payload_size; + return data - origin; } void wss::detail::peer::process_data_packet( diff --git a/src/detail/remote_signal_container.cpp b/src/detail/remote_signal_container.cpp index 020a83d..a1e30b3 100644 --- a/src/detail/remote_signal_container.cpp +++ b/src/detail/remote_signal_container.cpp @@ -7,13 +7,18 @@ #include std::pair -wss::detail::remote_signal_container::add_remote_signal(const std::string& id) +wss::detail::remote_signal_container::add_remote_signal( + const std::string& id, + bool hidden) { auto it = _signals_by_id.find(id); if (it != _signals_by_id.end()) return std::make_pair(true, std::ref(it->second)); - auto [jt, emplaced] = _signals_by_id.emplace(id, id); + auto [jt, emplaced] = _signals_by_id.emplace( + std::piecewise_construct, + std::make_tuple(id), + std::make_tuple(id, hidden)); return std::make_pair(true, std::ref(jt->second)); } @@ -27,6 +32,24 @@ wss::detail::remote_signal_container::find_remote_signal(const std::string& id) return &it->second; } +wss::detail::remote_signal_container::remote_signal_entry * +wss::detail::remote_signal_container::find_table(const std::string& table_id) +{ + auto it = std::find_if( + _signals_by_id.begin(), + _signals_by_id.end(), + [table_id](const decltype(_signals_by_id)::value_type& entry) + { + auto table = entry.second.signal->table(); + return table && !table->id().empty() && table->id() == table_id; + }); + + if (it == _signals_by_id.end()) + return nullptr; + + return &it->second; +} + wss::detail::remote_signal_container::remote_signal_entry * wss::detail::remote_signal_container::find_remote_signal(unsigned signo) { @@ -47,6 +70,24 @@ wss::detail::remote_signal_container::find_remote_signal(const std::string& id) return &it->second; } +const wss::detail::remote_signal_container::remote_signal_entry * +wss::detail::remote_signal_container::find_table(const std::string& table_id) const +{ + auto it = std::find_if( + _signals_by_id.begin(), + _signals_by_id.end(), + [table_id](const decltype(_signals_by_id)::value_type& entry) + { + auto table = entry.second.signal->table(); + return table && !table->id().empty() && table->id() == table_id; + }); + + if (it == _signals_by_id.end()) + return nullptr; + + return &it->second; +} + const wss::detail::remote_signal_container::remote_signal_entry * wss::detail::remote_signal_container::find_remote_signal(unsigned signo) const { diff --git a/src/detail/remote_signal_impl.cpp b/src/detail/remote_signal_impl.cpp index 551e131..572f258 100644 --- a/src/detail/remote_signal_impl.cpp +++ b/src/detail/remote_signal_impl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -145,6 +146,11 @@ void wss::detail::remote_signal_impl::signo(unsigned signo) _signo = signo; } +const std::shared_ptr& wss::detail::remote_signal_impl::table() const noexcept +{ + return _table; +} + void wss::detail::remote_signal_impl::handle_subscribe() { if (_is_subscribed) @@ -189,7 +195,7 @@ void wss::detail::remote_signal_impl::handle_signal( if (!table_id.empty() && table_id != id()) { - _domain_signal = on_signal_sought(table_id).value_or(nullptr); + _domain_signal = on_table_sought(table_id).value_or(nullptr); if (_domain_signal) _domain_table = _domain_signal->_table; else diff --git a/src/metadata.cpp b/src/metadata.cpp index bd0e76e..30618b8 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -67,32 +67,49 @@ std::pair< std::optional > wss::metadata::linear_start_delta() const { - if (rule() != rule_types::linear_rule) - return std::make_pair(std::nullopt, std::nullopt); + std::optional start; + std::optional delta; - if (_json.contains("interpretation") - && _json["interpretation"].is_object() - && _json["interpretation"].contains("rule") - && _json["interpretation"]["rule"].is_object() - && _json["interpretation"]["rule"].contains("parameters") - && _json["interpretation"]["rule"]["parameters"].is_object()) + if (rule() == rule_types::linear_rule) { - const auto& parameters = _json["interpretation"]["rule"]["parameters"]; + if (_json.contains("interpretation") + && _json["interpretation"].is_object() + && _json["interpretation"].contains("rule") + && _json["interpretation"]["rule"].is_object() + && _json["interpretation"]["rule"].contains("parameters") + && _json["interpretation"]["rule"]["parameters"].is_object()) + { + const auto& parameters = _json["interpretation"]["rule"]["parameters"]; - std::optional start; - if (auto element = parameters.value("start", nullptr); - element.is_number()) - start = element; + std::optional start; + if (auto element = parameters.value("start", nullptr); + element.is_number()) + start = element; - std::optional delta; - if (auto element = parameters.value("delta", nullptr); - element.is_number()) - delta = element; + std::optional delta; + if (auto element = parameters.value("delta", nullptr); + element.is_number()) + delta = element; + } - return std::make_pair(start, delta); + if (_json.contains("definition") + && _json["definition"].is_object() + && _json["definition"].contains("linear") + && _json["definition"]["linear"].is_object()) + { + const auto& linear = _json["definition"]["linear"]; + + if (auto element = linear.value("start", nullptr); + element.is_number()) + start = element; + + if (auto element = linear.value("delta", nullptr); + element.is_number()) + delta = element; + } } - return std::make_pair(std::nullopt, std::nullopt); + return std::make_pair(start, delta); } std::string wss::metadata::name() const @@ -310,6 +327,28 @@ std::optional wss::metadata::unit() const return wss::unit(id, name, quantity, symbol); } + else if (_json.contains("definition") + && _json["definition"].is_object() + && _json["definition"].contains("unit") + && _json["definition"]["unit"].is_object()) + { + const auto& unit = _json["definition"]["unit"]; + + int id = -1; + std::string name, quantity, symbol; + + if (unit.contains("unitId") && unit["unitId"].is_number_integer()) + id = unit["unitId"]; + + if (unit.contains("displayName") && unit["displayName"].is_string()) + name = symbol = unit["displayName"]; + + if (unit.contains("quantity") && unit["quantity"].is_string()) + quantity = unit["quantity"]; + + return wss::unit(id, name, quantity, symbol); + } + return std::nullopt; }