Skip to content

Commit b2f39e6

Browse files
authored
DPL: add metric for shm memory usage
DPL: add metric for shm memory usage
1 parent 465342e commit b2f39e6

File tree

4 files changed

+30
-14
lines changed

4 files changed

+30
-14
lines changed

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ struct DataProcessingStats {
3737
std::atomic<int> totalProcessedSize = 0;
3838
std::atomic<int> totalSigusr1 = 0;
3939
std::atomic<int> consumedTimeframes = 0;
40+
std::atomic<uint64_t> availableManagedShm = 0; /// Available shared memory in bytes.
4041

4142
std::atomic<uint64_t> lastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
4243
std::atomic<uint64_t> lastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics

Framework/Core/include/Framework/RunningWorkflowInfo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct RunningDeviceRef {
2323

2424
/// Information about the running workflow
2525
struct RunningWorkflowInfo {
26+
std::string uniqueWorkflowId = "";
27+
int16_t shmSegmentId;
2628
std::vector<DeviceSpec> devices;
2729
};
2830

Framework/Core/src/CommonServices.cxx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "InputRouteHelpers.h"
2828
#include "Framework/EndOfStreamContext.h"
2929
#include "Framework/RawDeviceService.h"
30+
#include "Framework/RunningWorkflowInfo.h"
3031
#include "Framework/Tracing.h"
3132
#include "Framework/Monitoring.h"
3233
#include "TextDriverClient.h"
@@ -43,6 +44,7 @@
4344
#include <InfoLogger/InfoLogger.hxx>
4445

4546
#include <FairMQDevice.h>
47+
#include <fairmq/shmem/Monitor.h>
4648
#include <options/FairMQProgOptions.h>
4749

4850
#include <cstdlib>
@@ -438,6 +440,20 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -
438440
if (timeSinceLastUpdate < 5000) {
439441
return;
440442
}
443+
// Derive the amount of shared memory used
444+
auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
445+
using namespace fair::mq::shmem;
446+
auto& spec = registry.get<DeviceSpec const>();
447+
448+
// FIXME: Ugly, but we do it only every 5 seconds...
449+
if (spec.name == "readout-proxy") {
450+
auto device = registry.get<RawDeviceService>().device();
451+
try {
452+
stats.availableManagedShm.store(Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId));
453+
} catch (...) {
454+
}
455+
}
456+
441457
auto performedComputationsSinceLastUpdate = stats.performedComputations - stats.lastReportedPerformedComputations;
442458

443459
ZoneScopedN("send metrics");
@@ -473,6 +489,10 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -
473489
.addTag(Key::Subsystem, Value::DPL));
474490
monitoring.send(Metric{((float)performedComputationsSinceLastUpdate / (float)timeSinceLastUpdate) * 1000, "processing_rate_hz"}.addTag(Key::Subsystem, Value::DPL));
475491

492+
if (stats.availableManagedShm) {
493+
monitoring.send(Metric{(uint64_t)stats.availableManagedShm, fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId)}.addTag(Key::Subsystem, Value::DPL));
494+
}
495+
476496
if (stats.consumedTimeframes) {
477497
monitoring.send(Metric{stats.consumedTimeframes, "consumed-timeframes"}.addTag(Key::Subsystem, Value::DPL));
478498
}

Framework/Core/src/runDataProcessing.cxx

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@
7272

7373
#include "FairMQDevice.h"
7474
#include <fairmq/DeviceRunner.h>
75-
#if __has_include(<fairmq/shmem/Monitor.h>)
7675
#include <fairmq/shmem/Monitor.h>
77-
#endif
7876
#include "options/FairMQProgOptions.h"
7977

8078
#include <boost/program_options.hpp>
@@ -115,6 +113,9 @@
115113
#include <sys/wait.h>
116114
#include <unistd.h>
117115
#include <execinfo.h>
116+
// This is to allow C++20 aggregate initialisation
117+
#pragma GCC diagnostic push
118+
#pragma GCC diagnostic ignored "-Wpedantic"
118119
#if defined(__linux__) && __has_include(<sched.h>)
119120
#include <sched.h>
120121
#elif __has_include(<linux/getcpu.h>)
@@ -319,19 +320,8 @@ static void handle_sigint(int)
319320
/// Helper to invoke shared memory cleanup
320321
void cleanupSHM(std::string const& uniqueWorkflowId)
321322
{
322-
#if __has_include(<fairmq/shmem/Monitor.h>)
323323
using namespace fair::mq::shmem;
324324
Monitor::Cleanup(SessionId{"dpl_" + uniqueWorkflowId}, false);
325-
#else
326-
// Old code, invoking external fairmq-shmmonitor
327-
auto shmCleanup = fmt::format("fairmq-shmmonitor --cleanup -s dpl_{} 2>&1 >/dev/null", uniqueWorkflowId);
328-
LOG(debug)
329-
<< "Cleaning up shm memory session with " << shmCleanup;
330-
auto result = system(shmCleanup.c_str());
331-
if (result != 0) {
332-
LOG(error) << "Unable to cleanup shared memory, run " << shmCleanup << "by hand to fix";
333-
}
334-
#endif
335325
}
336326

337327
static void handle_sigchld(int) { sigchld_requested = true; }
@@ -1162,7 +1152,9 @@ int runStateMachine(DataProcessorSpecs const& workflow,
11621152
boost::program_options::variables_map& varmap,
11631153
std::string frameworkId)
11641154
{
1165-
RunningWorkflowInfo runningWorkflow;
1155+
RunningWorkflowInfo runningWorkflow{
1156+
.uniqueWorkflowId = driverInfo.uniqueWorkflowId,
1157+
.shmSegmentId = (int16_t)atoi(varmap["shm-segment-id"].as<std::string>().c_str())};
11661158
DeviceInfos infos;
11671159
DeviceControls controls;
11681160
DevicesManager* devicesManager = new DevicesManager{controls, infos, runningWorkflow.devices};
@@ -2573,3 +2565,4 @@ void doBoostException(boost::exception& e, char const* processName)
25732565
LOGP(ERROR, "error while setting up workflow in {}: {}",
25742566
processName, boost::current_exception_diagnostic_information(true));
25752567
}
2568+
#pragma GCC diagnostic push

0 commit comments

Comments
 (0)