diff --git a/scripts/request-trace-tools/export-request-trace-usage-csv.mjs b/scripts/request-trace-tools/export-request-trace-usage-csv.mjs new file mode 100644 index 000000000..1c35df71e --- /dev/null +++ b/scripts/request-trace-tools/export-request-trace-usage-csv.mjs @@ -0,0 +1,186 @@ +#!/usr/bin/env node + +import fs from 'node:fs/promises'; +import path from 'node:path'; + +const USAGE_OUTPUT_FILE = 'usage-summary.csv'; + +function printUsage() { + console.log( + [ + 'Usage:', + ' node scripts/request-trace-tools/export-request-trace-usage-csv.mjs [output.csv]', + '', + 'Examples:', + ' node scripts/request-trace-tools/export-request-trace-usage-csv.mjs ~/.bitfun/projects/demo/request-traces/abc123', + ' node scripts/request-trace-tools/export-request-trace-usage-csv.mjs ./request-traces ./trace-usage.csv', + ].join('\n'), + ); +} + +function isHelpFlag(value) { + return value === '-h' || value === '--help'; +} + +function csvEscape(value) { + if (value === null || value === undefined) { + return ''; + } + + const text = String(value); + if (!/[",\n\r]/.test(text)) { + return text; + } + + return `"${text.replaceAll('"', '""')}"`; +} + +function normalizeInteger(value) { + return Number.isInteger(value) ? value : null; +} + +function deriveCacheMissTokens(promptTokens, cacheHitTokens) { + if (!Number.isInteger(promptTokens) || !Number.isInteger(cacheHitTokens)) { + return null; + } + + return Math.max(0, promptTokens - cacheHitTokens); +} + +function buildRow(trace, fileName) { + const usage = trace?.response?.usage ?? null; + const promptTokens = normalizeInteger(usage?.promptTokenCount); + const completionTokens = normalizeInteger(usage?.candidatesTokenCount); + const reasoningTokens = normalizeInteger(usage?.reasoningTokenCount); + const totalTokens = normalizeInteger(usage?.totalTokenCount); + const cacheHitTokens = normalizeInteger(usage?.cachedContentTokenCount); + const cacheMissTokens = deriveCacheMissTokens(promptTokens, cacheHitTokens); + const cacheWriteTokens = normalizeInteger(usage?.cacheCreationTokenCount); + + return { + file_name: fileName, + sequence: trace?.sequence ?? null, + recorded_at: trace?.recorded_at ?? '', + trace_id: trace?.trace_id ?? '', + session_id: trace?.session_id ?? '', + turn_id: trace?.turn_id ?? '', + operation_kind: trace?.operation_kind ?? '', + operation_id: trace?.operation_id ?? '', + operation_trigger: trace?.operation_trigger ?? '', + capture_mode: trace?.capture_mode ?? '', + provider: trace?.request?.provider ?? '', + api_format: trace?.request?.api_format ?? '', + model_id: trace?.request?.model_id ?? '', + request_url: trace?.request?.request_url ?? '', + attempt_number: trace?.request?.attempt_number ?? null, + response_kind: trace?.response?.kind ?? '', + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + reasoning_tokens: reasoningTokens, + total_tokens: totalTokens, + cache_hit_tokens: cacheHitTokens, + cache_miss_tokens: cacheMissTokens, + cache_write_tokens: cacheWriteTokens, + error: trace?.response?.error ?? '', + partial_recovery_reason: trace?.response?.partial_recovery_reason ?? '', + }; +} + +async function readTraceRows(traceDir) { + const entries = await fs.readdir(traceDir, { withFileTypes: true }); + const traceFiles = entries + .filter((entry) => entry.isFile() && entry.name.endsWith('.json')) + .map((entry) => entry.name) + .sort((left, right) => left.localeCompare(right, undefined, { numeric: true })); + + const rows = []; + for (const fileName of traceFiles) { + const filePath = path.join(traceDir, fileName); + const contents = await fs.readFile(filePath, 'utf8'); + const trace = JSON.parse(contents); + rows.push(buildRow(trace, fileName)); + } + + rows.sort((left, right) => { + const leftSequence = Number.isInteger(left.sequence) ? left.sequence : Number.MAX_SAFE_INTEGER; + const rightSequence = Number.isInteger(right.sequence) ? right.sequence : Number.MAX_SAFE_INTEGER; + if (leftSequence !== rightSequence) { + return leftSequence - rightSequence; + } + + return left.file_name.localeCompare(right.file_name, undefined, { numeric: true }); + }); + + return rows; +} + +function buildCsv(rows) { + const headers = [ + 'file_name', + 'sequence', + 'prompt_tokens', + 'completion_tokens', + 'reasoning_tokens', + 'total_tokens', + 'cache_hit_tokens', + 'cache_miss_tokens', + 'cache_write_tokens', + 'recorded_at', + 'trace_id', + 'session_id', + 'turn_id', + 'operation_kind', + 'operation_id', + 'operation_trigger', + 'capture_mode', + 'provider', + 'api_format', + 'model_id', + 'request_url', + 'attempt_number', + 'response_kind', + 'error', + 'partial_recovery_reason', + ]; + + const lines = [headers.join(',')]; + for (const row of rows) { + lines.push(headers.map((header) => csvEscape(row[header])).join(',')); + } + + return `${lines.join('\n')}\n`; +} + +async function main() { + const args = process.argv.slice(2); + + if (args.length === 0 || args.some(isHelpFlag)) { + printUsage(); + process.exit(args.length === 0 ? 1 : 0); + } + + const [traceDirArg, outputPathArg] = args; + const traceDir = path.resolve(traceDirArg); + const outputPath = outputPathArg + ? path.resolve(outputPathArg) + : path.join(traceDir, USAGE_OUTPUT_FILE); + + const traceDirStats = await fs.stat(traceDir).catch(() => null); + if (!traceDirStats?.isDirectory()) { + throw new Error(`Request trace directory not found: ${traceDir}`); + } + + const rows = await readTraceRows(traceDir); + const csv = buildCsv(rows); + + await fs.writeFile(outputPath, csv, 'utf8'); + + console.log( + `Wrote ${rows.length} trace row(s) to ${outputPath}`, + ); +} + +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/scripts/request-trace-tools/usage-dashboard.html b/scripts/request-trace-tools/usage-dashboard.html new file mode 100644 index 000000000..3d750b269 --- /dev/null +++ b/scripts/request-trace-tools/usage-dashboard.html @@ -0,0 +1,1811 @@ + + + + + + Request Trace Usage Dashboard + + + +
+
+

Request Trace Explorer

+

把每次请求的 token 走势摊开看

+

+ 这个页面会优先读取当前目录的 usage-summary.csv,也支持手动导入 CSV。 + 它围绕 request trace 的导出字段,重点观察 + prompt_tokenscompletion_tokenscache_hit_tokens、 + cache_miss_tokenscache_write_tokens 的趋势、峰值和覆盖情况。 +

+
+ + +
+ 等待数据 + 优先尝试 usage-summary.csv,失败后可手动导入 +
+
+
+ +
+ + +
+
+
+ 请求数 + - +
+
+ usage 覆盖 + - +
+
+ Prompt 总量 + - +
+
+ Completion 总量 + - +
+
+ Cache Hit 总量 + - +
+
+ Cache Miss 总量 + - +
+
+ +
+
+
+

Request Token Curves

+

等待加载数据…

+
+
+ Prompt + Completion + Cache Hit + Cache Miss + Cache Write + +
+
+ +
+
+ 还没有可视化数据 +

+ 如果你是直接双击打开页面,浏览器大概率不能直接 fetch 本地文件。 + 这时点“选择 CSV 文件”导入同目录的 usage-summary.csv 就可以。 +

+
+
+ + +

+ 鼠标滚轮缩放 X 轴,按住左键拖动平移,双击图表重置当前视窗。 + cache_miss_tokens 只有在 CSV 明确给出或能由 prompt/cache_hit 推导时才会显示。 +

+
+ +
+
+
+
+

请求列表

+

+ 默认按 cache_miss_tokens 排序,方便快速定位没有吃到缓存的请求段。 +

+
+
+ +
+
+ + + + + + + + + + + + + + + +
Seq时间Cache MissPromptCompletionCache HitModel
等待数据…
+ +
+ +
+

最近 8 条记录

+

用于对照导入文件尾部请求,顺便看 usage 是否有缺口。

