Skip to content

Commit 8ca629a

Browse files
committed
tfbuilder: support cancellation of init process
1 parent 7f2db93 commit 8ca629a

File tree

1 file changed

+35
-33
lines changed

1 file changed

+35
-33
lines changed

src/TfBuilder/TfBuilderInputUCX.cxx

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ bool TfBuilderInputUCX::start()
276276
listen_conn_handle_cb, &dd_ucp_listen_context)) {
277277
return false;
278278
}
279+
DDDLOG("TfBuilderInputUCX::start(): ucp_listener created.");
279280

280281
// Start the Listener thread
281282
mListenerThread = create_thread_member("ucx_listener", &TfBuilderInputUCX::ListenerThread, this);
@@ -448,11 +449,6 @@ void TfBuilderInputUCX::stop()
448449

449450
mState = TERMINATED;
450451

451-
if (mListenerThread.joinable()) {
452-
mListenerThread.join();
453-
}
454-
DDDLOG("TfBuilderInputUCX::stop: Listener thread stopped.");
455-
456452
// Wait for input threads to stop
457453
DDDLOG("TfBuilderInputUCX::stop: Waiting for input threads to terminate.");
458454
mStfReqQueue.stop();
@@ -500,44 +496,50 @@ void TfBuilderInputUCX::stop()
500496
}
501497
}
502498

503-
// unmap the receive buffers
504-
if (ucp_data_region_set) {
505-
// remove the ucx mapping from the region
506-
void* lOrigAddress = mTimeFrameBuilder.mMemRes.mDataMemRes->address();
507-
mTimeFrameBuilder.mMemRes.mDataMemRes->set_ucx_address(lOrigAddress);
508-
// unmap
509-
ucp_mem_unmap(ucp_context, ucp_data_region);
510-
}
499+
if (mListenerThread.joinable()) {
500+
mListenerThread.join();
501+
DDDLOG("TfBuilderInputUCX::stop: Listener thread stopped.");
511502

503+
// unmap the receive buffers
504+
if (ucp_data_region_set) {
505+
// remove the ucx mapping from the region
506+
void* lOrigAddress = mTimeFrameBuilder.mMemRes.mDataMemRes->address();
507+
mTimeFrameBuilder.mMemRes.mDataMemRes->set_ucx_address(lOrigAddress);
508+
// unmap
509+
ucp_mem_unmap(ucp_context, ucp_data_region);
510+
}
512511

513-
{ // close ucx: destroy remote rma keys and disconnect
514-
std::unique_lock lLock(mConnectionMapLock);
515-
DDDLOG("TfBuilderInputUCX::stop: closing ep connections.");
516512

517-
for (auto & lConn : mConnMap) {
518-
std::unique_lock lIoLock(lConn.second->mRemoteKeysLock);
519-
for (auto & lRKeyIt : lConn.second->mRemoteKeys) {
520-
ucp_rkey_destroy(lRKeyIt.second);
513+
{ // close ucx: destroy remote rma keys and disconnect
514+
std::unique_lock lLock(mConnectionMapLock);
515+
DDDLOG("TfBuilderInputUCX::stop: closing ep connections.");
516+
517+
for (auto & lConn : mConnMap) {
518+
std::unique_lock lIoLock(lConn.second->mRemoteKeysLock);
519+
for (auto & lRKeyIt : lConn.second->mRemoteKeys) {
520+
ucp_rkey_destroy(lRKeyIt.second);
521+
}
522+
ucx::util::close_ep_connection(lConn.second->mWorker, lConn.second->ucp_ep);
521523
}
522-
ucx::util::close_ep_connection(lConn.second->mWorker, lConn.second->ucp_ep);
524+
mConnMap.clear();
523525
}
524-
mConnMap.clear();
525-
}
526526

527-
{// close the listener and all workers
528-
DDDLOG("TfBuilderInputUCX::stop: closing the listener.");
529-
ucp_listener_destroy(ucp_listener);
530-
ucp_worker_destroy(listener_worker.ucp_worker);
527+
{// close the listener and all workers
528+
DDDLOG("TfBuilderInputUCX::stop: closing the listener.");
529+
ucp_listener_destroy(ucp_listener);
530+
ucp_worker_destroy(listener_worker.ucp_worker);
531531

532-
DDDLOG("TfBuilderInputUCX::stop: destroying data workers.");
533-
for (auto &lWorker : mDataWorkers) {
534-
ucp_worker_destroy(lWorker.ucp_worker);
532+
DDDLOG("TfBuilderInputUCX::stop: destroying data workers.");
533+
for (auto &lWorker : mDataWorkers) {
534+
ucp_worker_destroy(lWorker.ucp_worker);
535+
}
536+
DDDLOG("TfBuilderInputUCX::stop: running the ucp cleanup.");
537+
ucp_cleanup(ucp_context);
535538
}
536-
DDDLOG("TfBuilderInputUCX::stop: running the ucp cleanup.");
537-
ucp_cleanup(ucp_context);
539+
DDDLOG("TfBuilderInputUCX::stop: All input channels are closed.");
538540
}
539541

540-
DDDLOG("TfBuilderInputUCX::stop: All input channels are closed.");
542+
DDDLOG("TfBuilderInputUCX::stop: Finished.");
541543
}
542544

543545
/// Receive buffer allocation thread

0 commit comments

Comments
 (0)