Skip to content

Commit 01211f7

Browse files
ironManncuveland
authored andcommitted
tfbuilder: implement timeouts and retries for StfDataRequest grpcs
1 parent 1fab58d commit 01211f7

File tree

13 files changed

+223
-117
lines changed

13 files changed

+223
-117
lines changed

src/StfSender/StfSenderOutput.cxx

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,20 @@ void StfSenderOutput::sendStfToTfBuilder(const std::uint64_t pStfId, const std::
406406
assert(!pTfBuilderId.empty());
407407
std::scoped_lock lLock(mScheduledStfMapLock);
408408

409+
{ // Check if we handled this grpc call before, and we are handling a retry now
410+
const auto lResIter = mSchedulingResult.find(pTfBuilderId);
411+
if (lResIter != mSchedulingResult.end()) {
412+
413+
if (lResIter->second.first == pStfId) {
414+
pRes.set_status(lResIter->second.second);
415+
return;
416+
}
417+
}
418+
}
419+
409420
const auto lStfIter = mScheduledStfMap.find(pStfId);
410421
// verify we have the STF.
411422
if (lStfIter == mScheduledStfMap.end()) {
412-
413423
if (pTfBuilderId != "-1") {
414424
// request for Stf we don't have is an error
415425
pRes.set_status(StfDataResponse::DATA_DROPPED_UNKNOWN);
@@ -442,21 +452,23 @@ void StfSenderOutput::sendStfToTfBuilder(const std::uint64_t pStfId, const std::
442452
} else if (mOutputFairMQ) {
443453
lOk = mOutputFairMQ->sendStfToTfBuilder(pTfBuilderId, std::move(lStf));
444454
}
445-
if (!lOk) {
446-
pRes.set_status(StfDataResponse::TF_BUILDER_UNKNOWN);
447-
mDropQueue.push(std::move(lStf));
448-
EDDLOG_GRL(1000, "sendStfToTfBuilder: TfBuilder not known to StfSender. tfb_id={}", pTfBuilderId);
449-
return;
450-
}
451-
452-
// update status and counters
453-
pRes.set_status(StfDataResponse::OK);
454-
{
455+
if (lOk) {
456+
// update status and counters
457+
pRes.set_status(StfDataResponse::OK);
455458
std::scoped_lock lCntLock(mCounters.mCountersLock);
456459
mCounters.mValues.mInSending.mSize += lStfSize;
457460
mCounters.mValues.mInSending.mCnt += 1;
461+
} else {
462+
pRes.set_status(StfDataResponse::TF_BUILDER_UNKNOWN);
463+
mDropQueue.push(std::move(lStf));
464+
EDDLOG_GRL(1000, "sendStfToTfBuilder: TfBuilder not known to StfSender. tfb_id={}", pTfBuilderId);
458465
}
459466
}
467+
468+
// remember the result for the stf
469+
auto &lResultInfo = mSchedulingResult[pTfBuilderId];
470+
lResultInfo.first = pStfId;
471+
lResultInfo.second = pRes.status();
460472
}
461473

462474
/// Drop thread

src/StfSender/StfSenderOutput.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ class StfSenderOutput
8686
mCounters.mValues = StdSenderOutputCounters::Values();
8787
mLastStfId = 0;
8888
}
89+
90+
{ // clear the stf maps
91+
std::scoped_lock lLock(mScheduledStfMapLock);
92+
mScheduledStfMap.clear();
93+
mSchedulingResult.clear();
94+
}
8995
}
9096

9197
private:
@@ -119,6 +125,7 @@ class StfSenderOutput
119125
std::uint64_t mLastStfId = 0;
120126
std::mutex mScheduledStfMapLock;
121127
std::map<std::uint64_t, ScheduledStfInfo> mScheduledStfMap;
128+
std::map<std::string, std::pair<std::uint64_t, StfDataResponse::StfDataStatus> > mSchedulingResult; // use for answering retried StfRequest() grpc
122129

123130
/// Buffer utilization counters
124131
StdSenderOutputCounters mCounters;

