diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a5b246..18e5cd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,7 +34,7 @@ jobs: SCCACHE_CACHE_SIZE: '2G' steps: - name: Checkout - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: fetch-depth: 0 diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 8e1dbb1..51ca1f0 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -41,12 +41,12 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: fetch-depth: 0 - name: Initialize CodeQL - uses: github/codeql-action/init@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v3 + uses: github/codeql-action/init@b20883b0cd1f46c72ae0ba6d1090936928f9fa30 # v3 with: languages: ${{ matrix.language }} build-mode: ${{ matrix.build-mode }} @@ -55,10 +55,10 @@ jobs: - name: Autobuild if: matrix.build-mode == 'autobuild' - uses: github/codeql-action/autobuild@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v3 + uses: github/codeql-action/autobuild@b20883b0cd1f46c72ae0ba6d1090936928f9fa30 # v3 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v3 + uses: github/codeql-action/analyze@b20883b0cd1f46c72ae0ba6d1090936928f9fa30 # v3 with: category: "/language:${{ matrix.language }}" diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 540f900..71a625f 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -31,7 +31,7 @@ jobs: ahead_count: ${{ steps.diff.outputs.ahead_count }} steps: - name: Checkout target ref - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: ref: ${{ env.TARGET_REF }} fetch-depth: 0 @@ -80,7 +80,7 @@ jobs: SCCACHE_CACHE_SIZE: '2G' steps: - name: Checkout target ref - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: ref: ${{ env.TARGET_REF }} fetch-depth: 0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 55bbba1..aa71f7c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -30,7 +30,7 @@ jobs: SCCACHE_CACHE_SIZE: '2G' steps: - name: Checkout - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: fetch-depth: 0 diff --git a/Cargo.lock b/Cargo.lock index cd0b865..3d803a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,13 +36,13 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "openvcs-core" -version = "0.1.5" +version = "0.1.6" dependencies = [ "linkme", "log", @@ -53,18 +53,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" +checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" dependencies = [ "proc-macro2", ] @@ -125,18 +125,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", @@ -151,6 +151,6 @@ checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] name = "zmij" -version = "1.0.12" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc5a66a20078bf1251bde995aa2fdcc4b800c70b5d92dd2c62abc5c60f679f8" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" diff --git a/Cargo.toml b/Cargo.toml index 6416ee2..23548f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "openvcs-core" -version = "0.1.5" +version = "0.1.6" edition = "2024" description = "Core types and traits for OpenVCS." license = "GPL-3.0-or-later" @@ -31,10 +31,10 @@ default = ["plugin-protocol"] plugin-protocol = ["dep:serde_json"] # The VCS trait and VCS-related error type. -vcs = ["dep:thiserror", "backend-registry"] +vcs = ["dep:thiserror"] # Backend discovery via the `BACKENDS` registry (link-time registration). -backend-registry = ["dep:linkme"] +backend-registry = ["dep:linkme", "vcs"] [dependencies] serde = { version = "1", features = ["derive"] } diff --git a/README.md b/README.md index c2f77ee..736a79f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # OpenVCS Core (`openvcs-core`) -[![Dev CI (fast)](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/ci.yml/badge.svg)](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/ci.yml) +[![Nightly](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/nightly.yml/badge.svg?branch=Dev)](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/nightly.yml) +[![Dev](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/ci.yml/badge.svg?branch=Dev)](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/ci.yml) +[![Stable](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/release.yml/badge.svg?branch=Dev)](https://github.com/Open-VCS/OpenVCS-Core/actions/workflows/release.yml) Shared Rust crate for: - OpenVCS plugins (JSON-RPC over stdio) diff --git a/src/host.rs b/src/host.rs index 077f19f..17d2a15 100644 --- a/src/host.rs +++ b/src/host.rs @@ -1,7 +1,7 @@ use crate::plugin_protocol::RpcRequest; -use crate::plugin_stdio::{PluginError, RequestIdState, call_host}; +use crate::plugin_protocol::RpcResponse; +use crate::plugin_stdio::{PluginError, RequestIdState, SharedQueue, call_host}; use serde_json::Value; -use std::collections::VecDeque; use std::io::{BufReader, LineWriter}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Duration; @@ -13,7 +13,8 @@ pub type HostStdin = BufReader; struct HostContext { out: Arc>, stdin: Arc>, - queue: Arc>>, + queue: Arc>, + responses: Arc>, ids: Arc>, timeout: Duration, } @@ -23,15 +24,22 @@ static HOST: OnceLock = OnceLock::new(); pub fn init_stdio_default(next_id: u64, timeout: Duration) { let out = Arc::new(Mutex::new(LineWriter::new(std::io::stdout()))); let stdin = Arc::new(Mutex::new(BufReader::new(std::io::stdin()))); - let queue = Arc::new(Mutex::new(VecDeque::new())); + let queue = Arc::new(SharedQueue::new()); + let responses = Arc::new(SharedQueue::new()); let ids = Arc::new(Mutex::new(RequestIdState { next_id })); - init_default_stdio_host(out, stdin, queue, ids, timeout); + crate::plugin_stdio::start_reader_thread( + Arc::clone(&stdin), + Arc::clone(&queue), + Arc::clone(&responses), + ); + init_default_stdio_host(out, stdin, queue, responses, ids, timeout); } pub fn init_default_stdio_host( out: Arc>, stdin: Arc>, - queue: Arc>>, + queue: Arc>, + responses: Arc>, ids: Arc>, timeout: Duration, ) { @@ -39,6 +47,7 @@ pub fn init_default_stdio_host( out, stdin, queue, + responses, ids, timeout, }); @@ -58,7 +67,7 @@ pub fn stdin() -> Result<&'static Arc>, PluginError> { .stdin) } -pub fn queue() -> Result<&'static Arc>>, PluginError> { +pub fn queue() -> Result<&'static Arc>, PluginError> { Ok(&HOST .get() .ok_or_else(|| PluginError::code("host.uninitialized", "host not initialized"))? @@ -82,7 +91,7 @@ pub fn call(method: &str, params: Value) -> Result { call_host( &ctx.out, &ctx.stdin, - &ctx.queue, + &ctx.responses, &ctx.ids, method, params, diff --git a/src/lib.rs b/src/lib.rs index 2a52338..281e0ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ pub use crate::backend_id::BackendId; #[doc(hidden)] pub use log as __log; -#[cfg(feature = "backend-registry")] +#[cfg(all(feature = "backend-registry", feature = "vcs"))] pub mod backend_descriptor; #[cfg(feature = "plugin-protocol")] @@ -273,26 +273,6 @@ pub trait Vcs: Send + Sync { Err(VcsError::Unsupported(self.id())) } - // lfs - fn lfs_fetch(&self) -> Result<()> { - Err(VcsError::Unsupported(self.id())) - } - fn lfs_pull(&self) -> Result<()> { - Err(VcsError::Unsupported(self.id())) - } - fn lfs_prune(&self) -> Result<()> { - Err(VcsError::Unsupported(self.id())) - } - fn lfs_track(&self, _paths: &[PathBuf]) -> Result<()> { - Err(VcsError::Unsupported(self.id())) - } - fn lfs_untrack(&self, _paths: &[PathBuf]) -> Result<()> { - Err(VcsError::Unsupported(self.id())) - } - fn lfs_is_tracked(&self, _path: &Path) -> Result { - Err(VcsError::Unsupported(self.id())) - } - // history operations fn cherry_pick(&self, _rev: &str) -> Result<()> { Err(VcsError::Unsupported(self.id())) diff --git a/src/plugin_runtime.rs b/src/plugin_runtime.rs index 4943526..c9a9255 100644 --- a/src/plugin_runtime.rs +++ b/src/plugin_runtime.rs @@ -1,9 +1,8 @@ use crate::models::VcsEvent; use crate::plugin_protocol::{PluginMessage, RpcRequest}; use crate::plugin_stdio::ok_null; -use crate::plugin_stdio::{PluginError, receive_message, respond_shared, send_message_shared}; +use crate::plugin_stdio::{PluginError, SharedQueue, respond_shared, send_message_shared}; use std::collections::HashMap; -use std::collections::VecDeque; use std::io::{self, BufReader, LineWriter}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Duration; @@ -37,32 +36,17 @@ impl PluginCtx { } } -fn next_request( - queue: &Arc>>, - stdin: &Arc>>, -) -> Option { - if let Ok(mut q) = queue.lock() - && let Some(req) = q.pop_front() - { +fn next_request(queue: &Arc>) -> Option { + if let Some(req) = queue.pop_now() { return Some(req); } - loop { - let msg = { - let mut lock = stdin.lock().ok()?; - receive_message(&mut *lock)? - }; - match msg { - PluginMessage::Request(req) => return Some(req), - PluginMessage::Response(_) | PluginMessage::Event { .. } => continue, - } - } + queue.pop_wait() } pub struct PluginRuntime { ctx: PluginCtx, - stdin: Arc>>, - queue: Arc>>, + queue: Arc>, } impl PluginRuntime { @@ -77,20 +61,26 @@ impl PluginRuntime { let stdout = Arc::new(Mutex::new(LineWriter::new(io::stdout()))); let stdin = Arc::new(Mutex::new(BufReader::new(io::stdin()))); - let queue: Arc>> = Arc::new(Mutex::new(VecDeque::new())); + let queue: Arc> = Arc::new(SharedQueue::new()); + let responses = Arc::new(SharedQueue::new()); let ids = Arc::new(Mutex::new(crate::plugin_stdio::RequestIdState { next_id })); + crate::plugin_stdio::start_reader_thread( + Arc::clone(&stdin), + Arc::clone(&queue), + Arc::clone(&responses), + ); crate::host::init_default_stdio_host( Arc::clone(&stdout), Arc::clone(&stdin), Arc::clone(&queue), + Arc::clone(&responses), Arc::clone(&ids), timeout, ); Self { ctx: PluginCtx { stdout }, - stdin, queue, } } @@ -103,7 +93,7 @@ impl PluginRuntime { &mut self, mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult, ) -> io::Result { - let Some(req) = next_request(&self.queue, &self.stdin) else { + let Some(req) = next_request(&self.queue) else { return Ok(false); }; let id = req.id; diff --git a/src/plugin_stdio.rs b/src/plugin_stdio.rs index ffcca73..1d2b67a 100644 --- a/src/plugin_stdio.rs +++ b/src/plugin_stdio.rs @@ -4,7 +4,8 @@ use serde::de::DeserializeOwned; use serde_json::Value; use std::collections::{HashMap, VecDeque}; use std::io::{self, BufRead, Write}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; use std::time::{Duration, Instant}; #[derive(Debug, Clone)] @@ -129,10 +130,137 @@ pub struct RequestIdState { pub next_id: u64, } +#[derive(Debug)] +struct SharedQueueState { + items: VecDeque, + closed: bool, +} + +#[derive(Debug)] +pub struct SharedQueue { + state: Mutex>, + cv: Condvar, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SharedQueueError { + Timeout, + Poisoned, +} + +impl Default for SharedQueue { + fn default() -> Self { + Self::new() + } +} + +impl SharedQueue { + pub fn new() -> Self { + Self { + state: Mutex::new(SharedQueueState { + items: VecDeque::new(), + closed: false, + }), + cv: Condvar::new(), + } + } + + pub fn push(&self, item: T) { + if let Ok(mut state) = self.state.lock() { + state.items.push_back(item); + self.cv.notify_all(); + } + } + + pub fn close(&self) { + if let Ok(mut state) = self.state.lock() { + state.closed = true; + self.cv.notify_all(); + } + } + + pub fn pop_now(&self) -> Option { + let mut state = self.state.lock().ok()?; + state.items.pop_front() + } + + pub fn pop_wait(&self) -> Option { + let mut state = self.state.lock().ok()?; + loop { + if let Some(item) = state.items.pop_front() { + return Some(item); + } + if state.closed { + return None; + } + state = self.cv.wait(state).ok()?; + } + } + + pub fn pop_wait_until(&self, deadline: Instant) -> Result, SharedQueueError> { + let mut state = self.state.lock().map_err(|_| SharedQueueError::Poisoned)?; + loop { + if let Some(item) = state.items.pop_front() { + return Ok(Some(item)); + } + if state.closed { + return Ok(None); + } + + let now = Instant::now(); + if now >= deadline { + return Err(SharedQueueError::Timeout); + } + let timeout = deadline.saturating_duration_since(now); + let (next_state, wait_result) = self + .cv + .wait_timeout(state, timeout) + .map_err(|_| SharedQueueError::Poisoned)?; + state = next_state; + if wait_result.timed_out() { + return Err(SharedQueueError::Timeout); + } + } + } +} + +pub fn start_reader_thread( + stdin: Arc>, + requests: Arc>, + responses: Arc>, +) { + thread::spawn(move || { + loop { + let msg = { + let mut lock = match stdin.lock() { + Ok(lock) => lock, + Err(_) => { + requests.close(); + responses.close(); + return; + } + }; + receive_message(&mut *lock) + }; + + match msg { + Some(PluginMessage::Request(req)) => requests.push(req), + Some(PluginMessage::Response(resp)) => responses.push(resp), + Some(PluginMessage::Event { .. }) => {} + None => { + requests.close(); + responses.close(); + return; + } + } + } + }); +} + pub fn call_host( out: &Arc>, - stdin: &Arc>, - queue: &Arc>>, + _stdin: &Arc>, + responses: &Arc>, ids: &Arc>, method: &str, params: Value, @@ -176,35 +304,28 @@ pub fn call_host( }; } - let msg = { - let mut lock = stdin - .lock() - .map_err(|_| PluginError::message("stdin lock poisoned"))?; - receive_message(&mut *lock).ok_or_else(|| PluginError::message("host closed stdin"))? - }; - - match msg { - PluginMessage::Response(resp) => { - if resp.id == id { - return if resp.ok { - Ok(resp.result) - } else { - Err(PluginError { - code: resp.error_code.or(Some("host.error".into())), - message: resp.error.unwrap_or_else(|| "error".into()), - data: resp.error_data, - }) - }; - } - stash.insert(resp.id, resp); - } - PluginMessage::Request(req) => { - if let Ok(mut q) = queue.lock() { - q.push_back(req); + let resp = responses + .pop_wait_until(deadline) + .map_err(|e| match e { + SharedQueueError::Timeout => { + PluginError::code("host.timeout", "host call timed out") } - } - PluginMessage::Event { .. } => {} + SharedQueueError::Poisoned => PluginError::message("response queue lock poisoned"), + })? + .ok_or_else(|| PluginError::message("host closed stdin"))?; + + if resp.id == id { + return if resp.ok { + Ok(resp.result) + } else { + Err(PluginError { + code: resp.error_code.or(Some("host.error".into())), + message: resp.error.unwrap_or_else(|| "error".into()), + data: resp.error_data, + }) + }; } + stash.insert(resp.id, resp); } } @@ -291,13 +412,19 @@ mod tests { {\"id\":77,\"method\":\"noop\",\"params\":null}\n\ {\"id\":5,\"ok\":true,\"result\":{\"answer\":42}}\n" as &[u8], ))); - let queue = Arc::new(Mutex::new(VecDeque::::new())); + let queue = Arc::new(SharedQueue::::new()); + let responses = Arc::new(SharedQueue::::new()); let ids = Arc::new(Mutex::new(RequestIdState { next_id: 5 })); + start_reader_thread( + Arc::clone(&stdin), + Arc::clone(&queue), + Arc::clone(&responses), + ); let result = call_host( &out, &stdin, - &queue, + &responses, &ids, "math.answer", serde_json::json!({}), @@ -306,10 +433,9 @@ mod tests { .expect("host call ok"); assert_eq!(result, serde_json::json!({"answer": 42})); - let queue = queue.lock().expect("queue lock"); - assert_eq!(queue.len(), 1); - assert_eq!(queue[0].id, 77); - assert_eq!(queue[0].method, "noop"); + let queued = queue.pop_now().expect("queued request"); + assert_eq!(queued.id, 77); + assert_eq!(queued.method, "noop"); let out = out.lock().expect("out lock"); let line = std::str::from_utf8(&out).expect("utf-8");