Skip to content

Commit 58ca8d8

Browse files
committed
DPL: enable WebSocket based DriverClient
1 parent e5557e4 commit 58ca8d8

15 files changed

+369
-74
lines changed

Framework/Core/include/Framework/DeviceState.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ struct DeviceState {
4141
StreamingState streaming = StreamingState::Streaming;
4242
bool quitRequested = false;
4343
// The libuv event loop which serves this device.
44-
uv_loop_t* loop;
44+
uv_loop_t* loop = nullptr;
4545
// The list of active timers which notify this device.
4646
std::vector<uv_timer_t*> activeTimers;
4747
// The list of pollers for active input channels

Framework/Core/include/Framework/DriverClient.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#define O2_FRAMEWORK_DRIVERCLIENT_H_
1212

1313
#include "Framework/ServiceHandle.h"
14+
#include <string>
1415
#include <functional>
1516

1617
namespace o2::framework
@@ -23,7 +24,15 @@ class DriverClient
2324
constexpr static ServiceKind service_kind = ServiceKind::Global;
2425

2526
/// Report some message to the Driver
26-
virtual void tell(const char* msg) = 0;
27+
/// @a msg the message to be sent.
28+
/// @a size size of the message to be sent.
29+
/// @a flush whether the message should be flushed immediately,
30+
/// if possible.
31+
virtual void tell(char const* msg, size_t s, bool flush = true) = 0;
32+
void tell(std::string_view const& msg, bool flush = true)
33+
{
34+
tell(msg.data(), msg.size(), flush);
35+
};
2736

2837
/// Act on some @a event notified by the driver
2938
virtual void observe(const char* event, std::function<void(char const*)> callback) = 0;

Framework/Core/include/Framework/DriverInfo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ struct DriverInfo {
137137
std::string uniqueWorkflowId = "";
138138
/// Metrics gathering interval
139139
unsigned short resourcesMonitoringInterval;
140+
/// Port used by the websocket control. 0 means not initialised.
141+
unsigned short port = 0;
140142
/// Last port used for tracy
141143
short tracyPort = 8086;
142144
/// Aggregate metrics calculated in the driver itself

Framework/Core/src/CommonServices.cxx

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
#include "Framework/Tracing.h"
2626
#include "Framework/Monitoring.h"
2727
#include "TextDriverClient.h"
28+
#include "WSDriverClient.h"
29+
#include "HTTPParser.h"
2830
#include "../src/DataProcessingStatus.h"
31+
#include "DPLMonitoringBackend.h"
2932

3033
#include <Configuration/ConfigurationInterface.h>
3134
#include <Configuration/ConfigurationFactory.h>
@@ -35,6 +38,7 @@
3538
#include <options/FairMQProgOptions.h>
3639

3740
#include <cstdlib>
41+
#include <cstring>
3842

3943
using AliceO2::InfoLogger::InfoLogger;
4044
using AliceO2::InfoLogger::InfoLoggerContext;
@@ -58,8 +62,19 @@ struct ServiceKindExtractor<InfoLoggerContext> {
5862
o2::framework::ServiceSpec CommonServices::monitoringSpec()
5963
{
6064
return ServiceSpec{"monitoring",
61-
[](ServiceRegistry&, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
62-
void* service = MonitoringFactory::Get(options.GetPropertyAsString("monitoring-backend")).release();
65+
[](ServiceRegistry& registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
66+
void* service = nullptr;
67+
bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
68+
bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
69+
bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
70+
if (useDPL) {
71+
auto monitoring = new Monitoring();
72+
monitoring->addBackend(std::make_unique<DPLMonitoringBackend>(registry));
73+
service = monitoring;
74+
} else {
75+
auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
76+
service = MonitoringFactory::Get(backend).release();
77+
}
6378
return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
6479
},
6580
noConfiguration(),
@@ -223,8 +238,14 @@ o2::framework::ServiceSpec CommonServices::driverClientSpec()
223238
return ServiceSpec{
224239
"driverClient",
225240
[](ServiceRegistry& services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
241+
auto backend = options.GetPropertyAsString("driver-client-backend");
242+
if (backend == "stdout://") {
243+
return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
244+
new TextDriverClient(services, state)};
245+
}
246+
auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
226247
return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
227-
new TextDriverClient(services, state)};
248+
new WSDriverClient(services, state, ip.c_str(), port)};
228249
},
229250
noConfiguration(),
230251
nullptr,

Framework/Core/src/DPLMonitoringBackend.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void DPLMonitoringBackend::send(o2::monitoring::Metric const& metric)
7676
mStream << o2::monitoring::tags::TAG_KEY[key] << "=" << o2::monitoring::tags::GetValue(value);
7777
}
7878
mStream << '\n';
79-
mRegistry.get<framework::DriverClient>().tell(mStream.str().c_str());
79+
mRegistry.get<framework::DriverClient>().tell(mStream.str());
8080
}
8181

8282
} // namespace o2::framework

Framework/Core/src/DPLWebSocket.cxx

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ void websocket_server_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_
6464
}
6565
try {
6666
parse_http_request(buf->base, nread, server);
67+
free(buf->base);
6768
} catch (RuntimeErrorRef& ref) {
6869
auto& err = o2::framework::error_from_ref(ref);
6970
LOG(ERROR) << "Error while parsing request: " << err.what;
@@ -270,6 +271,22 @@ void ws_client_write_callback(uv_write_t* h, int status)
270271
}
271272
}
272273