src/StfSender/StfSenderOutputUCX.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,10 @@ void StfSenderOutputUCX::DataHandlerThread(unsigned pThreadIdx)
420420
{
421421
DDDLOG("StfSenderOutputUCX: Starting meta thread {}", pThreadIdx);
422422

423+
#if defined(__linux__)
424+
if (nice(-10)) { }
425+
#endif
426+
423427
// local worker we advance here
424428
assert (pThreadIdx < mDataWorkers.size());
425429
ucx::dd_ucp_worker &lWorker = mDataWorkers[pThreadIdx];

src/StfSender/StfSenderRpc.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ ::grpc::Status StfSenderRpcImpl::TerminatePartition(::grpc::ServerContext* /*con
166166
return Status::CANCELLED;
167167
}
168168

169-
IDDLOG("TerminatePartition request received. partition_id={}", request->partition_id());
169+
IDDLOG_GRL(5000, "TerminatePartition request received. partition_id={}", request->partition_id());
170170

171171
mTerminateRequested = true;
172172
return Status::OK;

src/TfBuilder/TfBuilderInput.cxx

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ void TfBuilderInput::StfPacingThread()
174174
}
175175

176176
assert (lStfInfo.mType == ReceivedStfMeta::MetaType::INFO);
177-
assert (lStfInfo.mRecvStfdata);
177+
assert (lStfInfo.mRecvStfData);
178178

179179
// Rename STF id if this is a Topological TF
180180
if (lStfInfo.mStfOrigin == SubTimeFrame::Header::Origin::eReadoutTopology) {
181181
// deserialize here to be able to rename the stf
182-
lStfInfo.mStf = std::move(lStfReceiver.deserialize(*lStfInfo.mRecvStfHeaderMeta.get(), *lStfInfo.mRecvStfdata));
183-
lStfInfo.mRecvStfdata = nullptr;
182+
lStfInfo.mStf = std::move(lStfReceiver.deserialize(*lStfInfo.mRecvStfHeaderMeta.get(), *lStfInfo.mRecvStfData));
183+
lStfInfo.mRecvStfData = nullptr;
184184

185185
const std::uint64_t lNewTfId = mRpc->getIdForTopoTf(lStfInfo.mStfSenderId, lStfInfo.mStfId);
186186

@@ -229,8 +229,8 @@ void TfBuilderInput::deserialize_headers(std::vector<ReceivedStfMeta> &pStfs)
229229
}
230230

231231
// deserialize the data
232-
lStfInfo.mStf = std::move(lStfReceiver.deserialize(*lStfInfo.mRecvStfHeaderMeta.get(), *lStfInfo.mRecvStfdata));
233-
lStfInfo.mRecvStfdata = nullptr;
232+
lStfInfo.mStf = std::move(lStfReceiver.deserialize(*lStfInfo.mRecvStfHeaderMeta.get(), *lStfInfo.mRecvStfData));
233+
lStfInfo.mRecvStfData = nullptr;
234234
}
235235
}
236236

@@ -259,6 +259,7 @@ void TfBuilderInput::StfDeserializingThread()
259259
// check if the TF should be dropped
260260
if (mStfIdsToDrop.count(lStfId) > 0) {
261261
mStfMergeMap.erase(mStfMergeMap.begin());
262+
continue;
262263
}
263264

264265
// deserialize headers of any new STFs
@@ -273,7 +274,7 @@ void TfBuilderInput::StfDeserializingThread()
273274
}
274275

275276
// Check if the TF is completed
276-
if (!lStfVector.empty() && (lStfVector.size() == lNumStfsOpt)) {
277+
if (!lStfVector.empty() && (lStfVector.size() == lNumStfsOpt.value())) {
277278
mStfsForMerging.push(std::move(lStfVector));
278279

279280
mStfMergeMap.erase(lStfId);

src/TfBuilder/TfBuilderInputDefs.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct ReceivedStfMeta {
3434
std::chrono::time_point<std::chrono::steady_clock> mTimeReceived;
3535

3636
std::unique_ptr<IovStfHdrMeta> mRecvStfHeaderMeta;
37-
std::unique_ptr<std::vector<FairMQMessagePtr>> mRecvStfdata;
37+
std::unique_ptr<std::vector<FairMQMessagePtr>> mRecvStfData;
3838
std::unique_ptr<SubTimeFrame> mStf;
3939
std::string mStfSenderId;
4040

@@ -47,13 +47,13 @@ struct ReceivedStfMeta {
4747
const SubTimeFrame::Header::Origin pStfOrigin,
4848
const std::string &pStfSenderId,
4949
std::unique_ptr<IovStfHdrMeta> &&pRcvHdrMeta,
50-
std::unique_ptr<std::vector<FairMQMessagePtr>> &&pRecvStfdata)
50+
std::unique_ptr<std::vector<FairMQMessagePtr>> &&pRecvStfData)
5151
: mType(INFO),
5252
mStfId(pStfId),
5353
mStfOrigin(pStfOrigin),
5454
mTimeReceived(std::chrono::steady_clock::now()),
5555
mRecvStfHeaderMeta(std::move(pRcvHdrMeta)),
56-
mRecvStfdata(std::move(pRecvStfdata)),
56+
mRecvStfData(std::move(pRecvStfData)),
5757
mStf(nullptr),
5858
mStfSenderId(pStfSenderId)
5959
{ }

src/TfBuilder/TfBuilderInputUCX.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ bool TfBuilderInputUCX::start()
187187
{
188188
// setting configuration options
189189
mThreadPoolSize = std::clamp(mConfig->getUInt64Param(UcxTfBuilderThreadPoolSizeKey, UcxTfBuilderThreadPoolSizeDefault), std::size_t(1), std::size_t(256));
190-
mRdmaPollingWait =mConfig->getBoolParam(UcxPollForRDMACompletionKey, UcxPollForRDMACompletionDefault);
191-
mRdmaConcurrentStfSizeMax =mConfig->getUInt64Param(UcxMaxStfSizeForConcurrentFetchBKey, UcxMaxStfSizeForConcurrentFetchBDefault);
190+
mRdmaPollingWait = mConfig->getBoolParam(UcxPollForRDMACompletionKey, UcxPollForRDMACompletionDefault);
191+
mRdmaConcurrentStfSizeMax = mConfig->getUInt64Param(UcxMaxStfSizeForConcurrentFetchBKey, UcxMaxStfSizeForConcurrentFetchBDefault);
192192
IDDLOG("TfBuilderInputUCX: Configuration loaded. thread_pool={} polling={} concurrent_size={}", mThreadPoolSize, mRdmaPollingWait, mRdmaConcurrentStfSizeMax);
193193

194194
auto &lConfStatus = mConfig->status();

0 commit comments

Comments
 (0)