-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathITest_MessageAggregation.cpp
More file actions
147 lines (122 loc) · 6.12 KB
/
ITest_MessageAggregation.cpp
File metadata and controls
147 lines (122 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// SPDX-FileCopyrightText: 2023 Vector Informatik GmbH
//
// SPDX-License-Identifier: MIT
#include <string>
#include <chrono>
#include <iostream>
#include "ITestFixture.hpp"
#include "gtest/gtest.h"
namespace {
using namespace SilKit::Tests;
using namespace SilKit::Config;
using namespace SilKit::Services;
using namespace SilKit::Services::PubSub;
using namespace std::chrono_literals;
struct ITest_MessageAggregation : ITest_SimTestHarness
{
using ITest_SimTestHarness::ITest_SimTestHarness;
};
TEST_F(ITest_MessageAggregation, receive_msg_after_lifecycle_has_been_stopped)
{
std::promise<void> recvMsgPromise;
auto recvMsgFuture = recvMsgPromise.get_future();
SetupFromParticipantList({"Publisher", "Subscriber"});
SilKit::Services::PubSub::PubSubSpec dataSpec{"someTopic", {}};
{
std::string participantName = "Publisher";
std::string participantConfig(
R"({"Experimental": {"TimeSynchronization": {"EnableMessageAggregation": "Auto"}}})");
auto&& simParticipant = _simTestHarness->GetParticipant(participantName, participantConfig);
auto&& participant = simParticipant->Participant();
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();
auto&& dataPublisher = participant->CreateDataPublisher("PubCtrl", dataSpec);
timeSyncService->SetSimulationStepHandler(
[dataPublisher, lifecycleService](std::chrono::nanoseconds /*now*/, std::chrono::nanoseconds /*duration*/) {
uint32_t messageSizeInBytes = 1;
std::vector<uint8_t> data(messageSizeInBytes, '*');
dataPublisher->Publish(std::move(data));
// stop lifecycle immediately (one message is sent and should be received by subscriber)
lifecycleService->Stop("Stop and check if message of current time step is transmitted.");
}, 1ms);
}
{
std::string participantName = "Subscriber";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();
/*auto&& lifecycleService =*/simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();
/*auto&& dataSubscriber =*/participant->CreateDataSubscriber(
"PubCtrl", dataSpec,
[&recvMsgPromise](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) {
recvMsgPromise.set_value();
});
timeSyncService->SetSimulationStepHandler(
[](std::chrono::nanoseconds /*now*/, std::chrono::nanoseconds /*duration*/) {}, 1ms);
}
auto ok = _simTestHarness->Run(5s);
ASSERT_TRUE(ok) << "SimTestHarness should terminate without timeout";
bool msgReceived = recvMsgFuture.wait_for(5s) == std::future_status::ready;
EXPECT_TRUE(msgReceived)
<< "Message of current time step has not been received (flush of aggregated messages has not been performed).";
}
TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_sim_step_handler)
{
SetupFromParticipantList({"Publisher", "Subscriber"});
SilKit::Services::PubSub::PubSubSpec dataSpecPing{"ping", {}};
SilKit::Services::PubSub::PubSubSpec dataSpecPong{"pong", {}};
std::atomic_bool msgReceived{false};
// participant with async simulation step handler & enabled message aggregation
{
std::string participantName = "Publisher";
std::string participantConfig(
R"({"Experimental": {"TimeSynchronization": {"EnableMessageAggregation": "On"}}})");
auto&& simParticipant = _simTestHarness->GetParticipant(participantName, participantConfig);
auto&& participant = simParticipant->Participant();
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();
auto&& dataPublisher = participant->CreateDataPublisher("Ctrl", dataSpecPing);
/*auto&& dataSubscriber =*/participant->CreateDataSubscriber(
"Ctrl", dataSpecPong,
[timeSyncService, &msgReceived](IDataSubscriber* /*subscriber*/,
const DataMessageEvent& /*dataMessageEvent*/) {
// unblock, if message from other participant is received
msgReceived = true;
timeSyncService->CompleteSimulationStep();
});
timeSyncService->SetSimulationStepHandlerAsync(
[dataPublisher, lifecycleService, &msgReceived](std::chrono::nanoseconds,
std::chrono::nanoseconds) {
// send ping
std::vector<uint8_t> ping(1, '?');
dataPublisher->Publish(std::move(ping));
if (msgReceived)
{
lifecycleService->Stop("One time step has been performed successfully.");
}
},
1s);
}
{
std::string participantName = "Subscriber";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();
/*auto&& lifecycleService =*/simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();
auto&& dataPublisher = participant->CreateDataPublisher("Ctrl", dataSpecPong);
/*auto&& dataSubscriber =*/participant->CreateDataSubscriber(
"Ctrl", dataSpecPing,
[dataPublisher](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) {
// send back pong
std::vector<uint8_t> pong(1, '!');
dataPublisher->Publish(std::move(pong));
});
timeSyncService->SetSimulationStepHandlerAsync(
[timeSyncService](std::chrono::nanoseconds, std::chrono::nanoseconds) {
timeSyncService->CompleteSimulationStep();
}, 1s);
}
auto ok = _simTestHarness->Run(5s);
ASSERT_TRUE(ok) << "SimTestHarness should terminate without timeout";
}
} // namespace