274+
void ws_client_bulk_write_callback(uv_write_t* h, int status)
275+
{
276+
if (status) {
277+
LOG(ERROR) << "uv_write error: " << uv_err_name(status);
278+
free(h);
279+
return;
280+
}
281+
std::vector<uv_buf_t>* buffers = (std::vector<uv_buf_t>*)h->data;
282+
if (buffers) {
283+
for (auto& b : *buffers) {
284+
free(b.base);
285+
}
286+
}
287+
delete buffers;
288+
}
289+
273290
/// Helper to return an error
274291
void WSDPLClient::write(char const* message, size_t s)
275292
{
@@ -281,11 +298,14 @@ void WSDPLClient::write(char const* message, size_t s)
281298

282299
void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
283300
{
284-
for (auto& msg : outputs) {
285-
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
286-
write_req->data = msg.base;
287-
uv_write(write_req, (uv_stream_t*)mStream, &msg, 1, ws_client_write_callback);
301+
if (outputs.empty()) {
302+
return;
288303
}
304+
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
305+
std::vector<uv_buf_t>* buffers = new std::vector<uv_buf_t>;
306+
buffers->swap(outputs);
307+
write_req->data = buffers;
308+
uv_write(write_req, (uv_stream_t*)mStream, &buffers->at(0), buffers->size(), ws_client_bulk_write_callback);
289309
}
290310

291311
} // namespace o2::framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "Framework/SourceInfoHeader.h"
3434
#include "Framework/Logger.h"
3535
#include "Framework/Monitoring.h"
36+
#include "Framework/DriverClient.h"
3637
#include "PropertyTreeHelpers.h"
3738
#include "DataProcessingStatus.h"
3839
#include "DataProcessingHelpers.h"
@@ -237,7 +238,8 @@ void DataProcessingDevice::Init()
237238
} else {
238239
str = entry.second.get_value<std::string>();
239240
}
240-
LOG(INFO) << "[CONFIG] " << entry.first << "=" << str << " 1 " << configStore->provenance(entry.first.c_str());
241+
std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
242+
mServiceRegistry.get<DriverClient>().tell(configString.c_str());
241243
}
242244

243245
mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,6 +1216,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
12161216
("post-fork-command", bpo::value<std::string>(), "post fork command to execute (e.g. numactl {pid}") //
12171217
("session", bpo::value<std::string>(), "unique label for the shared memory session") //
12181218
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
1219+
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
12191220
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
12201221
("infologger-mode", bpo::value<std::string>(), "INFOLOGGER_MODE override") //
12211222
("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //

Framework/Core/src/TextDriverClient.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ TextDriverClient::TextDriverClient(ServiceRegistry& registry, DeviceState& devic
1717
{
1818
}
1919

20-
void TextDriverClient::tell(const char* msg)
20+
void TextDriverClient::tell(const char* msg, size_t s, bool flush)
2121
{
22-
LOG(INFO) << msg;
22+
LOG(INFO) << std::string_view{msg, s};
2323
}
2424

2525
void TextDriverClient::flushPending()

Framework/Core/src/TextDriverClient.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ class TextDriverClient : public DriverClient
2828

2929
/// The text based client simply sends a message on stdout which is
3030
/// (potentially) captured by the driver.
31-
void tell(const char* msg) override;
31+
void tell(char const* msg, size_t s, bool flush = true) final;
3232
/// Half duplex communication
33-
void observe(const char* event, std::function<void(char const*)> callback) override{};
34-
void flushPending() override;
33+
void observe(const char* event, std::function<void(char const*)> callback) final{};
34+
void flushPending() final;
3535
};
3636

3737
} // namespace o2::framework

0 commit comments

Comments
 (0)