Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/ws-streaming/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ namespace wss

void on_signal_subscribe_requested(const std::string& signal_id);
void on_signal_unsubscribe_requested(const std::string& signal_id);
std::shared_ptr<detail::remote_signal_impl> on_signal_sought(const std::string& signal_id);
std::shared_ptr<detail::remote_signal_impl> on_table_sought(const std::string& table_id);

void dispatch_metadata(
unsigned signo,
Expand Down
4 changes: 4 additions & 0 deletions include/ws-streaming/detail/linear_table.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <string>

#include <ws-streaming/metadata.hpp>
#include <ws-streaming/detail/streaming_protocol.hpp>
Expand All @@ -27,8 +28,11 @@ namespace wss::detail

std::int64_t driven_index() const noexcept;

const std::string& id() const noexcept;

private:

std::string _id;
std::int64_t _index = 0;
std::int64_t _value = 0;
std::int64_t _delta = 0;
Expand Down
15 changes: 11 additions & 4 deletions include/ws-streaming/detail/remote_signal_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,31 @@ namespace wss::detail

struct remote_signal_entry
{
remote_signal_entry(const std::string& id)
remote_signal_entry(const std::string& id, bool hidden)
: signal(std::make_shared<detail::remote_signal_impl>(id))
, hidden(hidden)
{
}

bool hidden = false;
std::shared_ptr<detail::remote_signal_impl> signal;
boost::signals2::scoped_connection on_subscribe_requested;
boost::signals2::scoped_connection on_unsubscribe_requested;
boost::signals2::scoped_connection on_signal_sought;
boost::signals2::scoped_connection on_table_sought;
};

protected:

std::pair<bool, remote_signal_entry&> add_remote_signal(const std::string& id);
std::pair<bool, remote_signal_entry&> add_remote_signal(
const std::string& id,
bool hidden);

remote_signal_entry *find_remote_signal(const std::string& id);
remote_signal_entry *find_table(const std::string& table_id);
remote_signal_entry *find_remote_signal(unsigned signo);

const remote_signal_entry *find_remote_signal(const std::string& id) const;
const remote_signal_entry *find_table(const std::string& table_id) const;
const remote_signal_entry *find_remote_signal(unsigned signo) const;

template <typename Func>
Expand All @@ -51,7 +57,8 @@ namespace wss::detail
signal.second.signal->detach();

for (const auto& signal : old_signals)
func(signal.second.signal);
if (!signal.second.hidden)
func(signal.second.signal);
}

auto remote_signals()
Expand Down
4 changes: 3 additions & 1 deletion include/ws-streaming/detail/remote_signal_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ namespace wss::detail

boost::signals2::signal<void()> on_subscribe_requested;
boost::signals2::signal<void()> on_unsubscribe_requested;
boost::signals2::signal<std::shared_ptr<remote_signal_impl>(const std::string& id)> on_signal_sought;
boost::signals2::signal<std::shared_ptr<remote_signal_impl>(const std::string& table_id)> on_table_sought;

const std::shared_ptr<linear_table>& table() const noexcept;

private:

Expand Down
32 changes: 21 additions & 11 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ void wss::connection::on_signal_unsubscribe_requested(
}

