From 6061caf4831947b20e5e72cc945c9c7c2717234f Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 2 Jul 2025 10:33:58 +0200 Subject: [PATCH 1/4] Update copyright years in LICENSE / ipfixcol2 -V --- LICENSE | 2 +- src/core/main.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/LICENSE b/LICENSE index 6ece728a..33e14b89 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (C) 2015-2017 CESNET, z.s.p.o. +Copyright (C) 2015-2025 CESNET, z.s.p.o. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions diff --git a/src/core/main.cpp b/src/core/main.cpp index 1c133b9e..c79daec4 100644 --- a/src/core/main.cpp +++ b/src/core/main.cpp @@ -73,7 +73,7 @@ print_version() << "Build type: " << IPX_BUILD_TYPE << "\n" << "Architecture: " << IPX_BUILD_ARCH << " (" << IPX_BUILD_BYTE_ORDER << ")" << "\n" << "Compiler: " << IPX_BUILD_COMPILER << "\n" - << "Copyright (C) 2018 CESNET z.s.p.o.\n"; + << "Copyright (C) 2025 CESNET z.s.p.o.\n"; } /** From 5cc285760f56221932b2e07f87c9692ab685cdc3 Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 2 Jul 2025 10:32:47 +0200 Subject: [PATCH 2/4] Forwarder: fix templates being repeatedly sent when using TCP When using UDP, templates are periodically resent unlike TCP, where this periodic resending should not happen. Because of a missing check this periodic resending was happening even over TCP. --- src/plugins/output/forwarder/src/Connection.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp index 9caaf11a..9ebd0e65 100644 --- a/src/plugins/output/forwarder/src/Connection.cpp +++ b/src/plugins/output/forwarder/src/Connection.cpp @@ -236,8 +236,8 @@ Connection::get_or_create_sender(ipx_msg_ipfix_t *msg) send_message(msg); }, m_con_params.protocol == Protocol::TCP, - m_tmplts_resend_pkts, - m_tmplts_resend_secs))); + m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_pkts : 0, + m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_secs : 0))); } Sender &sender = *m_senders[odid].get(); From 30bb2c86d9616bd03cb0f619b1e5b7f1aa3cf1c8 Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 2 Jul 2025 13:27:16 +0200 Subject: [PATCH 3/4] Forwarder: handle periodic message Adds support for newly introduced "periodic message". Without this change, unfinished data transfers might never finish if no input data is coming. --- src/plugins/output/forwarder/src/Forwarder.cpp | 8 ++++++++ src/plugins/output/forwarder/src/Forwarder.h | 10 +++++++++- src/plugins/output/forwarder/src/Host.cpp | 11 +++++++++++ src/plugins/output/forwarder/src/Host.h | 8 +++++++- src/plugins/output/forwarder/src/main.cpp | 6 +++++- 5 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/plugins/output/forwarder/src/Forwarder.cpp b/src/plugins/output/forwarder/src/Forwarder.cpp index da0e89f1..318caf11 100644 --- a/src/plugins/output/forwarder/src/Forwarder.cpp +++ b/src/plugins/output/forwarder/src/Forwarder.cpp @@ -105,6 +105,14 @@ void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg) } } +void +Forwarder::handle_periodic_message(ipx_msg_periodic_t *msg) +{ + for (auto &host : m_hosts) { + host->advance_transfers(); + } +} + void Forwarder::forward_to_all(ipx_msg_ipfix_t *msg) { diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h index 2fc11ddd..2aba1677 100644 --- a/src/plugins/output/forwarder/src/Forwarder.h +++ b/src/plugins/output/forwarder/src/Forwarder.h @@ -80,6 +80,13 @@ class Forwarder { void handle_ipfix_message(ipx_msg_ipfix_t *msg); + /** + * \brief Handle a periodic message + * \param msg The periodic message + */ + void + handle_periodic_message(ipx_msg_periodic_t *msg); + /** * \brief The destructor - finalize the forwarder */ @@ -101,4 +108,5 @@ class Forwarder { void forward_round_robin(ipx_msg_ipfix_t *msg); -}; \ No newline at end of file + +}; diff --git a/src/plugins/output/forwarder/src/Host.cpp b/src/plugins/output/forwarder/src/Host.cpp index 26851e7e..a44a5fa4 100644 --- a/src/plugins/output/forwarder/src/Host.cpp +++ b/src/plugins/output/forwarder/src/Host.cpp @@ -142,6 +142,17 @@ Host::forward_message(ipx_msg_ipfix_t *msg) return true; } +void +Host::advance_transfers() +{ + for (auto &p : m_session_to_connection) { + Connection &connection = *p.second.get(); + if (connection.check_connected()) { + connection.advance_transfers(); + } + } +} + Host::~Host() { for (auto &p : m_session_to_connection) { diff --git a/src/plugins/output/forwarder/src/Host.h b/src/plugins/output/forwarder/src/Host.h index 4189f566..8868b6ec 100644 --- a/src/plugins/output/forwarder/src/Host.h +++ b/src/plugins/output/forwarder/src/Host.h @@ -100,6 +100,12 @@ class Host { bool forward_message(ipx_msg_ipfix_t *msg); + /** + * \brief Advance the unfinished transfers + */ + void + advance_transfers(); + private: const std::string &m_ident; @@ -116,4 +122,4 @@ class Host { Connector &m_connector; std::unordered_map> m_session_to_connection; -}; \ No newline at end of file +}; diff --git a/src/plugins/output/forwarder/src/main.cpp b/src/plugins/output/forwarder/src/main.cpp index a05d61fd..59dc35e8 100644 --- a/src/plugins/output/forwarder/src/main.cpp +++ b/src/plugins/output/forwarder/src/main.cpp @@ -88,7 +88,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config) return IPX_ERR_DENIED; } - ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION; + ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC; ipx_ctx_subscribe(ctx, &mask, NULL); ipx_ctx_private_set(ctx, forwarder.release()); @@ -121,6 +121,10 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg) forwarder.handle_session_message(ipx_msg_base2session(msg)); break; + case IPX_MSG_PERIODIC: + forwarder.handle_periodic_message(ipx_msg_base2periodic(msg)); + break; + default: assert(0); } From 68ec5a82fd6d61cf857640419334f0648fab1848 Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 2 Jul 2025 13:31:34 +0200 Subject: [PATCH 4/4] Forwarder: clear buffers on connection lost Unfinished transfers would remain in buffers when we lost connection. In case of TCP, this is a problem. A message might be half-way send when the connection drops, resulting in the other half being sent after the TCP connection reestabilishes. The collector would interpret this as an entirely new message, resulting in bogus data being decoded. --- .../output/forwarder/src/Connection.cpp | 53 ++++++++++++------- src/plugins/output/forwarder/src/Connection.h | 5 +- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp index 9ebd0e65..36718982 100644 --- a/src/plugins/output/forwarder/src/Connection.cpp +++ b/src/plugins/output/forwarder/src/Connection.cpp @@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg) sender.process_message(msg); } catch (const ConnectionError &err) { - // In case connection was lost, we have to resend templates when it reconnects - sender.clear_templates(); + on_connection_lost(); throw err; } } @@ -107,31 +106,36 @@ Connection::advance_transfers() IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size()); - for (auto it = m_transfers.begin(); it != m_transfers.end(); ) { + try { + for (auto it = m_transfers.begin(); it != m_transfers.end(); ) { - Transfer &transfer = *it; + Transfer &transfer = *it; - assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger + assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger - ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset], - transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL); + ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset], + transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL); - check_socket_error(ret); + check_socket_error(ret); - size_t sent = std::max(0, ret); - IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str()); + size_t sent = std::max(0, ret); + IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str()); - // Is the transfer done? - if (transfer.offset + sent == transfer.data.size()) { - it = m_transfers.erase(it); - // Remove the transfer and continue with the next one + // Is the transfer done? + if (transfer.offset + sent == transfer.data.size()) { + it = m_transfers.erase(it); + // Remove the transfer and continue with the next one - } else { - transfer.offset += sent; + } else { + transfer.offset += sent; - // Finish, cannot advance next transfer before the one before it is fully sent - break; + // Finish, cannot advance next transfer before the one before it is fully sent + break; + } } + } catch (ConnectionError& err) { + on_connection_lost(); + throw err; } } @@ -261,3 +265,16 @@ Connection::check_socket_error(ssize_t sock_ret) throw ConnectionError(errbuf); } } + +void +Connection::on_connection_lost() +{ + for (auto& p : m_senders) { + // In case connection was lost, we have to resend templates when it reconnects + Sender& sender = *p.second.get(); + sender.clear_templates(); + } + + // Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message + m_transfers.clear(); +} diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h index 02c76f1d..d137b5c4 100644 --- a/src/plugins/output/forwarder/src/Connection.h +++ b/src/plugins/output/forwarder/src/Connection.h @@ -181,4 +181,7 @@ class Connection { void check_socket_error(ssize_t sock_ret); -}; \ No newline at end of file + + void + on_connection_lost(); +};