diff --git a/.gitignore b/.gitignore index a1f68081a..4c8dfbf22 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ sandbox/kafka-data sandbox/zookeeper-data sandbox/zookeeper-logs sandbox/rqd/shots/ +sandbox/pgadmin-data docs/_data/version.yml target/* diff --git a/cuebot/build.gradle b/cuebot/build.gradle index e944402a7..bbbef89f2 100644 --- a/cuebot/build.gradle +++ b/cuebot/build.gradle @@ -88,6 +88,7 @@ compileTestJava { options.compilerArgs << "-Xlint:all,-serial" << "-Werror" } + protobuf { protoc { // The protoc compiler diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java index b73cd75c3..a006a7a79 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java @@ -43,6 +43,7 @@ public class DispatchFrame extends FrameEntity implements FrameInterface { public int minGpus; public int maxGpus; public long minGpuMemory; + public int slotsRequired; // A comma separated list of services public String services; diff --git a/cuebot/src/main/java/com/imageworks/spcue/HostEntity.java b/cuebot/src/main/java/com/imageworks/spcue/HostEntity.java index 548e99cd2..cb196dda1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/HostEntity.java +++ b/cuebot/src/main/java/com/imageworks/spcue/HostEntity.java @@ -38,6 +38,7 @@ public class HostEntity extends Entity implements HostInterface { public int idleGpus; public long gpuMemory; public long idleGpuMemory; + public int concurrentSlotsLimit; public boolean unlockAtBoot; @@ -61,6 +62,7 @@ public HostEntity(Host grpcHost) { this.idleGpus = (int) grpcHost.getIdleGpus(); this.gpuMemory = grpcHost.getGpuMemory(); this.idleGpuMemory = grpcHost.getIdleGpuMemory(); + this.concurrentSlotsLimit = grpcHost.getConcurrentSlotsLimit(); } public String getHostId() { diff --git a/cuebot/src/main/java/com/imageworks/spcue/LayerDetail.java b/cuebot/src/main/java/com/imageworks/spcue/LayerDetail.java index 572139039..da0e830f6 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/LayerDetail.java +++ b/cuebot/src/main/java/com/imageworks/spcue/LayerDetail.java @@ -38,6 +38,7 @@ public class LayerDetail extends LayerEntity implements LayerInterface { public int timeout_llu; public int dispatchOrder; public int totalFrameCount; + public int slotsRequired; public Set tags = new LinkedHashSet(); public Set services = new LinkedHashSet(); diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index 8c92ad016..4f953ee5e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -45,6 +45,8 @@ public class VirtualProc extends FrameEntity implements ProcInterface { public long gpuMemoryUsed; public long gpuMemoryMax; + public int slotsRequired; + public boolean unbooked; public boolean usageRecorded = false; public boolean isLocalDispatch = false; @@ -101,6 +103,7 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, proc.memoryReserved = frame.getMinMemory(); proc.gpusReserved = frame.minGpus; proc.gpuMemoryReserved = frame.minGpuMemory; + proc.slotsRequired = frame.slotsRequired; /* * Frames that are announcing cores less than 100 are not multi-threaded so there is no @@ -237,6 +240,7 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, proc.memoryReserved = frame.getMinMemory(); proc.gpusReserved = frame.minGpus; proc.gpuMemoryReserved = frame.minGpuMemory; + proc.slotsRequired = frame.slotsRequired; int wholeCores = (int) (Math.floor(host.idleCores / 100.0)); if (wholeCores == 0) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java index dfd0397a2..8459d62cf 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java @@ -243,6 +243,22 @@ public interface HostDao { */ void updateThreadMode(HostInterface host, ThreadMode mode); + /** + * Update the host's concurrent procs limit. + * + * @param host HostInterface + * @param limit int (0 for no limit) + */ + void updateConcurrentSlotsLimit(HostInterface host, int limit); + + /** + * Get the host's concurrent slots limit by hostname. + * + * @param hostname String + * @return int the concurrent slots limit + */ + int getHostConcurrentSlotsLimit(String hostname); + /** * Update the specified host's hardware information. * @@ -260,7 +276,7 @@ public interface HostDao { */ void updateHostStats(HostInterface host, long totalMemory, long freeMemory, long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, long freeGpuMemory, - int load, Timestamp bootTime, String os); + int load, Timestamp bootTime, String os, int runningProcs); /** * Return true if the HardwareState is Up, false if it is anything else. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java index 5d5433ada..1b5435218 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java @@ -311,6 +311,14 @@ public interface LayerDao { */ void updateTimeoutLLU(LayerInterface layer, int timeout_llu); + /** + * Updates the slots required for a layer. + * + * @param layer the layer to update + * @param slots the number of slots required (<0 means the host is not slot-based) + */ + void updateLayerSlotsRequired(LayerInterface layer, int slots); + /** * Lowers the minimum memory on a layer if the layer is using less memory and the currnet min * memory is the dispatcher default. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java index 2daa00335..4f75d6708 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java @@ -546,6 +546,7 @@ private static final String replaceQueryForFifo(String query) { "int_gpus_min, " + "int_gpus_max, " + "int_gpu_mem_min, " + + "int_slots_required, " + "str_cmd, " + "str_range, " + "int_chunk_size, " + @@ -588,6 +589,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -676,6 +678,7 @@ private static final String replaceQueryForFifo(String query) { "layer.b_threadable, " + "layer.int_mem_min, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -765,6 +768,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -847,6 +851,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -932,6 +937,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -1020,6 +1026,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + @@ -1108,6 +1115,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.int_cores_max, " + "layer.str_cmd, " + "layer.str_range, " + @@ -1191,6 +1199,7 @@ private static final String replaceQueryForFifo(String query) { "layer.int_gpus_min, " + "layer.int_gpus_max, " + "layer.int_gpu_mem_min, " + + "layer.int_slots_required, " + "layer.str_cmd, " + "layer.str_range, " + "layer.int_chunk_size, " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 2ccaef16c..15ec06a17 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -234,6 +234,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { frame.minGpus = rs.getInt("int_gpus_min"); frame.maxGpus = rs.getInt("int_gpus_max"); frame.minGpuMemory = rs.getLong("int_gpu_mem_min"); + frame.slotsRequired = rs.getInt("int_slots_required"); frame.version = rs.getInt("int_version"); frame.services = rs.getString("str_services"); frame.os = rs.getString("str_os"); @@ -252,8 +253,8 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { + "layer.str_type AS layer_type, " + "layer.str_cmd, " + "layer.int_cores_min," + "layer.int_cores_max," + "layer.b_threadable," + "layer.int_mem_min, " + "layer.int_gpus_min," + "layer.int_gpus_max," + "layer.int_gpu_mem_min, " - + "layer.str_range, " + "layer.int_chunk_size, " + "layer.str_services " + "FROM " - + "layer, " + "job, " + "show, " + + "layer.int_slots_required, " + "layer.str_range, " + "layer.int_chunk_size, " + + "layer.str_services " + "FROM " + "layer, " + "job, " + "show, " + "frame LEFT JOIN proc ON (proc.pk_frame = frame.pk_frame) " + "WHERE " + "job.pk_show = show.pk_show " + "AND " + "frame.pk_job = job.pk_job " + "AND " + "frame.pk_layer = layer.pk_layer " + "AND " + "frame.pk_frame = ?"; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 862e1c459..9365b9f2b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -78,6 +78,7 @@ public HostEntity mapRow(ResultSet rs, int rowNum) throws SQLException { host.idleGpus = rs.getInt("int_gpus_idle"); host.gpuMemory = rs.getLong("int_gpu_mem"); host.idleGpuMemory = rs.getLong("int_gpu_mem_idle"); + host.concurrentSlotsLimit = rs.getInt("int_concurrent_slots_limit"); host.dateBooted = rs.getDate("ts_booted"); host.dateCreated = rs.getDate("ts_created"); host.datePinged = rs.getDate("ts_ping"); @@ -131,6 +132,7 @@ public String getFacilityId() { + " host.int_gpus_idle, " + " host.int_gpu_mem, " + " host.int_gpu_mem_idle, " + + " host.int_concurrent_slots_limit, " + " host.ts_created, " + " host.str_name, " + " host_stat.str_state, " @@ -395,14 +397,15 @@ public CallableStatement createCallableStatement(Connection con) throws SQLExcep + " int_load = ?, " + " ts_booted = ?, " + " ts_ping = current_timestamp, " - + " str_os = ? " + + " str_os = ?, " + + " int_running_procs = ? " + "WHERE " + " pk_host = ?"; @Override public void updateHostStats(HostInterface host, long totalMemory, long freeMemory, long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, - long freeGpuMemory, int load, Timestamp bootTime, String os) { + long freeGpuMemory, int load, Timestamp bootTime, String os, int runningProcs) { if (os == null) { os = Dispatcher.OS_DEFAULT; @@ -410,7 +413,7 @@ public void updateHostStats(HostInterface host, long totalMemory, long freeMemor getJdbcTemplate().update(UPDATE_RENDER_HOST, totalMemory, freeMemory, totalSwap, freeSwap, totalMcp, freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os, - host.getHostId()); + runningProcs, host.getHostId()); } @Override @@ -562,6 +565,23 @@ public void updateThreadMode(HostInterface host, ThreadMode mode) { mode.getNumber(), host.getHostId()); } + @Override + public void updateConcurrentSlotsLimit(HostInterface host, int limit) { + getJdbcTemplate().update("UPDATE host SET int_concurrent_slots_limit=? WHERE pk_host=?", + limit, host.getHostId()); + } + + @Override + public int getHostConcurrentSlotsLimit(String hostname) { + try { + return getJdbcTemplate().queryForObject( + "SELECT int_concurrent_slots_limit FROM host WHERE str_name = ?", + Integer.class, hostname); + } catch (EmptyResultDataAccessException e) { + return 0; + } + } + @Override public void updateHostOs(HostInterface host, String os) { getJdbcTemplate().update("UPDATE host_stat SET str_os=? WHERE pk_host=?", os, @@ -631,7 +651,7 @@ public boolean isNimbyHost(HostInterface h) { /** * Checks if the passed in name looks like a fully qualified domain name. If so, returns the * hostname without the domain. Otherwise returns the passed in name unchanged. - * + * * @param fqdn - String * @return String - hostname */ diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java index d9ef93e2b..83abf0a60 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java @@ -158,6 +158,7 @@ public LayerDetail mapRow(ResultSet rs, int rowNum) throws SQLException { layer.services.addAll(Lists.newArrayList(rs.getString("str_services").split(","))); layer.timeout = rs.getInt("int_timeout"); layer.timeout_llu = rs.getInt("int_timeout_llu"); + layer.slotsRequired = rs.getInt("int_slots_required"); return layer; } }; @@ -241,7 +242,8 @@ public LayerInterface getLayer(String id) { + "int_dispatch_order, " + "str_tags, " + "str_type," + "int_cores_min, " + "int_cores_max, " + "b_threadable, " + "int_mem_min, " + "int_gpus_min, " + "int_gpus_max, " + "int_gpu_mem_min, " + "str_services, " + "int_timeout," - + "int_timeout_llu " + ") " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + "int_timeout_llu, " + "int_slots_required " + ") " + + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; @Override public void insertLayerDetail(LayerDetail l) { @@ -250,7 +252,7 @@ public void insertLayerDetail(LayerDetail l) { l.chunkSize, l.dispatchOrder, StringUtils.join(l.tags, " | "), l.type.toString(), l.minimumCores, l.maximumCores, l.isThreadable, l.minimumMemory, l.minimumGpus, l.maximumGpus, l.minimumGpuMemory, StringUtils.join(l.services, ","), l.timeout, - l.timeout_llu); + l.timeout_llu, l.slotsRequired); } @Override @@ -555,6 +557,12 @@ public void updateTimeoutLLU(LayerInterface layer, int timeout_llu) { layer.getLayerId()); } + @Override + public void updateLayerSlotsRequired(LayerInterface layer, int slots) { + getJdbcTemplate().update("UPDATE layer SET int_slots_required=? WHERE pk_layer=?", slots, + layer.getLayerId()); + } + @Override public void enableMemoryOptimizer(LayerInterface layer, boolean value) { getJdbcTemplate().update("UPDATE layer SET b_optimize=? WHERE pk_layer=?", value, diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java index 9e7b8ee95..1be896420 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java @@ -961,6 +961,7 @@ public static NestedHost.Builder mapNestedHostBuilder(ResultSet rs) throws SQLEx .setLockState(LockState.valueOf(SqlUtil.getString(rs, "str_lock_state"))) .setHasComment(rs.getBoolean("b_comment")) .setThreadMode(ThreadMode.values()[rs.getInt("int_thread_mode")]) + .setConcurrentSlotsLimit(rs.getInt("int_concurrent_slots_limit")) .setOs(SqlUtil.getString(rs, "str_os")); String tags = SqlUtil.getString(rs, "str_tags"); @@ -998,6 +999,7 @@ public static Host.Builder mapHostBuilder(ResultSet rs) throws SQLException { builder.setLockState(LockState.valueOf(SqlUtil.getString(rs, "str_lock_state"))); builder.setHasComment(rs.getBoolean("b_comment")); builder.setThreadMode(ThreadMode.values()[rs.getInt("int_thread_mode")]); + builder.setConcurrentSlotsLimit(rs.getInt("int_concurrent_slots_limit")); builder.setOs(SqlUtil.getString(rs, "str_os")); String tags = SqlUtil.getString(rs, "str_tags"); @@ -1183,7 +1185,8 @@ public Layer mapRow(ResultSet rs, int rowNum) throws SQLException { Arrays.asList(SqlUtil.getString(rs, "str_limit_names").split(","))) .setMemoryOptimizerEnabled(rs.getBoolean("b_optimize")) .setTimeout(rs.getInt("int_timeout")) - .setTimeoutLlu(rs.getInt("int_timeout_llu")); + .setTimeoutLlu(rs.getInt("int_timeout_llu")) + .setSlotsRequired(rs.getInt("int_slots_required")); LayerStats.Builder statsBuilder = LayerStats.newBuilder() .setReservedCores(Convert.coreUnitsToCores(rs.getInt("int_cores"))) @@ -1710,9 +1713,9 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "host.int_cores_idle," + "host.int_mem," + "host.int_mem_idle," + "host.int_gpus," + "host.int_gpus_idle," + "host.int_gpu_mem," + "host.int_gpu_mem_idle," + "host.str_tags," + "host.str_lock_state," + "host.b_comment," - + "host.int_thread_mode," + "host_stat.str_os," + "host_stat.int_mem_total," - + "host_stat.int_mem_free," + "host_stat.int_swap_total," + "host_stat.int_swap_free," - + "host_stat.int_mcp_total," + "host_stat.int_mcp_free," + + "host.int_thread_mode," + "host.int_concurrent_slots_limit," + "host_stat.str_os," + + "host_stat.int_mem_total," + "host_stat.int_mem_free," + "host_stat.int_swap_total," + + "host_stat.int_swap_free," + "host_stat.int_mcp_total," + "host_stat.int_mcp_free," + "host_stat.int_gpu_mem_total," + "host_stat.int_gpu_mem_free," + "host_stat.int_load, " + "alloc.str_name AS alloc_name " + "FROM " + "alloc," + "facility, " + "host_stat," + "host " + "WHERE " + "host.pk_alloc = alloc.pk_alloc " diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 3eb1ad3f2..a32b0e8fc 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -392,7 +392,7 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { .setNumCores(proc.coresReserved).setNumGpus(proc.gpusReserved) .setStartTime(System.currentTimeMillis()).setIgnoreNimby(proc.isLocalDispatch) .setOs(proc.os).setSoftMemoryLimit(frame.softMemoryLimit).setLokiUrl(frame.lokiURL) - .setHardMemoryLimit(frame.hardMemoryLimit) + .setHardMemoryLimit(frame.hardMemoryLimit).setSlotsRequired(proc.slotsRequired) .putAllEnvironment(jobDao.getEnvironment(frame)) .putAllEnvironment(layerDao.getLayerEnvironment(frame)).putEnvironment("CUE3", "1") .putEnvironment("CUE_THREADS", String.valueOf(threads)) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 7bd73ef3d..86951fbe1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -191,7 +191,7 @@ public void handleHostReport(HostReport report, boolean isBoot) { rhost.getTotalSwap(), rhost.getFreeSwap(), rhost.getTotalMcp(), rhost.getFreeMcp(), rhost.getTotalGpuMem(), rhost.getFreeGpuMem(), rhost.getLoad(), new Timestamp(rhost.getBootTime() * 1000l), - rhost.getAttributesMap().get("SP_OS")); + rhost.getAttributesMap().get("SP_OS"), report.getFramesCount()); // Both logics are conflicting, only change hardware state if // there was no need for a tempDirStorage state change diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageHost.java b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageHost.java index 5732af62d..9ed1a25a2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageHost.java @@ -74,6 +74,8 @@ import com.imageworks.spcue.grpc.host.HostSetHardwareStateRequest; import com.imageworks.spcue.grpc.host.HostSetHardwareStateResponse; import com.imageworks.spcue.grpc.host.HostSetOsRequest; +import com.imageworks.spcue.grpc.host.HostSetConcurrentSlotsLimitRequest; +import com.imageworks.spcue.grpc.host.HostSetConcurrentSlotsLimitResponse; import com.imageworks.spcue.grpc.host.HostSetOsResponse; import com.imageworks.spcue.grpc.host.HostSetThreadModeRequest; import com.imageworks.spcue.grpc.host.HostSetThreadModeResponse; @@ -323,6 +325,15 @@ public void setOs(HostSetOsRequest request, responseObserver.onCompleted(); } + @Override + public void setConcurrentSlotsLimit(HostSetConcurrentSlotsLimitRequest request, + StreamObserver responseObserver) { + HostInterface host = getHostInterface(request.getHost()); + hostManager.setConcurrentSlotsLimit(host, request.getLimit()); + responseObserver.onNext(HostSetConcurrentSlotsLimitResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + public HostManager getHostManager() { return hostManager; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageLayer.java b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageLayer.java index 0e19a2e43..21f64d9b3 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageLayer.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageLayer.java @@ -110,6 +110,8 @@ import com.imageworks.spcue.grpc.job.LayerSetTimeoutResponse; import com.imageworks.spcue.grpc.job.LayerSetTimeoutLLURequest; import com.imageworks.spcue.grpc.job.LayerSetTimeoutLLUResponse; +import com.imageworks.spcue.grpc.job.LayerSetSlotsRequiredRequest; +import com.imageworks.spcue.grpc.job.LayerSetSlotsRequiredResponse; import com.imageworks.spcue.grpc.job.LayerStaggerFramesRequest; import com.imageworks.spcue.grpc.job.LayerStaggerFramesResponse; import com.imageworks.spcue.grpc.limit.Limit; @@ -432,6 +434,15 @@ public void setTimeoutLLU(LayerSetTimeoutLLURequest request, } } + @Override + public void setSlotsRequired(LayerSetSlotsRequiredRequest request, + StreamObserver responseObserver) { + updateLayer(request.getLayer()); + jobManager.setLayerSlotsRequired(layer, request.getSlots()); + responseObserver.onNext(LayerSetSlotsRequiredResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + @Override public void addLimit(LayerAddLimitRequest request, StreamObserver responseObserver) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/RqdReportStatic.java b/cuebot/src/main/java/com/imageworks/spcue/servant/RqdReportStatic.java index 2a30fb2cd..ef044834d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/RqdReportStatic.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/RqdReportStatic.java @@ -12,11 +12,15 @@ import com.imageworks.spcue.grpc.report.RqdReportRunningFrameCompletionResponse; import com.imageworks.spcue.grpc.report.RqdReportStatusRequest; import com.imageworks.spcue.grpc.report.RqdReportStatusResponse; +import com.imageworks.spcue.grpc.report.RqdReportGetHostSlotsLimitRequest; +import com.imageworks.spcue.grpc.report.RqdReportGetHostSlotsLimitResponse; +import com.imageworks.spcue.service.HostManager; public class RqdReportStatic extends RqdReportInterfaceGrpc.RqdReportInterfaceImplBase { private FrameCompleteHandler frameCompleteHandler; private HostReportHandler hostReportHandler; + private HostManager hostManager; @SuppressWarnings("unused") @@ -44,6 +48,15 @@ public void reportStatus(RqdReportStatusRequest request, responseObserver.onCompleted(); } + @Override + public void getHostSlotsLimit(RqdReportGetHostSlotsLimitRequest request, + StreamObserver responseObserver) { + int slotsLimit = hostManager.getHostConcurrentSlotsLimit(request.getName()); + responseObserver.onNext( + RqdReportGetHostSlotsLimitResponse.newBuilder().setSlotsLimit(slotsLimit).build()); + responseObserver.onCompleted(); + } + public FrameCompleteHandler getFrameCompleteHandler() { return frameCompleteHandler; } @@ -59,4 +72,12 @@ public HostReportHandler getHostReportHandler() { public void setHostReportHandler(HostReportHandler hostReportHandler) { this.hostReportHandler = hostReportHandler; } + + public HostManager getHostManager() { + return hostManager; + } + + public void setHostManager(HostManager hostManager) { + this.hostManager = hostManager; + } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index 76d5282b2..14cdd75a1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -67,6 +67,22 @@ public interface HostManager { */ void setHostFreeTempDir(HostInterface host, Long freeTempDir); + /** + * Updates the concurrent procs limit of a host. + * + * @param host HostInterface + * @param limit int + */ + void setConcurrentSlotsLimit(HostInterface host, int limit); + + /** + * Gets the concurrent slots limit of a host by hostname. + * + * @param hostname String + * @return int the concurrent slots limit + */ + int getHostConcurrentSlotsLimit(String hostname); + DispatchHost createHost(HostReport report); DispatchHost createHost(RenderHost host); @@ -120,7 +136,7 @@ public interface HostManager { */ void setHostStatistics(HostInterface host, long totalMemory, long freeMemory, long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, long freeGpuMemory, - int load, Timestamp bootTime, String os); + int load, Timestamp bootTime, String os, int runningProcs); void deleteHost(HostInterface host); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index 1432f7169..3d0dad129 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -94,6 +94,17 @@ public void setHostFreeTempDir(HostInterface host, Long freeTempDir) { hostDao.updateHostFreeTempDir(host, freeTempDir); } + @Override + public void setConcurrentSlotsLimit(HostInterface host, int limit) { + hostDao.updateConcurrentSlotsLimit(host, limit); + } + + @Override + @Transactional(propagation = Propagation.REQUIRED, readOnly = true) + public int getHostConcurrentSlotsLimit(String hostname) { + return hostDao.getHostConcurrentSlotsLimit(hostname); + } + public void rebootWhenIdle(HostInterface host) { try { hostDao.updateHostState(host, HardwareState.REBOOT_WHEN_IDLE); @@ -116,10 +127,10 @@ public void rebootNow(HostInterface host) { @Override public void setHostStatistics(HostInterface host, long totalMemory, long freeMemory, long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, - long freeGpuMemory, int load, Timestamp bootTime, String os) { + long freeGpuMemory, int load, Timestamp bootTime, String os, int runningProcs) { hostDao.updateHostStats(host, totalMemory, freeMemory, totalSwap, freeSwap, totalMcp, - freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os); + freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os, runningProcs); } @Transactional(propagation = Propagation.SUPPORTS, readOnly = true) diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobManager.java index 4641b8e82..51d6e9548 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobManager.java @@ -455,6 +455,14 @@ public interface JobManager { */ void setLayerMinGpus(LayerInterface layer, int gpuUnits); + /** + * Sets the slots required for a layer. + * + * @param layer the layer to update + * @param slots the number of slots required + */ + void setLayerSlotsRequired(LayerInterface layer, int slots); + /** * Add a limit to the given layer. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java index 03bc765b4..0904689fb 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java @@ -456,6 +456,11 @@ public void setLayerMinGpus(LayerInterface layer, int gpu) { layerDao.updateLayerMinGpus(layer, gpu); } + @Override + public void setLayerSlotsRequired(LayerInterface layer, int slots) { + layerDao.updateLayerSlotsRequired(layer, slots); + } + @Override public void setLayerMaxGpus(LayerInterface layer, int gpu) { layerDao.updateLayerMaxGpus(layer, gpu); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java index d4ff4e6b5..bed7fa903 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java @@ -444,6 +444,10 @@ private void handleLayerTags(BuildableJob buildableJob, Element jobTag) { layer.timeout_llu = Integer.parseInt(layerTag.getChildTextTrim("timeout_llu")); } + if (layerTag.getChildTextTrim("slots_required") != null) { + layer.slotsRequired = Integer.parseInt(layerTag.getChildTextTrim("slots_required")); + } + /* * Handle the layer environment */ diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V35__Add_host_frame_limit.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V35__Add_host_frame_limit.sql new file mode 100644 index 000000000..e42d873f8 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V35__Add_host_frame_limit.sql @@ -0,0 +1,7 @@ +-- Add a field to limit the max amount of concurrent frales a host can run +-- -1 means no limit +alter table host + add int_concurrent_slots_limit INT NOT NULL DEFAULT -1; + +alter table host_stat + add int_running_procs INT NOT NULL DEFAULT 0; diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V36__Add_layer_slots_required.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V36__Add_layer_slots_required.sql new file mode 100644 index 000000000..2172118dc --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V36__Add_layer_slots_required.sql @@ -0,0 +1,4 @@ +-- Add a field to mark a layer as requiring at least a specific number of slots +-- <= 0 means slots are not required +alter table layer + add int_slots_required INT NOT NULL DEFAULT 0; diff --git a/cuebot/src/main/resources/conf/spring/applicationContext-grpc.xml b/cuebot/src/main/resources/conf/spring/applicationContext-grpc.xml index 612aeaff5..86293e8ac 100644 --- a/cuebot/src/main/resources/conf/spring/applicationContext-grpc.xml +++ b/cuebot/src/main/resources/conf/spring/applicationContext-grpc.xml @@ -1,4 +1,4 @@ - + - - - + @@ -35,7 +35,11 @@ - + @@ -53,7 +57,11 @@ - + @@ -160,7 +168,11 @@ - + @@ -169,7 +181,11 @@ - + @@ -184,7 +200,11 @@ - + @@ -193,12 +213,21 @@ - + + - + diff --git a/cuebot/src/main/resources/public/dtd/cjsl-1.16.dtd b/cuebot/src/main/resources/public/dtd/cjsl-1.16.dtd new file mode 100644 index 000000000..20ebf19c2 --- /dev/null +++ b/cuebot/src/main/resources/public/dtd/cjsl-1.16.dtd @@ -0,0 +1,106 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java index 77f0b2799..6c66e26a6 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java @@ -384,7 +384,8 @@ public void updateHostStats() { DispatchHost dispatchHost = hostDao.findDispatchHost(TEST_HOST); hostDao.updateHostStats(dispatchHost, CueUtil.GB8, CueUtil.GB8, CueUtil.GB8, CueUtil.GB8, - CueUtil.GB8, CueUtil.GB8, 1, 1, 100, new Timestamp(1247526000 * 1000l), "spinux1"); + CueUtil.GB8, CueUtil.GB8, 1, 1, 100, new Timestamp(1247526000 * 1000l), "spinux1", + 2); Map result = jdbcTemplate .queryForMap("SELECT * FROM host_stat WHERE pk_host=?", dispatchHost.getHostId()); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/service/JobSpecTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/service/JobSpecTests.java index 69057bb83..533feb1c5 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/service/JobSpecTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/service/JobSpecTests.java @@ -120,4 +120,26 @@ public void testParseMaxCoresAndMaxGpus() { assertEquals(job.maxGpusOverride, Integer.valueOf(42)); } + @Test + public void testParseSlotsRequired() { + String xml = readJobSpec("jobspec_1_16.xml"); + JobSpec spec = jobLauncher.parse(xml); + assertEquals(spec.getDoc().getDocType().getPublicID(), "SPI Cue Specification Language"); + assertEquals(spec.getDoc().getDocType().getSystemID(), + "http://localhost:8080/spcue/dtd/cjsl-1.16.dtd"); + assertEquals(spec.getJobs().size(), 1); + BuildableJob job = spec.getJobs().get(0); + assertEquals(job.getBuildableLayers().size(), 2); + + // First layer uses slot-based booking + LayerDetail slotBasedLayer = job.getBuildableLayers().get(0).layerDetail; + assertEquals(slotBasedLayer.name, "slot_based_layer"); + assertEquals(slotBasedLayer.slotsRequired, 4); + + // Second layer uses regular resource booking (default slots_required = 0) + LayerDetail regularLayer = job.getBuildableLayers().get(1).layerDetail; + assertEquals(regularLayer.name, "regular_layer"); + assertEquals(regularLayer.slotsRequired, 0); + } + } diff --git a/cuebot/src/test/resources/conf/dtd/cjsl-1.16.dtd b/cuebot/src/test/resources/conf/dtd/cjsl-1.16.dtd new file mode 100644 index 000000000..20ebf19c2 --- /dev/null +++ b/cuebot/src/test/resources/conf/dtd/cjsl-1.16.dtd @@ -0,0 +1,106 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_1_16.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_1_16.xml new file mode 100644 index 000000000..6e05da5bb --- /dev/null +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_1_16.xml @@ -0,0 +1,58 @@ + + + + + local + testing + default + testuser + 9860 + + + False + 2 + 420 + 42 + False + + + + echo "Using slot-based booking" + 1-10 + 1 + 4 + + + shell + + + + echo "Using regular resource booking" + 1-5 + 1 + 100 + 2048 + + + shell + + + + + + diff --git a/cuegui/cuegui/HostMonitorTree.py b/cuegui/cuegui/HostMonitorTree.py index 207126214..c75790e2b 100644 --- a/cuegui/cuegui/HostMonitorTree.py +++ b/cuegui/cuegui/HostMonitorTree.py @@ -159,6 +159,15 @@ def __init__(self, parent): data=lambda host: ",".join(host.data.tags), tip="The tags applied to the host.\n\n" "On a frame it is the name of the job.") + self.addColumn("Concurrent Slots", 50, id=23, + data=lambda host: \ + host.data.concurrent_slots_limit \ + if host.data.concurrent_slots_limit >= 0 \ + else "-", + tip="When >0 the host is configured to be slot based.\n" + "The host can only run this amount of slots at the same time " + "(Usually: 1 frame = 1 slot)\n\n" + "This host will only run layers with a slots_required field configured.") self.hostSearch = opencue.search.HostSearch() @@ -290,6 +299,7 @@ def contextMenuEvent(self, e): self.__menuActions.hosts().addAction(menu, "removeTags") self.__menuActions.hosts().addAction(menu, "renameTag") self.__menuActions.hosts().addAction(menu, "changeAllocation") + self.__menuActions.hosts().addAction(menu, "setConcurrentSlotsLimit") self.__menuActions.hosts().addAction(menu, "delete") self.__menuActions.hosts().addAction(menu, "rebootWhenIdle") self.__menuActions.hosts().addAction(menu, "setRepair") diff --git a/cuegui/cuegui/LayerDialog.py b/cuegui/cuegui/LayerDialog.py index d95cb5367..10fc7545f 100644 --- a/cuegui/cuegui/LayerDialog.py +++ b/cuegui/cuegui/LayerDialog.py @@ -170,6 +170,12 @@ def __init__(self, layers, parent=None): self.__timeout_llu.setSuffix(" minutes") self.__timeout_llu.setSpecialValueText("No timeout") + # Slots Required + self.__slots_required = QtWidgets.QSpinBox(self) + self.__slots_required.setRange(0, int(self._cfg().get('max_cores', 16))) + self.__slots_required.setSingleStep(1) + self.__slots_required.setSpecialValueText("Not slot-based") + # Memory Optimizer self.__mem_opt = QtWidgets.QCheckBox() self.__mem_opt.setChecked(self.getMemoryOptSetting()) @@ -228,6 +234,7 @@ def __init__(self, layers, parent=None): self.__max_gpus.setValue(self.getMaxGpus()) self.__timeout.setValue(self.getTimeout()) self.__timeout_llu.setValue(self.getTimeoutLLU()) + self.__slots_required.setValue(self.getSlotsRequired()) QtWidgets.QVBoxLayout(self) @@ -272,6 +279,10 @@ def __init__(self, layers, parent=None): self.__timeout_llu, False), multiSelect)) + layout.addWidget(EnableableItem(LayerPropertiesItem("Slots Required:", + self.__slots_required, + False), + multiSelect)) layout.addStretch() self.__group.setLayout(layout) @@ -336,6 +347,8 @@ def apply(self): layer.setTimeout(self.__timeout.value()) if self.__timeout_llu.isEnabled(): layer.setTimeoutLLU(self.__timeout_llu.value()) + if self.__slots_required.isEnabled(): + layer.setSlotsRequired(self.__slots_required.value()) if self.__tags.isEnabled(): self.__tags.apply() if self.__limits.isEnabled(): @@ -421,6 +434,14 @@ def getMemoryOptSetting(self): break return result + def getSlotsRequired(self): + """Gets the layer slots required.""" + result = 0 + for layer in self.__layers: + if layer.data.slots_required > result: + result = layer.data.slots_required + return result + def __translateToMemSpinbox(self, value): self.__mem.spinner.setValue(float(value) / 1024.0) diff --git a/cuegui/cuegui/LayerMonitorTree.py b/cuegui/cuegui/LayerMonitorTree.py index 1d2ec0250..d5799506d 100644 --- a/cuegui/cuegui/LayerMonitorTree.py +++ b/cuegui/cuegui/LayerMonitorTree.py @@ -150,6 +150,11 @@ def __init__(self, parent): data=lambda layer: cuegui.Utils.secondsToHHHMM(layer.data.timeout_llu*60), sort=lambda layer: layer.data.timeout_llu, tip="Timeout for a frames\' LLU, Hours:Minutes") + self.addColumn("Slots Required", 65, id=23, + data=lambda layer: "-" if layer.data.slots_required <= 0 else str(layer.data.slots_required), + sort=lambda layer: layer.data.slots_required, + tip="Number of slots required per frame\n" + "(- means not slot-based)") cuegui.AbstractTreeWidget.AbstractTreeWidget.__init__(self, parent) # pylint: disable=no-member diff --git a/cuegui/cuegui/MenuActions.py b/cuegui/cuegui/MenuActions.py index fcdae5d41..e2c0e5fc9 100644 --- a/cuegui/cuegui/MenuActions.py +++ b/cuegui/cuegui/MenuActions.py @@ -1936,6 +1936,42 @@ def setThreadModeVariable(self, rpcObjects=None): host.setThreadMode("VARIABLE") self._update() + setConcurrentSlotsLimit_info = ["Update Slot Limit...", None, "configure"] + def setConcurrentSlotsLimit(self, rpcObjects=None): + """Set the concurrent slots limit for selected hosts.""" + hosts = self._getOnlyHostObjects(rpcObjects) + if not hosts: + return + + # Get current value from first selected host + current = hosts[0].concurrentSlotsLimit() if len(hosts) == 1 else 0 + + title = "Set Concurrent Slots Limit" + body = "Enter maximum concurrent slots \n(usually a frame consumes 1 slot, " \ + "the value can be configured on its layer's slot_required field)\n" \ + "When a limit is defined, booking will only allocate layers with " \ + "slots_required > 0 to be executed on this host. \n" \ + "Which means regular booking by cores/memory/gpu becomes disabled.\n\n" \ + "(0 for no limit, >0 for specific limit):" + + (value, choice) = QtWidgets.QInputDialog.getInt( + self._caller, + title, + body, + current, # current value + 0, # minimum value + 10000, # maximum value + 1, # step + ) + + if choice: + for host in hosts: + self.cuebotCall( + host.setConcurrentSlotsLimit, + "Set Concurrent Slots Limit on %s Failed" % host.data.name, + int(value), + ) + self._update() class ProcActions(AbstractActions): """Actions for procs.""" diff --git a/proto/src/host.proto b/proto/src/host.proto index b321fdec1..296b34cfe 100644 --- a/proto/src/host.proto +++ b/proto/src/host.proto @@ -95,6 +95,9 @@ service HostInterface { // Changes the host's [ThreadMode] rpc SetThreadMode(HostSetThreadModeRequest) returns (HostSetThreadModeResponse); + + // Set the maximum concurrent procs limit for the host. + rpc SetConcurrentSlotsLimit(HostSetConcurrentSlotsLimitRequest) returns (HostSetConcurrentSlotsLimitResponse); // Unlocks the host for booking if the proc is in the Locked state. You cannot unlock a NimbyLocked proc. rpc Unlock(HostUnlockRequest) returns (HostUnlockResponse); @@ -273,7 +276,12 @@ message Host { LockState lock_state = 26; ThreadMode thread_mode = 27; float gpus = 28; - float idle_gpus = 29; + float idle_gpus = 29; + + // When a limit is defined, booking will only allocate layers with slots_required > 0 to be + // executed on this host. Which means regular booking by cores/memory/gpu becomes disabled. + // (0 for no limit, >0 for specific limit) + int32 concurrent_slots_limit = 30; } message HostSearchCriteria { @@ -321,6 +329,7 @@ message NestedHost { NestedProcSeq procs = 28; float gpus = 29; float idle_gpus = 30; + int32 concurrent_slots_limit = 31; } message NestedHostSeq { @@ -636,6 +645,14 @@ message HostSetThreadModeRequest { message HostSetThreadModeResponse {} // Empty +// SetConcurrentSlotsLimit +message HostSetConcurrentSlotsLimitRequest { + Host host = 1; + int32 limit = 2; +} + +message HostSetConcurrentSlotsLimitResponse {} // Empty + // Unlock message HostUnlockRequest { Host host = 1; diff --git a/proto/src/job.proto b/proto/src/job.proto index 4c76308fa..9c5756e78 100644 --- a/proto/src/job.proto +++ b/proto/src/job.proto @@ -392,6 +392,9 @@ service LayerInterface { // Set whether the LLU timeout for frames in the layer rpc SetTimeoutLLU(LayerSetTimeoutLLURequest) returns (LayerSetTimeoutLLUResponse); + // Set the number of slots required per frame for this layer + rpc SetSlotsRequired(LayerSetSlotsRequiredRequest) returns (LayerSetSlotsRequiredResponse); + // Staggers the specified frame range. rpc StaggerFrames(LayerStaggerFramesRequest) returns (LayerStaggerFramesResponse); } @@ -714,6 +717,8 @@ message Layer { float min_gpus = 20; float max_gpus = 21; string command = 22; + // Number of slots required per frame (<= 0 means not slot-based) + int32 slots_required = 23; } message LayerSeq { @@ -1794,6 +1799,14 @@ message LayerSetTimeoutLLURequest { message LayerSetTimeoutLLUResponse {} // Empty +// SetSlotsRequired +// +message LayerSetSlotsRequiredRequest { + Layer layer = 1; + int32 slots = 2; +} + +message LayerSetSlotsRequiredResponse {} // Empty // StaggerFrames message LayerStaggerFramesRequest { diff --git a/proto/src/report.proto b/proto/src/report.proto index 6ace5708e..806ec9973 100644 --- a/proto/src/report.proto +++ b/proto/src/report.proto @@ -23,6 +23,9 @@ service RqdReportInterface { // An incremental status report sent by RQD rpc ReportStatus(RqdReportStatusRequest) returns (RqdReportStatusResponse); + + // Get the host's slot limit + rpc GetHostSlotsLimit(RqdReportGetHostSlotsLimitRequest) returns (RqdReportGetHostSlotsLimitResponse); } @@ -180,3 +183,12 @@ message RqdReportStatusRequest { HostReport host_report = 1; } message RqdReportStatusResponse {} // Empty + +// GetHostSlotsLimit +message RqdReportGetHostSlotsLimitRequest { + string name = 1; +} + +message RqdReportGetHostSlotsLimitResponse { + int64 slots_limit = 1; +} diff --git a/proto/src/rqd.proto b/proto/src/rqd.proto index 621a13212..2151ad9d6 100644 --- a/proto/src/rqd.proto +++ b/proto/src/rqd.proto @@ -126,6 +126,8 @@ message RunFrame { int64 hard_memory_limit = 27; int32 pid = 28; string loki_url = 29; + // Number of slots required per frame (<= 0 means not slot-based) + int32 slots_required = 30; } message RunFrameSeq { diff --git a/pycue/opencue/wrappers/host.py b/pycue/opencue/wrappers/host.py index c6fda1e87..de47308a5 100644 --- a/pycue/opencue/wrappers/host.py +++ b/pycue/opencue/wrappers/host.py @@ -128,6 +128,11 @@ def reboot(self): """Causes the host to kill all running frames and reboot the machine.""" self.stub.Reboot(host_pb2.HostRebootRequest(host=self.data), timeout=Cuebot.Timeout) + def setConcurrentSlotsLimit(self, limit): + """Set the concurrent slots limit for selected hosts.""" + self.stub.SetConcurrentSlotsLimit(host_pb2.HostSetConcurrentSlotsLimitRequest( + host=self.data, limit=limit), timeout=Cuebot.Timeout) + def addTags(self, tags): """Adds tags to a host. @@ -633,6 +638,15 @@ def os(self): """ return self.data.os + def concurrentSlotsLimit(self): + """Returns the limit of slots this host can run concurrently. + + :rtype: int + :return: the concurrent slots limit (0 = no limit) + """ + return self.data.concurrent_slots_limit + + class NestedHost(Host): """This class contains information and actions related to a nested host.""" diff --git a/pycue/opencue/wrappers/layer.py b/pycue/opencue/wrappers/layer.py index 8b3dfdf1b..ba6e61bb2 100644 --- a/pycue/opencue/wrappers/layer.py +++ b/pycue/opencue/wrappers/layer.py @@ -185,6 +185,17 @@ def setMinMemory(self, memory): job_pb2.LayerSetMinMemoryRequest(layer=self.data, memory=memory), timeout=Cuebot.Timeout) + def setSlotsRequired(self, slots): + """Sets the number of slots required per frame for this layer. + + :type slots: int + :param slots: Number of slots required (<=0 disables slot-based booking) + """ + return self.stub.SetSlotsRequired( + job_pb2.LayerSetSlotsRequiredRequest(layer=self.data, slots=slots), + timeout=Cuebot.Timeout, + ) + def setThreadable(self, threadable): """Sets the threadable field. @@ -457,6 +468,14 @@ def minMemory(self): """ return self.data.min_memory + def slotsRequired(self): + """Returns the number of slots required per frame. + + :rtype: int + :return: Number of slots required (<0 means not slot-based) + """ + return self.data.slots_required + def limits(self): """Returns the limit names for this layer. diff --git a/pycue/tests/wrappers/test_layer.py b/pycue/tests/wrappers/test_layer.py index 7adb7b11b..97c04d410 100644 --- a/pycue/tests/wrappers/test_layer.py +++ b/pycue/tests/wrappers/test_layer.py @@ -240,6 +240,20 @@ def testSetMinMemory(self, getStubMock): job_pb2.LayerSetMinMemoryRequest(layer=layer.data, memory=memory), timeout=mock.ANY) + def testSetSlotsRequired(self, getStubMock): + stubMock = mock.Mock() + stubMock.SetSlotsRequired.return_value = job_pb2.LayerSetSlotsRequiredResponse() + getStubMock.return_value = stubMock + + slots = 4 + layer = opencue.wrappers.layer.Layer(job_pb2.Layer(name=TEST_LAYER_NAME)) + layer.setSlotsRequired(slots) + + stubMock.SetSlotsRequired.assert_called_with( + job_pb2.LayerSetSlotsRequiredRequest(layer=layer.data, slots=slots), + timeout=mock.ANY, + ) + def testSetThreadable(self, getStubMock): stubMock = mock.Mock() stubMock.SetThreadable.return_value = job_pb2.LayerSetThreadableResponse() diff --git a/pyoutline/outline/backend/cue.py b/pyoutline/outline/backend/cue.py index ce88f3692..f6d6db991 100644 --- a/pyoutline/outline/backend/cue.py +++ b/pyoutline/outline/backend/cue.py @@ -374,6 +374,16 @@ def _serialize(launcher, use_pycuerun): else: _warning_spec_version(spec_version, "timeout_llu") + if layer.get_arg("slots_required"): + if spec_version >= Version("1.16"): + sub_element( + spec_layer, + "slots_required", + "%s" % (layer.get_arg("slots_required")), + ) + else: + _warning_spec_version(spec_version, "slots_required") + if os.environ.get("OL_TAG_OVERRIDE", False): sub_element(spec_layer, "tags", scrub_tags(os.environ["OL_TAG_OVERRIDE"])) diff --git a/pyoutline/outline/layer.py b/pyoutline/outline/layer.py index 856850ba7..d2eb23d31 100644 --- a/pyoutline/outline/layer.py +++ b/pyoutline/outline/layer.py @@ -15,25 +15,22 @@ """Base classes for all outline modules.""" -from __future__ import annotations -from __future__ import absolute_import -from __future__ import print_function -from __future__ import division +from __future__ import absolute_import, annotations, division, print_function +import logging import os import sys -import logging import tempfile from typing import ( - TypedDict, - List, - Optional, + Any, Callable, Dict, - Any, - Union, - Tuple, + List, + Optional, Set, + Tuple, + TypedDict, + Union, ) import FileSequence @@ -47,9 +44,9 @@ import outline.util if sys.version_info >= (3, 12): - from typing import override, Unpack + from typing import Unpack, override else: - from typing_extensions import override, Unpack + from typing_extensions import Unpack, override __all__ = [ "Layer", @@ -110,6 +107,8 @@ class _LayerArgs(TypedDict, total=False): # timeout_llu: Timeout for long last update in seconds # before considering a frame hung timeout_llu: int + # slots_required: Number of slots required per frame (<0 means not slot-based) + slots_required: int type: outline.constants.LayerType # The layer type (Render, Util, Post) @@ -1294,11 +1293,15 @@ class LayerPostProcess(Frame): the parent and the post process. """ - def __init__(self, creator: Layer, propigate: bool = True, **args: Unpack[_LayerArgs]) -> None: + def __init__( + self, creator: Layer, propigate: bool = True, **args: Unpack[_LayerArgs] + ) -> None: super().__init__(f"{creator.get_name()}_postprocess", **args) self.__creator = creator - self.depend_on(creator, outline.depend.DependType.LayerOnLayer, propigate=propigate) + self.depend_on( + creator, outline.depend.DependType.LayerOnLayer, propigate=propigate + ) self.set_type(outline.constants.LayerType.UTIL) diff --git a/rust/crates/dummy-cuebot/src/report_servant.rs b/rust/crates/dummy-cuebot/src/report_servant.rs index 1590dcb79..9eb290338 100644 --- a/rust/crates/dummy-cuebot/src/report_servant.rs +++ b/rust/crates/dummy-cuebot/src/report_servant.rs @@ -19,6 +19,9 @@ use opencue_proto::report::{ RqdReportRqdStartupResponse, RqdReportRunningFrameCompletionRequest, RqdReportRunningFrameCompletionResponse, RqdReportStatusRequest, RqdReportStatusResponse, }; +use opencue_proto::report::{ + RqdReportGetHostSlotsLimitRequest, RqdReportGetHostSlotsLimitResponse, +}; use tonic::transport::Server; use tonic::{async_trait, Request, Response, Status}; @@ -64,6 +67,23 @@ impl RqdReportInterface for ReportServant { Ok(Response::new(RqdReportStatusResponse {})) } + + /// Get the host's slot limit + async fn get_host_slots_limit( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + let name = request.into_inner().name; + println!( + "RqdReport: Received a get_host_slots_limit request with: {:?}", + name + ); + + Ok(Response::new(RqdReportGetHostSlotsLimitResponse { + slots_limit: -1, + })) + } } pub struct DummyCuebotServer {} diff --git a/rust/crates/dummy-cuebot/src/rqd_client.rs b/rust/crates/dummy-cuebot/src/rqd_client.rs index ab3ce01b4..bd9acdfd4 100644 --- a/rust/crates/dummy-cuebot/src/rqd_client.rs +++ b/rust/crates/dummy-cuebot/src/rqd_client.rs @@ -82,6 +82,8 @@ impl DummyRqdClient { #[allow(deprecated)] start_time: 0, + + slots_required: 0, }; let mut client = self.client.lock().await; diff --git a/rust/crates/rqd/src/frame/manager.rs b/rust/crates/rqd/src/frame/manager.rs index 811efebd1..d4cca5101 100644 --- a/rust/crates/rqd/src/frame/manager.rs +++ b/rust/crates/rqd/src/frame/manager.rs @@ -80,6 +80,8 @@ impl FrameManager { self.validate_grpc_frame(&run_frame)?; self.validate_machine_state(run_frame.ignore_nimby).await?; + let resource_id = run_frame.resource_id(); + // Create user if required. uid and gid ranges have already been verified let uid = match run_frame.uid_optional.as_ref().map(|o| match o { run_frame::UidOptional::Uid(v) => *v as u32, @@ -97,39 +99,13 @@ impl FrameManager { None => CONFIG.runner.default_uid, }; - // **Attention**: If an error happens between here and spawning a frame, the resources - // reserved need to be released. - - let num_cores = (run_frame.num_cores as u32).div_ceil(CONFIG.machine.core_multiplier); - - // Reserving cores will always yield a list of reserved thread_ids. If hyperthreading is off, - // the list should be ignored - let thread_ids = self - .machine - .reserve_cores(Either::Left(num_cores as usize), run_frame.resource_id()) - .await - .map_err(|err| { - FrameManagerError::Aborted(format!( - "Not launching, failed to reserve cpu resources {:?}", - err - )) - })?; // Although num_gpus is not required on a frame, the field is not optional on the proto // layer. =0 means None, !=0 means Some let gpu_list = match run_frame.num_gpus { 0 => None, _ => { + // TODO: Release GPUs in case of error when GPU support gets implemented let reserved_res = self.machine.reserve_gpus(run_frame.num_gpus as u32).await; - if reserved_res.is_err() { - // Release cores reserved on the last step - if let Err(err) = self.machine.release_cores(&run_frame.resource_id()).await { - warn!( - "Failed to release cores reserved for {} during gpu reservation failure. {}", - &run_frame.resource_id(), - err - ) - }; - } Some(reserved_res.map_err(|err| { FrameManagerError::Aborted(format!( "Not launching, insufficient resources {:?}", @@ -145,27 +121,90 @@ impl FrameManager { .environment .get("CUE_THREADABLE") .is_some_and(|v| v == "1"); - // Ignore the list of allocated threads if hyperthreading is off - let thread_ids = hyperthreaded.then_some(thread_ids); - let resource_id = run_frame.resource_id(); - let running_frame = Arc::new(RunningFrame::init( - run_frame, - uid, - CONFIG.runner.clone(), - thread_ids, - gpu_list, - self.machine.get_host_name().await, - )); + let slot_based_booking = self.machine.is_slot_configured().await; + // Keep track of reserved slots, if any + let mut reserved_slots = 0; + + let running_frame = match slot_based_booking { + // Core based booking + false => { + // **Attention**: If an error happens between here and spawning a frame, the resources + // reserved need to be released. + let num_cores = + (run_frame.num_cores as u32).div_ceil(CONFIG.machine.core_multiplier); + + // Reserving cores will always yield a list of reserved thread_ids. If hyperthreading is off, + // the list should be ignored + let thread_ids = self + .machine + .reserve_cores(Either::Left(num_cores as usize), run_frame.resource_id()) + .await + .map_err(|err| { + FrameManagerError::Aborted(format!( + "Not launching, failed to reserve cpu resources {:?}", + err + )) + })?; + // Ignore the list of allocated threads if hyperthreading is off + let thread_ids = hyperthreaded.then_some(thread_ids); + + Arc::new(RunningFrame::init( + run_frame, + uid, + CONFIG.runner.clone(), + thread_ids, + gpu_list, + self.machine.get_host_name(), + )) + } + // Slot based booking + true => { + reserved_slots = if run_frame.slots_required > 0 { + run_frame.slots_required as u32 + } else { + Err(FrameManagerError::InvalidArgument( + "Core based frame cannot be launched on a slot configured host".to_string(), + ))? + }; + self.machine + .reserve_slots(reserved_slots) + .await + .map_err(|err| { + FrameManagerError::Aborted(format!( + "Not launching, failed to reserve {:} slots {:?}", + run_frame.slots_required, err + )) + })?; + + Arc::new(RunningFrame::init( + run_frame, + uid, + CONFIG.runner.clone(), + // Disable taskset to avoid binding this frame to specific threads + None, + gpu_list, + self.machine.get_host_name(), + )) + } + }; if cfg!(feature = "containerized_frames") && CONFIG.runner.run_on_docker { #[cfg(feature = "containerized_frames")] self.spawn_docker_frame(running_frame, false); } else if self.spawn_running_frame(running_frame, false).is_err() { - // Release cores reserved if spawning the frame failed - if let Err(err) = self.machine.release_cores(&resource_id).await { + let release_res = if slot_based_booking { + // Release slots reserved if spawning the frame failed + self.machine.release_slots(reserved_slots).await + } else { + // Release cores reserved if spawning the frame failed + self.machine.release_cores(&resource_id).await + }; + + // Log failure to release + if let Err(err) = release_res { warn!( - "Failed to release cores reserved for {} during spawn failure. {}", + "Failed to release resources reserved for {} during spawn failure. {}", &resource_id, err ); } @@ -207,43 +246,75 @@ impl FrameManager { }) .collect(); let mut errors = Vec::new(); + let slot_based_booking = self.machine.is_slot_configured().await; + for path in snapshot_dir { let running_frame = RunningFrame::from_snapshot(&path, CONFIG.runner.clone()) .await .map(Arc::new); match running_frame { Ok(running_frame) => { - // Update reservations. If a thread_ids list exists, the frame was booked using affinity - if let Err(err) = match &running_frame.thread_ids { - Some(thread_ids) => { - self.machine - .reserve_cores( - Either::Right(thread_ids.clone()), - running_frame.request.resource_id(), - ) - .await + let resource_id = running_frame.request.resource_id(); + let mut reserved_slots = 0; + + // Update reservations based on booking mode + if let Err(err) = match slot_based_booking { + // Core-based booking: If a thread_ids list exists, the frame was booked using affinity + false => { + match &running_frame.thread_ids { + Some(thread_ids) => { + self.machine + .reserve_cores( + Either::Right(thread_ids.clone()), + running_frame.request.resource_id(), + ) + .await + } + None => { + let num_cores = (running_frame.request.num_cores as u32) + .div_ceil(CONFIG.machine.core_multiplier); + self.machine + .reserve_cores( + Either::Left(num_cores as usize), + running_frame.request.resource_id(), + ) + .await + } + } + // Ignore reserved threads as they are no longer necessary + .map(|_| ()) } - None => { - let num_cores = (running_frame.request.num_cores as u32) - .div_ceil(CONFIG.machine.core_multiplier); - self.machine - .reserve_cores( - Either::Left(num_cores as usize), - running_frame.request.resource_id(), - ) - .await + // Slot-based booking + true => { + reserved_slots = if running_frame.request.slots_required > 0 { + running_frame.request.slots_required as u32 + } else { + errors.push(format!( + "Core based frame {} cannot be recovered on a slot configured host", + resource_id + )); + continue; + }; + self.machine.reserve_slots(reserved_slots).await } } { errors.push(err.to_string()); } - let resource_id = running_frame.request.resource_id(); if CONFIG.runner.run_on_docker { todo!("Recovering frames when running on docker is not yet supported") } else if self.spawn_running_frame(running_frame, true).is_err() { - if let Err(err) = self.machine.release_cores(&resource_id).await { + let release_res = if slot_based_booking { + // Release slots reserved if spawning the frame failed + self.machine.release_slots(reserved_slots).await + } else { + self.machine.release_cores(&resource_id).await + }; + + // Failed to release + if let Err(err) = release_res { warn!( - "Failed to release cores reserved for {} during recover spawn error. {}", + "Failed to release resources reserved for {} during recover spawn error. {}", &resource_id, err ); } diff --git a/rust/crates/rqd/src/frame/running_frame.rs b/rust/crates/rqd/src/frame/running_frame.rs index 396c589fd..79518014e 100644 --- a/rust/crates/rqd/src/frame/running_frame.rs +++ b/rust/crates/rqd/src/frame/running_frame.rs @@ -1359,6 +1359,7 @@ mod tests { #[allow(deprecated)] start_time: 0, + slots_required: 0, }, uid, config, diff --git a/rust/crates/rqd/src/report/report_client.rs b/rust/crates/rqd/src/report/report_client.rs index 31995e894..092cb9944 100644 --- a/rust/crates/rqd/src/report/report_client.rs +++ b/rust/crates/rqd/src/report/report_client.rs @@ -183,6 +183,7 @@ pub trait ReportInterface { run_time: u32, ) -> Result<()>; async fn send_host_report(&self, host_report: pb::HostReport) -> Result<()>; + async fn get_host_slots_limit(&self, name: String) -> Result>; } #[async_trait] @@ -242,4 +243,23 @@ impl ReportInterface for ReportClient { .into_diagnostic() .and(Ok(())) } + + async fn get_host_slots_limit(&self, name: String) -> Result> { + let request = pb::RqdReportGetHostSlotsLimitRequest { name }; + let slots_limit = self + .get_client() + .await? + .get_host_slots_limit(request) + .await + .into_diagnostic()? + .into_inner() + .slots_limit; + + // Host with limit <= 0 are running on core based booking mode, so they don't have a limit + if slots_limit > 0 { + Ok(Some(slots_limit as u32)) + } else { + Ok(None) + } + } } diff --git a/rust/crates/rqd/src/system/linux.rs b/rust/crates/rqd/src/system/linux.rs index 2098e2a94..b9aa57299 100644 --- a/rust/crates/rqd/src/system/linux.rs +++ b/rust/crates/rqd/src/system/linux.rs @@ -966,7 +966,7 @@ impl SystemManager for LinuxSystem { #[cfg(test)] mod tests { - use crate::config::{MachineConfig, MemoryMetric}; + use crate::config::MachineConfig; use std::fs; use std::{collections::HashMap, sync::Mutex}; diff --git a/rust/crates/rqd/src/system/machine.rs b/rust/crates/rqd/src/system/machine.rs index 14843c835..ab547b1a0 100644 --- a/rust/crates/rqd/src/system/machine.rs +++ b/rust/crates/rqd/src/system/machine.rs @@ -78,6 +78,11 @@ pub struct MachineMonitor { pub core_manager: Arc>, pub running_frames_cache: Arc, last_host_state: Arc>>, + // Host name is only written once at the beginning of start. After that it is only read. + // This makes it safe to have a sync lock to give the object mutability (it can't be + // initialized at init) but avoid unecessary awaits + host_name: std::sync::RwLock>, + slot_state: RwLock>, interrupt: Mutex>>, reboot_when_idle: Mutex, #[cfg(feature = "nimby")] @@ -86,6 +91,11 @@ pub struct MachineMonitor { nimby_state: RwLock, } +struct SlotState { + slot_limit: u32, + slots_consumed: u32, +} + static MACHINE_MONITOR: OnceCell> = OnceCell::const_new(); pub async fn instance() -> Result> { @@ -153,6 +163,7 @@ impl MachineMonitor { system_manager: Mutex::new(system_manager), running_frames_cache: RunningFrameCache::init(), last_host_state: Arc::new(RwLock::new(None)), + host_name: std::sync::RwLock::new(None), interrupt: Mutex::new(None), reboot_when_idle: Mutex::new(false), #[cfg(feature = "nimby")] @@ -160,6 +171,7 @@ impl MachineMonitor { #[cfg(feature = "nimby")] nimby_state: RwLock::new(LockState::Open), core_manager, + slot_state: RwLock::new(None), }) } @@ -177,6 +189,14 @@ impl MachineMonitor { core_manager.get_core_info_report(self.maching_config.core_multiplier) }; + // Write host_name to the object + { + self.host_name + .write() + .unwrap_or_else(|p| p.into_inner()) + .replace(host_state.name.clone()); + } + self.last_host_state .write() .await @@ -216,6 +236,7 @@ impl MachineMonitor { _ = interval.tick() => { self.collect_and_send_host_report().await?; self.check_reboot_flag().await; + self.check_host_state_on_server().await; #[cfg(feature = "nimby")] if let Some(nimby) = &*self.nimby { @@ -337,6 +358,29 @@ impl MachineMonitor { } } + async fn check_host_state_on_server(&self) { + let client = self.report_client.clone(); + + if let Some(slot_limit) = client + .get_host_slots_limit(self.get_host_name()) + .await + .ok() + .flatten() + { + let mut current_state = self.slot_state.write().await; + let slots_consumed = current_state + .as_ref() + .map(|s| s.slots_consumed) + .unwrap_or(0); + + // Replace limit but keep consumed count + current_state.replace(SlotState { + slot_limit, + slots_consumed, + }); + } + } + async fn monitor_running_frames(&self) -> Result<()> { let mut finished_frames: Vec> = Vec::new(); let mut running_frames: Vec<(Arc, RunningState)> = Vec::new(); @@ -499,7 +543,19 @@ impl MachineMonitor { let frame_report = frame.clone_into_running_frame_info(); info!("Sending frame complete report: {}", frame); - if let Err(err) = self.release_cores(&frame.request.resource_id()).await { + // Either release slots or cores, depending on whether it was configured with slots + if frame.request.slots_required > 0 { + if let Err(err) = self + .release_slots(frame.request.slots_required as u32) + .await + { + warn!( + "Failed to release cores reserved by {}: {}", + frame.request.resource_id(), + err + ); + }; + } else if let Err(err) = self.release_cores(&frame.request.resource_id()).await { warn!( "Failed to release cores reserved by {}: {}", frame.request.resource_id(), @@ -567,6 +623,7 @@ pub trait Machine { async fn hardware_state(&self) -> Option; async fn memory_usage(&self) -> Option<(u32, u64)>; async fn nimby_locked(&self) -> bool; + async fn is_slot_configured(&self) -> bool; /// Reserve CPU cores for a resource /// @@ -588,6 +645,40 @@ pub trait Machine { resource_id: Uuid, ) -> Result, ReservationError>; + /// Reserve slot units for a resource + /// + /// # Arguments + /// + /// * `requested_slots` - Number of slots to reserve + /// + /// # Returns + /// + /// Returns `Ok(())` if the slots were successfully reserved + /// + /// # Errors + /// + /// Returns `ReservationError` if: + /// * There are not enough available slots (`NotEnoughResourcesAvailable`) + /// * Slot reservation is not configured on this machine (`InvalidSlotReservationRequest`) + async fn reserve_slots(&self, requested_slots: u32) -> Result<(), ReservationError>; + + /// Release slot units previously reserved by a resource + /// + /// # Arguments + /// + /// * `requested_slots` - Number of slots to release + /// + /// # Returns + /// + /// Returns `Ok(())` if the slots were successfully released + /// + /// # Errors + /// + /// Returns `ReservationError` if: + /// * Attempting to release more slots than are currently consumed (`NotEnoughResourcesAvailable`) + /// * Slot reservation is not configured on this machine (`InvalidSlotReservationRequest`) + async fn release_slots(&self, requested_slots: u32) -> Result<(), ReservationError>; + /// Release CPU cores previously reserved by a resource /// /// # Arguments @@ -627,7 +718,17 @@ pub trait Machine { /// The user ID (uid) of the created or existing user async fn create_user_if_unexisting(&self, username: &str, uid: u32, gid: u32) -> Result; - async fn get_host_name(&self) -> String; + /// Returns the hostname of this machine + /// + /// The hostname is determined during the initial startup report and remains + /// constant throughout the machine's lifecycle. If the hostname hasn't been + /// initialized yet (which shouldn't happen in normal operation), returns + /// "noname" as a fallback. + /// + /// # Returns + /// + /// The machine's hostname as a String + fn get_host_name(&self) -> String; /// Send a signal to kill a process /// @@ -693,6 +794,10 @@ impl Machine for MachineMonitor { .unwrap_or(false) } + async fn is_slot_configured(&self) -> bool { + self.slot_state.read().await.as_ref().is_some() + } + async fn reserve_cores( &self, request: Either>, @@ -706,6 +811,38 @@ impl Machine for MachineMonitor { } } + async fn reserve_slots(&self, requested_slots: u32) -> Result<(), ReservationError> { + let mut slot_state = self.slot_state.write().await; + + match slot_state.as_mut() { + Some(slot_state) => { + if slot_state.slots_consumed + requested_slots < slot_state.slot_limit { + slot_state.slots_consumed += requested_slots; + Ok(()) + } else { + Err(ReservationError::NotEnoughResourcesAvailable) + } + } + None => Err(ReservationError::InvalidSlotReservationRequest), + } + } + + async fn release_slots(&self, released_slots: u32) -> Result<(), ReservationError> { + let mut slot_state = self.slot_state.write().await; + + match slot_state.as_mut() { + Some(slot_state) => { + if released_slots <= slot_state.slots_consumed { + slot_state.slots_consumed -= released_slots; + Ok(()) + } else { + Err(ReservationError::NotEnoughResourcesAvailable) + } + } + None => Err(ReservationError::InvalidSlotReservationRequest), + } + } + async fn release_cores(&self, resource_id: &Uuid) -> Result<(), ReservationError> { let mut core_manager = self.core_manager.write().await; core_manager.release_cores(resource_id).map(|_| ()) @@ -720,11 +857,11 @@ impl Machine for MachineMonitor { system.create_user_if_unexisting(username, uid, gid) } - async fn get_host_name(&self) -> String { - let lock = self.last_host_state.read().await; - - lock.as_ref() - .map(|h| h.name.clone()) + fn get_host_name(&self) -> String { + self.host_name + .read() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .clone() .unwrap_or("noname".to_string()) } diff --git a/rust/crates/rqd/src/system/manager.rs b/rust/crates/rqd/src/system/manager.rs index 4dda6f06f..b9516fb2e 100644 --- a/rust/crates/rqd/src/system/manager.rs +++ b/rust/crates/rqd/src/system/manager.rs @@ -70,6 +70,9 @@ pub enum ReservationError { #[error("Could not find core owner of this thread id")] CoreNotFoundForThread(Vec), + + #[error("Slot reservation requested when the host is configured to core based booking")] + InvalidSlotReservationRequest, } /// Represents attributes on a machine that should never change without restarting the diff --git a/rust/crates/rqd/src/system/oom.rs b/rust/crates/rqd/src/system/oom.rs index 2ca82f69e..d5a91a820 100644 --- a/rust/crates/rqd/src/system/oom.rs +++ b/rust/crates/rqd/src/system/oom.rs @@ -239,6 +239,7 @@ mod tests { log_dir_file: "".to_string(), #[allow(deprecated)] start_time: 0, + slots_required: 0, }, 1000, config, diff --git a/rust/crates/scheduler/src/dao/host_dao.rs b/rust/crates/scheduler/src/dao/host_dao.rs index 4d6d37fef..6663cd307 100644 --- a/rust/crates/scheduler/src/dao/host_dao.rs +++ b/rust/crates/scheduler/src/dao/host_dao.rs @@ -71,6 +71,8 @@ pub struct HostModel { // Number of cores available at the subscription of the show this host has been queried on int_alloc_available_cores: i64, ts_ping: DateTime, + int_concurrent_slots_limit: i64, + int_running_procs: i64, } impl From for Host { @@ -105,6 +107,9 @@ impl From for Host { alloc_id: parse_uuid(&val.pk_alloc), alloc_name: val.str_alloc_name, last_updated: val.ts_ping, + concurrent_slots_limit: (val.int_concurrent_slots_limit > 0) + .then_some(val.int_concurrent_slots_limit as u32), + running_procs_count: val.int_running_procs as u32, } } } @@ -130,7 +135,9 @@ SELECT DISTINCT s.int_burst - s.int_cores as int_alloc_available_cores, a.pk_alloc, a.str_name as str_alloc_name, - hs.ts_ping + hs.ts_ping, + h.int_concurrent_slots_limit, + hs.int_running_procs FROM host h INNER JOIN host_stat hs ON h.pk_host = hs.pk_host INNER JOIN alloc a ON h.pk_alloc = a.pk_alloc @@ -152,12 +159,14 @@ WHERE pk_host = $5 RETURNING int_cores_idle, int_mem_idle, int_gpus_idle, int_gpu_mem_idle, NOW() "#; -// This update is meant for testing environments where rqd is not constantly reporting -// host reports to Cuebot to get host_stats properly updated. +// ATTENTION: This update is meant for testing environments where rqd is not constantly reporting +// host reports to Cuebot to get host_stats properly updated. This is turned of by default and +// can be turned on by `host_cache.update_stat_on_book=true` static UPDATE_HOST_STAT: &str = r#" UPDATE host_stat SET int_mem_free = int_mem_free - $1, int_gpu_mem_free = int_gpu_mem_free - $2 + int_running_procs = int_running_procs + 1 WHERE pk_host = $3 "#; diff --git a/rust/crates/scheduler/src/dao/layer_dao.rs b/rust/crates/scheduler/src/dao/layer_dao.rs index 6d26fc8d1..ce1c1d6ba 100644 --- a/rust/crates/scheduler/src/dao/layer_dao.rs +++ b/rust/crates/scheduler/src/dao/layer_dao.rs @@ -56,6 +56,7 @@ pub struct DispatchLayerModel { pub b_threadable: bool, pub int_gpus_min: i64, pub int_gpu_mem_min: i64, + pub int_slots_required: i64, pub str_tags: String, } @@ -79,6 +80,7 @@ pub struct LayerWithFramesModel { pub b_threadable: bool, pub int_gpus_min: i64, pub int_gpu_mem_min: i64, + pub int_slots_required: i64, pub str_tags: String, // Frame fields (Optional - NULL when no frames match) @@ -132,13 +134,19 @@ impl DispatchLayer { ), mem_min: ByteSize::kb(layer.int_mem_min as u64), threadable: layer.b_threadable, - gpus_min: layer - .int_gpus_min - .try_into() - .expect("gpus_min should fit on a i32"), + gpus_min: CoreSize( + layer + .int_gpus_min + .try_into() + .expect("gpus_min should fit on a i32"), + ), gpu_mem_min: ByteSize::kb(layer.int_gpu_mem_min as u64), tags: layer.str_tags.split(" | ").map(|t| t.to_string()).collect(), frames: frames.into_iter().map(|f| f.into()).collect(), + slots_required: layer + .int_slots_required + .try_into() + .expect("int_slots_required should fit on a i32"), } } } @@ -218,6 +226,7 @@ SELECT DISTINCT l.b_threadable, l.int_gpus_min, l.int_gpu_mem_min, + l.int_slots_required, l.str_tags, l.int_dispatch_order, @@ -339,6 +348,7 @@ impl LayerDao { int_gpus_min: model.int_gpus_min, int_gpu_mem_min: model.int_gpu_mem_min, str_tags: model.str_tags.clone(), + int_slots_required: model.int_slots_required, }; // Extract frame data (if present) diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index 9feb8f288..434d62298 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -12,7 +12,6 @@ use actix::{Actor, ActorFutureExt, AsyncContext, Handler, ResponseActFuture, WrapFuture}; -use bytesize::ByteSize; use itertools::Itertools; use miette::IntoDiagnostic; use scc::{hash_map::OccupiedEntry, HashMap, HashSet}; @@ -34,9 +33,8 @@ use crate::{ cluster_key::{ClusterKey, Tag, TagType}, config::CONFIG, dao::HostDao, - host_cache::*, - host_cache::{messages::*, store}, - models::{CoreSize, Host}, + host_cache::{messages::*, store, *}, + models::{Host, ResourceRequest}, }; #[derive(Clone)] @@ -106,8 +104,7 @@ where facility_id, show_id, tags, - cores, - memory, + resource_request, validation, } = msg; @@ -116,7 +113,7 @@ where Box::pin( async move { let out = service - .check_out(facility_id, show_id, tags, cores, memory, validation) + .check_out(facility_id, show_id, tags, resource_request, validation) .await; if let Ok(host) = &out { debug!("Checked out {}", host.1); @@ -209,8 +206,7 @@ impl HostCacheService { facility_id: Uuid, show_id: Uuid, tags: Vec, - cores: CoreSize, - memory: ByteSize, + resource_request: ResourceRequest, validation: F, ) -> Result where @@ -236,9 +232,9 @@ impl HostCacheService { // fight for the same rows. .read_async(&cache_key, |_, cached_group| { if !cached_group.expired() { + // Checkout host from a group cached_group - // Checkout host from a group - .check_out(cores, memory, validation) + .check_out(resource_request, validation) .map(|host| (cache_key.clone(), host.clone())) .ok() } else { @@ -261,7 +257,7 @@ impl HostCacheService { .map_err(|err| HostCacheError::FailedToQueryHostCache(err.to_string()))?; let checked_out_host = group // Checkout host from a group - .check_out(cores, memory, validation) + .check_out(resource_request, validation) .map(|host| CheckedOutHost(cache_key.clone(), host.clone())); if let Ok(checked_out_host) = checked_out_host { diff --git a/rust/crates/scheduler/src/host_cache/cache.rs b/rust/crates/scheduler/src/host_cache/cache.rs index d41252d34..2942c704d 100644 --- a/rust/crates/scheduler/src/host_cache/cache.rs +++ b/rust/crates/scheduler/src/host_cache/cache.rs @@ -46,7 +46,7 @@ use uuid::Uuid; use crate::{ config::{HostBookingStrategy, CONFIG}, host_cache::{store::HOST_STORE, HostCacheError, HostId}, - models::{CoreSize, Host}, + models::{CoreSize, Host, ResourceRequest}, }; type CoreKey = u32; @@ -144,8 +144,7 @@ impl HostCache { /// /// # Arguments /// - /// * `cores` - Minimum number of cores required - /// * `memory` - Minimum memory required + /// * `resource_request` - The resource requirements (cores and memory, GPU, or unit) /// * `validation` - Function to validate additional host requirements /// /// # Returns @@ -154,8 +153,7 @@ impl HostCache { /// * `Err(HostCacheError)` - No suitable host available pub fn check_out( &self, - cores: CoreSize, - memory: ByteSize, + resource_request: ResourceRequest, validation: F, ) -> Result where @@ -163,9 +161,20 @@ impl HostCache { { self.ping_query(); - let host = self - .remove_host(cores, memory, validation) - .ok_or(HostCacheError::NoCandidateAvailable)?; + let host = match resource_request { + ResourceRequest::CoresAndMemory { cores, memory } => self + .remove_host(cores, memory, 1, validation) + .ok_or(HostCacheError::NoCandidateAvailable)?, + ResourceRequest::Gpu { cores, memory } => { + todo!("GPU host search is not yet implemented. Request: {cores}, {memory}") + } + ResourceRequest::Slots(slots) => self + // Request a host with minimum requirements as the remove logic already accounts for + // limiting slots + // TODO: Replace and consider hardcoded values + .remove_host(CoreSize(1), ByteSize::mib(256), slots, validation) + .ok_or(HostCacheError::NoCandidateAvailable)?, + }; Ok(host) } @@ -186,7 +195,13 @@ impl HostCache { /// /// * `Some(Host)` - Host that meets all requirements /// * `None` - No suitable host found - fn remove_host(&self, cores: CoreSize, memory: ByteSize, validation: F) -> Option + fn remove_host( + &self, + cores: CoreSize, + memory: ByteSize, + slots: u32, + validation: F, + ) -> Option where F: Fn(&Host) -> bool, { @@ -200,6 +215,8 @@ impl HostCache { // Check memory and core requirements just in case host.idle_memory >= memory && host.idle_cores >= cores && + host.running_procs_count + slots + <= host.concurrent_slots_limit.unwrap_or(u32::MAX) && // Ensure we're not retrying the same host as last attempts !failed_candidates.borrow().contains(&host.id) }; @@ -366,6 +383,8 @@ mod tests { alloc_id: Uuid::new_v4(), alloc_name: "test".to_string(), last_updated: Utc::now(), + concurrent_slots_limit: None, + running_procs_count: 0, } } @@ -460,8 +479,10 @@ mod tests { cache.check_in(host, false); let result = cache.check_out( - CoreSize(2), - ByteSize::gb(4), + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, |_| true, // Always validate true ); @@ -485,7 +506,13 @@ mod tests { fn test_checkout_no_candidate_available() { let cache = HostCache::default(); - let result = cache.check_out(CoreSize(4), ByteSize::gb(8), |_| true); + let result = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(4), + memory: ByteSize::gb(8), + }, + |_| true, + ); assert!(result.is_err()); assert!(matches!(result, Err(HostCacheError::NoCandidateAvailable))); @@ -500,8 +527,10 @@ mod tests { cache.check_in(host, false); let result = cache.check_out( - CoreSize(4), // Request more cores than available - ByteSize::gb(4), + ResourceRequest::CoresAndMemory { + cores: CoreSize(4), // Request more cores than available + memory: ByteSize::gb(4), + }, |_| true, ); @@ -517,8 +546,10 @@ mod tests { cache.check_in(host, false); let result = cache.check_out( - CoreSize(2), - ByteSize::gb(8), // Request more memory than available + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(8), // Request more memory than available + }, |_| true, ); @@ -534,8 +565,10 @@ mod tests { cache.check_in(host, false); let result = cache.check_out( - CoreSize(2), - ByteSize::gb(4), + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, |_| false, // Always fail validation ); @@ -551,11 +584,23 @@ mod tests { cache.check_in(host, false); // First checkout should succeed - let result1 = cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true); + let result1 = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true, + ); assert!(result1.is_ok()); // Second checkout should fail because host is already checked out - let result2 = cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true); + let result2 = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true, + ); assert!(result2.is_err()); } @@ -568,7 +613,13 @@ mod tests { cache.check_in(host.clone(), false); // Checkout the host - let mut checked_host = assert_ok!(cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true)); + let mut checked_host = assert_ok!(cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true + )); assert_eq!(checked_host.idle_cores.value(), 4); // Reduce the number of cores and checkin to ensure cache is updated @@ -576,8 +627,20 @@ mod tests { // Check it back in cache.check_in(checked_host, false); - assert_err!(cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true)); - assert_ok!(cache.check_out(CoreSize(1), ByteSize::gb(4), |_| true)); + assert_err!(cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true + )); + assert_ok!(cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(1), + memory: ByteSize::gb(4), + }, + |_| true + )); } #[test] @@ -599,7 +662,13 @@ mod tests { cache.check_in(host3, false); // Request 3 cores, 6GB - should get host2 (4 cores, 8GB) or host3 (8 cores, 16GB) - let result = cache.check_out(CoreSize(3), ByteSize::gb(6), |_| true); + let result = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(3), + memory: ByteSize::gb(6), + }, + |_| true, + ); assert!(result.is_ok()); let chosen_host = result.unwrap(); @@ -643,11 +712,23 @@ mod tests { cache.check_in(host2, false); // First checkout should succeed - let result1 = cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true); + let result1 = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true, + ); assert!(result1.is_ok()); // Second checkout should also succeed (different host) - let result2 = cache.check_out(CoreSize(2), ByteSize::gb(4), |_| true); + let result2 = cache.check_out( + ResourceRequest::CoresAndMemory { + cores: CoreSize(2), + memory: ByteSize::gb(4), + }, + |_| true, + ); assert!(result2.is_ok()); // The hosts should be different diff --git a/rust/crates/scheduler/src/host_cache/messages.rs b/rust/crates/scheduler/src/host_cache/messages.rs index bb25ced14..94ba38af1 100644 --- a/rust/crates/scheduler/src/host_cache/messages.rs +++ b/rust/crates/scheduler/src/host_cache/messages.rs @@ -12,14 +12,13 @@ use actix::{Message, MessageResponse}; -use bytesize::ByteSize; use miette::Result; use uuid::Uuid; use crate::{ cluster_key::{ClusterKey, Tag}, host_cache::HostCacheError, - models::{CoreSize, Host}, + models::{Host, ResourceRequest}, }; /// Response containing a checked-out host and its associated cluster key. @@ -66,8 +65,7 @@ where pub facility_id: Uuid, pub show_id: Uuid, pub tags: Vec, - pub cores: CoreSize, - pub memory: ByteSize, + pub resource_request: ResourceRequest, pub validation: F, } diff --git a/rust/crates/scheduler/src/host_cache/store.rs b/rust/crates/scheduler/src/host_cache/store.rs index 5509e23e1..4479a10b8 100644 --- a/rust/crates/scheduler/src/host_cache/store.rs +++ b/rust/crates/scheduler/src/host_cache/store.rs @@ -400,6 +400,8 @@ mod tests { alloc_id: Uuid::new_v4(), alloc_name: "test".to_string(), last_updated, + concurrent_slots_limit: None, + running_procs_count: 0, } } diff --git a/rust/crates/scheduler/src/models/host.rs b/rust/crates/scheduler/src/models/host.rs index 38af88726..c98c82ecb 100644 --- a/rust/crates/scheduler/src/models/host.rs +++ b/rust/crates/scheduler/src/models/host.rs @@ -36,6 +36,8 @@ pub struct Host { pub(crate) alloc_id: Uuid, pub(crate) alloc_name: String, pub(crate) last_updated: DateTime, + pub(crate) concurrent_slots_limit: Option, + pub(crate) running_procs_count: u32, } impl Host { @@ -75,6 +77,7 @@ impl Host { alloc_available_cores: CoreSize, alloc_id: Uuid, alloc_name: String, + concurrent_frames_limit: Option, ) -> Self { Self { id, @@ -91,6 +94,8 @@ impl Host { alloc_id, alloc_name, last_updated: Local::now().with_timezone(&Utc), + concurrent_slots_limit: concurrent_frames_limit, + running_procs_count: 0, } } } diff --git a/rust/crates/scheduler/src/models/layer.rs b/rust/crates/scheduler/src/models/layer.rs index 10f795fa8..2cd27dadf 100644 --- a/rust/crates/scheduler/src/models/layer.rs +++ b/rust/crates/scheduler/src/models/layer.rs @@ -31,8 +31,9 @@ pub struct DispatchLayer { pub cores_min: CoreSize, pub mem_min: ByteSize, pub threadable: bool, - pub gpus_min: i32, + pub gpus_min: CoreSize, pub gpu_mem_min: ByteSize, + pub slots_required: u32, pub tags: HashSet, pub frames: Vec, } @@ -49,6 +50,17 @@ impl fmt::Display for DispatchLayer { } } +/// Describes what resources are required to run a frame from this layer +#[derive(Clone, Copy)] +pub enum ResourceRequest { + /// Request a machine with at least this amount of cores and memory idle + CoresAndMemory { cores: CoreSize, memory: ByteSize }, + /// Request a machine with this amount of gpu cores idle + Gpu { cores: CoreSize, memory: ByteSize }, + /// Request a machine with this amount of frame slots available + Slots(u32), +} + impl DispatchLayer { /// Removes frames with matching IDs from this layer's frame list. /// @@ -61,4 +73,20 @@ impl DispatchLayer { pub fn drain_frames(&mut self, frame_ids: Vec) { self.frames.retain(|f| !frame_ids.contains(&f.id)) } + + pub fn resource_request(&self) -> ResourceRequest { + if self.slots_required > 0 { + ResourceRequest::Slots(self.slots_required) + } else if self.gpus_min.value() > 0 { + ResourceRequest::Gpu { + cores: self.gpus_min, + memory: self.gpu_mem_min, + } + } else { + ResourceRequest::CoresAndMemory { + cores: self.cores_min, + memory: self.mem_min, + } + } + } } diff --git a/rust/crates/scheduler/src/models/mod.rs b/rust/crates/scheduler/src/models/mod.rs index 9f61fd86d..50767d267 100644 --- a/rust/crates/scheduler/src/models/mod.rs +++ b/rust/crates/scheduler/src/models/mod.rs @@ -22,7 +22,7 @@ pub use core_size::{CoreSize, CoreSizeWithMultiplier}; pub use frame::DispatchFrame; pub use host::Host; pub use job::DispatchJob; -pub use layer::DispatchLayer; +pub use layer::{DispatchLayer, ResourceRequest}; pub use subscription::{Allocation, Subscription}; pub use virtual_proc::VirtualProc; diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs index ecec49436..46343e278 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs @@ -967,6 +967,7 @@ impl RqdDispatcherService { log_file: "deprecated".to_string(), #[allow(deprecated)] log_dir_file: "deprecated".to_string(), + slots_required: 0, }; Ok(run_frame) @@ -1040,6 +1041,7 @@ mod tests { CoreSize(4), Uuid::new_v4(), "test-alloc".to_string(), + None, ) } @@ -1172,6 +1174,7 @@ mod tests { CoreSize(4), Uuid::new_v4(), "test-alloc".to_string(), + None, ); let mut frame = create_test_dispatch_frame(); @@ -1208,6 +1211,7 @@ mod tests { CoreSize(4), Uuid::new_v4(), "test-alloc".to_string(), + None, ); let mut frame = create_test_dispatch_frame(); @@ -1244,6 +1248,7 @@ mod tests { CoreSize(8), Uuid::new_v4(), "test-alloc".to_string(), + None, ); let mut frame = create_test_dispatch_frame(); @@ -1279,6 +1284,7 @@ mod tests { CoreSize(8), Uuid::new_v4(), "test-alloc".to_string(), + None, ); let mut frame = create_test_dispatch_frame(); diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index b2a379afb..dd2eaa92a 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -291,27 +291,23 @@ impl MatchingService { layer.show_id ); - // Clone only the minimal data needed for the validation closure - // These are needed because the closure must have 'static lifetime for actor messaging - let layer_id = layer.id; - let show_id = layer.show_id; let cores_requested = layer.cores_min; let allocation_service = self.allocation_service.clone(); let os = layer.str_os.clone(); + // Get a matching candidate let host_candidate = self .host_service .send(CheckOut { facility_id: layer.facility_id, show_id: layer.show_id, tags, - cores: cores_requested, - memory: layer.mem_min, + resource_request: layer.resource_request(), validation: move |host| { Self::validate_match( host, - &layer_id, - &show_id, + &layer.id, + &layer.show_id, cores_requested, &allocation_service, os.as_deref(),