std::shared_ptr<wss::detail::remote_signal_impl>
wss::connection::on_signal_sought(
const std::string& signal_id)
wss::connection::on_table_sought(
const std::string& table_id)
{
auto *entry = detail::remote_signal_container::find_remote_signal(signal_id);
auto *entry = detail::remote_signal_container::find_table(table_id);
if (entry)
return entry->signal;

Expand Down Expand Up @@ -401,7 +401,7 @@ void wss::connection::handle_available(
if (!id.is_string())
continue;

auto [added, signal] = add_remote_signal(id);
auto [added, signal] = add_remote_signal(id, false);
if (!added)
continue;

Expand All @@ -411,8 +411,8 @@ void wss::connection::handle_available(
signal.on_unsubscribe_requested = signal.signal->on_unsubscribe_requested.connect(
std::bind(&connection::on_signal_unsubscribe_requested, this, id));

signal.on_signal_sought = signal.signal->on_signal_sought.connect(
std::bind(&connection::on_signal_sought, this, _1));
signal.on_table_sought = signal.signal->on_table_sought.connect(
std::bind(&connection::on_table_sought, this, _1));

on_available(signal.signal);
}
Expand All @@ -429,12 +429,22 @@ void wss::connection::handle_subscribe(
else if (params.is_array() && params.size() > 0 && params[0].is_string())
signal_id = params[0];

auto *entry = detail::remote_signal_container::find_remote_signal(static_cast<std::string>(signal_id));
if (!entry)
return;
auto [added, signal] = add_remote_signal(signal_id, true);

if (added)
{
signal.on_subscribe_requested = signal.signal->on_subscribe_requested.connect(
std::bind(&connection::on_signal_subscribe_requested, this, signal_id));

signal.on_unsubscribe_requested = signal.signal->on_unsubscribe_requested.connect(
std::bind(&connection::on_signal_unsubscribe_requested, this, signal_id));

signal.on_table_sought = signal.signal->on_table_sought.connect(
std::bind(&connection::on_table_sought, this, _1));
}

set_remote_signal_signo(entry, signo);
entry->signal->handle_metadata("subscribe", params);
set_remote_signal_signo(&signal, signo);
signal.signal->handle_metadata("subscribe", params);
}

void wss::connection::handle_unsubscribe(
Expand Down
7 changes: 7 additions & 0 deletions src/detail/linear_table.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <cstdint>
#include <string>

#include <ws-streaming/metadata.hpp>
#include <ws-streaming/rule_types.hpp>
Expand All @@ -15,6 +16,7 @@ wss::detail::linear_table::linear_table(const metadata& metadata)
void wss::detail::linear_table::update(
const metadata& metadata)
{
_id = metadata.table_id();
auto [start, delta] = metadata.linear_start_delta();

_value = start.value_or(_value);
Expand Down Expand Up @@ -60,3 +62,8 @@ std::int64_t wss::detail::linear_table::driven_index() const noexcept
{
return _driven_index;
}

const std::string& wss::detail::linear_table::id() const noexcept
{
return _id;
}
62 changes: 33 additions & 29 deletions src/detail/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,13 @@ void wss::detail::peer::process_buffer()

void wss::detail::peer::process_buffer_tcp()
{
// Process as many streaming protocol packets as possible.
while (true)
{
std::size_t bytes_consumed = process_packet(
_rx_buffer.data(),
_rx_buffer_bytes);

// If there's not enough data to form a complete packet, we can't process any more.
if (!bytes_consumed)
break;
std::size_t bytes_consumed = process_packet(
_rx_buffer.data(),
_rx_buffer_bytes);

// If there's not enough data to form a complete packet, we can't process any more.
if (bytes_consumed)
{
// Consume the handled frame by sliding the remaining data in the read buffer over
// to the left. (Can't use std::memcpy() for this because the ranges overlap.)
std::memmove(
Expand Down Expand Up @@ -337,32 +333,40 @@ std::size_t wss::detail::peer::process_packet(
const std::uint8_t *data,
std::size_t size)
{
// Try to decode the WebSocket Streaming Protocol packet.
auto header = detail::streaming_protocol::decode_header(data, size, _use_tcp_protocol);
if (!header.header_size)
return 0;
const auto *origin = data;

switch (header.type)
while (true)
{
case detail::streaming_protocol::packet_type::DATA:
process_data_packet(
header.signo,
data + header.header_size,
header.payload_size);
// Try to decode the WebSocket Streaming Protocol packet.
auto header = detail::streaming_protocol::decode_header(data, size, _use_tcp_protocol);
if (!header.header_size)
break;

case detail::streaming_protocol::packet_type::METADATA:
process_metadata_packet(
header.signo,
data + header.header_size,
header.payload_size);
break;
switch (header.type)
{
case detail::streaming_protocol::packet_type::DATA:
process_data_packet(
header.signo,
data + header.header_size,
header.payload_size);
break;

default:
break;
case detail::streaming_protocol::packet_type::METADATA:
process_metadata_packet(
header.signo,
data + header.header_size,
header.payload_size);
break;

default:
break;
}

data += header.header_size + header.payload_size;
size -= header.header_size + header.payload_size;
}

return header.header_size + header.payload_size;
return data - origin;
}

void wss::detail::peer::process_data_packet(
Expand Down
45 changes: 43 additions & 2 deletions src/detail/remote_signal_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
#include <ws-streaming/detail/remote_signal_container.hpp>

std::pair<bool, wss::detail::remote_signal_container::remote_signal_entry&>
wss::detail::remote_signal_container::add_remote_signal(const std::string& id)
wss::detail::remote_signal_container::add_remote_signal(
const std::string& id,
bool hidden)
{
auto it = _signals_by_id.find(id);
if (it != _signals_by_id.end())
return std::make_pair(true, std::ref(it->second));

auto [jt, emplaced] = _signals_by_id.emplace(id, id);
auto [jt, emplaced] = _signals_by_id.emplace(
std::piecewise_construct,
std::make_tuple(id),
std::make_tuple(id, hidden));
return std::make_pair(true, std::ref(jt->second));
}

Expand All @@ -27,6 +32,24 @@ wss::detail::remote_signal_container::find_remote_signal(const std::string& id)
return &it->second;
}

wss::detail::remote_signal_container::remote_signal_entry *
wss::detail::remote_signal_container::find_table(const std::string& table_id)
{
auto it = std::find_if(
_signals_by_id.begin(),
_signals_by_id.end(),
[table_id](const decltype(_signals_by_id)::value_type& entry)
{
auto table = entry.second.signal->table();
return table && !table->id().empty() && table->id() == table_id;
});

if (it == _signals_by_id.end())
return nullptr;

return &it->second;
}

wss::detail::remote_signal_container::remote_signal_entry *
wss::detail::remote_signal_container::find_remote_signal(unsigned signo)
{
Expand All @@ -47,6 +70,24 @@ wss::detail::remote_signal_container::find_remote_signal(const std::string& id)
return &it->second;
}

const wss::detail::remote_signal_container::remote_signal_entry *
wss::detail::remote_signal_container::find_table(const std::string& table_id) const
{
auto it = std::find_if(
_signals_by_id.begin(),
_signals_by_id.end(),
[table_id](const decltype(_signals_by_id)::value_type& entry)
{
auto table = entry.second.signal->table();
return table && !table->id().empty() && table->id() == table_id;
});

if (it == _signals_by_id.end())
return nullptr;

return &it->second;
}

const wss::detail::remote_signal_container::remote_signal_entry *
wss::detail::remote_signal_container::find_remote_signal(unsigned signo) const
{
Expand Down
8 changes: 7 additions & 1 deletion src/detail/remote_signal_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ws-streaming/remote_signal.hpp>
#include <ws-streaming/rule_types.hpp>
#include <ws-streaming/detail/json.hpp>
#include <ws-streaming/detail/linear_table.hpp>
#include <ws-streaming/detail/remote_signal_impl.hpp>
#include <ws-streaming/detail/streaming_protocol.hpp>

Expand Down Expand Up @@ -145,6 +146,11 @@ void wss::detail::remote_signal_impl::signo(unsigned signo)
_signo = signo;
}

const std::shared_ptr<wss::detail::linear_table>& wss::detail::remote_signal_impl::table() const noexcept
{
return _table;
}

void wss::detail::remote_signal_impl::handle_subscribe()
{
if (_is_subscribed)
Expand Down Expand Up @@ -189,7 +195,7 @@ void wss::detail::remote_signal_impl::handle_signal(

if (!table_id.empty() && table_id != id())
{
_domain_signal = on_signal_sought(table_id).value_or(nullptr);
_domain_signal = on_table_sought(table_id).value_or(nullptr);
if (_domain_signal)
_domain_table = _domain_signal->_table;
else
Expand Down
Loading