From 71fa446c91d86166140c2582defe0a3d8fd02f73 Mon Sep 17 00:00:00 2001 From: songyuqin0686 Date: Fri, 3 Apr 2026 16:32:47 +0800 Subject: [PATCH] Implement XSCHED_AUTO_XQUEUE mechanism with corrected preemption level logic and automatic XQueue creation for CUDA streams --- ggml/src/ggml-cuda/common.cuh | 115 ++++++++++++++++++++++++++++++-- ggml/src/ggml-cuda/ggml-cuda.cu | 30 ++++++--- tools/server/server.cpp | 6 +- 3 files changed, 134 insertions(+), 17 deletions(-) diff --git a/ggml/src/ggml-cuda/common.cuh b/ggml/src/ggml-cuda/common.cuh index ee75d10546e..954b5d9070d 100644 --- a/ggml/src/ggml-cuda/common.cuh +++ b/ggml/src/ggml-cuda/common.cuh @@ -25,6 +25,7 @@ #include #include #include +#include #if defined(GGML_USE_HIP) #include "vendors/hip.h" @@ -37,6 +38,17 @@ #include "xsched/xsched.h" #include "xsched/cuda/hal.h" +// XSched automatic XQueue configuration +// When XSCHED_AUTO_XQUEUE is enabled, XSched will automatically create XQueue for CUDA streams +// Use XSCHED_AUTO_XQUEUE_LEVEL to set the preemption level (default: auto-detect based on GPU architecture) +#ifndef XSCHED_AUTO_XQUEUE +#define XSCHED_AUTO_XQUEUE 0 // Default: manual XQueue creation +#endif + +#ifndef XSCHED_AUTO_XQUEUE_LEVEL +#define XSCHED_AUTO_XQUEUE_LEVEL -1 // Default: auto-detect +#endif + #define STRINGIZE_IMPL(...) #__VA_ARGS__ #define STRINGIZE(...) STRINGIZE_IMPL(__VA_ARGS__) @@ -751,7 +763,7 @@ struct ggml_tensor_extra_gpu { #if (defined(GGML_CUDA_USE_GRAPHS) || defined(GGML_HIP_GRAPHS)) -#define USE_CUDA_GRAPH +// #define USE_CUDA_GRAPH #endif struct ggml_graph_node_properties { @@ -799,8 +811,11 @@ struct ggml_backend_cuda_context { cudaEvent_t copy_event = nullptr; cudaStream_t streams[GGML_CUDA_MAX_DEVICES][GGML_CUDA_MAX_STREAMS] = { { nullptr } }; + HwQueueHandle hwqueues[GGML_CUDA_MAX_DEVICES][GGML_CUDA_MAX_STREAMS] = { { 0 } }; + XQueueHandle xqueues[GGML_CUDA_MAX_DEVICES][GGML_CUDA_MAX_STREAMS] = { { 0 } }; cublasHandle_t cublas_handles[GGML_CUDA_MAX_DEVICES] = {nullptr}; + mutable std::mutex streams_mutex; std::unique_ptr cuda_graph; int priority = 0; @@ -812,16 +827,104 @@ struct ggml_backend_cuda_context { ~ggml_backend_cuda_context(); + // Disable copying and moving to prevent resource management issues + ggml_backend_cuda_context(const ggml_backend_cuda_context&) = delete; + ggml_backend_cuda_context& operator=(const ggml_backend_cuda_context&) = delete; + ggml_backend_cuda_context(ggml_backend_cuda_context&&) = delete; + ggml_backend_cuda_context& operator=(ggml_backend_cuda_context&&) = delete; + + int get_max_supported_preempt_level(int device_id) { + // If XSCHED_AUTO_XQUEUE_LEVEL is explicitly set, use it + if (XSCHED_AUTO_XQUEUE_LEVEL >= 0) { + return XSCHED_AUTO_XQUEUE_LEVEL; + } + + // Auto-detect based on GPU architecture + cudaDeviceProp prop; + CUDA_CHECK(cudaGetDeviceProperties(&prop, device_id)); + int arch = prop.major * 10 + prop.minor; + + // Corrected preemption level logic based on official requirements + if (arch >= 80) { // Ampere (A100, RTX 30 series) and newer + return 3; // kPreemptLevelInterrupt + } else if (arch >= 70) { // Volta (V100) and Turing + return 3; // kPreemptLevelInterrupt + } else if (arch >= 60) { // Pascal (GTX 10 series) + return 2; // kPreemptLevelDeactivate + } else if (arch >= 50) { // Maxwell (GTX 9 series) + return 2; // kPreemptLevelDeactivate + } else if (arch >= 30) { // Kepler (K20, K40, GTX TITAN) + return 2; // kPreemptLevelDeactivate + } else { + // For older architectures or unknown + return 1; // kPreemptLevelBlock + } + } + cudaStream_t stream(int device, int stream) { + std::lock_guard lock(streams_mutex); + + // If stream does not exist, create stream if (streams[device][stream] == nullptr) { ggml_cuda_set_device(device); CUDA_CHECK(cudaStreamCreateWithFlags(&streams[device][stream], cudaStreamNonBlocking)); - HwQueueHandle hwqueue; - CudaQueueCreate(&hwqueue,streams[device][stream]); - XQueueHandle xqueue; - XQueueCreate(&xqueue, hwqueue, kPreemptLevelDeactivate, kQueueCreateFlagNone); - XHintPriority(xqueue, priority); // In XSched, lower number means lower priority + +#if XSCHED_AUTO_XQUEUE + // In auto-XQueue mode, XSched automatically creates XQueue for CUDA streams + // Note: XQueueGetFromCudaStream is planned future API, currently using two-step creation + HwQueueHandle hwqueue = 0; + XResult res = CudaQueueCreate(&hwqueue, streams[device][stream]); + if (res != kXSchedSuccess) { + CUDA_CHECK(cudaStreamDestroy(streams[device][stream])); + streams[device][stream] = nullptr; + GGML_ABORT("CudaQueueCreate failed: %d", res); + } + + XQueueHandle xqueue = 0; + res = XQueueCreate(&xqueue, hwqueue, get_max_supported_preempt_level(device), kQueueCreateFlagNone); + if (res != kXSchedSuccess) { + HwQueueDestroy(hwqueue); + CUDA_CHECK(cudaStreamDestroy(streams[device][stream])); + streams[device][stream] = nullptr; + GGML_ABORT("XQueueCreate failed: %d", res); + } + + hwqueues[device][stream] = hwqueue; + xqueues[device][stream] = xqueue; + + // Set initial priority (always set, including priority 0) + XHintPriority(xqueue, priority); +#else + // Manual XQueue creation mode (legacy) + HwQueueHandle hwqueue = 0; + XResult res = CudaQueueCreate(&hwqueue, streams[device][stream]); + if (res != kXSchedSuccess) { + CUDA_CHECK(cudaStreamDestroy(streams[device][stream])); + streams[device][stream] = nullptr; + GGML_ABORT("CudaQueueCreate failed: %d", res); + } + + XQueueHandle xqueue = 0; + res = XQueueCreate(&xqueue, hwqueue, get_max_supported_preempt_level(device), kQueueCreateFlagNone); + if (res != kXSchedSuccess) { + HwQueueDestroy(hwqueue); + CUDA_CHECK(cudaStreamDestroy(streams[device][stream])); + streams[device][stream] = nullptr; + GGML_ABORT("XQueueCreate failed: %d", res); + } + + hwqueues[device][stream] = hwqueue; + xqueues[device][stream] = xqueue; + + // Set initial priority (always set, including priority 0) + XHintPriority(xqueue, priority); +#endif + } + // If stream exists but XQueue is not bound (should not happen, but ensure robustness) + else if (xqueues[device][stream] == 0 && !XSCHED_AUTO_XQUEUE) { + GGML_ABORT("Stream exists but XQueue not bound - internal error"); } + return streams[device][stream]; } diff --git a/ggml/src/ggml-cuda/ggml-cuda.cu b/ggml/src/ggml-cuda/ggml-cuda.cu index 0b939ca4854..e193387eaed 100644 --- a/ggml/src/ggml-cuda/ggml-cuda.cu +++ b/ggml/src/ggml-cuda/ggml-cuda.cu @@ -531,14 +531,28 @@ ggml_backend_cuda_context::~ggml_backend_cuda_context() { if (copy_event != nullptr) { CUDA_CHECK(cudaEventDestroy(copy_event)); } + + // Destroy all XSched queues and hardware queues for (int i = 0; i < GGML_CUDA_MAX_DEVICES; ++i) { for (int j = 0; j < GGML_CUDA_MAX_STREAMS; ++j) { + if (xqueues[i][j] != 0) { + XQueueDestroy(xqueues[i][j]); + xqueues[i][j] = 0; + } + + if (hwqueues[i][j] != 0) { + HwQueueDestroy(hwqueues[i][j]); + hwqueues[i][j] = 0; + } + if (streams[i][j] != nullptr) { CUDA_CHECK(cudaStreamDestroy(streams[i][j])); + streams[i][j] = nullptr; } } if (cublas_handles[i] != nullptr) { CUBLAS_CHECK(cublasDestroy(cublas_handles[i])); + cublas_handles[i] = nullptr; } } } @@ -2855,19 +2869,19 @@ static void ggml_backend_cuda_event_wait(ggml_backend_t backend, ggml_backend_ev static void ggml_backend_cuda_set_priority(ggml_backend_t backend, int prio) { ggml_backend_cuda_context *cuda_ctx = (ggml_backend_cuda_context *)backend->context; + + std::lock_guard lock(cuda_ctx->streams_mutex); + + // Update priority for all existing XQueues for (int device = 0; device < GGML_CUDA_MAX_DEVICES; device++) { for (int idx = 0; idx < GGML_CUDA_MAX_STREAMS; idx++) { - auto stream = cuda_ctx->streams[device][idx]; - if(stream == nullptr) { - continue; + if (cuda_ctx->xqueues[device][idx] != 0) { + XHintPriority(cuda_ctx->xqueues[device][idx], prio); } - HwQueueHandle hwqueue; - CudaQueueCreate(&hwqueue,stream); - XQueueHandle xqueue; - XQueueCreate(&xqueue, hwqueue, kPreemptLevelDeactivate, kQueueCreateFlagNone); - XHintPriority(xqueue, prio); // In XSched, lower number means lower priority } } + + // Store new priority, subsequent XQueues will use this priority cuda_ctx->priority = prio; } diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 049282d7821..a6c329071de 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5010,9 +5010,9 @@ int main(int argc, char ** argv) { std::vector threads; // this call blocks the main thread until queue_tasks.terminate() is called for(int i = 0; i < SERVER_TASK_PRIO_COUNT; i++) { - threads.emplace_back([&ctx_server, &i]() { - ctx_server[i].queue_tasks.start_loop(); - }); + threads.emplace_back([&ctx_server](int ind) { + ctx_server[ind].queue_tasks.start_loop(); + },i); } for(auto &thread: threads) {