Skip to content

Commit b06bb94

Browse files
committed
DPL: refactor Control parsing helpers
1 parent c67c677 commit b06bb94

File tree

6 files changed

+100
-45
lines changed

6 files changed

+100
-45
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ o2_add_library(Framework
3232
src/ComputingResourceHelpers.cxx
3333
src/ConfigContext.cxx
3434
src/ControlService.cxx
35+
src/ControlServiceHelpers.cxx
3536
src/DispatchPolicy.cxx
3637
src/ConfigParamStore.cxx
3738
src/ConfigParamsHelper.cxx

Framework/Core/include/Framework/ControlService.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
#define O2_FRAMEWORK_CONTROLSERVICE_H_
1212

1313
#include "Framework/ServiceHandle.h"
14-
#include <regex>
1514
#include <mutex>
1615

1716
namespace o2::framework
@@ -58,7 +57,5 @@ class ControlService
5857
std::mutex mMutex;
5958
};
6059

61-
bool parseControl(std::string const& s, std::smatch& match);
62-
6360
} // namespace o2::framework
6461
#endif // O2_FRAMEWORK_CONTROLSERVICE_H_

Framework/Core/src/ControlService.cxx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,4 @@ void ControlService::notifyStreamingState(StreamingState state)
7575
}
7676
}
7777

78-
bool parseControl(std::string const& s, std::smatch& match)
79-
{
80-
char const* action = strstr(s.data(), "CONTROL_ACTION:");
81-
if (action == nullptr) {
82-
return false;
83-
}
84-
const static std::regex controlRE1(".*CONTROL_ACTION: READY_TO_(QUIT)_(ME|ALL)", std::regex::optimize);
85-
const static std::regex controlRE2(".*CONTROL_ACTION: (NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)", std::regex::optimize);
86-
return std::regex_search(s, match, controlRE1) || std::regex_search(s, match, controlRE2);
87-
}
88-
8978
} // namespace o2::framework
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "ControlServiceHelpers.h"
11+
#include "Framework/RawDeviceService.h"
12+
#include "Framework/Logger.h"
13+
#include "Framework/DeviceInfo.h"
14+
#include <string>
15+
#include <string_view>
16+
#include <regex>
17+
#include <iostream>
18+
19+
namespace o2::framework
20+
{
21+
22+
bool ControlServiceHelpers::parseControl(std::string const& s, std::smatch& match)
23+
{
24+
char const* action = strstr(s.data(), "CONTROL_ACTION:");
25+
if (action == nullptr) {
26+
return false;
27+
}
28+
const static std::regex controlRE1(".*CONTROL_ACTION: READY_TO_(QUIT)_(ME|ALL)", std::regex::optimize);
29+
const static std::regex controlRE2(".*CONTROL_ACTION: (NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)", std::regex::optimize);
30+
return std::regex_search(s, match, controlRE1) || std::regex_search(s, match, controlRE2);
31+
}
32+
33+
void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
34+
pid_t pid,
35+
std::string const& command,
36+
std::string const& arg)
37+
{
38+
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, int pid, auto lambda) {
39+
for (auto& deviceInfo : infos) {
40+
if (deviceInfo.pid == pid) {
41+
lambda(deviceInfo);
42+
break;
43+
}
44+
}
45+
};
46+
LOGP(info, "Found control command {} from pid {} with argument {}.", command, pid, arg);
47+
if (command == "QUIT" && arg == "ALL") {
48+
for (auto& deviceInfo : infos) {
49+
deviceInfo.readyToQuit = true;
50+
}
51+
} else if (command == "QUIT" && arg == "ME") {
52+
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; });
53+
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") {
54+
// FIXME: this should really be a policy...
55+
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; });
56+
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") {
57+
// FIXME: this should really be a policy...
58+
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; });
59+
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
60+
// FIXME: this should really be a policy...
61+
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
62+
}
63+
};
64+
65+
} // namespace o2::framework
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef O2_FRAMEWORK_CONTROLSERVICEHELPERS_H_
11+
#define O2_FRAMEWORK_CONTROLSERVICEHELPERS_H_
12+
13+
#include "Framework/DeviceInfo.h"
14+
15+
#include <unistd.h>
16+
#include <vector>
17+
#include <string>
18+
#include <regex>
19+
20+
namespace o2::framework
21+
{
22+
struct ControlServiceHelpers {
23+
static bool parseControl(std::string const& s, std::smatch& match);
24+
static void processCommand(std::vector<DeviceInfo>& infos,
25+
pid_t pid,
26+
std::string const& command,
27+
std::string const& arg);
28+
};
29+
30+
} // namespace o2::framework
31+
#endif // O2_FRAMEWORK_CONTROLSERVICEHELPERS_H_

Framework/Core/src/runDataProcessing.cxx

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "Framework/DataProcessorInfo.h"
4040
#include "Framework/DriverInfo.h"
4141
#include "Framework/DriverControl.h"
42+
#include "ControlServiceHelpers.h"
4243

4344
#include "ComputingResourceHelpers.h"
4445
#include "DataProcessingStatus.h"
@@ -578,35 +579,6 @@ struct LogProcessingState {
578579
bool hasNewMetric = false;
579580
};
580581

581-
void processCommand(DeviceInfos& infos, pid_t pid, std::string const& command, std::string const& arg)
582-
{
583-
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
584-
for (auto& deviceInfo : infos) {
585-
if (deviceInfo.pid == pid) {
586-
lambda(deviceInfo);
587-
break;
588-
}
589-
}
590-
};
591-
LOGP(debug, "Found control command {} from pid {} with argument {}.", command, pid, arg);
592-
if (command == "QUIT" && arg == "ALL") {
593-
for (auto& deviceInfo : infos) {
594-
deviceInfo.readyToQuit = true;
595-
}
596-
} else if (command == "QUIT" && arg == "ME") {
597-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; });
598-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") {
599-
// FIXME: this should really be a policy...
600-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; });
601-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") {
602-
// FIXME: this should really be a policy...
603-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; });
604-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
605-
// FIXME: this should really be a policy...
606-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
607-
}
608-
};
609-
610582
LogProcessingState processChildrenOutput(DriverInfo& driverInfo,
611583
DeviceInfos& infos,
612584
DeviceSpecs const& specs,
@@ -671,8 +643,8 @@ LogProcessingState processChildrenOutput(DriverInfo& driverInfo,
671643
// the DataRelayer view.
672644
DeviceMetricsHelper::processMetric(metricMatch, metrics, newMetricCallback);
673645
result.didProcessMetric = true;
674-
} else if (logLevel == LogParsingHelpers::LogLevel::Info && parseControl(token, match)) {
675-
processCommand(infos, info.pid, match[1].str(), match[2].str());
646+
} else if (logLevel == LogParsingHelpers::LogLevel::Info && ControlServiceHelpers::parseControl(token, match)) {
647+
ControlServiceHelpers::processCommand(infos, info.pid, match[1].str(), match[2].str());
676648
result.didProcessControl = true;
677649
} else if (logLevel == LogParsingHelpers::LogLevel::Info && DeviceConfigHelper::parseConfig(token, configMatch)) {
678650
DeviceConfigHelper::processConfig(configMatch, info);

0 commit comments

Comments
 (0)