Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (C) 2015-2017 CESNET, z.s.p.o.
Copyright (C) 2015-2025 CESNET, z.s.p.o.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
Expand Down
2 changes: 1 addition & 1 deletion src/core/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ print_version()
<< "Build type: " << IPX_BUILD_TYPE << "\n"
<< "Architecture: " << IPX_BUILD_ARCH << " (" << IPX_BUILD_BYTE_ORDER << ")" << "\n"
<< "Compiler: " << IPX_BUILD_COMPILER << "\n"
<< "Copyright (C) 2018 CESNET z.s.p.o.\n";
<< "Copyright (C) 2025 CESNET z.s.p.o.\n";
}

/**
Expand Down
57 changes: 37 additions & 20 deletions src/plugins/output/forwarder/src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg)
sender.process_message(msg);

} catch (const ConnectionError &err) {
// In case connection was lost, we have to resend templates when it reconnects
sender.clear_templates();
on_connection_lost();
throw err;
}
}
Expand All @@ -107,31 +106,36 @@ Connection::advance_transfers()

IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size());

for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
try {
for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {

Transfer &transfer = *it;
Transfer &transfer = *it;

assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger

ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);

check_socket_error(ret);
check_socket_error(ret);

size_t sent = std::max<ssize_t>(0, ret);
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
size_t sent = std::max<ssize_t>(0, ret);
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());

// Is the transfer done?
if (transfer.offset + sent == transfer.data.size()) {
it = m_transfers.erase(it);
// Remove the transfer and continue with the next one
// Is the transfer done?
if (transfer.offset + sent == transfer.data.size()) {
it = m_transfers.erase(it);
// Remove the transfer and continue with the next one

} else {
transfer.offset += sent;
} else {
transfer.offset += sent;

// Finish, cannot advance next transfer before the one before it is fully sent
break;
// Finish, cannot advance next transfer before the one before it is fully sent
break;
}
}
} catch (ConnectionError& err) {
on_connection_lost();
throw err;
}
}

Expand Down Expand Up @@ -236,8 +240,8 @@ Connection::get_or_create_sender(ipx_msg_ipfix_t *msg)
send_message(msg);
},
m_con_params.protocol == Protocol::TCP,
m_tmplts_resend_pkts,
m_tmplts_resend_secs)));
m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_pkts : 0,
m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_secs : 0)));
}

Sender &sender = *m_senders[odid].get();
Expand All @@ -261,3 +265,16 @@ Connection::check_socket_error(ssize_t sock_ret)
throw ConnectionError(errbuf);
}
}

void
Connection::on_connection_lost()
{
for (auto& p : m_senders) {
// In case connection was lost, we have to resend templates when it reconnects
Sender& sender = *p.second.get();
sender.clear_templates();
}

// Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message
m_transfers.clear();
}
5 changes: 4 additions & 1 deletion src/plugins/output/forwarder/src/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,7 @@ class Connection {

void
check_socket_error(ssize_t sock_ret);
};

void
on_connection_lost();
};
8 changes: 8 additions & 0 deletions src/plugins/output/forwarder/src/Forwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg)
}
}

void
Forwarder::handle_periodic_message(ipx_msg_periodic_t *msg)
{
for (auto &host : m_hosts) {
host->advance_transfers();
}
}

void
Forwarder::forward_to_all(ipx_msg_ipfix_t *msg)
{
Expand Down
10 changes: 9 additions & 1 deletion src/plugins/output/forwarder/src/Forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class Forwarder {
void
handle_ipfix_message(ipx_msg_ipfix_t *msg);

/**
* \brief Handle a periodic message
* \param msg The periodic message
*/
void
handle_periodic_message(ipx_msg_periodic_t *msg);

/**
* \brief The destructor - finalize the forwarder
*/
Expand All @@ -101,4 +108,5 @@ class Forwarder {

void
forward_round_robin(ipx_msg_ipfix_t *msg);
};

};
11 changes: 11 additions & 0 deletions src/plugins/output/forwarder/src/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ Host::forward_message(ipx_msg_ipfix_t *msg)
return true;
}

void
Host::advance_transfers()
{
for (auto &p : m_session_to_connection) {
Connection &connection = *p.second.get();
if (connection.check_connected()) {
connection.advance_transfers();
}
}
}

Host::~Host()
{
for (auto &p : m_session_to_connection) {
Expand Down
8 changes: 7 additions & 1 deletion src/plugins/output/forwarder/src/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class Host {
bool
forward_message(ipx_msg_ipfix_t *msg);

/**
* \brief Advance the unfinished transfers
*/
void
advance_transfers();

private:
const std::string &m_ident;

Expand All @@ -116,4 +122,4 @@ class Host {
Connector &m_connector;

std::unordered_map<const ipx_session *, std::unique_ptr<Connection>> m_session_to_connection;
};
};
6 changes: 5 additions & 1 deletion src/plugins/output/forwarder/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config)
return IPX_ERR_DENIED;
}

ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION;
ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC;
ipx_ctx_subscribe(ctx, &mask, NULL);
ipx_ctx_private_set(ctx, forwarder.release());

Expand Down Expand Up @@ -121,6 +121,10 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg)
forwarder.handle_session_message(ipx_msg_base2session(msg));
break;

case IPX_MSG_PERIODIC:
forwarder.handle_periodic_message(ipx_msg_base2periodic(msg));
break;

default: assert(0);
}

Expand Down
Loading