+ + + + + + + + + + + + + +
Seq时间KindPromptCoverage
等待数据…
+
+
+
+
+
+ + + + diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index acce1a9f3..88674ed67 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -1515,7 +1515,6 @@ pub async fn control_background_command( origin: ExecCommandControlOrigin::OutOfBand, remote, yield_time_ms: Some(250), - max_output_chars: Some(4_096), }) .await .map(|response| { diff --git a/src/crates/adapters/ai-adapters/src/client.rs b/src/crates/adapters/ai-adapters/src/client.rs index e431aad87..e36537d77 100644 --- a/src/crates/adapters/ai-adapters/src/client.rs +++ b/src/crates/adapters/ai-adapters/src/client.rs @@ -13,7 +13,9 @@ pub(crate) mod sse; pub(crate) mod utils; use crate::providers::{anthropic, gemini, openai}; -use crate::trace::{ModelExchangeRequestTraceHandle, ModelExchangeTraceConfig}; +use crate::trace::{ + ModelExchangeRequestTraceHandle, ModelExchangeResponseTrace, ModelExchangeTraceConfig, +}; use crate::types::ProxyConfig; use crate::types::*; use anyhow::Result; @@ -173,6 +175,28 @@ impl AIClient { messages: Vec, tools: Option>, extra_body: Option, + ) -> Result { + self.send_message_with_extra_body_and_trace(messages, tools, extra_body, None) + .await + } + + pub async fn send_message_with_trace( + &self, + messages: Vec, + tools: Option>, + trace: Option, + ) -> Result { + let custom_body = self.config.custom_request_body.clone(); + self.send_message_with_extra_body_and_trace(messages, tools, custom_body, trace) + .await + } + + pub async fn send_message_with_extra_body_and_trace( + &self, + messages: Vec, + tools: Option>, + extra_body: Option, + trace: Option, ) -> Result { for attempt in 0..SEND_MESSAGE_STREAM_ATTEMPTS { let stream_response = self @@ -180,16 +204,27 @@ impl AIClient { messages.clone(), tools.clone(), extra_body.clone(), - None, + trace.clone(), ) .await?; + let trace_handle = stream_response.trace_handle.clone(); match response_aggregator::aggregate_stream_response(stream_response).await { - Ok(response) => return Ok(response), + Ok(response) => { + complete_aggregated_trace(trace.as_ref(), trace_handle.as_ref(), &response) + .await; + return Ok(response); + } Err(error) if attempt < SEND_MESSAGE_STREAM_ATTEMPTS - 1 && is_transient_stream_error(&error.to_string()) => { + fail_aggregated_trace( + trace.as_ref(), + trace_handle.as_ref(), + &error.to_string(), + ) + .await; let delay_ms = send_message_retry_delay_ms(attempt); warn!( "Retrying aggregated AI stream after transient error: attempt={}/{}, delay_ms={}, error={}", @@ -200,7 +235,15 @@ impl AIClient { ); tokio::time::sleep(Duration::from_millis(delay_ms)).await; } - Err(error) => return Err(error), + Err(error) => { + fail_aggregated_trace( + trace.as_ref(), + trace_handle.as_ref(), + &error.to_string(), + ) + .await; + return Err(error); + } } } @@ -317,6 +360,55 @@ fn is_transient_stream_error(error_message: &str) -> bool { .any(|k| msg.contains(k)) } +async fn complete_aggregated_trace( + trace_config: Option<&ModelExchangeTraceConfig>, + trace_handle: Option<&ModelExchangeRequestTraceHandle>, + response: &GeminiResponse, +) { + let (Some(trace_config), Some(trace_handle)) = (trace_config, trace_handle) else { + return; + }; + + trace_config + .sink + .request_attempt_completed(trace_handle, &gemini_response_to_trace(response)) + .await; +} + +async fn fail_aggregated_trace( + trace_config: Option<&ModelExchangeTraceConfig>, + trace_handle: Option<&ModelExchangeRequestTraceHandle>, + error: &str, +) { + let Some(trace_config) = trace_config else { + return; + }; + + trace_config + .sink + .request_attempt_failed(trace_handle, error) + .await; +} + +fn gemini_response_to_trace(response: &GeminiResponse) -> ModelExchangeResponseTrace { + ModelExchangeResponseTrace { + kind: "completed".to_string(), + assistant_text: Some(response.text.clone()), + thinking: response.reasoning_content.clone(), + tool_calls: response + .tool_calls + .as_ref() + .and_then(|tool_calls| serde_json::to_value(tool_calls).ok()), + usage: response + .usage + .as_ref() + .and_then(|usage| serde_json::to_value(usage).ok()), + provider_metadata: response.provider_metadata.clone(), + partial_recovery_reason: None, + error: None, + } +} + #[cfg(test)] mod tests { use super::{is_transient_stream_error, AIClient}; diff --git a/src/crates/adapters/ai-adapters/src/client/sse.rs b/src/crates/adapters/ai-adapters/src/client/sse.rs index d5094e00d..02d041311 100644 --- a/src/crates/adapters/ai-adapters/src/client/sse.rs +++ b/src/crates/adapters/ai-adapters/src/client/sse.rs @@ -117,7 +117,7 @@ where .sink .request_attempt_started(&ModelExchangeRequestAttempt { request_url: url.to_string(), - request_body: request_body.clone(), + request_body: trace.capture_request_body.then(|| request_body.clone()), attempt_number: attempt + 1, }) .await diff --git a/src/crates/adapters/ai-adapters/src/trace.rs b/src/crates/adapters/ai-adapters/src/trace.rs index 035ba9ec8..0f858de72 100644 --- a/src/crates/adapters/ai-adapters/src/trace.rs +++ b/src/crates/adapters/ai-adapters/src/trace.rs @@ -6,7 +6,8 @@ use std::sync::Arc; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelExchangeRequestAttempt { pub request_url: String, - pub request_body: Value, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub request_body: Option, pub attempt_number: usize, } @@ -18,7 +19,8 @@ pub struct ModelExchangeRequestTraceHandle { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelExchangeResponseTrace { pub kind: String, - pub assistant_text: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub assistant_text: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub thinking: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -56,4 +58,5 @@ pub trait ModelExchangeTraceSink: Send + Sync { #[derive(Clone)] pub struct ModelExchangeTraceConfig { pub sink: Arc, + pub capture_request_body: bool, } diff --git a/src/crates/assembly/core/src/agentic/agents/prompt_builder/prompt_builder_impl.rs b/src/crates/assembly/core/src/agentic/agents/prompt_builder/prompt_builder_impl.rs index 3e4ae4774..2c3b17dba 100644 --- a/src/crates/assembly/core/src/agentic/agents/prompt_builder/prompt_builder_impl.rs +++ b/src/crates/assembly/core/src/agentic/agents/prompt_builder/prompt_builder_impl.rs @@ -45,6 +45,7 @@ pub struct RemoteExecutionHints { pub struct RuntimeContextNeeds { pub workspace_tools: bool, pub exec_command: bool, + pub exec_control: bool, pub computer_use: bool, } @@ -64,6 +65,9 @@ impl RuntimeContextNeeds { if tool_name == "ExecCommand" { needs.exec_command = true; } + if tool_name == "ExecControl" { + needs.exec_control = true; + } } "ComputerUse" | "ControlHub" => { needs.computer_use = true; @@ -75,7 +79,7 @@ impl RuntimeContextNeeds { } fn is_empty(self) -> bool { - !self.workspace_tools && !self.exec_command && !self.computer_use + !self.workspace_tools && !self.exec_command && !self.exec_control && !self.computer_use } } @@ -302,6 +306,20 @@ impl PromptBuilder { lines.extend(section_lines); } + fn exec_control_runtime_guidance( + host_os: &str, + remote_execution: bool, + exec_control_available: bool, + ) -> Vec { + if !exec_control_available || remote_execution || host_os != "windows" { + return Vec::new(); + } + + vec![ + "- On local Windows ExecCommand sessions, `ExecControl` `interrupt` is effectively the same as `kill` for non-TTY processes.".to_string(), + ] + } + /// Build runtime facts that may change independently from the agent's system prompt. pub async fn build_runtime_context_reminder(&self) -> Option { let needs = self.context.runtime_context_needs; @@ -362,6 +380,13 @@ impl PromptBuilder { Self::push_runtime_context_section(&mut lines, "ExecCommand Shell", exec_command_lines); } + let exec_control_lines = Self::exec_control_runtime_guidance( + host_os, + self.context.remote_execution.is_some(), + needs.exec_control, + ); + Self::push_runtime_context_section(&mut lines, "ExecControl", exec_control_lines); + if needs.computer_use { let mut local_client_lines = Vec::new(); if self.context.remote_execution.is_some() && needs.workspace_tools { @@ -880,6 +905,31 @@ mod tests { assert!(PromptBuilder::local_exec_shell_runtime_guidance("bash").is_empty()); } + #[test] + fn exec_control_runtime_guidance_is_added_for_local_windows() { + let guidance = PromptBuilder::exec_control_runtime_guidance("windows", false, true); + + assert_eq!( + guidance, + vec![ + "- On local Windows ExecCommand sessions, `ExecControl` `interrupt` is effectively the same as `kill` for non-TTY processes.".to_string() + ] + ); + } + + #[test] + fn exec_control_runtime_guidance_is_empty_when_exec_control_is_unavailable() { + assert!( + PromptBuilder::exec_control_runtime_guidance("windows", false, false).is_empty() + ); + } + + #[test] + fn exec_control_runtime_guidance_is_empty_for_remote_or_non_windows_hosts() { + assert!(PromptBuilder::exec_control_runtime_guidance("linux", false, true).is_empty()); + assert!(PromptBuilder::exec_control_runtime_guidance("windows", true, true).is_empty()); + } + #[tokio::test] async fn runtime_context_includes_computer_use_info_only_when_needed() { let context = PromptBuilderContext::new(r"workspace\root", None, None) diff --git a/src/crates/assembly/core/src/agentic/coordination/coordinator.rs b/src/crates/assembly/core/src/agentic/coordination/coordinator.rs index 828d7ba0b..3efca2050 100644 --- a/src/crates/assembly/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/assembly/core/src/agentic/coordination/coordinator.rs @@ -5577,33 +5577,6 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet &self.session_manager } - /// Persist a completed `/btw` side-question turn into an existing child session. - #[allow(clippy::too_many_arguments)] - pub async fn persist_btw_turn( - &self, - workspace_path: &Path, - child_session_id: &str, - request_id: &str, - question: &str, - full_text: &str, - parent_session_id: &str, - parent_dialog_turn_id: Option<&str>, - parent_turn_index: Option, - ) -> BitFunResult<()> { - self.session_manager - .persist_btw_turn( - workspace_path, - child_session_id, - request_id, - question, - full_text, - parent_session_id, - parent_dialog_turn_id, - parent_turn_index, - ) - .await - } - /// Set global coordinator (called during initialization) /// /// Skips if global coordinator already exists diff --git a/src/crates/assembly/core/src/agentic/execution/execution_engine.rs b/src/crates/assembly/core/src/agentic/execution/execution_engine.rs index fdee0f251..fe868b820 100644 --- a/src/crates/assembly/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/assembly/core/src/agentic/execution/execution_engine.rs @@ -2,6 +2,9 @@ //! //! Executes complete dialog turns, managing loops of multiple model rounds +use super::model_exchange_trace::{ + prepare_model_exchange_trace_for_workspace, ModelExchangeTraceOperation, +}; use super::round_executor::RoundExecutor; use super::types::{ExecutionContext, ExecutionResult, RoundContext}; use crate::agentic::agents::{ @@ -23,7 +26,6 @@ use crate::agentic::remote_file_delivery::TOOL_CONTEXT_REMOTE_FILE_DELIVERY_KEY; use crate::agentic::round_preempt::RoundInjectionKind; use crate::agentic::session::{CompressionMode, ContextCompressor, SessionManager}; use crate::agentic::skill_agent_snapshot::build_skill_agent_tool_listing_sections_from_snapshot; -use crate::agentic::tools::framework::ToolExposure; use crate::agentic::tools::implementations::{SkillTool, TaskTool}; use crate::agentic::tools::product_runtime::{ collect_product_unlocked_collapsed_tools, GetToolSpecTool, @@ -38,6 +40,7 @@ use crate::util::token_counter::TokenCounter; use crate::util::types::Message as AIMessage; use crate::util::types::ToolDefinition; use crate::util::{elapsed_ms_u64, truncate_at_char_boundary}; +use bitfun_ai_adapters::ModelExchangeTraceConfig; use log::{debug, error, info, trace, warn}; use sha2::{Digest, Sha256}; use std::collections::{HashMap, HashSet}; @@ -276,6 +279,8 @@ impl ExecutionEngine { /// /// System prompt and tool definitions are fixed per dialog turn and should not /// count against the auto-compression threshold the same way tool results do. + /// Keeping them fixed also preserves provider-side prefix/KV cache reuse; + /// changing tool definitions mid-turn turns every later round into a cache miss. fn estimate_auto_compression_pressure( messages: &[Message], tools: Option<&[ToolDefinition]>, @@ -518,30 +523,6 @@ impl ExecutionEngine { .unwrap_or_else(|| "auto".to_string()) } - /// Replace collapsed stub definitions with their full (expanded) variants - /// while preserving the original tool order and tool set. - /// - /// Tools missing from `expanded_definitions` (e.g. GetToolSpec when every - /// collapsed tool got unlocked) keep their original definition; extra tools - /// introduced by the re-resolve are ignored. - fn merge_unlocked_tool_definitions( - base_definitions: &[ToolDefinition], - expanded_definitions: Vec, - ) -> Vec { - let mut expanded_by_name: HashMap = expanded_definitions - .into_iter() - .map(|definition| (definition.name.clone(), definition)) - .collect(); - base_definitions - .iter() - .map(|definition| { - expanded_by_name - .remove(&definition.name) - .unwrap_or_else(|| definition.clone()) - }) - .collect() - } - async fn build_tool_listing_sections( manifest: &ResolvedToolManifest, tool_context: &crate::agentic::tools::framework::ToolUseContext, @@ -1028,7 +1009,7 @@ impl ExecutionEngine { &self, runtime_messages: &[Message], dialog_turn_id: &str, - workspace_path: Option<&Path>, + workspace: Option<&WorkspaceBinding>, provider: &str, attach_images: bool, prepended_prompt_reminders: &PrependedPromptReminders, @@ -1038,7 +1019,7 @@ impl ExecutionEngine { let mut compression_messages = Self::build_ai_messages_for_send( runtime_messages, provider, - workspace_path, + workspace.map(|workspace| workspace.root_path()), dialog_turn_id, attach_images, &prepended_reminders, @@ -1055,6 +1036,7 @@ impl ExecutionEngine { ai_client: Arc, request_messages: Vec, tool_definitions: Option>, + trace_config: Option, max_tries: usize, ) -> BitFunResult { let mut last_error = None; @@ -1062,7 +1044,11 @@ impl ExecutionEngine { for attempt in 0..max_tries { let result = ai_client - .send_message(request_messages.clone(), tool_definitions.clone()) + .send_message_with_trace( + request_messages.clone(), + tool_definitions.clone(), + trace_config.clone(), + ) .await; match result { @@ -1118,17 +1104,18 @@ impl ExecutionEngine { ai_client: Arc, runtime_messages: &[Message], dialog_turn_id: &str, - workspace_path: Option<&Path>, + workspace: Option<&WorkspaceBinding>, tool_definitions: &Option>, prepended_prompt_reminders: &PrependedPromptReminders, primary_supports_image_understanding: bool, contract: Option<&crate::agentic::core::CompressionContract>, + trace_config: Option, ) -> BitFunResult> { let request_messages = self .build_compression_request_messages( runtime_messages, dialog_turn_id, - workspace_path, + workspace, &ai_client.config.format, primary_supports_image_understanding, prepended_prompt_reminders, @@ -1141,6 +1128,7 @@ impl ExecutionEngine { ai_client, request_messages, tool_definitions.clone(), + trace_config, 2, ) .await?; @@ -1307,6 +1295,11 @@ impl ExecutionEngine { RuntimeContextNeeds::from_tool_names(manifest.allowed_tool_names.iter()) }) .unwrap_or_default(); + // Snapshot prompt-visible tool definitions once for this turn. Do not + // re-resolve or rewrite them after GetToolSpec unlocks a collapsed tool: + // the unlocked detail travels in tool results, while mutating the tool + // definitions would change the request prefix and trigger provider + // prefix/KV cache misses on subsequent rounds. let tool_definitions = tool_manifest.map(|manifest| manifest.tool_definitions); let prompt_context = Self::build_prompt_context( @@ -1358,7 +1351,7 @@ impl ExecutionEngine { prepended_prompt_reminders: &PrependedPromptReminders, primary_supports_image_understanding: bool, compression_contract_limit: usize, - workspace_path: Option<&Path>, + workspace: Option<&WorkspaceBinding>, ) -> BitFunResult)>> { let mut session = self .session_manager @@ -1398,16 +1391,29 @@ impl ExecutionEngine { let compression_contract = self .session_manager .compression_contract_for_session(session_id, compression_contract_limit); + let trace_config = prepare_model_exchange_trace_for_workspace( + session_id, + dialog_turn_id, + workspace, + ModelExchangeTraceOperation { + kind: "context_compression", + id: &compression_id, + trigger: Some("auto"), + }, + ai_client.as_ref(), + ) + .await; let model_summary = match self .generate_compression_model_summary( ai_client, &runtime_messages, dialog_turn_id, - workspace_path, + workspace, tool_definitions, prepended_prompt_reminders, primary_supports_image_understanding, compression_contract.as_ref(), + trace_config, ) .await { @@ -1611,19 +1617,29 @@ impl ExecutionEngine { let compression_contract = self .session_manager .compression_contract_for_session(&session_id, scaffold.compression_contract_limit); + let trace_config = prepare_model_exchange_trace_for_workspace( + &session_id, + &dialog_turn_id, + context.workspace.as_ref(), + ModelExchangeTraceOperation { + kind: "context_compression", + id: &compression_id, + trigger: Some(trigger), + }, + scaffold.ai_client.as_ref(), + ) + .await; let model_summary = match self .generate_compression_model_summary( scaffold.ai_client.clone(), &runtime_messages, &dialog_turn_id, - context - .workspace - .as_ref() - .map(|workspace| workspace.root_path()), + context.workspace.as_ref(), &scaffold.tool_definitions, &scaffold.prepended_prompt_reminders, scaffold.primary_supports_image_understanding, compression_contract.as_ref(), + trace_config, ) .await { @@ -1994,6 +2010,19 @@ impl ExecutionEngine { RuntimeContextNeeds::from_tool_names(manifest.allowed_tool_names.iter()) }) .unwrap_or_default(); + // We do not currently keep a session-level cache of resolved tool + // definitions; each turn re-resolves them from the current manifest. + // Expected changes therefore come from user-driven configuration or + // product-version changes, such as: + // - agent_type / mode changes + // - the user editing the enabled tool set for the current agent + // - MCP tool enablement / settings changes + // - a newer product build changing built-in tool definitions + // + // Outside those cases, tool definitions should remain byte-stable + // across the session. Avoid introducing extra turn-to-turn variation: + // it changes the request prefix and causes provider prefix/KV cache + // misses. let (available_tools, tool_definitions) = if let Some(manifest) = tool_manifest { (manifest.allowed_tool_names, Some(manifest.tool_definitions)) } else { @@ -2150,10 +2179,6 @@ impl ExecutionEngine { } } - // Per-turn cache of tool definitions re-resolved after GetToolSpec - // unlocks, keyed by the unlocked tool set of the round it was built for. - let mut unlocked_tool_definitions_cache: Option<(Vec, Vec)> = None; - // Loop to execute model rounds loop { if completed_rounds >= self.config.max_rounds { @@ -2236,10 +2261,7 @@ impl ExecutionEngine { &prepended_prompt_reminders, primary_supports_image_understanding, context_profile_policy.compression_contract_limit, - context - .workspace - .as_ref() - .map(|workspace| workspace.root_path()), + context.workspace.as_ref(), ) .await { @@ -2321,48 +2343,6 @@ impl ExecutionEngine { let unlocked_collapsed_tools = collect_product_unlocked_collapsed_tools(&messages, &collapsed_tools); - // Once GetToolSpec has unlocked a collapsed tool, later rounds must - // declare its real input schema. The collapsed stub declares - // `properties: {}` with `additionalProperties: false`, so - // schema-faithful models/providers can only ever emit empty - // arguments for it, which makes the unlocked tool uncallable. - let round_tool_definitions = match ( - &tool_definitions, - unlocked_collapsed_tools.is_empty(), - ) { - (Some(base_definitions), false) => match &unlocked_tool_definitions_cache { - Some((cached_unlocked, cached_definitions)) - if *cached_unlocked == unlocked_collapsed_tools => - { - Some(cached_definitions.clone()) - } - _ => { - let mut exposure_overrides = tool_policy.exposure_overrides.clone(); - for tool_name in &unlocked_collapsed_tools { - exposure_overrides.insert(tool_name.clone(), ToolExposure::Expanded); - } - let unlocked_manifest = resolve_tool_manifest( - &allowed_tools, - &exposure_overrides, - &tool_description_context, - ) - .await; - let merged = Self::merge_unlocked_tool_definitions( - base_definitions, - unlocked_manifest.tool_definitions, - ); - debug!( - "Expanded unlocked collapsed tool definitions for round: round_index={}, unlocked_tools={:?}", - round_index, unlocked_collapsed_tools - ); - unlocked_tool_definitions_cache = - Some((unlocked_collapsed_tools.clone(), merged.clone())); - Some(merged) - } - }, - _ => tool_definitions.clone(), - }; - let round_context = RoundContext { session_id: context.session_id.clone(), subagent_parent_info: context.subagent_parent_info.clone(), @@ -2417,7 +2397,7 @@ impl ExecutionEngine { ai_client.clone(), round_context, ai_messages, - round_tool_definitions, + tool_definitions.clone(), Some(context_window), ) .await?; @@ -2998,66 +2978,6 @@ mod tests { } } - #[test] - fn merge_unlocked_tool_definitions_swaps_stub_and_preserves_order_and_set() { - let stub_schema = json!({ - "type": "object", - "additionalProperties": false, - "properties": {} - }); - let full_schema = json!({ - "type": "object", - "properties": { "query": { "type": "string" } }, - "required": ["query"] - }); - let base = vec![ - ToolDefinition { - name: "Read".to_string(), - description: "Read a file".to_string(), - parameters: json!({ "type": "object" }), - }, - ToolDefinition { - name: "WebSearch".to_string(), - description: "THIS TOOL IS COLLAPSED...".to_string(), - parameters: stub_schema, - }, - ToolDefinition { - name: "GetToolSpec".to_string(), - description: "Unlock collapsed tools".to_string(), - parameters: json!({ "type": "object" }), - }, - ]; - // Re-resolved manifest: WebSearch expanded, GetToolSpec dropped - // (no collapsed tools remain), plus an unrelated extra tool. - let expanded = vec![ - ToolDefinition { - name: "WebSearch".to_string(), - description: "Search the web".to_string(), - parameters: full_schema.clone(), - }, - ToolDefinition { - name: "ExtraTool".to_string(), - description: "Should be ignored".to_string(), - parameters: json!({ "type": "object" }), - }, - ]; - - let merged = ExecutionEngine::merge_unlocked_tool_definitions(&base, expanded); - - assert_eq!( - merged - .iter() - .map(|definition| definition.name.as_str()) - .collect::>(), - vec!["Read", "WebSearch", "GetToolSpec"] - ); - assert_eq!(merged[1].description, "Search the web"); - assert_eq!(merged[1].parameters, full_schema); - // GetToolSpec keeps its original definition even though the - // re-resolved manifest no longer contains it. - assert_eq!(merged[2].description, "Unlock collapsed tools"); - } - #[test] fn resolve_configured_fast_model_falls_back_to_primary_when_fast_is_stale() { let mut ai_config = AIConfig::default(); diff --git a/src/crates/assembly/core/src/agentic/execution/model_exchange_trace.rs b/src/crates/assembly/core/src/agentic/execution/model_exchange_trace.rs index 64e7d0384..efd266c70 100644 --- a/src/crates/assembly/core/src/agentic/execution/model_exchange_trace.rs +++ b/src/crates/assembly/core/src/agentic/execution/model_exchange_trace.rs @@ -1,6 +1,9 @@ use super::types::RoundContext; +use crate::agentic::WorkspaceBinding; use crate::infrastructure::ai::AIClient; -use crate::service::config::GlobalConfigManager; +use crate::service::config::{ + GlobalConfigManager, ModelExchangeTracingConfig, ModelExchangeTracingMode, +}; use crate::service::workspace_runtime::{ get_workspace_runtime_service_arc, WorkspaceRuntimeContext, }; @@ -29,28 +32,80 @@ struct ModelExchangeTraceRecord { recorded_at: DateTime, session_id: String, turn_id: String, - round_id: String, + operation_kind: String, + operation_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + operation_trigger: Option, + capture_mode: ModelExchangeTracingMode, #[serde(default, skip_serializing_if = "Option::is_none")] response: Option, request: ModelExchangeTraceRequestRecord, } +#[derive(Debug, Clone, Copy)] +pub struct ModelExchangeTraceOperation<'a> { + pub kind: &'a str, + pub id: &'a str, + pub trigger: Option<&'a str>, +} + #[derive(Debug, Clone, Serialize, Deserialize)] struct ModelExchangeTraceRequestRecord { provider: String, api_format: String, model_id: String, request_url: String, - body: Value, + #[serde(default, skip_serializing_if = "Option::is_none")] + body: Option, attempt_number: usize, } +#[derive(Debug, Clone, Copy)] +struct ModelExchangeTracePolicy { + mode: ModelExchangeTracingMode, + capture_request_body: bool, + capture_response_text: bool, + capture_reasoning: bool, + capture_tool_calls: bool, + capture_usage: bool, + capture_provider_metadata: bool, +} + +impl ModelExchangeTracePolicy { + fn from_config(config: ModelExchangeTracingConfig) -> Option { + match config.mode { + ModelExchangeTracingMode::Off => None, + ModelExchangeTracingMode::Full => Some(Self { + mode: ModelExchangeTracingMode::Full, + capture_request_body: true, + capture_response_text: true, + capture_reasoning: true, + capture_tool_calls: true, + capture_usage: true, + capture_provider_metadata: true, + }), + ModelExchangeTracingMode::UsageOnly => Some(Self { + mode: ModelExchangeTracingMode::UsageOnly, + capture_request_body: false, + capture_response_text: false, + capture_reasoning: false, + capture_tool_calls: false, + capture_usage: true, + capture_provider_metadata: false, + }), + } + } +} + #[derive(Debug)] struct WorkspaceModelExchangeTraceSink { runtime_context: WorkspaceRuntimeContext, + policy: ModelExchangeTracePolicy, session_id: String, turn_id: String, - round_id: String, + operation_kind: String, + operation_id: String, + operation_trigger: Option, provider: String, api_format: String, model_id: String, @@ -60,18 +115,24 @@ struct WorkspaceModelExchangeTraceSink { impl WorkspaceModelExchangeTraceSink { fn new( runtime_context: WorkspaceRuntimeContext, + policy: ModelExchangeTracePolicy, session_id: String, turn_id: String, - round_id: String, + operation_kind: String, + operation_id: String, + operation_trigger: Option, provider: String, api_format: String, model_id: String, ) -> Self { Self { runtime_context, + policy, session_id, turn_id, - round_id, + operation_kind, + operation_id, + operation_trigger, provider, api_format, model_id, @@ -142,11 +203,47 @@ impl WorkspaceModelExchangeTraceSink { }; let mut record = self.read_record(&path).await?; - record.response = Some(response.clone()); + record.response = Some(self.sanitize_response(response)); self.write_record(&path, &record).await?; self.trace_paths.remove(trace_id); Ok(()) } + + fn sanitize_response( + &self, + response: &ModelExchangeResponseTrace, + ) -> ModelExchangeResponseTrace { + ModelExchangeResponseTrace { + kind: response.kind.clone(), + assistant_text: self + .policy + .capture_response_text + .then(|| response.assistant_text.clone()) + .flatten(), + thinking: self + .policy + .capture_reasoning + .then(|| response.thinking.clone()) + .flatten(), + tool_calls: self + .policy + .capture_tool_calls + .then(|| response.tool_calls.clone()) + .flatten(), + usage: self + .policy + .capture_usage + .then(|| response.usage.clone()) + .flatten(), + provider_metadata: self + .policy + .capture_provider_metadata + .then(|| response.provider_metadata.clone()) + .flatten(), + partial_recovery_reason: response.partial_recovery_reason.clone(), + error: response.error.clone(), + } + } } #[async_trait] @@ -177,7 +274,11 @@ impl ModelExchangeTraceSink for WorkspaceModelExchangeTraceSink { recorded_at: Utc::now(), session_id: self.session_id.clone(), turn_id: self.turn_id.clone(), - round_id: self.round_id.clone(), + operation_kind: self.operation_kind.clone(), + operation_id: self.operation_id.clone(), + operation_trigger: self.operation_trigger.clone(), + capture_mode: self.policy.mode, + response: None, request: ModelExchangeTraceRequestRecord { provider: self.provider.clone(), api_format: self.api_format.clone(), @@ -186,7 +287,6 @@ impl ModelExchangeTraceSink for WorkspaceModelExchangeTraceSink { body: attempt.request_body.clone(), attempt_number: attempt.attempt_number, }, - response: None, }; if let Err(error) = self.write_record(&path, &record).await { @@ -215,7 +315,7 @@ impl ModelExchangeTraceSink for WorkspaceModelExchangeTraceSink { &handle.trace_id, &ModelExchangeResponseTrace { kind: "error".to_string(), - assistant_text: String::new(), + assistant_text: None, thinking: None, tool_calls: None, usage: None, @@ -252,14 +352,35 @@ pub async fn prepare_model_exchange_trace( round_id: &str, ai_client: &AIClient, ) -> Option { - if !model_exchange_trace_enabled().await { + prepare_model_exchange_trace_for_workspace( + &context.session_id, + &context.dialog_turn_id, + context.workspace.as_ref(), + ModelExchangeTraceOperation { + kind: "model_round", + id: round_id, + trigger: None, + }, + ai_client, + ) + .await +} + +pub async fn prepare_model_exchange_trace_for_workspace( + session_id: &str, + turn_id: &str, + workspace: Option<&WorkspaceBinding>, + operation: ModelExchangeTraceOperation<'_>, + ai_client: &AIClient, +) -> Option { + let Some(policy) = current_model_exchange_trace_policy().await else { return None; - } + }; - let Some(workspace) = context.workspace.as_ref() else { + let Some(workspace) = workspace else { debug!( - "Model exchange trace skipped because round has no workspace: session_id={}, turn_id={}", - context.session_id, context.dialog_turn_id + "Model exchange trace skipped because operation has no workspace: session_id={}, turn_id={}, operation_kind={}, operation_id={}", + session_id, turn_id, operation.kind, operation.id ); return None; }; @@ -271,8 +392,8 @@ pub async fn prepare_model_exchange_trace( Ok(result) => result.context, Err(error) => { warn!( - "Model exchange trace skipped because runtime init failed: session_id={}, error={}", - context.session_id, error + "Model exchange trace skipped because runtime init failed: session_id={}, operation_kind={}, operation_id={}, error={}", + session_id, operation.kind, operation.id, error ); return None; } @@ -281,25 +402,30 @@ pub async fn prepare_model_exchange_trace( Some(ModelExchangeTraceConfig { sink: Arc::new(WorkspaceModelExchangeTraceSink::new( runtime_context, - context.session_id.clone(), - context.dialog_turn_id.clone(), - round_id.to_string(), + policy, + session_id.to_string(), + turn_id.to_string(), + operation.kind.to_string(), + operation.id.to_string(), + operation.trigger.map(str::to_string), ai_client.config.format.clone(), ai_client.config.format.clone(), ai_client.config.model.clone(), )), + capture_request_body: policy.capture_request_body, }) } -async fn model_exchange_trace_enabled() -> bool { +async fn current_model_exchange_trace_policy() -> Option { let Ok(config_service) = GlobalConfigManager::get_service().await else { - return false; + return None; }; - config_service - .get_config(Some("app.logging.model_exchange_trace")) + let tracing_config: ModelExchangeTracingConfig = config_service + .get_config(Some("app.logging.model_exchange_tracing")) .await - .unwrap_or(false) + .unwrap_or_default(); + ModelExchangeTracePolicy::from_config(tracing_config) } async fn detect_last_sequence(session_dir: &Path) -> Result { diff --git a/src/crates/assembly/core/src/agentic/execution/round_executor.rs b/src/crates/assembly/core/src/agentic/execution/round_executor.rs index 9035b6b9c..24f452368 100644 --- a/src/crates/assembly/core/src/agentic/execution/round_executor.rs +++ b/src/crates/assembly/core/src/agentic/execution/round_executor.rs @@ -45,6 +45,16 @@ impl RoundExecutor { !text.trim().is_empty() } + async fn sleep_with_cancellation( + delay_ms: u64, + cancel_token: &CancellationToken, + ) -> BitFunResult<()> { + tokio::select! { + _ = cancel_token.cancelled() => Err(BitFunError::Cancelled("Execution cancelled".to_string())), + _ = tokio::time::sleep(Duration::from_millis(delay_ms)) => Ok(()), + } + } + pub fn new( stream_processor: Arc, event_queue: Arc, @@ -130,16 +140,19 @@ impl RoundExecutor { attempt_index + 1, max_attempts ); - // Use dynamically obtained client for call - let (stream_response, send_to_stream_ms) = match ai_client - .send_message_stream( - ai_messages.clone(), - tool_definitions.clone(), - trace_config.clone(), - ) - .await - { + let send_future = ai_client.send_message_stream( + ai_messages.clone(), + tool_definitions.clone(), + trace_config.clone(), + ); + let send_result = tokio::select! { + _ = cancel_token.cancelled() => { + return Err(BitFunError::Cancelled("Execution cancelled".to_string())); + } + result = send_future => result, + }; + let (stream_response, send_to_stream_ms) = match send_result { Ok(response) => { let send_to_stream_ms = elapsed_ms_u64(request_started_at); debug!( @@ -168,7 +181,11 @@ impl RoundExecutor { delay_ms, err_msg ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -260,7 +277,11 @@ impl RoundExecutor { .count(), err_msg ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -347,7 +368,11 @@ impl RoundExecutor { result.tool_calls.len(), partial_recovery_reason ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -375,7 +400,11 @@ impl RoundExecutor { delay_ms, result.tool_calls.len() ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -422,7 +451,11 @@ impl RoundExecutor { max_attempts, delay_ms ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -470,7 +503,11 @@ impl RoundExecutor { delay_ms, err_msg ); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(cancel_err) = + Self::sleep_with_cancellation(delay_ms, &cancel_token).await + { + return Err(cancel_err); + } attempt_index += 1; continue; } @@ -1038,7 +1075,7 @@ impl RoundExecutor { partial_recovery_reason, ) = if let Some(result) = result { ( - result.full_text.clone(), + Some(result.full_text.clone()), Self::stream_result_reasoning(result), serde_json::to_value(&result.tool_calls).ok(), result @@ -1049,7 +1086,7 @@ impl RoundExecutor { result.partial_recovery_reason.clone(), ) } else { - (String::new(), None, None, None, None, None) + (None, None, None, None, None, None) }; ModelExchangeResponseTrace { @@ -1217,12 +1254,14 @@ mod tests { use crate::agentic::execution::stream_processor::StreamResult; use crate::agentic::execution::types::RoundContext; use crate::agentic::tools::ToolRuntimeRestrictions; + use crate::util::errors::BitFunError; use crate::util::types::ai::GeminiUsage; use bitfun_runtime_ports::DelegationPolicy; use dashmap::DashMap; use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use std::time::Duration; use tokio_util::sync::CancellationToken; fn test_round_executor() -> RoundExecutor { @@ -1324,6 +1363,31 @@ mod tests { ))); } + #[tokio::test] + async fn cancellable_sleep_returns_cancelled_when_token_fires() { + let token = CancellationToken::new(); + let token_for_task = token.clone(); + + let waiter = tokio::spawn(async move { + RoundExecutor::sleep_with_cancellation(5_000, &token_for_task).await + }); + + tokio::time::sleep(Duration::from_millis(20)).await; + token.cancel(); + + let result = waiter.await.expect("sleep task should join"); + assert!(matches!(result, Err(BitFunError::Cancelled(_)))); + } + + #[tokio::test] + async fn cancellable_sleep_completes_normally_without_cancel() { + let token = CancellationToken::new(); + + let result = RoundExecutor::sleep_with_cancellation(10, &token).await; + + assert!(result.is_ok()); + } + #[test] fn token_details_emits_both_cache_keys_when_present() { use crate::util::types::ai::GeminiUsage; @@ -1426,7 +1490,7 @@ mod tests { trace.error.as_deref(), Some("Provider returned only invalid tool arguments") ); - assert_eq!(trace.assistant_text, ""); + assert_eq!(trace.assistant_text.as_deref(), Some("")); assert_eq!(trace.thinking.as_deref(), Some("reasoning")); assert_eq!( trace.partial_recovery_reason.as_deref(), @@ -1463,7 +1527,7 @@ mod tests { let trace = RoundExecutor::error_trace_response("error", "request failed".to_string()); assert_eq!(trace.kind, "error"); - assert_eq!(trace.assistant_text, ""); + assert!(trace.assistant_text.is_none()); assert!(trace.thinking.is_none()); assert!(trace.tool_calls.is_none()); assert!(trace.usage.is_none()); diff --git a/src/crates/assembly/core/src/agentic/session/session_manager.rs b/src/crates/assembly/core/src/agentic/session/session_manager.rs index d3d1659b3..e288ae300 100644 --- a/src/crates/assembly/core/src/agentic/session/session_manager.rs +++ b/src/crates/assembly/core/src/agentic/session/session_manager.rs @@ -3945,151 +3945,6 @@ impl SessionManager { Ok(()) } - /// Persist a completed `/btw` side-question turn into an existing child session. - #[allow(clippy::too_many_arguments)] - pub async fn persist_btw_turn( - &self, - workspace_path: &Path, - child_session_id: &str, - request_id: &str, - question: &str, - full_text: &str, - parent_session_id: &str, - parent_dialog_turn_id: Option<&str>, - parent_turn_index: Option, - ) -> BitFunResult<()> { - let session = self.sessions.get(child_session_id).ok_or_else(|| { - BitFunError::NotFound(format!("Session not found: {}", child_session_id)) - })?; - let turn_id = format!("btw-turn-{}", request_id); - let turn_index = session - .dialog_turn_ids - .iter() - .position(|existing| existing == &turn_id) - .unwrap_or(session.dialog_turn_ids.len()); - - let user_message_id = format!("btw-user-{}", request_id); - let round_id = format!("btw-round-{}", request_id); - let text_id = format!("btw-text-{}", request_id); - let now = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - - let mut turn = DialogTurnData::new( - turn_id.clone(), - turn_index, - child_session_id.to_string(), - UserMessageData { - id: user_message_id, - content: question.to_string(), - timestamp: now, - metadata: Some(json!({ - "kind": "btw", - "parentSessionId": parent_session_id, - "parentRequestId": request_id, - "parentDialogTurnId": parent_dialog_turn_id, - "parentTurnIndex": parent_turn_index, - })), - }, - ); - turn.timestamp = now; - turn.start_time = now; - turn.end_time = Some(now); - turn.duration_ms = Some(0); - turn.status = TurnStatus::Completed; - turn.model_rounds = vec![ModelRoundData { - id: round_id, - turn_id: turn_id.clone(), - round_index: 0, - timestamp: now, - text_items: vec![TextItemData { - id: text_id, - content: full_text.to_string(), - is_streaming: false, - timestamp: now, - is_markdown: true, - order_index: None, - is_subagent_item: None, - parent_task_tool_id: None, - subagent_session_id: None, - status: Some("completed".to_string()), - }], - tool_items: vec![], - thinking_items: vec![], - start_time: now, - end_time: Some(now), - duration_ms: Some(0), - provider_id: None, - model_id: None, - model_alias: None, - first_chunk_ms: None, - first_visible_output_ms: None, - stream_duration_ms: None, - attempt_count: None, - failure_category: None, - token_details: None, - status: "completed".to_string(), - }]; - - drop(session); - - // Persist the turn to disk - self.persistence_manager - .save_dialog_turn(workspace_path, &turn) - .await?; - - // Sync messages to the in-memory caches so subsequent turns can access context. - let user_message = Message::user(question.to_string()) - .with_turn_id(turn_id.clone()) - .with_semantic_kind(MessageSemanticKind::ActualUserInput); - let assistant_message = - Message::assistant(full_text.to_string()).with_turn_id(turn_id.clone()); - - // Add to the in-memory runtime context cache. - self.context_store - .add_message(child_session_id, user_message); - self.context_store - .add_message(child_session_id, assistant_message); - - // IMPORTANT: keep the DashMap guard scope short -- do NOT hold it across .await. - let session_snapshot = if let Some(mut session) = self.sessions.get_mut(child_session_id) { - if !session - .dialog_turn_ids - .iter() - .any(|existing| existing == &turn_id) - { - session.dialog_turn_ids.push(turn_id); - } - session.updated_at = SystemTime::now(); - session.last_activity_at = SystemTime::now(); - - if self.config.enable_persistence && Self::should_persist_session(&session) { - Some(session.clone()) - } else { - None - } - } else { - None - }; - // RefMut guard released here -- DashMap shard lock is free. - - if let Some(session) = session_snapshot { - self.persistence_manager - .save_session(workspace_path, &session) - .await?; - } - - self.persist_context_snapshot_for_turn_best_effort( - child_session_id, - turn_index, - "btw_turn_persisted", - ) - .await; - - Ok(()) - } - // ============ Helper Methods ============ /// Get a best-effort message view for the session. diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/ask_user_question_tool.rs b/src/crates/assembly/core/src/agentic/tools/implementations/ask_user_question_tool.rs index 4753278f5..0874c87e8 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/ask_user_question_tool.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/ask_user_question_tool.rs @@ -135,14 +135,14 @@ Usage notes: }, "multiSelect": { "type": "boolean", - "description": "Set to true to allow the user to select multiple options instead of just one. Use when choices are not mutually exclusive." + "default": false, + "description": "Optional. Defaults to false. Set to true to allow the user to select multiple options instead of just one. Use when choices are not mutually exclusive." } }, "required": [ "question", "header", - "options", - "multiSelect" + "options" ], "additionalProperties": false }, @@ -294,4 +294,20 @@ mod tests { assert!(tool.is_available_in_context(Some(&context)).await); } + + #[test] + fn ask_user_question_schema_defaults_multi_select_to_false() { + let schema = AskUserQuestionTool::new().input_schema(); + let question_schema = &schema["properties"]["questions"]["items"]; + + assert_eq!( + question_schema["properties"]["multiSelect"]["default"], + false + ); + assert!(!question_schema["required"] + .as_array() + .expect("required array") + .iter() + .any(|value| value == "multiSelect")); + } } diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/command.rs b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/command.rs index 2b5ea700d..a4f747e10 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/command.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/command.rs @@ -29,9 +29,9 @@ use terminal_core::{ }; use tokio::sync::mpsc; -const DEFAULT_MAX_OUTPUT_CHARS: u64 = 10_000; const REMOTE_SHELL_PROBE_TIMEOUT_MS: u64 = 3_000; const REMOTE_NON_TTY_INTERRUPT_GRACE_SECONDS: u64 = 2; +const DEFAULT_TOOL_YIELD_TIME_MS: u64 = 30_000; const POWERSHELL_UTF8_OUTPUT_PREFIX: &str = "[Console]::OutputEncoding=[System.Text.Encoding]::UTF8;\n"; @@ -615,13 +615,10 @@ exit "$__bitfun_status""# "remote SSH manager is not initialized for ExecCommand".to_string(), ) })?; - let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") + let yield_time_ms = input + .get("yield_time_ms") .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); + .unwrap_or(DEFAULT_TOOL_YIELD_TIME_MS); let shell = Self::resolve_remote_shell(&ssh_manager, &connection_id).await; let env_snapshot = remote_env_snapshot_for( ssh_manager.clone(), @@ -663,8 +660,8 @@ exit "$__bitfun_status""# connection_id, command, tty, - yield_time_ms, - max_output_chars: Some(max_output_chars), + yield_time_ms: Some(yield_time_ms), + max_output_chars: None, lifecycle_tx: Self::start_remote_lifecycle_bridge(context, self.name()), output_capture_tx, }; @@ -820,11 +817,7 @@ Output: }, "yield_time_ms": { "type": "number", - "description": "How long to wait for output before yielding." - }, - "max_output_chars": { - "type": "number", - "description": "Maximum output characters to return. Defaults to 10000; excess output keeps head and tail." + "description": "How long to wait for output before yielding. Defaults to 30000 ms." } }, "required": ["cmd"], @@ -886,13 +879,10 @@ Output: let workdir = Self::resolve_workdir(input, context)?; let tty = input.get("tty").and_then(Value::as_bool).unwrap_or(false); let shell = resolve_local_exec_shell().await; - let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") + let yield_time_ms = input + .get("yield_time_ms") .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); + .unwrap_or(DEFAULT_TOOL_YIELD_TIME_MS); let output_capture_tx = if let Some(capture_id) = context.tool_call_id.as_ref() { Some( background_command_output_capture() @@ -915,8 +905,8 @@ Output: cwd: workdir.clone(), env: Self::command_env(), tty, - yield_time_ms, - max_output_chars: Some(max_output_chars), + yield_time_ms: Some(yield_time_ms), + max_output_chars: None, lifecycle_tx: Self::start_local_lifecycle_bridge(context, self.name()), output_capture_tx, }; diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/control.rs b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/control.rs index f35297b4e..53f04ee08 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/control.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/control.rs @@ -14,8 +14,6 @@ use terminal_core::{ LocalExecSessionCompletionStatus, TerminalError, }; -const DEFAULT_MAX_OUTPUT_CHARS: u64 = 10_000; - // ExecControl termination semantics by execution surface: // // Local workspace: @@ -77,7 +75,6 @@ pub struct ExecCommandControlRequest { pub origin: ExecCommandControlOrigin, pub remote: bool, pub yield_time_ms: Option, - pub max_output_chars: Option, } #[derive(Debug, Clone)] @@ -112,7 +109,7 @@ pub async fn control_exec_command_session( action: ExecControlTool::remote_action(request.action), origin: ExecControlTool::remote_origin(request.origin), yield_time_ms: request.yield_time_ms, - max_output_chars: request.max_output_chars, + max_output_chars: None, }) .await .map_err(|error| match error { @@ -143,7 +140,7 @@ pub async fn control_exec_command_session( action: ExecControlTool::local_action(request.action), origin: ExecControlTool::local_origin(request.origin), yield_time_ms: request.yield_time_ms, - max_output_chars: request.max_output_chars, + max_output_chars: None, }) .await .map_err(|error| match error { @@ -323,20 +320,12 @@ impl ExecControlTool { BitFunError::tool("action must be either 'interrupt' or 'kill'".to_string()) })?; let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") - .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); - let response = match control_exec_command_session(ExecCommandControlRequest { session_id, action, origin: ExecCommandControlOrigin::ModelTool, remote: true, yield_time_ms, - max_output_chars: Some(max_output_chars), }) .await { @@ -406,11 +395,7 @@ Output is only what was produced during this tool call's wait window."# }, "yield_time_ms": { "type": "number", - "description": "How long to wait for output after the control action before yielding." - }, - "max_output_chars": { - "type": "number", - "description": "Maximum output characters to return. Defaults to 10000; excess output keeps head and tail." + "description": "How long to wait for output after the control action before yielding. Defaults to 10000 ms." } }, "required": ["session_id", "action"], @@ -479,20 +464,12 @@ Output is only what was produced during this tool call's wait window."# BitFunError::tool("action must be either 'interrupt' or 'kill'".to_string()) })?; let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") - .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); - let response = match control_exec_command_session(ExecCommandControlRequest { session_id, action, origin: ExecCommandControlOrigin::ModelTool, remote: false, yield_time_ms, - max_output_chars: Some(max_output_chars), }) .await { @@ -574,7 +551,6 @@ mod tests { origin: ExecCommandControlOrigin::ModelTool, remote: false, yield_time_ms: Some(0), - max_output_chars: Some(1), }) .await .expect_err("missing session should be structured"); diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/stdin.rs b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/stdin.rs index d632a6f2f..e91c41158 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/stdin.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/exec_command/stdin.rs @@ -13,7 +13,7 @@ use terminal_core::{ LocalExecSessionCompletionStatus, LocalWriteStdinRequest, TerminalError, }; -const DEFAULT_MAX_OUTPUT_CHARS: u64 = 10_000; +const DEFAULT_TOOL_YIELD_TIME_MS: u64 = 30_000; pub struct WriteStdinTool; @@ -143,20 +143,16 @@ impl WriteStdinTool { .get("append_enter") .and_then(Value::as_bool) .unwrap_or(false); - let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") + let yield_time_ms = input + .get("yield_time_ms") .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); - + .unwrap_or(DEFAULT_TOOL_YIELD_TIME_MS); let request = RemoteWriteStdinRequest { session_id, chars, append_enter, - yield_time_ms, - max_output_chars: Some(max_output_chars), + yield_time_ms: Some(yield_time_ms), + max_output_chars: None, }; let progress_bridge = ExecOutputProgressBridge::start(context, self.name()); let response_result = if let Some(bridge) = progress_bridge.as_ref() { @@ -237,11 +233,7 @@ Output is only what was produced during this tool call's wait window."# }, "yield_time_ms": { "type": "number", - "description": "How long to wait for output before yielding." - }, - "max_output_chars": { - "type": "number", - "description": "Maximum output characters to return. Defaults to 10000; excess output keeps head and tail." + "description": "How long to wait for output before yielding. Defaults to 30000 ms." } }, "required": ["session_id"], @@ -303,20 +295,16 @@ Output is only what was produced during this tool call's wait window."# .get("append_enter") .and_then(Value::as_bool) .unwrap_or(false); - let yield_time_ms = input.get("yield_time_ms").and_then(Value::as_u64); - let max_output_chars = input - .get("max_output_chars") + let yield_time_ms = input + .get("yield_time_ms") .and_then(Value::as_u64) - .unwrap_or(DEFAULT_MAX_OUTPUT_CHARS) - .try_into() - .unwrap_or(usize::MAX); - + .unwrap_or(DEFAULT_TOOL_YIELD_TIME_MS); let request = LocalWriteStdinRequest { session_id, chars, append_enter, - yield_time_ms, - max_output_chars: Some(max_output_chars), + yield_time_ms: Some(yield_time_ms), + max_output_chars: None, }; let progress_bridge = ExecOutputProgressBridge::start(context, self.name()); let response_result = if let Some(bridge) = progress_bridge.as_ref() { diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/task_tool.rs b/src/crates/assembly/core/src/agentic/tools/implementations/task_tool.rs index d064ec65a..f520d305c 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/task_tool.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/task_tool.rs @@ -467,8 +467,13 @@ impl Tool for TaskTool { Ok(self.render_description()) } - async fn is_available_in_context(&self, context: Option<&ToolUseContext>) -> bool { - !Self::get_enabled_agents(context).await.is_empty() + async fn is_available_in_context(&self, _context: Option<&ToolUseContext>) -> bool { + // Keep Task prompt-visible even when no fresh subagents are currently + // available. Hiding it based on transient subagent availability makes + // the tool manifest drift across turns and causes provider prefix/KV + // cache misses. Task also still supports `fork_context=true` in that + // state, so removing it from the manifest would be behaviorally wrong. + true } fn short_description(&self) -> String { @@ -1690,6 +1695,14 @@ mod tests { .is_some_and(|message| message.contains("subagent_type is required"))); } + #[tokio::test] + async fn task_tool_stays_available_without_enabled_subagents() { + assert!( + TaskTool::new().is_available_in_context(None).await, + "Task should remain prompt-visible even when no fresh subagents are currently available" + ); + } + #[tokio::test] async fn validate_input_rejects_fork_context_conflicting_fields() { let validation = TaskTool::new() diff --git a/src/crates/assembly/core/src/agentic/tools/product_runtime/catalog.rs b/src/crates/assembly/core/src/agentic/tools/product_runtime/catalog.rs index e7ef729a6..d01ac01f3 100644 --- a/src/crates/assembly/core/src/agentic/tools/product_runtime/catalog.rs +++ b/src/crates/assembly/core/src/agentic/tools/product_runtime/catalog.rs @@ -580,12 +580,12 @@ mod tests { .unwrap_or_else(|| panic!("{tool_name} stub should exist")); assert!( stub.description.contains(&format!( - "THIS TOOL IS COLLAPSED. You MUST call GetToolSpec({{\"tool_name\":\"{tool_name}\"}}) before first calling {tool_name}." + "THIS IS A COLLAPSED TOOL. Before first use, call GetToolSpec({{\"tool_name\":\"{tool_name}\"}}) to load its schema." )), "collapsed stub must point to the explicit GetToolSpec unlock flow" ); assert_eq!(stub.parameters["type"], json!("object")); - assert_eq!(stub.parameters["additionalProperties"], json!(false)); + assert_eq!(stub.parameters["additionalProperties"], json!(true)); assert_eq!(stub.parameters["properties"], json!({})); } } diff --git a/src/crates/assembly/core/src/service/config/types.rs b/src/crates/assembly/core/src/service/config/types.rs index db90a1411..612f8bf1a 100644 --- a/src/crates/assembly/core/src/service/config/types.rs +++ b/src/crates/assembly/core/src/service/config/types.rs @@ -134,9 +134,24 @@ pub struct AppLoggingConfig { /// Whether diagnostic logs may include sensitive troubleshooting payloads. #[serde(default = "default_true")] pub include_sensitive_diagnostics: bool, - /// Whether to persist per-request AI model exchange traces for developer diagnostics. + /// Per-request AI model exchange tracing configuration for developer diagnostics. #[serde(default)] - pub model_exchange_trace: bool, + pub model_exchange_tracing: ModelExchangeTracingConfig, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ModelExchangeTracingMode { + #[default] + Off, + Full, + UsageOnly, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ModelExchangeTracingConfig { + pub mode: ModelExchangeTracingMode, } /// Session-related UI preferences. @@ -1344,7 +1359,15 @@ impl Default for AppLoggingConfig { // Set to Debug in early development for easier diagnostics level: "debug".to_string(), include_sensitive_diagnostics: true, - model_exchange_trace: false, + model_exchange_tracing: ModelExchangeTracingConfig::default(), + } + } +} + +impl Default for ModelExchangeTracingConfig { + fn default() -> Self { + Self { + mode: ModelExchangeTracingMode::Off, } } } @@ -1782,7 +1805,8 @@ impl AIModelConfig { #[cfg(test)] mod tests { use super::{ - AIConfig, AIExperienceConfig, AIModelConfig, AppLoggingConfig, GlobalConfig, ReasoningMode, + AIConfig, AIExperienceConfig, AIModelConfig, AppLoggingConfig, GlobalConfig, + ModelExchangeTracingMode, ReasoningMode, }; #[test] @@ -2072,7 +2096,10 @@ mod tests { .expect("logging config without sensitive preference should deserialize"); assert!(config.include_sensitive_diagnostics); - assert!(!config.model_exchange_trace); + assert_eq!( + config.model_exchange_tracing.mode, + ModelExchangeTracingMode::Off + ); } #[test] diff --git a/src/crates/execution/agent-runtime/src/user_questions.rs b/src/crates/execution/agent-runtime/src/user_questions.rs index 09744da7e..be036ab6f 100644 --- a/src/crates/execution/agent-runtime/src/user_questions.rs +++ b/src/crates/execution/agent-runtime/src/user_questions.rs @@ -14,7 +14,7 @@ pub struct Question { pub question: String, pub header: String, pub options: Vec, - #[serde(rename = "multiSelect")] + #[serde(rename = "multiSelect", default)] pub multi_select: bool, } diff --git a/src/crates/execution/agent-runtime/tests/user_question_tool_contracts.rs b/src/crates/execution/agent-runtime/tests/user_question_tool_contracts.rs index 6cb199437..36bf4ded6 100644 --- a/src/crates/execution/agent-runtime/tests/user_question_tool_contracts.rs +++ b/src/crates/execution/agent-runtime/tests/user_question_tool_contracts.rs @@ -98,3 +98,20 @@ fn ask_user_question_answered_and_cancelled_results_keep_wire_shape() { "User input request was cancelled." ); } + +#[test] +fn ask_user_question_input_defaults_multi_select_to_false_when_omitted() { + let input: AskUserQuestionInput = serde_json::from_value(serde_json::json!({ + "questions": [{ + "question": "Which path should be used?", + "header": "Path", + "options": [ + { "label": "A", "description": "Use A" }, + { "label": "B", "description": "Use B" } + ] + }] + })) + .expect("input without multiSelect should deserialize"); + + assert!(!input.questions[0].multi_select); +} diff --git a/src/crates/execution/tool-contracts/src/execution_gate.rs b/src/crates/execution/tool-contracts/src/execution_gate.rs index dae7b4c28..a57a9767a 100644 --- a/src/crates/execution/tool-contracts/src/execution_gate.rs +++ b/src/crates/execution/tool-contracts/src/execution_gate.rs @@ -12,6 +12,11 @@ pub const TOOL_CALL_LOOP_THRESHOLD: usize = 3; /// Bounded per-session history window for loop detection. pub const TOOL_CALL_HISTORY_WINDOW: usize = 10; +// WriteStdin is commonly used as a pure polling primitive for long-running +// ExecCommand sessions, so repeated identical calls are a legitimate way to +// wait for more output rather than evidence of a stuck tool loop. +const TOOL_CALL_LOOP_EXEMPT_TOOL_NAMES: &[&str] = &["WriteStdin"]; + #[derive(Debug, Clone)] struct RecentToolCall { tool_name: String, @@ -52,6 +57,7 @@ impl ToolCallLoopDecision { impl ToolCallLoopHistory { pub fn check_and_record(&mut self, tool_name: &str, arguments: &Value) -> ToolCallLoopDecision { + let is_exempt = TOOL_CALL_LOOP_EXEMPT_TOOL_NAMES.contains(&tool_name); let identical_priors = self .entries .iter() @@ -59,7 +65,7 @@ impl ToolCallLoopHistory { .take(TOOL_CALL_LOOP_THRESHOLD) .take_while(|past| past.tool_name == tool_name && &past.arguments == arguments) .count(); - let is_loop = identical_priors >= TOOL_CALL_LOOP_THRESHOLD; + let is_loop = !is_exempt && identical_priors >= TOOL_CALL_LOOP_THRESHOLD; self.entries.push_back(RecentToolCall { tool_name: tool_name.to_string(), diff --git a/src/crates/execution/tool-contracts/src/framework.rs b/src/crates/execution/tool-contracts/src/framework.rs index d7e3371d7..03ae6de4b 100644 --- a/src/crates/execution/tool-contracts/src/framework.rs +++ b/src/crates/execution/tool-contracts/src/framework.rs @@ -306,17 +306,25 @@ pub fn build_collapsed_tool_stub_definition( tool_name: &str, short_description: &str, ) -> ToolManifestDefinition { + // Keep the prompt-visible stub stable for the life of the conversation. + // GetToolSpec returns the full schema out-of-band; replacing this stub with + // a different tool definition mid-session changes the request prefix and + // causes provider-side prefix/KV cache misses on later rounds. + // We still need a stub definition in the request because some providers + // constrain model tool calls to the exact tool list attached to that + // request. Without a prompt-visible stub entry, the model may be unable to + // call the collapsed tool at all, even after GetToolSpec has described it. ToolManifestDefinition::new( tool_name, format!( - "THIS TOOL IS COLLAPSED. You MUST call GetToolSpec({{\"tool_name\":\"{}\"}}) before first calling {}. Any direct call will fail validation. Summary: {}", + "THIS IS A COLLAPSED TOOL. Before first use, call GetToolSpec({{\"tool_name\":\"{}\"}}) to load its schema. After that, you can call {} directly. Any direct call before loading will fail validation.\nSummary: {}", tool_name, tool_name, short_description, ), serde_json::json!({ "type": "object", - "additionalProperties": false, + "additionalProperties": true, "properties": {} }), ) @@ -1091,6 +1099,10 @@ where }); } + // This prompt-visible tool-definition list is part of the request prefix. + // Once a turn starts, enrich collapsed tools through GetToolSpec results + // instead of mutating this list, or later rounds will lose prefix-cache + // reuse even if the actual tool set is unchanged. let tool_definitions = build_prompt_visible_tool_manifest_definitions(&manifest_items); ContextualToolManifest { diff --git a/src/crates/execution/tool-contracts/tests/tool_contracts.rs b/src/crates/execution/tool-contracts/tests/tool_contracts.rs index c519eedf8..bff7864d5 100644 --- a/src/crates/execution/tool-contracts/tests/tool_contracts.rs +++ b/src/crates/execution/tool-contracts/tests/tool_contracts.rs @@ -927,6 +927,19 @@ fn tool_call_loop_history_blocks_fourth_identical_call_and_keeps_recovery_messag ); } +#[test] +fn tool_call_loop_history_exempts_write_stdin() { + let mut history = ToolCallLoopHistory::default(); + let args = json!({ "session_id": 123, "chars": "", "append_enter": false }); + + for _ in 0..8 { + assert!( + history.check_and_record("WriteStdin", &args).is_allowed(), + "WriteStdin should stay exempt from identical-call loop blocking" + ); + } +} + #[test] fn tool_execution_admission_gate_preserves_pipeline_rejection_order() { let mut restrictions = ToolRuntimeRestrictions::default(); @@ -1256,7 +1269,7 @@ fn collapsed_tool_stub_definition_preserves_prompt_visible_guardrail() { stub.parameters, json!({ "type": "object", - "additionalProperties": false, + "additionalProperties": true, "properties": {} }) ); @@ -2157,7 +2170,7 @@ async fn contextual_manifest_resolver_preserves_runtime_visible_manifest_contrac assert!(web_fetch .description .contains("THIS TOOL IS COLLAPSED. You MUST call GetToolSpec({\"tool_name\":\"WebFetch\"}) before first calling WebFetch.")); - assert_eq!(web_fetch.parameters["additionalProperties"], false); + assert_eq!(web_fetch.parameters["additionalProperties"], true); assert_eq!(web_fetch.parameters["properties"], json!({})); } diff --git a/src/crates/services/services-integrations/src/remote_ssh/remote_exec.rs b/src/crates/services/services-integrations/src/remote_ssh/remote_exec.rs index 678cca744..f0dc284bb 100644 --- a/src/crates/services/services-integrations/src/remote_ssh/remote_exec.rs +++ b/src/crates/services/services-integrations/src/remote_ssh/remote_exec.rs @@ -16,7 +16,6 @@ use tokio::time::{Duration, Instant}; use uuid::Uuid; const DEFAULT_YIELD_TIME_MS: u64 = 10_000; -const DEFAULT_MAX_OUTPUT_CHARS: usize = 10_000; const MAX_RETAINED_OUTPUT_BYTES: usize = 1024 * 1024; const MAX_REMOTE_EXEC_SESSIONS: usize = 64; const MAX_COMPLETED_REMOTE_EXEC_SESSIONS: usize = 64; @@ -267,7 +266,7 @@ impl RemoteExecProcessManager { .collect_until( cursor, deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), output_tx.as_ref(), ) .await; @@ -385,7 +384,7 @@ impl RemoteExecProcessManager { .collect_until( cursor, deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), output_tx.as_ref(), ) .await; @@ -447,7 +446,7 @@ impl RemoteExecProcessManager { .collect_until( cursor.clone(), deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), None, ) .await; @@ -1133,7 +1132,7 @@ fn new_chunk_id() -> String { #[cfg(test)] mod tests { - use super::new_session_id; + use super::{new_session_id, HeadTailText}; use std::collections::HashMap; #[test] @@ -1142,4 +1141,13 @@ mod tests { assert_eq!(new_session_id(&sessions), 1000); } + + #[test] + fn head_tail_text_keeps_full_output_when_unbounded() { + let mut buffer = HeadTailText::new(usize::MAX); + buffer.push_str("abcdefghijklmnop"); + + assert_eq!(buffer.total_chars, 16); + assert_eq!(buffer.render(), "abcdefghijklmnop"); + } } diff --git a/src/crates/services/terminal/src/exec.rs b/src/crates/services/terminal/src/exec.rs index 03ed85ea4..a0a19a7a9 100644 --- a/src/crates/services/terminal/src/exec.rs +++ b/src/crates/services/terminal/src/exec.rs @@ -23,7 +23,6 @@ use tokio::task::JoinHandle; use uuid::Uuid; const DEFAULT_YIELD_TIME_MS: u64 = 10_000; -const DEFAULT_MAX_OUTPUT_CHARS: usize = 10_000; const MAX_RETAINED_OUTPUT_BYTES: usize = 1024 * 1024; const MAX_EXEC_SESSIONS: usize = 64; const MAX_COMPLETED_EXEC_SESSIONS: usize = 64; @@ -283,7 +282,7 @@ impl ExecProcessManager { .collect_until( cursor, deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), output_tx.as_ref(), ) .await; @@ -406,7 +405,7 @@ impl ExecProcessManager { .collect_until( cursor, deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), output_tx.as_ref(), ) .await; @@ -488,7 +487,7 @@ impl ExecProcessManager { .collect_until( cursor, deadline_from_now(request.yield_time_ms), - request.max_output_chars.unwrap_or(DEFAULT_MAX_OUTPUT_CHARS), + request.max_output_chars.unwrap_or(usize::MAX), None, ) .await; @@ -2415,6 +2414,15 @@ print("parent_exit", flush=True)"#; assert!(rendered.contains("truncated")); } + #[test] + fn head_tail_text_keeps_full_output_when_unbounded() { + let mut buffer = HeadTailText::new(usize::MAX); + buffer.push_str("abcdefghijklmnop"); + + assert_eq!(buffer.total_chars, 16); + assert_eq!(buffer.render(), "abcdefghijklmnop"); + } + #[test] fn smart_decode_preserves_utf8() { assert_eq!(bytes_to_string_smart("小游戏平台".as_bytes()), "小游戏平台"); diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts index e2466e601..85fb370e5 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts @@ -637,9 +637,11 @@ export function handleToolExecutionProgress( tool_name === 'Bash' || tool_name === 'ExecCommand' || tool_name === 'WriteStdin' || + tool_name === 'ExecControl' || (toolItem as any).toolName === 'Bash' || (toolItem as any).toolName === 'ExecCommand' || - (toolItem as any).toolName === 'WriteStdin'; + (toolItem as any).toolName === 'WriteStdin' || + (toolItem as any).toolName === 'ExecControl'; const shouldAppend = typeof progress_message === 'string' && ( isTerminalLikeProgress ? progress_message.length > 0 diff --git a/src/web-ui/src/flow_chat/tool-cards/ExecControlToolCard.tsx b/src/web-ui/src/flow_chat/tool-cards/ExecControlToolCard.tsx new file mode 100644 index 000000000..b8157347e --- /dev/null +++ b/src/web-ui/src/flow_chat/tool-cards/ExecControlToolCard.tsx @@ -0,0 +1,26 @@ +import React, { useMemo } from 'react'; +import { useTranslation } from 'react-i18next'; +import type { ToolCardProps } from '../types/flow-chat'; +import { ExecProcessToolCardView } from './ExecProcessToolCardView'; +import { buildExecControlCardModel } from './execProcessToolCardModel'; + +export const ExecControlToolCard: React.FC = ({ + toolItem, + onExpand, +}) => { + const { t } = useTranslation('flow-chat'); + const model = useMemo( + () => buildExecControlCardModel(toolItem, t), + [t, toolItem], + ); + + return ( + + ); +}; + +export default ExecControlToolCard; diff --git a/src/web-ui/src/flow_chat/tool-cards/ExecProcessToolCardView.tsx b/src/web-ui/src/flow_chat/tool-cards/ExecProcessToolCardView.tsx index 734d5b3a8..b8ad0f798 100644 --- a/src/web-ui/src/flow_chat/tool-cards/ExecProcessToolCardView.tsx +++ b/src/web-ui/src/flow_chat/tool-cards/ExecProcessToolCardView.tsx @@ -22,7 +22,7 @@ const EXEC_OUTPUT_STREAMING_MAX_ROWS = 4; const EXEC_OUTPUT_EXPANDED_MAX_ROWS = 15; export interface ExecProcessCardModel { - kind: 'command' | 'stdin'; + kind: 'command' | 'stdin' | 'control'; actionLabel: string; primaryText: string; emptyText: string; diff --git a/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.test.ts b/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.test.ts index 7542a4872..ba5497be8 100644 --- a/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.test.ts +++ b/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.test.ts @@ -1,14 +1,22 @@ import { describe, expect, it } from 'vitest'; import type { FlowToolItem } from '../types/flow-chat'; -import { buildWriteStdinCardModel } from './execProcessToolCardModel'; +import { buildExecControlCardModel, buildWriteStdinCardModel } from './execProcessToolCardModel'; const messages: Record = { 'toolCards.execProcess.pollSession': 'Poll session #{{id}}', 'toolCards.execProcess.pollProcess': 'Poll process:', 'toolCards.execProcess.writeStdin': 'Write stdin:', + 'toolCards.execProcess.interruptProcess': 'Interrupt process:', + 'toolCards.execProcess.killProcess': 'Kill process:', 'toolCards.execProcess.pollingOutput': 'Polling output...', 'toolCards.execProcess.waitingForOutput': 'Waiting for output...', + 'toolCards.execProcess.interruptingSession': 'Interrupting process...', + 'toolCards.execProcess.killingSession': 'Killing process...', 'toolCards.execProcess.noOutput': 'No output', + 'toolCards.execProcess.interruptSentNoOutput': 'Interrupt sent; no output', + 'toolCards.execProcess.killSentNoOutput': 'Kill sent; no output', + 'toolCards.execProcess.interruptSession': 'Interrupt session #{{id}}', + 'toolCards.execProcess.killSession': 'Kill session #{{id}}', 'toolCards.execProcess.sessionNotFound': 'Session #{{id}} was not found.', }; @@ -38,6 +46,24 @@ function writeStdinItem(result: unknown): FlowToolItem { }; } +function execControlItem(input: Record, result: unknown): FlowToolItem { + return { + id: 'tool-execcontrol-1', + type: 'tool', + toolName: 'ExecControl', + status: 'completed', + timestamp: Date.now(), + toolCall: { + id: 'call-execcontrol-1', + input, + }, + toolResult: { + success: true, + result, + }, + }; +} + describe('buildWriteStdinCardModel', () => { it('surfaces session-not-found results as a completed notice', () => { const model = buildWriteStdinCardModel(writeStdinItem({ @@ -54,3 +80,44 @@ describe('buildWriteStdinCardModel', () => { expect(model.sessionId).toBe(42); }); }); + +describe('buildExecControlCardModel', () => { + it('renders interrupt controls with the requested session id', () => { + const model = buildExecControlCardModel(execControlItem({ + session_id: 7, + action: 'interrupt', + }, { + output: 'stopped', + session_id: null, + exit_code: 130, + wall_time_seconds: 0.125, + action: 'interrupt', + }), t); + + expect(model.kind).toBe('control'); + expect(model.actionLabel).toBe('Interrupt process:'); + expect(model.primaryText).toBe('Interrupt session #7'); + expect(model.resultOutput).toBe('stopped'); + expect(model.sessionId).toBe(7); + expect(model.exitCode).toBe(130); + }); + + it('surfaces session-not-found control results as a completed notice', () => { + const model = buildExecControlCardModel(execControlItem({ + session_id: 99, + action: 'kill', + }, { + status: 'session_not_found', + requested_session_id: 99, + session_id: null, + output: '', + action: 'kill', + }), t); + + expect(model.actionLabel).toBe('Kill process:'); + expect(model.primaryText).toBe('Kill session #99'); + expect(model.resultNoticeText).toBe('Session #99 was not found.'); + expect(model.resultOutput).toBe(''); + expect(model.noOutputText).toBe('Kill sent; no output'); + }); +}); diff --git a/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.ts b/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.ts index 503408ae0..d81821750 100644 --- a/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.ts +++ b/src/web-ui/src/flow_chat/tool-cards/execProcessToolCardModel.ts @@ -4,6 +4,7 @@ import type { ExecProcessCardModel } from './ExecProcessToolCardView'; interface ParsedExecResult { output: string; status?: string; + action?: string; workdir?: string; sessionId?: number; requestedSessionId?: number; @@ -54,6 +55,7 @@ export function parseExecProcessResult(raw: unknown): ParsedExecResult { return { output: stringField(record, 'output') ?? '', status: stringField(record, 'status'), + action: stringField(record, 'action'), workdir: stringField(record, 'workdir'), sessionId: numberField(record, 'session_id'), requestedSessionId: numberField(record, 'requested_session_id'), @@ -136,3 +138,62 @@ export function buildWriteStdinCardModel( remote: result.remote, }; } + +function inputSessionId(input: Record): number | undefined { + const value = input.session_id; + if (typeof value === 'number' && Number.isFinite(value)) { + return value; + } + + if (typeof value === 'string' && value.trim()) { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : undefined; + } + + return undefined; +} + +export function buildExecControlCardModel( + toolItem: FlowToolItem, + t: (key: string, options?: Record) => string, +): ExecProcessCardModel { + const input = toolItem.toolCall?.input ?? {}; + const result = parseExecProcessResult(toolItem.toolResult?.result); + const sessionId = inputSessionId(input); + const displaySessionId = sessionId ?? result.sessionId ?? result.requestedSessionId; + const action = typeof input.action === 'string' + ? input.action + : result.action; + const isInterrupt = action === 'interrupt'; + const primaryText = isInterrupt + ? t('toolCards.execProcess.interruptSession', { id: displaySessionId ?? '?' }) + : t('toolCards.execProcess.killSession', { id: displaySessionId ?? '?' }); + const resultNoticeText = result.status === 'session_not_found' + ? t('toolCards.execProcess.sessionNotFound', { + id: displaySessionId ?? '?', + }) + : undefined; + + return { + kind: 'control', + actionLabel: isInterrupt + ? t('toolCards.execProcess.interruptProcess') + : t('toolCards.execProcess.killProcess'), + primaryText, + emptyText: primaryText, + copyText: primaryText, + copyDisabled: displaySessionId == null, + waitingText: isInterrupt + ? t('toolCards.execProcess.interruptingSession') + : t('toolCards.execProcess.killingSession'), + noOutputText: isInterrupt + ? t('toolCards.execProcess.interruptSentNoOutput') + : t('toolCards.execProcess.killSentNoOutput'), + resultNoticeText, + resultOutput: result.output, + sessionId: displaySessionId, + exitCode: result.exitCode, + wallTimeSeconds: result.wallTimeSeconds, + remote: result.remote, + }; +} diff --git a/src/web-ui/src/flow_chat/tool-cards/index.ts b/src/web-ui/src/flow_chat/tool-cards/index.ts index 4cb5281bc..20173bf05 100644 --- a/src/web-ui/src/flow_chat/tool-cards/index.ts +++ b/src/web-ui/src/flow_chat/tool-cards/index.ts @@ -43,6 +43,7 @@ import { CreatePlanDisplay } from './CreatePlanDisplay'; import { TerminalToolCard } from './TerminalToolCard'; import { ExecCommandToolCard } from './ExecCommandToolCard'; import { WriteStdinToolCard } from './WriteStdinToolCard'; +import { ExecControlToolCard } from './ExecControlToolCard'; import { TerminalControlDisplay } from './TerminalControlDisplay'; import { InitMiniAppDisplay } from './MiniAppToolDisplay'; import { GenerativeWidgetToolCard } from './GenerativeWidgetToolCard'; @@ -107,6 +108,7 @@ export const TOOL_CARD_COMPONENTS = { // Exec process tools 'ExecCommand': ExecCommandToolCard, 'WriteStdin': WriteStdinToolCard, + 'ExecControl': ExecControlToolCard, // MiniApp tool 'InitMiniApp': InitMiniAppDisplay, diff --git a/src/web-ui/src/flow_chat/tool-cards/toolCardMetadata.ts b/src/web-ui/src/flow_chat/tool-cards/toolCardMetadata.ts index 35c4e078d..0d5fb767a 100644 --- a/src/web-ui/src/flow_chat/tool-cards/toolCardMetadata.ts +++ b/src/web-ui/src/flow_chat/tool-cards/toolCardMetadata.ts @@ -297,6 +297,17 @@ export const TOOL_CARD_CONFIGS: Record = { primaryColor: '#10b981' }, + 'ExecControl': { + toolName: 'ExecControl', + displayName: 'Control Process', + icon: 'TERM', + requiresConfirmation: false, + resultDisplayType: 'detailed', + description: 'Interrupt or kill a running command process', + displayMode: 'standard', + primaryColor: '#ef4444' + }, + // MiniApp tool 'InitMiniApp': { toolName: 'InitMiniApp', diff --git a/src/web-ui/src/infrastructure/config/types/index.ts b/src/web-ui/src/infrastructure/config/types/index.ts index 226634d4c..dc38f2f16 100644 --- a/src/web-ui/src/infrastructure/config/types/index.ts +++ b/src/web-ui/src/infrastructure/config/types/index.ts @@ -28,11 +28,16 @@ export interface AppConfig { } export type BackendLogLevel = 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'off'; +export type ModelExchangeTracingMode = 'off' | 'full' | 'usage_only'; + +export interface ModelExchangeTracingConfig { + mode: ModelExchangeTracingMode; +} export interface AppLoggingConfig { level: BackendLogLevel; include_sensitive_diagnostics: boolean; - model_exchange_trace: boolean; + model_exchange_tracing: ModelExchangeTracingConfig; } // Reserved; legacy `default_mode` in saved JSON is ignored by the app. diff --git a/src/web-ui/src/locales/en-US/flow-chat.json b/src/web-ui/src/locales/en-US/flow-chat.json index 23e30b7d6..9de4aacad 100644 --- a/src/web-ui/src/locales/en-US/flow-chat.json +++ b/src/web-ui/src/locales/en-US/flow-chat.json @@ -1340,15 +1340,23 @@ "executeCommand": "Run command:", "writeStdin": "Write stdin:", "pollProcess": "Poll process:", + "interruptProcess": "Interrupt process:", + "killProcess": "Kill process:", "executingCommand": "Running command...", "waitingForOutput": "Waiting for output...", "pollingOutput": "Polling output...", + "interruptingSession": "Interrupting process...", + "killingSession": "Killing process...", "pollSession": "Poll session #{{id}}", + "interruptSession": "Interrupt session #{{id}}", + "killSession": "Kill session #{{id}}", "session": "Session:", "wallTime": "{{seconds}}s", "remote": "Remote", "tty": "TTY", "noOutput": "No output", + "interruptSentNoOutput": "Interrupt sent; no output", + "killSentNoOutput": "Kill sent; no output", "sessionNotFound": "Session #{{id}} was not found", "copyPrimary": "Copy", "primaryCopied": "Copied", diff --git a/src/web-ui/src/locales/zh-CN/flow-chat.json b/src/web-ui/src/locales/zh-CN/flow-chat.json index 2edb8ad5e..35971aab0 100644 --- a/src/web-ui/src/locales/zh-CN/flow-chat.json +++ b/src/web-ui/src/locales/zh-CN/flow-chat.json @@ -1340,15 +1340,23 @@ "executeCommand": "运行命令:", "writeStdin": "写入标准输入:", "pollProcess": "轮询进程:", + "interruptProcess": "中断进程:", + "killProcess": "终止进程:", "executingCommand": "正在运行命令...", "waitingForOutput": "等待输出...", "pollingOutput": "正在轮询输出...", + "interruptingSession": "正在中断进程...", + "killingSession": "正在终止进程...", "pollSession": "轮询会话 #{{id}}", + "interruptSession": "中断会话 #{{id}}", + "killSession": "终止会话 #{{id}}", "session": "会话:", "wallTime": "{{seconds}}秒", "remote": "远程", "tty": "TTY", "noOutput": "无输出", + "interruptSentNoOutput": "已发送中断,无输出", + "killSentNoOutput": "已发送终止,无输出", "sessionNotFound": "未找到会话 #{{id}}", "copyPrimary": "复制", "primaryCopied": "已复制", diff --git a/src/web-ui/src/locales/zh-TW/flow-chat.json b/src/web-ui/src/locales/zh-TW/flow-chat.json index d037c17c4..bb524c754 100644 --- a/src/web-ui/src/locales/zh-TW/flow-chat.json +++ b/src/web-ui/src/locales/zh-TW/flow-chat.json @@ -1340,15 +1340,23 @@ "executeCommand": "執行命令:", "writeStdin": "寫入標準輸入:", "pollProcess": "輪詢進程:", + "interruptProcess": "中斷進程:", + "killProcess": "終止進程:", "executingCommand": "正在執行命令...", "waitingForOutput": "等待輸出...", "pollingOutput": "正在輪詢輸出...", + "interruptingSession": "正在中斷進程...", + "killingSession": "正在終止進程...", "pollSession": "輪詢會話 #{{id}}", + "interruptSession": "中斷會話 #{{id}}", + "killSession": "終止會話 #{{id}}", "session": "會話:", "wallTime": "{{seconds}}秒", "remote": "遠端", "tty": "TTY", "noOutput": "無輸出", + "interruptSentNoOutput": "已傳送中斷,無輸出", + "killSentNoOutput": "已傳送終止,無輸出", "sessionNotFound": "未找到會話 #{{id}}", "copyPrimary": "複製", "primaryCopied": "已複製",