Skip to content

Commit 6035df9

Browse files
committed
Merge remote-tracking branch 'origin/dev' into stable-sync
2 parents 4845f82 + 67889bb commit 6035df9

File tree

14 files changed

+1564
-53
lines changed

14 files changed

+1564
-53
lines changed

CCDB/src/CcdbApi.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1688,7 +1688,7 @@ void CcdbApi::vectoredLoadFileToMemory(std::vector<RequestContext>& requestConte
16881688
saveSnapshot(requestContext);
16891689
}
16901690
} else {
1691-
LOG(error) << "Did not receive content for " << requestContext.path << "\n";
1691+
LOG(warning) << "Did not receive content for " << requestContext.path << "\n"; // Temporarily demoted to warning, since it floods the infologger
16921692
}
16931693
}
16941694
}

Detectors/GlobalTracking/src/MatchTPCITS.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,9 @@ bool MatchTPCITS::prepareITSData()
633633
for (int irof = 0; irof < nROFs; irof++) {
634634
const auto& rofRec = mITSTrackROFRec[irof];
635635
long nBC = rofRec.getBCData().differenceInBC(mStartIR);
636-
if (nBC > maxBCs) {
636+
if (nBC > maxBCs || nBC < 0) {
637637
if (++errCount < MaxErrors2Report) {
638-
LOGP(alarm, "ITS ROF#{} start is not compatible with TF 1st orbit {} and TF length of {} HBFs",
638+
LOGP(alarm, "ITS ROF#{} start {} is not compatible with TF 1st orbit {} or TF length of {} HBFs",
639639
irof, rofRec.getBCData().asString(), mStartIR.asString(), nHBF);
640640
}
641641
break;

Detectors/ITSMFT/common/reconstruction/src/Clusterer.cxx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void Clusterer::process(int nThreads, PixelReader& reader, CompClusCont* compClu
3636
}
3737
auto autoDecode = reader.getDecodeNextAuto();
3838
int rofcount{0};
39+
o2::InteractionRecord lastIR{};
3940
do {
4041
if (autoDecode) {
4142
reader.setDecodeNextAuto(false); // internally do not autodecode
@@ -46,6 +47,15 @@ void Clusterer::process(int nThreads, PixelReader& reader, CompClusCont* compClu
4647
if (reader.getInteractionRecord().isDummy()) {
4748
continue; // No IR info was found
4849
}
50+
if (!lastIR.isDummy() && lastIR >= reader.getInteractionRecord()) {
51+
const int MaxErrLog = 2;
52+
static int errLocCount = 0;
53+
if (errLocCount++ < MaxErrLog) {
54+
LOGP(warn, "Impossible ROF IR {}, does not exceed previous {}, discarding in clusterization", reader.getInteractionRecord().asString(), lastIR.asString());
55+
}
56+
continue;
57+
}
58+
lastIR = reader.getInteractionRecord();
4959
// pre-fetch all non-empty chips of current ROF
5060
ChipPixelData* curChipData = nullptr;
5161
mFiredChipsPtr.clear();

Detectors/ITSMFT/common/workflow/src/STFDecoderSpec.cxx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,17 @@ void STFDecoder<Mapping>::run(ProcessingContext& pc)
155155
}
156156

157157
mDecoder->setDecodeNextAuto(false);
158+
o2::InteractionRecord lastIR{}, firstIR{0, pc.services().get<o2::framework::TimingInfo>().firstTForbit};
158159
while (mDecoder->decodeNextTrigger() >= 0) {
160+
if ((!lastIR.isDummy() && lastIR >= mDecoder->getInteractionRecord()) || firstIR > mDecoder->getInteractionRecord()) {
161+
const int MaxErrLog = 2;
162+
static int errLocCount = 0;
163+
if (errLocCount++ < MaxErrLog) {
164+
LOGP(warn, "Impossible ROF IR {}, previous was {}, TF 1st IR was {}, discarding in decoding", mDecoder->getInteractionRecord().asString(), lastIR.asString(), firstIR.asString());
165+
}
166+
continue;
167+
}
168+
lastIR = mDecoder->getInteractionRecord();
159169
if (mDoDigits || mClusterer->getMaxROFDepthToSquash()) { // call before clusterization, since the latter will hide the digits
160170
mDecoder->fillDecodedDigits(digVec, digROFVec); // lot of copying involved
161171
if (mDoCalibData) {

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,18 @@ class DataProcessingDevice : public fair::mq::Device
107107
std::unique_ptr<ConfigParamRegistry> mConfigRegistry;
108108
ServiceRegistry& mServiceRegistry;
109109

110-
uint64_t mLastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
111-
uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
112-
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
110+
uint64_t mLastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
111+
uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
112+
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
113113
std::vector<fair::mq::RegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
114114
std::mutex mRegionInfoMutex;
115-
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
116-
bool mWasActive = false; /// Whether or not the device was active at last iteration.
117-
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
118-
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
115+
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
116+
bool mWasActive = false; /// Whether or not the device was active at last iteration.
117+
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
118+
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
119119
/// Handle to wake up the main loop from other threads
120120
/// e.g. when FairMQ notifies some callback in an asynchronous way
121121
uv_async_t* mAwakeHandle = nullptr;
122-
int64_t mCleanupCount = -1;
123122
};
124123

125124
} // namespace o2::framework

Framework/Core/include/Framework/DeviceState.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <string>
2020
#include <map>
2121
#include <utility>
22+
#include <atomic>
2223

2324
typedef struct uv_loop_s uv_loop_t;
2425
typedef struct uv_timer_s uv_timer_t;
@@ -59,6 +60,7 @@ struct DeviceState {
5960
std::vector<InputChannelInfo> inputChannelInfos;
6061
StreamingState streaming = StreamingState::Streaming;
6162
bool quitRequested = false;
63+
std::atomic<int64_t> cleanupCount = -1;
6264

6365
/// ComputingQuotaOffers which have not yet been
6466
/// evaluated by the ComputingQuotaEvaluator

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,16 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
136136
mServiceRegistry{registry},
137137
mProcessingPolicies{policies}
138138
{
139-
GetConfig()->Subscribe<std::string>("dpl", [&cleanupCount = mCleanupCount, &registry = mServiceRegistry](const std::string& key, std::string value) {
139+
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
140140
if (key == "cleanup") {
141+
auto ref = ServiceRegistryRef{registry, ServiceRegistry::globalDeviceSalt()};
142+
auto& deviceState = ref.get<DeviceState>();
143+
int64_t cleanupCount = deviceState.cleanupCount.load();
141144
int64_t newCleanupCount = std::stoll(value);
142145
if (newCleanupCount <= cleanupCount) {
143146
return;
144147
}
145-
cleanupCount = newCleanupCount;
146-
auto ref = ServiceRegistryRef{registry, ServiceRegistry::globalDeviceSalt()};
147-
auto& deviceState = ref.get<DeviceState>();
148+
deviceState.cleanupCount.store(newCleanupCount);
148149
for (auto& info : deviceState.inputChannelInfos) {
149150
fair::mq::Parts parts;
150151
while (info.channel->Receive(parts, 0)) {
@@ -982,6 +983,10 @@ void DataProcessingDevice::InitTask()
982983
uv_signal_init(state.loop, deviceContext.sigusr1Handle);
983984
uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
984985
}
986+
// If there is any signal, we want to make sure they are active
987+
for (auto& handle : state.activeSignals) {
988+
handle->data = &state;
989+
}
985990
// When we start, we must make sure that we do listen to the signal
986991
deviceContext.sigusr1Handle->data = &mServiceRegistry;
987992

@@ -1691,6 +1696,11 @@ void DataProcessingDevice::ResetTask()
16911696
if (deviceContext.sigusr1Handle) {
16921697
deviceContext.sigusr1Handle->data = nullptr;
16931698
}
1699+
// Makes sure we do not have a working context on
1700+
// shutdown.
1701+
for (auto& handle : ref.get<DeviceState>().activeSignals) {
1702+
handle->data = nullptr;
1703+
}
16941704
}
16951705

16961706
struct WaitBackpressurePolicy {

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ void signal_callback(uv_signal_t* handle, int)
9090
{
9191
// We simply wake up the event loop. Nothing to be done here.
9292
auto* state = (DeviceState*)handle->data;
93+
if (!state) {
94+
return;
95+
}
9396
state->loopReason |= DeviceState::SIGNAL_ARRIVED;
9497
state->loopReason |= DeviceState::DATA_INCOMING;
9598
}

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -687,53 +687,25 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
687687
if (fair::mq::State{state} != fair::mq::State::Ready) {
688688
return;
689689
}
690-
// We keep track of whether or not all channels have seen a new state.
691-
std::vector<bool> lastNewStatePending(deviceState.inputChannelInfos.size(), false);
692690
uv_update_time(deviceState.loop);
693-
auto start = uv_now(deviceState.loop);
694-
695-
// Continue iterating until all channels have seen a new state.
696-
while (std::all_of(lastNewStatePending.begin(), lastNewStatePending.end(), [](bool b) { return b; }) != true) {
697-
if (uv_now(deviceState.loop) - start > 5000) {
698-
LOGP(info, "Timeout while draining messages, going to next state anyway.");
699-
break;
700-
}
691+
bool doDrain = true;
692+
// Cleanup count is set by the cleanup property of the device.
693+
// It is incremented every time the device is cleaned up.
694+
// We use it to detect when the device is cleaned up.
695+
int64_t cleanupCount = deviceState.cleanupCount.load();
696+
697+
// Continue iterating we saw the cleanup property being reset or
698+
// the device state changing.
699+
while (doDrain) {
700+
doDrain = device->NewStatePending() || deviceState.cleanupCount == cleanupCount;
701701
fair::mq::Parts parts;
702702
for (size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
703703
auto& info = deviceState.inputChannelInfos[ci];
704704
// We only care about rawfmq channels.
705705
if (info.channelType != ChannelAccountingType::RAWFMQ) {
706-
lastNewStatePending[ci] = true;
707-
continue;
708-
}
709-
// This means we have not set things up yet. I.e. the first iteration from
710-
// ready to run has not happened yet.
711-
if (info.channel == nullptr) {
712-
lastNewStatePending[ci] = true;
713706
continue;
714707
}
715708
info.channel->Receive(parts, 10);
716-
// Handle both cases of state changes:
717-
//
718-
// - The state has been changed from the outside and FairMQ knows about it.
719-
// - The state has been changed from the GUI, and deviceState.nextFairMQState knows about it.
720-
//
721-
// This latter case is probably better handled from DPL itself, after all it's fair to
722-
// assume we need to switch state as soon as the GUI notifies us.
723-
// For now we keep it here to avoid side effects.
724-
lastNewStatePending[ci] = device->NewStatePending() || (deviceState.nextFairMQState.empty() == false);
725-
if (parts.Size() == 0) {
726-
continue;
727-
}
728-
if (!lastNewStatePending[ci]) {
729-
LOGP(warn, "Unexpected {} message on channel {} while in Ready state. Dropping.", parts.Size(), info.channel->GetName());
730-
} else if (lastNewStatePending[ci]) {
731-
LOGP(detail, "Some {} parts were received on channel {} while switching away from Ready. Keeping.", parts.Size(), info.channel->GetName());
732-
for (int pi = 0; pi < parts.Size(); ++pi) {
733-
info.parts.fParts.emplace_back(std::move(parts.At(pi)));
734-
}
735-
info.readPolled = true;
736-
}
737709
}
738710
// Keep state transitions going also when running with the standalone GUI.
739711
uv_run(deviceState.loop, UV_RUN_NOWAIT);

Generators/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ o2_add_library(Generators
5050
$<$<BOOL:${pythia_FOUND}>:src/DecayerPythia8Param.cxx>
5151
$<$<BOOL:${HepMC3_FOUND}>:src/GeneratorHepMC.cxx>
5252
$<$<BOOL:${HepMC3_FOUND}>:src/GeneratorHepMCParam.cxx>
53+
$<$<BOOL:${HepMC3_FOUND}>:src/AODToHepMC.cxx>
5354
PUBLIC_LINK_LIBRARIES FairRoot::Base O2::SimConfig O2::CommonUtils O2::DetectorsBase O2::ZDCBase
5455
O2::SimulationDataFormat ${pythia6Target} ${pythiaTarget} ${hepmcTarget}
5556
FairRoot::Gen
@@ -106,6 +107,7 @@ endif()
106107
if(HepMC3_FOUND)
107108
list(APPEND headers include/Generators/GeneratorHepMC.h)
108109
list(APPEND headers include/Generators/GeneratorHepMCParam.h)
110+
list(APPEND headers include/Generators/AODToHepMC.h)
109111
endif()
110112

111113
o2_target_root_dictionary(Generators HEADERS ${headers})

0 commit comments

Comments
 (0)