Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,4 +805,83 @@ TEST_F(ITest_Internals_DataPubSub, test_1pub_1sub_async_rejoin)
ShutdownSystem();
}


// Two publishers (optional label1), one subscriber (optional label1, label2); publishers start first; subscriber joins
TEST_F(ITest_Internals_DataPubSub, test_2pub_1sub_async_starting_order)
{
const uint32_t numMsgToPublish = 1;
const uint32_t numMsgToReceive = 1 * numMsgToPublish;

std::vector<PubSubParticipant> publishers;
publishers.push_back({"Pub1",
{{"PubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
1,
defaultMsgSize,
numMsgToPublish}},
{}});
publishers.push_back({"Pub2",
{{"PubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
1,
defaultMsgSize,
numMsgToPublish}},
{}});

std::vector<PubSubParticipant> subscribers;
std::vector<std::vector<uint8_t>> expectedDataUnordered;
expectedDataUnordered.reserve(numMsgToReceive);
for (uint32_t d = 0; d < numMsgToReceive; d++)
{
// Receive the same blob several times (once from every publisher)
expectedDataUnordered.emplace_back(std::vector<uint8_t>(defaultMsgSize, 0));
}
subscribers.push_back(
{"Sub1",
{},
{{
"SubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional},
{"K2", "V2", SilKit::Services::MatchingLabel::Kind::Optional}
}, // BUGHUNT: Second label breaks communication
defaultMsgSize,
numMsgToReceive,
1,
expectedDataUnordered,
}}});

for (auto& sub : subscribers)
{
sub.communicationTimeout = std::chrono::milliseconds(1000);
}

_testSystem.SetupRegistryAndSystemMaster("silkit://localhost:0", false, {});


//BUGHUNT: Subscribers start first fails SOMETIMES
//RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);

//BUGHUNT: Publishers start first fails ALWAYS

// Start publishers
RunParticipants(publishers, _testSystem.GetRegistryUri(), false);
for (auto& p : publishers)
{
p.WaitForAllSent();
}

// Start subscriber
RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);


JoinPubSubThreads();
ShutdownSystem();
}

} // anonymous namespace
9 changes: 5 additions & 4 deletions SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ITest_Internals_DataPubSub : public testing::Test
, name{newName}
, dataSubscribers{newDataSubscribers}
, dataPublishers{newDataPublishers}
, allReceived{std::make_unique<std::atomic<bool>>(false)}
{
}

Expand All @@ -186,6 +187,7 @@ class ITest_Internals_DataPubSub : public testing::Test
std::string name;
std::vector<DataSubscriberInfo> dataSubscribers;
std::vector<DataPublisherInfo> dataPublishers;
std::unique_ptr<std::atomic<bool>> allReceived;
std::unique_ptr<SilKit::IParticipant> participant;
SilKit::Core::IParticipantInternal* participantImpl = nullptr;

Expand All @@ -196,7 +198,6 @@ class ITest_Internals_DataPubSub : public testing::Test
std::promise<void> allDiscoveredPromise;
bool allDiscovered{false};
std::promise<void> allReceivedPromise;
bool allReceived{false};
// Pub
std::promise<void> allSentPromise;
bool allSent{false};
Expand All @@ -208,7 +209,7 @@ class ITest_Internals_DataPubSub : public testing::Test
if (std::all_of(dataSubscribers.begin(), dataSubscribers.end(),
[](const auto& dsInfo) { return dsInfo.numMsgToReceive == 0; }))
{
allReceived = true;
*allReceived = true;
allReceivedPromise.set_value();
}
}
Expand All @@ -224,11 +225,11 @@ class ITest_Internals_DataPubSub : public testing::Test

void CheckAllReceivedPromise()
{
if (!allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
if (!*allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
return dsInfo.allReceived;
}))
{
allReceived = true;
*allReceived = true;
allReceivedPromise.set_value();
}
}
Expand Down
7 changes: 5 additions & 2 deletions SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class TestInfrastructure
{
std::stringstream ss;
ss << "Something went wrong: " << error.what() << std::endl;
_systemMaster.systemController->AbortSimulation();
if (_systemMaster.systemController)
{
_systemMaster.systemController->AbortSimulation();
}
FAIL() << ss.str();
}

