From b2679847464ae30f88ac36d8717d1a8e05328653 Mon Sep 17 00:00:00 2001 From: abrand Date: Fri, 19 Dec 2025 16:10:17 -0500 Subject: [PATCH] fix hyperthreading cores reservation --- rqd/rqd/rqcore.py | 21 ++++++++++------ rqd/rqd/rqmachine.py | 60 ++++++++++++++++++++++++++++++-------------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index e60d49365..89e138296 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -423,17 +423,24 @@ def launchFrame(self, runFrame): # See if all requested cores are available with self.__threadLock: - # pylint: disable=no-member - if self.cores.idle_cores < runFrame.num_cores: - err = "Not launching, insufficient idle cores" - log.critical(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) - # pylint: enable=no-member - + # For hyperthreading workloads, check HT core availability first + # as it uses a different counting mechanism than idle_cores if runFrame.environment.get('CUE_THREADABLE') == '1': + if not self.machine.canReserveHT(runFrame.num_cores): + err = "Not launching, insufficient hyperthreading cores available" + log.critical(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) reserveHT = self.machine.reserveHT(runFrame.num_cores) if reserveHT: runFrame.attributes['CPU_LIST'] = reserveHT + else: + # Only check idle cores for non-hyperthreading workloads + # pylint: disable=no-member + if self.cores.idle_cores < runFrame.num_cores: + err = "Not launching, insufficient idle cores" + log.critical(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) + # pylint: enable=no-member if runFrame.num_gpus: reserveGpus = self.machine.reserveGpus(runFrame.num_gpus) diff --git a/rqd/rqd/rqmachine.py b/rqd/rqd/rqmachine.py index 610df78ee..2968d45a0 100644 --- a/rqd/rqd/rqmachine.py +++ b/rqd/rqd/rqmachine.py @@ -936,7 +936,41 @@ def setupGpu(self): """ Setup rqd for Gpus """ self.__gpusets = set(range(self.getGpuCount())) - def reserveHT(self, frameCores): + def _getAvailableHTCores(self): + """Get available hyperthreading cores information + @rtype: tuple + @return: (avail_cores_dict, avail_cores_count) where avail_cores_dict + maps physid -> set(coreid) and avail_cores_count is the total count + """ + avail_cores = {} + avail_cores_count = 0 + reserved_cores = self.__coreInfo.reserved_cores + + for physid, cores in self.__procs_by_physid_and_coreid.items(): + for coreid in cores.keys(): + if int(physid) in reserved_cores and \ + int(coreid) in reserved_cores[int(physid)].coreid: + continue + avail_cores.setdefault(physid, set()).add(coreid) + avail_cores_count += 1 + + return avail_cores, avail_cores_count + + def canReserveHT(self, frameCores: int): + """Check if hyperthreading cores can be reserved without actually reserving them + @type frameCores: int + @param frameCores: The total physical cores required by the frame. + @rtype: bool + @return: True if cores can be reserved, False otherwise + """ + if frameCores % 100: + return False + + _, avail_cores_count = self._getAvailableHTCores() + remaining_cores = frameCores / 100 + return avail_cores_count >= remaining_cores + + def reserveHT(self, frameCores: int): """ Reserve cores for use by taskset taskset -c 0,1,8,9 COMMAND Not thread save, use with locking. @@ -951,21 +985,8 @@ def reserveHT(self, frameCores): return None log.info('Taskset: Requesting reserve of %d', (frameCores // 100)) - # Look for the most idle physical cpu. - # Prefer to assign cores from the same physical cpu. - # Spread different frames around on different physical cpus. - avail_cores = {} - avail_cores_count = 0 - reserved_cores = self.__coreInfo.reserved_cores - - for physid, cores in self.__procs_by_physid_and_coreid.items(): - for coreid in cores.keys(): - if int(physid) in reserved_cores and \ - int(coreid) in reserved_cores[int(physid)].coreid: - continue - avail_cores.setdefault(physid, set()).add(coreid) - avail_cores_count += 1 - + # Get available cores using factored method + avail_cores, avail_cores_count = self._getAvailableHTCores() remaining_cores = frameCores / 100 if avail_cores_count < remaining_cores: @@ -976,6 +997,7 @@ def reserveHT(self, frameCores): raise rqd.rqexceptions.CoreReservationFailureException(err) tasksets = [] + reserved_cores = self.__coreInfo.reserved_cores for physid, cores in sorted( avail_cores.items(), @@ -997,9 +1019,9 @@ def reserveHT(self, frameCores): if remaining_cores == 0: break - log.warning('Taskset: Reserving procs - %s', ','.join(tasksets)) - - return ','.join(tasksets) + joined_tasksets = ','.join(sorted(tasksets, key=int)) + log.warning('Taskset: Reserving procs - %s', joined_tasksets) + return joined_tasksets # pylint: disable=inconsistent-return-statements def releaseHT(self, reservedHT):