diff --git a/LICENSE b/LICENSE index 6ece728a..33e14b89 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (C) 2015-2017 CESNET, z.s.p.o. +Copyright (C) 2015-2025 CESNET, z.s.p.o. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions diff --git a/src/core/main.cpp b/src/core/main.cpp index 1c133b9e..c79daec4 100644 --- a/src/core/main.cpp +++ b/src/core/main.cpp @@ -73,7 +73,7 @@ print_version() << "Build type: " << IPX_BUILD_TYPE << "\n" << "Architecture: " << IPX_BUILD_ARCH << " (" << IPX_BUILD_BYTE_ORDER << ")" << "\n" << "Compiler: " << IPX_BUILD_COMPILER << "\n" - << "Copyright (C) 2018 CESNET z.s.p.o.\n"; + << "Copyright (C) 2025 CESNET z.s.p.o.\n"; } /** diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp index 9caaf11a..36718982 100644 --- a/src/plugins/output/forwarder/src/Connection.cpp +++ b/src/plugins/output/forwarder/src/Connection.cpp @@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg) sender.process_message(msg); } catch (const ConnectionError &err) { - // In case connection was lost, we have to resend templates when it reconnects - sender.clear_templates(); + on_connection_lost(); throw err; } } @@ -107,31 +106,36 @@ Connection::advance_transfers() IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size()); - for (auto it = m_transfers.begin(); it != m_transfers.end(); ) { + try { + for (auto it = m_transfers.begin(); it != m_transfers.end(); ) { - Transfer &transfer = *it; + Transfer &transfer = *it; - assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger + assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger - ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset], - transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL); + ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset], + transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL); - check_socket_error(ret); + check_socket_error(ret); - size_t sent = std::max(0, ret); - IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str()); + size_t sent = std::max(0, ret); + IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str()); - // Is the transfer done? - if (transfer.offset + sent == transfer.data.size()) { - it = m_transfers.erase(it); - // Remove the transfer and continue with the next one + // Is the transfer done? + if (transfer.offset + sent == transfer.data.size()) { + it = m_transfers.erase(it); + // Remove the transfer and continue with the next one - } else { - transfer.offset += sent; + } else { + transfer.offset += sent; - // Finish, cannot advance next transfer before the one before it is fully sent - break; + // Finish, cannot advance next transfer before the one before it is fully sent + break; + } } + } catch (ConnectionError& err) { + on_connection_lost(); + throw err; } } @@ -236,8 +240,8 @@ Connection::get_or_create_sender(ipx_msg_ipfix_t *msg) send_message(msg); }, m_con_params.protocol == Protocol::TCP, - m_tmplts_resend_pkts, - m_tmplts_resend_secs))); + m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_pkts : 0, + m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_secs : 0))); } Sender &sender = *m_senders[odid].get(); @@ -261,3 +265,16 @@ Connection::check_socket_error(ssize_t sock_ret) throw ConnectionError(errbuf); } } + +void +Connection::on_connection_lost() +{ + for (auto& p : m_senders) { + // In case connection was lost, we have to resend templates when it reconnects + Sender& sender = *p.second.get(); + sender.clear_templates(); + } + + // Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message + m_transfers.clear(); +} diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h index 02c76f1d..d137b5c4 100644 --- a/src/plugins/output/forwarder/src/Connection.h +++ b/src/plugins/output/forwarder/src/Connection.h @@ -181,4 +181,7 @@ class Connection { void check_socket_error(ssize_t sock_ret); -}; \ No newline at end of file + + void + on_connection_lost(); +}; diff --git a/src/plugins/output/forwarder/src/Forwarder.cpp b/src/plugins/output/forwarder/src/Forwarder.cpp index da0e89f1..318caf11 100644 --- a/src/plugins/output/forwarder/src/Forwarder.cpp +++ b/src/plugins/output/forwarder/src/Forwarder.cpp @@ -105,6 +105,14 @@ void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg) } } +void +Forwarder::handle_periodic_message(ipx_msg_periodic_t *msg) +{ + for (auto &host : m_hosts) { + host->advance_transfers(); + } +} + void Forwarder::forward_to_all(ipx_msg_ipfix_t *msg) { diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h index 2fc11ddd..2aba1677 100644 --- a/src/plugins/output/forwarder/src/Forwarder.h +++ b/src/plugins/output/forwarder/src/Forwarder.h @@ -80,6 +80,13 @@ class Forwarder { void handle_ipfix_message(ipx_msg_ipfix_t *msg); + /** + * \brief Handle a periodic message + * \param msg The periodic message + */ + void + handle_periodic_message(ipx_msg_periodic_t *msg); + /** * \brief The destructor - finalize the forwarder */ @@ -101,4 +108,5 @@ class Forwarder { void forward_round_robin(ipx_msg_ipfix_t *msg); -}; \ No newline at end of file + +}; diff --git a/src/plugins/output/forwarder/src/Host.cpp b/src/plugins/output/forwarder/src/Host.cpp index 26851e7e..a44a5fa4 100644 --- a/src/plugins/output/forwarder/src/Host.cpp +++ b/src/plugins/output/forwarder/src/Host.cpp @@ -142,6 +142,17 @@ Host::forward_message(ipx_msg_ipfix_t *msg) return true; } +void +Host::advance_transfers() +{ + for (auto &p : m_session_to_connection) { + Connection &connection = *p.second.get(); + if (connection.check_connected()) { + connection.advance_transfers(); + } + } +} + Host::~Host() { for (auto &p : m_session_to_connection) { diff --git a/src/plugins/output/forwarder/src/Host.h b/src/plugins/output/forwarder/src/Host.h index 4189f566..8868b6ec 100644 --- a/src/plugins/output/forwarder/src/Host.h +++ b/src/plugins/output/forwarder/src/Host.h @@ -100,6 +100,12 @@ class Host { bool forward_message(ipx_msg_ipfix_t *msg); + /** + * \brief Advance the unfinished transfers + */ + void + advance_transfers(); + private: const std::string &m_ident; @@ -116,4 +122,4 @@ class Host { Connector &m_connector; std::unordered_map> m_session_to_connection; -}; \ No newline at end of file +}; diff --git a/src/plugins/output/forwarder/src/main.cpp b/src/plugins/output/forwarder/src/main.cpp index a05d61fd..59dc35e8 100644 --- a/src/plugins/output/forwarder/src/main.cpp +++ b/src/plugins/output/forwarder/src/main.cpp @@ -88,7 +88,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config) return IPX_ERR_DENIED; } - ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION; + ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC; ipx_ctx_subscribe(ctx, &mask, NULL); ipx_ctx_private_set(ctx, forwarder.release()); @@ -121,6 +121,10 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg) forwarder.handle_session_message(ipx_msg_base2session(msg)); break; + case IPX_MSG_PERIODIC: + forwarder.handle_periodic_message(ipx_msg_base2periodic(msg)); + break; + default: assert(0); }