Expand Down Expand Up @@ -127,7 +130,7 @@ class TestInfrastructure
struct SystemMaster
{
std::unique_ptr<IParticipant> participant;
SilKit::Experimental::Services::Orchestration::ISystemController* systemController;
SilKit::Experimental::Services::Orchestration::ISystemController* systemController{nullptr};
ISystemMonitor* systemMonitor;
ILifecycleService* lifecycleService;

Expand Down
2 changes: 1 addition & 1 deletion SilKit/source/core/service/SpecificDiscoveryStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ auto SpecificDiscoveryStore::GetLabelWithMinimalNodeSet(DiscoveryKeyNode& keyNod
auto& not_label_nodes = keyNode.notLabelMap[l.key].nodes;

size_t relevantNodeCount = fit_nodes.size() + not_label_nodes.size();
if (relevantNodeCount < matchCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this just matches the last label to have any nodes in the cluster instead of the one with the least nodes?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the matchCount condition and > 0

Copy link

@KonradBreitsprecher KonradBreitsprecher Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the matchCount condition and > 0. The bug was that the outgreedylabel that was found here in the specific label setup (reproduced by the test) did not contain any handers (of the subscriber) and thus the follow up logic to finish the pubsub connection never happened.

Maybe the dict here should never have been populated to get into this situation, at least the >0 prevents the bug.

Also, for "symmetry reasons" there might be the same situation for mandatory Labels a few lines above.

To stir it up a little more, we might need at least one person who proudly says "I understand what's happening here" otherwise we have a black box algorithm in a central unit that was "introduced for performance reasons by a former colleague". If there is no xkcd we should make one...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the matchCount condition and > 0. The bug was that the outgreedylabel that was found here in the specific label setup (reproduced by the test) did not contain any handers (of the subscriber) and thus the follow up logic to finish the pubsub connection never happened.

Maybe the dict here should never have been populated to get into this situation, at least the >0 prevents the bug.

Also, for "symmetry reasons" there might be the same situation for mandatory Labels a few lines above.

To stir it up a little more, we might need at least one person who proudly says "I understand what's happening here" otherwise we have a black box algorithm in a central unit that was "introduced for performance reasons by a former colleague". If there is no xkcd we should make one...

I fully agree - I think i'll try to make a clean room implementation of the label matching code next week.
This PR is no show stopper for the upcoming 5.0.3 release, though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great. You can use https://github.com/vectorgrp/sil-kit/blob/main/SilKit%2FIntegrationTests%2FFTest_PubSubPerf.cpp to check performance degradation. I think there's a commented test set for production that runs a while but pushes the pubsub code to it's limits..

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great. You can use https://github.com/vectorgrp/sil-kit/blob/main/SilKit%2FIntegrationTests%2FFTest_PubSubPerf.cpp to check performance degradation. I think there's a commented test set for production that runs a while but pushes the pubsub code to it's limits..

if (relevantNodeCount > 0)
{
matchCount = relevantNodeCount;
outGreedyLabel = &l;
Expand Down
15 changes: 12 additions & 3 deletions SilKit/source/core/service/Test_SpecificDiscoveryStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,17 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues

ServiceDescriptor labelTestDescriptor{baseDescriptor};
labelTestDescriptor.SetSupplementalDataItem(supplKeyDataPublisherPubLabels,
" - key: kA\n value: vA\n kind: 2\n - key: kB\n value: vB\n "
"kind: 2\n - key: kC\n value: vC\n kind: 2 ");
R"(
- key: kA
value: vA
kind: 2
- key: kB
value: vB
kind: 2
- key: kC
value: vC
kind: 2
)");
labelTestDescriptor.SetServiceId(2);

testStore.ServiceChange(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor);
Expand All @@ -347,7 +356,7 @@ TEST_F(Test_SpecificDiscoveryStore, lookup_service_discovery_then_handler_issues
.Times(2);

EXPECT_CALL(callbacks, ServiceDiscoveryHandler(ServiceDiscoveryEvent::Type::ServiceCreated, labelTestDescriptor))
.Times(1);
.Times(2);

std::vector<SilKit::Services::MatchingLabel> optionalSubscriberLabels{
{"kA", "vA", SilKit::Services::MatchingLabel::Kind::Optional},
Expand Down
Loading