Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,24 @@ def launchFrame(self, runFrame):

# See if all requested cores are available
with self.__threadLock:
# pylint: disable=no-member
if self.cores.idle_cores < runFrame.num_cores:
err = "Not launching, insufficient idle cores"
log.critical(err)
raise rqd.rqexceptions.CoreReservationFailureException(err)
# pylint: enable=no-member

# For hyperthreading workloads, check HT core availability first
# as it uses a different counting mechanism than idle_cores
if runFrame.environment.get('CUE_THREADABLE') == '1':
if not self.machine.canReserveHT(runFrame.num_cores):
err = "Not launching, insufficient hyperthreading cores available"
log.critical(err)
raise rqd.rqexceptions.CoreReservationFailureException(err)
reserveHT = self.machine.reserveHT(runFrame.num_cores)
if reserveHT:
runFrame.attributes['CPU_LIST'] = reserveHT
else:
# Only check idle cores for non-hyperthreading workloads
# pylint: disable=no-member
if self.cores.idle_cores < runFrame.num_cores:
err = "Not launching, insufficient idle cores"
log.critical(err)
raise rqd.rqexceptions.CoreReservationFailureException(err)
# pylint: enable=no-member

if runFrame.num_gpus:
reserveGpus = self.machine.reserveGpus(runFrame.num_gpus)
Expand Down
60 changes: 41 additions & 19 deletions rqd/rqd/rqmachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,41 @@ def setupGpu(self):
""" Setup rqd for Gpus """
self.__gpusets = set(range(self.getGpuCount()))

def reserveHT(self, frameCores):
def _getAvailableHTCores(self):
"""Get available hyperthreading cores information
@rtype: tuple
@return: (avail_cores_dict, avail_cores_count) where avail_cores_dict
maps physid -> set(coreid) and avail_cores_count is the total count
"""
avail_cores = {}
avail_cores_count = 0
reserved_cores = self.__coreInfo.reserved_cores

for physid, cores in self.__procs_by_physid_and_coreid.items():
for coreid in cores.keys():
if int(physid) in reserved_cores and \
int(coreid) in reserved_cores[int(physid)].coreid:
continue
avail_cores.setdefault(physid, set()).add(coreid)
avail_cores_count += 1

return avail_cores, avail_cores_count

def canReserveHT(self, frameCores: int):
"""Check if hyperthreading cores can be reserved without actually reserving them
@type frameCores: int
@param frameCores: The total physical cores required by the frame.
@rtype: bool
@return: True if cores can be reserved, False otherwise
"""
if frameCores % 100:
return False

_, avail_cores_count = self._getAvailableHTCores()
remaining_cores = frameCores / 100
return avail_cores_count >= remaining_cores

def reserveHT(self, frameCores: int):
""" Reserve cores for use by taskset
taskset -c 0,1,8,9 COMMAND
Not thread save, use with locking.
Expand All @@ -951,21 +985,8 @@ def reserveHT(self, frameCores):
return None
log.info('Taskset: Requesting reserve of %d', (frameCores // 100))

# Look for the most idle physical cpu.
# Prefer to assign cores from the same physical cpu.
# Spread different frames around on different physical cpus.
avail_cores = {}
avail_cores_count = 0
reserved_cores = self.__coreInfo.reserved_cores

for physid, cores in self.__procs_by_physid_and_coreid.items():
for coreid in cores.keys():
if int(physid) in reserved_cores and \
int(coreid) in reserved_cores[int(physid)].coreid:
continue
avail_cores.setdefault(physid, set()).add(coreid)
avail_cores_count += 1

# Get available cores using factored method
avail_cores, avail_cores_count = self._getAvailableHTCores()
remaining_cores = frameCores / 100

if avail_cores_count < remaining_cores:
Expand All @@ -976,6 +997,7 @@ def reserveHT(self, frameCores):
raise rqd.rqexceptions.CoreReservationFailureException(err)

tasksets = []
reserved_cores = self.__coreInfo.reserved_cores

for physid, cores in sorted(
avail_cores.items(),
Expand All @@ -997,9 +1019,9 @@ def reserveHT(self, frameCores):
if remaining_cores == 0:
break

log.warning('Taskset: Reserving procs - %s', ','.join(tasksets))

return ','.join(tasksets)
joined_tasksets = ','.join(sorted(tasksets, key=int))
log.warning('Taskset: Reserving procs - %s', joined_tasksets)
return joined_tasksets

# pylint: disable=inconsistent-return-statements
def releaseHT(self, reservedHT):
Expand Down
Loading