Skip to content

Commit 1fab58d

Browse files
committed
tfbuilder: on stop, push all TFs, then End-of-Stream
1 parent 61e40a9 commit 1fab58d

File tree

1 file changed

+10
-14
lines changed

1 file changed

+10
-14
lines changed

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,12 @@ void TfBuilderDevice::TfForwardThread()
374374
if (!mRunning) {
375375
DDDLOG("TfForwardThread: Not running... ");
376376
break;
377-
} else if (!is_running(eTfFwdIn)) {
377+
} else if (!is_running(eTfFwdIn)) { // terminating
378378
DDDLOG("TfForwardThread: Queue closed. Exiting... ");
379379
break;
380-
} else if (!mInRunningState) {
381-
if (lTfOpt) {
382-
WDDLOG_RL(1000, "Dropping a raw TimeFrame because stop of the run is requested.");
383-
}
384-
380+
} else if (!mInRunningState && lTfOpt) { // run stopped, draining existing TFs
381+
DDDLOG_RL(1000, "Forwarding TimeFrame while stop of the run is requested.");
382+
} else if (!mInRunningState && !lTfOpt) { // run stopped and we don't have any more TFs cached
385383
// send EOS if exiting the running state
386384
if (!mStandalone && mTfDplAdapter && mShouldSendEos) {
387385
mTfDplAdapter->sendEosToDpl();
@@ -391,28 +389,26 @@ void TfBuilderDevice::TfForwardThread()
391389
}
392390

393391
if (lTfOpt == std::nullopt) {
394-
DDMON("tfbuilder", "tf_output.sent_size", mTfFwdTotalDataSize);
395-
DDMON("tfbuilder", "tf_output.sent_count", mTfFwdTotalTfCount);
396392
continue;
397393
}
398394

399395
auto &lTf = lTfOpt.value();
400396
const auto lTfId = lTf->id();
397+
const auto lTfSize = lTf->getDataSize();
401398
{
402-
DDMON("tfbuilder", "tf_output.id", lTfId);
403-
404-
mTfFwdTotalDataSize += lTf->getDataSize();
399+
mTfFwdTotalDataSize += lTfSize;
405400
mTfFwdTotalTfCount += 1;
401+
402+
DDMON("tfbuilder", "tf_output.id", lTfId);
406403
DDMON("tfbuilder", "tf_output.sent_size", mTfFwdTotalDataSize);
407404
DDMON("tfbuilder", "tf_output.sent_count", mTfFwdTotalTfCount);
408-
409-
DDMON_RATE("tfbuilder", "tf_output", lTf->getDataSize());
405+
DDMON_RATE("tfbuilder", "tf_output", lTfSize);
410406
}
411407

412408
if (!mStandalone) {
413409
try {
414410
IDDLOG_RL(5000, "Forwarding a new TF to DPL. tf_id={} size={} unique_equipments={} total={}",
415-
lTfId, lTf->getDataSize(), lTf->getEquipmentIdentifiers().size(), mTfFwdTotalTfCount);
411+
lTfId, lTfSize, lTf->getEquipmentIdentifiers().size(), mTfFwdTotalTfCount);
416412

417413
// adapt headers to include DPL processing header on the stack
418414
assert(mTfBuilder);

0 commit comments

Comments
 (0)