diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp index 641838918..16408053e 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp @@ -805,4 +805,83 @@ TEST_F(ITest_Internals_DataPubSub, test_1pub_1sub_async_rejoin) ShutdownSystem(); } + +// Two publishers (optional label1), one subscriber (optional label1, label2); publishers start first; subscriber joins +TEST_F(ITest_Internals_DataPubSub, test_2pub_1sub_async_starting_order) +{ + const uint32_t numMsgToPublish = 1; + const uint32_t numMsgToReceive = 1 * numMsgToPublish; + + std::vector publishers; + publishers.push_back({"Pub1", + {{"PubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}}, + 1, + defaultMsgSize, + numMsgToPublish}}, + {}}); + publishers.push_back({"Pub2", + {{"PubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}}, + 1, + defaultMsgSize, + numMsgToPublish}}, + {}}); + + std::vector subscribers; + std::vector> expectedDataUnordered; + expectedDataUnordered.reserve(numMsgToReceive); + for (uint32_t d = 0; d < numMsgToReceive; d++) + { + // Receive the same blob several times (once from every publisher) + expectedDataUnordered.emplace_back(std::vector(defaultMsgSize, 0)); + } + subscribers.push_back( + {"Sub1", + {}, + {{ + "SubCtrl1", + "TopicA", + {"A"}, + {{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}, + {"K2", "V2", SilKit::Services::MatchingLabel::Kind::Optional} + }, // BUGHUNT: Second label breaks communication + defaultMsgSize, + numMsgToReceive, + 1, + expectedDataUnordered, + }}}); + + for (auto& sub : subscribers) + { + sub.communicationTimeout = std::chrono::milliseconds(1000); + } + + _testSystem.SetupRegistryAndSystemMaster("silkit://localhost:0", false, {}); + + + //BUGHUNT: Subscribers start first fails SOMETIMES + //RunParticipants(subscribers, _testSystem.GetRegistryUri(), false); + + //BUGHUNT: Publishers start first fails ALWAYS + + // Start publishers + RunParticipants(publishers, _testSystem.GetRegistryUri(), false); + for (auto& p : publishers) + { + p.WaitForAllSent(); + } + + // Start subscriber + RunParticipants(subscribers, _testSystem.GetRegistryUri(), false); + + + JoinPubSubThreads(); + ShutdownSystem(); +} + } // anonymous namespace diff --git a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp index 107cc9efe..1ff4f15ee 100644 --- a/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp +++ b/SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp @@ -178,6 +178,7 @@ class ITest_Internals_DataPubSub : public testing::Test , name{newName} , dataSubscribers{newDataSubscribers} , dataPublishers{newDataPublishers} + , allReceived{std::make_unique>(false)} { } @@ -186,6 +187,7 @@ class ITest_Internals_DataPubSub : public testing::Test std::string name; std::vector dataSubscribers; std::vector dataPublishers; + std::unique_ptr> allReceived; std::unique_ptr participant; SilKit::Core::IParticipantInternal* participantImpl = nullptr; @@ -196,7 +198,6 @@ class ITest_Internals_DataPubSub : public testing::Test std::promise allDiscoveredPromise; bool allDiscovered{false}; std::promise allReceivedPromise; - bool allReceived{false}; // Pub std::promise allSentPromise; bool allSent{false}; @@ -208,7 +209,7 @@ class ITest_Internals_DataPubSub : public testing::Test if (std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { return dsInfo.numMsgToReceive == 0; })) { - allReceived = true; + *allReceived = true; allReceivedPromise.set_value(); } } @@ -224,11 +225,11 @@ class ITest_Internals_DataPubSub : public testing::Test void CheckAllReceivedPromise() { - if (!allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { + if (!*allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) { return dsInfo.allReceived; })) { - allReceived = true; + *allReceived = true; allReceivedPromise.set_value(); } } diff --git a/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp b/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp index d38f77ae2..1a2d12135 100644 --- a/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp +++ b/SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp @@ -25,7 +25,10 @@ class TestInfrastructure { std::stringstream ss; ss << "Something went wrong: " << error.what() << std::endl; - _systemMaster.systemController->AbortSimulation(); + if (_systemMaster.systemController) + { + _systemMaster.systemController->AbortSimulation(); + } FAIL() << ss.str(); } @@ -127,7 +130,7 @@ class TestInfrastructure struct SystemMaster { std::unique_ptr participant; - SilKit::Experimental::Services::Orchestration::ISystemController* systemController; + SilKit::Experimental::Services::Orchestration::ISystemController* systemController{nullptr}; ISystemMonitor* systemMonitor; ILifecycleService* lifecycleService; diff --git a/SilKit/source/core/service/SpecificDiscoveryStore.cpp b/SilKit/source/core/service/SpecificDiscoveryStore.cpp index ed7a76bcd..6004fd258 100644 --- a/SilKit/source/core/service/SpecificDiscoveryStore.cpp +++ b/SilKit/source/core/service/SpecificDiscoveryStore.cpp @@ -251,7 +251,7 @@ auto SpecificDiscoveryStore::GetLabelWithMinimalNodeSet(DiscoveryKeyNode& keyNod auto& not_label_nodes = keyNode.notLabelMap[l.key].nodes; size_t relevantNodeCount = fit_nodes.size() + not_label_nodes.size(); - if (relevantNodeCount < matchCount) + if (relevantNodeCount > 0) { matchCount = relevantNodeCount; outGreedyLabel = &l; diff --git a/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp b/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp index c567d864d..060890aa0 100644 --- a/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp +++ b/SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp @@ -336,8 +336,17 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues ServiceDescriptor labelTestDescriptor{baseDescriptor}; labelTestDescriptor.SetSupplementalDataItem(supplKeyDataPublisherPubLabels, - " - key: kA\n value: vA\n kind: 2\n - key: kB\n value: vB\n " - "kind: 2\n - key: kC\n value: vC\n kind: 2 "); + R"( + - key: kA + value: vA + kind: 2 + - key: kB + value: vB + kind: 2 + - key: kC + value: vC + kind: 2 + )"); labelTestDescriptor.SetServiceId(2); testStore.ServiceChange(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor); @@ -347,7 +356,7 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues .Times(2); EXPECT_CALL(callbacks, ServiceDiscoveryHandler(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor)) - .Times(1); + .Times(2); std::vector optionalSubscriberLabels{ {"kA", "vA", SilKit::Services::MatchingLabel::Kind::Optional},