From 1c78b5e5e2a9ef1c7b83a60d963672d17b5589d9 Mon Sep 17 00:00:00 2001 From: Konrad Breitsprecher Date: Thu, 29 Jan 2026 09:48:06 +0100 Subject: [PATCH 1/7] Modified Demos to trigger bug; Add integrationtest to trigger bug Signed-off-by: Konrad Breitsprecher --- Demos/communication/PubSub/PublisherDemo.cpp | 20 +---- Demos/communication/PubSub/SubscriberDemo.cpp | 17 ++-- .../ITest_Internals_DataPubSub.cpp | 79 +++++++++++++++++++ .../ITest_Internals_DataPubSub.hpp | 2 +- .../IntegrationTestInfrastructure.hpp | 7 +- 5 files changed, 93 insertions(+), 32 deletions(-) diff --git a/Demos/communication/PubSub/PublisherDemo.cpp b/Demos/communication/PubSub/PublisherDemo.cpp index 5994bdf33..c404f7923 100644 --- a/Demos/communication/PubSub/PublisherDemo.cpp +++ b/Demos/communication/PubSub/PublisherDemo.cpp @@ -21,9 +21,9 @@ class Publisher : public ApplicationBase void CreateControllers() override { - _gpsPublisher = GetParticipant()->CreateDataPublisher("GpsPublisher", PubSubDemoCommon::dataSpecGps, 0); - _temperaturePublisher = - GetParticipant()->CreateDataPublisher("TemperaturePublisher", PubSubDemoCommon::dataSpecTemperature, 0); + auto pubSpec = PubSubDemoCommon::dataSpecGps; + pubSpec.AddLabel("Key1", "Value1", SilKit::Services::MatchingLabel::Kind::Optional); + _gpsPublisher = GetParticipant()->CreateDataPublisher("GpsPublisher", pubSpec, 0); } void InitControllers() override {} @@ -44,28 +44,14 @@ class Publisher : public ApplicationBase _gpsPublisher->Publish(gpsSerialized); } - void PublishTemperatureData() - { - double temperature = 25.0 + static_cast(rand() % 10) / 10.0; - auto temperatureSerialized = PubSubDemoCommon::SerializeTemperature(temperature); - - std::stringstream ss; - ss << "Publishing temperature data: temperature=" << temperature; - GetLogger()->Info(ss.str()); - - _temperaturePublisher->Publish(temperatureSerialized); - } - void DoWorkSync(std::chrono::nanoseconds /*now*/) override { PublishGPSData(); - PublishTemperatureData(); } void DoWorkAsync() override { PublishGPSData(); - PublishTemperatureData(); } }; diff --git a/Demos/communication/PubSub/SubscriberDemo.cpp b/Demos/communication/PubSub/SubscriberDemo.cpp index af3f4f5e2..e9dffc747 100644 --- a/Demos/communication/PubSub/SubscriberDemo.cpp +++ b/Demos/communication/PubSub/SubscriberDemo.cpp @@ -21,8 +21,12 @@ class Subscriber : public ApplicationBase void CreateControllers() override { + auto subSpec = PubSubDemoCommon::dataSpecGps; + subSpec.AddLabel("Key1", "Value1", SilKit::Services::MatchingLabel::Kind::Optional); + subSpec.AddLabel("Key2", "Value2", SilKit::Services::MatchingLabel::Kind::Optional); + _gpsSubscriber = GetParticipant()->CreateDataSubscriber( - "GpsSubscriber", PubSubDemoCommon::dataSpecGps, + "GpsSubscriber", subSpec, [this](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { auto gpsData = PubSubDemoCommon::DeserializeGPSData(SilKit::Util::ToStdVector(dataMessageEvent.data)); @@ -31,17 +35,6 @@ class Subscriber : public ApplicationBase << ", signalQuality=" << gpsData.signalQuality; GetLogger()->Info(ss.str()); }); - - _temperatureSubscriber = GetParticipant()->CreateDataSubscriber( - "TemperatureSubscriber", PubSubDemoCommon::dataSpecTemperature, - [this](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { - double temperature = - PubSubDemoCommon::DeserializeTemperature(SilKit::Util::ToStdVector(dataMessageEvent.data)); - - std::stringstream ss; - ss << "Received temperature data: temperature=" << temperature; - GetLogger()->Info(ss.str()); - }); } void InitControllers() override {} diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp index 641838918..16408053e 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp @@ -805,4 +805,83 @@ TEST_F(ITest_Internals_DataPubSub, test_1pub_1sub_async_rejoin) ShutdownSystem(); } + +// Two publishers (optional label1), one subscriber (optional label1, label2); publishers start first; subscriber joins +TEST_F(ITest_Internals_DataPubSub, test_2pub_1sub_async_starting_order) +{ + const uint32_t numMsgToPublish = 1; + const uint32_t numMsgToReceive = 1 * numMsgToPublish; + + std::vector publishers; + publishers.push_back({"Pub1", + {{"PubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}}, + 1, + defaultMsgSize, + numMsgToPublish}}, + {}}); + publishers.push_back({"Pub2", + {{"PubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}}, + 1, + defaultMsgSize, + numMsgToPublish}}, + {}}); + + std::vector subscribers; + std::vector> expectedDataUnordered; + expectedDataUnordered.reserve(numMsgToReceive); + for (uint32_t d = 0; d < numMsgToReceive; d++) + { + // Receive the same blob several times (once from every publisher) + expectedDataUnordered.emplace_back(std::vector(defaultMsgSize, 0)); + } + subscribers.push_back( + {"Sub1", + {}, + {{ + "SubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}, + {"K2", "V2", SilKit::Services::MatchingLabel::Kind::Optional} + }, // BUGHUNT: Second label breaks communication + defaultMsgSize, + numMsgToReceive, + 1, + expectedDataUnordered, + }}}); + + for (auto& sub : subscribers) + { + sub.communicationTimeout = std::chrono::milliseconds(1000); + } + + _testSystem.SetupRegistryAndSystemMaster("silkit://localhost:0", false, {}); + + + //BUGHUNT: Subscribers start first fails SOMETIMES + //RunParticipants(subscribers, _testSystem.GetRegistryUri(), false); + + //BUGHUNT: Publishers start first fails ALWAYS + + // Start publishers + RunParticipants(publishers, _testSystem.GetRegistryUri(), false); + for (auto& p : publishers) + { + p.WaitForAllSent(); + } + + // Start subscriber + RunParticipants(subscribers, _testSystem.GetRegistryUri(), false); + + + JoinPubSubThreads(); + ShutdownSystem(); +} + } // anonymous namespace diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp index 107cc9efe..e6778dbb0 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp @@ -380,7 +380,7 @@ class ITest_Internals_DataPubSub : public testing::Test participant.allSentPromise.set_value(); } - if (!participant.dataSubscribers.empty()) + if (!participant.dataSubscribers.empty() && !participant.allReceived) { participant.WaitForAllReceived(); } diff --git a/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp b/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp index d38f77ae2..1a2d12135 100644 --- a/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp +++ b/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp @@ -25,7 +25,10 @@ class TestInfrastructure { std::stringstream ss; ss << "Something went wrong: " << error.what() << std::endl; - _systemMaster.systemController->AbortSimulation(); + if (_systemMaster.systemController) + { + _systemMaster.systemController->AbortSimulation(); + } FAIL() << ss.str(); } @@ -127,7 +130,7 @@ class TestInfrastructure struct SystemMaster { std::unique_ptr participant; - SilKit::Experimental::Services::Orchestration::ISystemController* systemController; + SilKit::Experimental::Services::Orchestration::ISystemController* systemController{nullptr}; ISystemMonitor* systemMonitor; ILifecycleService* lifecycleService; From f63fbd0e18c8b3807873a913e89ceeef7dcfdd96 Mon Sep 17 00:00:00 2001 From: Konrad Breitsprecher Date: Thu, 29 Jan 2026 09:50:21 +0100 Subject: [PATCH 2/7] Add comments Signed-off-by: Konrad Breitsprecher --- Demos/communication/PubSub/PublisherDemo.cpp | 4 +++- Demos/communication/PubSub/SubscriberDemo.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Demos/communication/PubSub/PublisherDemo.cpp b/Demos/communication/PubSub/PublisherDemo.cpp index c404f7923..b52c11611 100644 --- a/Demos/communication/PubSub/PublisherDemo.cpp +++ b/Demos/communication/PubSub/PublisherDemo.cpp @@ -22,7 +22,9 @@ class Publisher : public ApplicationBase void CreateControllers() override { auto pubSpec = PubSubDemoCommon::dataSpecGps; - pubSpec.AddLabel("Key1", "Value1", SilKit::Services::MatchingLabel::Kind::Optional); + pubSpec.AddLabel( + "Key1", "Value1", + SilKit::Services::MatchingLabel::Kind::Optional); // BUGHUNT: Missing second label breaks communication _gpsPublisher = GetParticipant()->CreateDataPublisher("GpsPublisher", pubSpec, 0); } diff --git a/Demos/communication/PubSub/SubscriberDemo.cpp b/Demos/communication/PubSub/SubscriberDemo.cpp index e9dffc747..b9a1c69c7 100644 --- a/Demos/communication/PubSub/SubscriberDemo.cpp +++ b/Demos/communication/PubSub/SubscriberDemo.cpp @@ -23,7 +23,7 @@ class Subscriber : public ApplicationBase { auto subSpec = PubSubDemoCommon::dataSpecGps; subSpec.AddLabel("Key1", "Value1", SilKit::Services::MatchingLabel::Kind::Optional); - subSpec.AddLabel("Key2", "Value2", SilKit::Services::MatchingLabel::Kind::Optional); + subSpec.AddLabel("Key2", "Value2", SilKit::Services::MatchingLabel::Kind::Optional); // BUGHUNT: Missing second on publisher label breaks communication _gpsSubscriber = GetParticipant()->CreateDataSubscriber( "GpsSubscriber", subSpec, From b10a59803fe3dda3213597f33180f0e97791d43a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20B=C3=B6rschig?= Date: Mon, 9 Feb 2026 14:07:57 +0100 Subject: [PATCH 3/7] apply fix from konrad MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marius Börschig --- SilKit/source/core/service/SpecificDiscoveryStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SilKit/source/core/service/SpecificDiscoveryStore.cpp b/SilKit/source/core/service/SpecificDiscoveryStore.cpp index ed7a76bcd..6004fd258 100644 --- a/SilKit/source/core/service/SpecificDiscoveryStore.cpp +++ b/SilKit/source/core/service/SpecificDiscoveryStore.cpp @@ -251,7 +251,7 @@ auto SpecificDiscoveryStore::GetLabelWithMinimalNodeSet(DiscoveryKeyNode& keyNod auto& not_label_nodes = keyNode.notLabelMap[l.key].nodes; size_t relevantNodeCount = fit_nodes.size() + not_label_nodes.size(); - if (relevantNodeCount < matchCount) + if (relevantNodeCount > 0) { matchCount = relevantNodeCount; outGreedyLabel = &l; From 86770779dcd32315767bf79ba3701fd244607ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20B=C3=B6rschig?= Date: Mon, 9 Feb 2026 14:10:43 +0100 Subject: [PATCH 4/7] revert demos MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marius Börschig --- Demos/communication/PubSub/PublisherDemo.cpp | 22 ++++++++++++++----- Demos/communication/PubSub/SubscriberDemo.cpp | 17 +++++++++----- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/Demos/communication/PubSub/PublisherDemo.cpp b/Demos/communication/PubSub/PublisherDemo.cpp index b52c11611..5994bdf33 100644 --- a/Demos/communication/PubSub/PublisherDemo.cpp +++ b/Demos/communication/PubSub/PublisherDemo.cpp @@ -21,11 +21,9 @@ class Publisher : public ApplicationBase void CreateControllers() override { - auto pubSpec = PubSubDemoCommon::dataSpecGps; - pubSpec.AddLabel( - "Key1", "Value1", - SilKit::Services::MatchingLabel::Kind::Optional); // BUGHUNT: Missing second label breaks communication - _gpsPublisher = GetParticipant()->CreateDataPublisher("GpsPublisher", pubSpec, 0); + _gpsPublisher = GetParticipant()->CreateDataPublisher("GpsPublisher", PubSubDemoCommon::dataSpecGps, 0); + _temperaturePublisher = + GetParticipant()->CreateDataPublisher("TemperaturePublisher", PubSubDemoCommon::dataSpecTemperature, 0); } void InitControllers() override {} @@ -46,14 +44,28 @@ class Publisher : public ApplicationBase _gpsPublisher->Publish(gpsSerialized); } + void PublishTemperatureData() + { + double temperature = 25.0 + static_cast(rand() % 10) / 10.0; + auto temperatureSerialized = PubSubDemoCommon::SerializeTemperature(temperature); + + std::stringstream ss; + ss << "Publishing temperature data: temperature=" << temperature; + GetLogger()->Info(ss.str()); + + _temperaturePublisher->Publish(temperatureSerialized); + } + void DoWorkSync(std::chrono::nanoseconds /*now*/) override { PublishGPSData(); + PublishTemperatureData(); } void DoWorkAsync() override { PublishGPSData(); + PublishTemperatureData(); } }; diff --git a/Demos/communication/PubSub/SubscriberDemo.cpp b/Demos/communication/PubSub/SubscriberDemo.cpp index b9a1c69c7..af3f4f5e2 100644 --- a/Demos/communication/PubSub/SubscriberDemo.cpp +++ b/Demos/communication/PubSub/SubscriberDemo.cpp @@ -21,12 +21,8 @@ class Subscriber : public ApplicationBase void CreateControllers() override { - auto subSpec = PubSubDemoCommon::dataSpecGps; - subSpec.AddLabel("Key1", "Value1", SilKit::Services::MatchingLabel::Kind::Optional); - subSpec.AddLabel("Key2", "Value2", SilKit::Services::MatchingLabel::Kind::Optional); // BUGHUNT: Missing second on publisher label breaks communication - _gpsSubscriber = GetParticipant()->CreateDataSubscriber( - "GpsSubscriber", subSpec, + "GpsSubscriber", PubSubDemoCommon::dataSpecGps, [this](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { auto gpsData = PubSubDemoCommon::DeserializeGPSData(SilKit::Util::ToStdVector(dataMessageEvent.data)); @@ -35,6 +31,17 @@ class Subscriber : public ApplicationBase << ", signalQuality=" << gpsData.signalQuality; GetLogger()->Info(ss.str()); }); + + _temperatureSubscriber = GetParticipant()->CreateDataSubscriber( + "TemperatureSubscriber", PubSubDemoCommon::dataSpecTemperature, + [this](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { + double temperature = + PubSubDemoCommon::DeserializeTemperature(SilKit::Util::ToStdVector(dataMessageEvent.data)); + + std::stringstream ss; + ss << "Received temperature data: temperature=" << temperature; + GetLogger()->Info(ss.str()); + }); } void InitControllers() override {} From e4edc7f886b70f92899114a342954be23fafea36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20B=C3=B6rschig?= Date: Mon, 9 Feb 2026 17:23:14 +0100 Subject: [PATCH 5/7] fix unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marius Börschig --- .../core/service/Test_SpecificDiscoveryStore.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp b/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp index c567d864d..060890aa0 100644 --- a/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp +++ b/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp @@ -336,8 +336,17 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues ServiceDescriptor labelTestDescriptor{baseDescriptor}; labelTestDescriptor.SetSupplementalDataItem(supplKeyDataPublisherPubLabels, - " - key: kA\n value: vA\n kind: 2\n - key: kB\n value: vB\n " - "kind: 2\n - key: kC\n value: vC\n kind: 2 "); + R"( + - key: kA + value: vA + kind: 2 + - key: kB + value: vB + kind: 2 + - key: kC + value: vC + kind: 2 + )"); labelTestDescriptor.SetServiceId(2); testStore.ServiceChange(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor); @@ -347,7 +356,7 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues .Times(2); EXPECT_CALL(callbacks, ServiceDiscoveryHandler(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor)) - .Times(1); + .Times(2); std::vector optionalSubscriberLabels{ {"kA", "vA", SilKit::Services::MatchingLabel::Kind::Optional}, From e11910604bae0de3649ac1c8b225bec6f1cf1cd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20B=C3=B6rschig?= Date: Wed, 11 Feb 2026 12:22:27 +0100 Subject: [PATCH 6/7] fixup! fix unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix data races Signed-off-by: Marius Börschig --- .../IntegrationTests/ITest_Internals_DataPubSub.hpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp index e6778dbb0..9a4148801 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp @@ -178,6 +178,7 @@ class ITest_Internals_DataPubSub : public testing::Test , name{newName} , dataSubscribers{newDataSubscribers} , dataPublishers{newDataPublishers} + , allReceived{std::make_unique>(false)} { } @@ -196,7 +197,7 @@ class ITest_Internals_DataPubSub : public testing::Test std::promise allDiscoveredPromise; bool allDiscovered{false}; std::promise allReceivedPromise; - bool allReceived{false}; + std::unique_ptr> allReceived; // Pub std::promise allSentPromise; bool allSent{false}; @@ -208,7 +209,7 @@ class ITest_Internals_DataPubSub : public testing::Test if (std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { return dsInfo.numMsgToReceive == 0; })) { - allReceived = true; + *allReceived = true; allReceivedPromise.set_value(); } } @@ -224,11 +225,11 @@ class ITest_Internals_DataPubSub : public testing::Test void CheckAllReceivedPromise() { - if (!allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { + if (!*allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { return dsInfo.allReceived; })) { - allReceived = true; + *allReceived = true; allReceivedPromise.set_value(); } } @@ -380,7 +381,7 @@ class ITest_Internals_DataPubSub : public testing::Test participant.allSentPromise.set_value(); } - if (!participant.dataSubscribers.empty() && !participant.allReceived) + if (!participant.dataSubscribers.empty()) { participant.WaitForAllReceived(); } From 652ffa3d84a470e490737991ad36684802dd4028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20B=C3=B6rschig?= Date: Wed, 11 Feb 2026 12:52:20 +0100 Subject: [PATCH 7/7] fixup! fixup! fix unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reorder allReceived atomic bool Signed-off-by: Marius Börschig --- SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp index 9a4148801..1ff4f15ee 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp @@ -187,6 +187,7 @@ class ITest_Internals_DataPubSub : public testing::Test std::string name; std::vector dataSubscribers; std::vector dataPublishers; + std::unique_ptr> allReceived; std::unique_ptr participant; SilKit::Core::IParticipantInternal* participantImpl = nullptr; @@ -197,7 +198,6 @@ class ITest_Internals_DataPubSub : public testing::Test std::promise allDiscoveredPromise; bool allDiscovered{false}; std::promise allReceivedPromise; - std::unique_ptr> allReceived; // Pub std::promise allSentPromise; bool allSent{false};