Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,35 @@ class LangfuseBridge internal constructor(
)
}
}
is PipelineEvent.ApprovalRequested -> {
// #2489 — human approval pause. Field-only (no body / PII).
mostRecentTrace()?.let { state ->
enqueueEventObservation(
trace = state,
name = "agent.approval.requested",
input = mapOf(
"title" to event.title,
"has_body" to event.hasBody,
"timeout_ms" to event.timeoutMs,
),
metadata = metadata(event.runtimeContext),
)
}
}
is PipelineEvent.ApprovalDecided -> {
// #2489 — pairs with ApprovalRequested via timestamp ordering.
mostRecentTrace()?.let { state ->
enqueueEventObservation(
trace = state,
name = "agent.approval.decided",
input = mapOf(
"decision" to event.decision,
"has_payload" to event.hasPayload,
),
metadata = metadata(event.runtimeContext),
)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,33 @@ class LangSmithBridge internal constructor(
)
}
}
is PipelineEvent.ApprovalRequested -> {
// #2489 — field-only (no body) per the audit-row PII discipline.
mostRecentAgentRun()?.let { state ->
enqueueEvent(
state,
"agent.approval.requested",
mapOf(
"title" to event.title,
"has_body" to event.hasBody,
"timeout_ms" to event.timeoutMs,
),
)
}
}
is PipelineEvent.ApprovalDecided -> {
// #2489 — pairs with ApprovalRequested.
mostRecentAgentRun()?.let { state ->
enqueueEvent(
state,
"agent.approval.decided",
mapOf(
"decision" to event.decision,
"has_payload" to event.hasPayload,
),
)
}
}
}
}

Expand Down
21 changes: 21 additions & 0 deletions agents-kt-otel/src/main/kotlin/agents_engine/otel/OtelBridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,27 @@ class OtelBridge(
.build(),
)
}
is PipelineEvent.ApprovalRequested -> {
// #2489 — human approval requested; field-only (no body / PII).
mostRecentAgentSpan()?.addEvent(
"agent.approval.requested",
Attributes.builder()
.put("approval.title", event.title)
.put("approval.has_body", event.hasBody)
.also { b -> event.timeoutMs?.let { b.put("approval.timeout_ms", it) } }
.build(),
)
}
is PipelineEvent.ApprovalDecided -> {
// #2489 — the resumed HumanDecision; pairs with ApprovalRequested.
mostRecentAgentSpan()?.addEvent(
"agent.approval.decided",
Attributes.builder()
.put("approval.decision", event.decision)
.put("approval.has_payload", event.hasPayload)
.build(),
)
}
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/main/kotlin/agents_engine/core/Agent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,24 @@ class Agent<IN, OUT>(
*/
var toolHallucinatedListener: ((name: String, args: Map<String, Any?>, allowedTools: List<String>) -> Unit)? = null
private set
/**
* #2489 — fires when a tool inside the agentic loop calls `humanApproval
* { }` and the runtime is about to pause for human input. Pure
* observability: the runtime still throws [AgentInterruptException].
* Receives the rendered `title`, whether a `body` is attached, and the
* advisory `timeoutMs`. Body content is omitted by design — see
* [PipelineEvent.ApprovalRequested].
*/
var approvalRequestedListener: ((title: String, hasBody: Boolean, timeoutMs: Long?) -> Unit)? = null
private set
/**
* #2489 — fires on the resume path when `resumeWith` is a [HumanDecision].
* Receives the variant name and whether the variant carried a payload
* (Edited/Responded). Body content omitted by design — see
* [PipelineEvent.ApprovalDecided].
*/
var approvalDecidedListener: ((decision: String, hasPayload: Boolean) -> Unit)? = null
private set
private val tokenUsageListeners = mutableListOf<(TokenUsage) -> Unit>()
var knowledgeUsedListener: ((name: String, content: String) -> Unit)? = null
private set
Expand Down Expand Up @@ -330,6 +348,23 @@ class Agent<IN, OUT>(
toolHallucinatedListener = block
}

/**
* #2489 — Observe `humanApproval { }` requests on this agent's loop.
* Settable post-construction. See [PipelineEvent.ApprovalRequested].
*/
fun onApprovalRequested(block: (title: String, hasBody: Boolean, timeoutMs: Long?) -> Unit) {
approvalRequestedListener = block
}

/**
* #2489 — Observe the [HumanDecision] when the resume path synthesises
* a tool result from a `resumeWith` of that type. Settable
* post-construction. See [PipelineEvent.ApprovalDecided].
*/
fun onApprovalDecided(block: (decision: String, hasPayload: Boolean) -> Unit) {
approvalDecidedListener = block
}

/**
* Observe provider-reported token usage for each successful LLM round-trip.
*
Expand Down
143 changes: 143 additions & 0 deletions src/main/kotlin/agents_engine/core/HumanApproval.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package agents_engine.core

import kotlin.time.Duration

/**
* `agents_engine/core/HumanApproval.kt` — first-class human approval gate
* (#2489), built on the interrupt primitive ([interrupt] in
* [agents_engine.core.Interrupt]). Promotes the typed-approval pattern
* sketched by the #1918 demo to a runtime feature.
*
* The shape:
*
* ```kotlin
* tool("approve_deploy") { args ->
* humanApproval {
* title = "Deploy to production?"
* body = deploymentPlan // typed
* timeout = 30.minutes
* defaultOnTimeout = HumanDecision.Rejected
* }
* // ↑ never returns — throws AgentInterruptException carrying the
* // ApprovalRequest. The caller asks the human, then resumes via
* // invokeSuspendResuming(..., resumeWith = <HumanDecision>).
* }
* ```
*
* **Sealed [HumanDecision].** Not a boolean. The four variants —
* `Approved`, `Rejected`, `Edited(payload)`, `Responded(payload)` —
* capture the four real-world outcomes for a typed approval. Edited
* carries the modified plan; Responded carries a free-form reply
* (e.g. "ask the user this clarifying question first").
*
* **Audit events.** [Agent.observe] subscribers see two new
* [PipelineEvent] variants: [PipelineEvent.ApprovalRequested] (emitted
* by the agentic loop when the interrupt payload is an [ApprovalRequest])
* and [PipelineEvent.ApprovalDecided] (emitted when the resume path
* synthesises a tool result from a [HumanDecision] `resumeWith`).
* Field-only — no payload bodies in the audit row, since payloads can
* be high-volume or PII-sensitive. Bridges (OTel / LangSmith /
* Langfuse) and the JSONL audit exporter pick them up via the usual
* `observe { }` seam.
*
* **Timeout.** [ApprovalRequest.timeout] and
* [ApprovalRequest.defaultOnTimeout] are advisory — the runtime can't
* honor them inside the suspension because the human reply happens
* BETWEEN `catch (AgentInterruptException)` and the next call to
* `invokeSuspendResuming(...)`. They're carried on the request so the
* caller has a contract for how to behave on expiry: when the
* configured timeout elapses without a reply, the caller should resume
* with `resumeWith = request.defaultOnTimeout`.
*
* Pairs with #2487 (HITL epic) and #2488 (interrupt primitive).
*/

/**
* A typed request for human input. Surfaced as the payload of
* [AgentInterruptException] when `humanApproval { }` fires.
*
* @property title short prompt rendered to the human (e.g. "Deploy to
* production?").
* @property body optional context — typed (`@Generable` or anything
* `toLlmInput`-renderable) or null. Typically the plan or artefact
* the human is reviewing.
* @property timeout advisory wall-clock cap on how long the runtime
* would wait if it were managing the timer (which it isn't — see
* class header). Null = no advisory.
* @property defaultOnTimeout the decision the caller should synthesise
* if [timeout] expires without a human reply. Defaults to
* [HumanDecision.Rejected] — fail-closed for sensitive actions is
* the right default for a regulated runtime.
*/
data class ApprovalRequest(
val title: String,
val body: Any? = null,
val timeout: Duration? = null,
val defaultOnTimeout: HumanDecision = HumanDecision.Rejected,
)

/**
* The sealed result of a human approval request. Caller passes one of
* these as `resumeWith` to `invokeSuspendResuming(...)`:
*
* - [Approved] — proceed.
* - [Rejected] — refuse. Sensitive actions should fail-closed.
* - [Edited] — the human modified the plan; `payload` carries the new
* plan (typically the same type as the original `body`).
* - [Responded] — the human gave a free-form reply (e.g. "first ask
* the user for clarification on X"); `payload` is the reply.
*/
sealed interface HumanDecision {
object Approved : HumanDecision
object Rejected : HumanDecision
data class Edited(val payload: Any?) : HumanDecision
data class Responded(val payload: Any?) : HumanDecision
}

/**
* DSL builder for [humanApproval].
*/
class ApprovalBuilder {
var title: String = ""
var body: Any? = null
var timeout: Duration? = null
var defaultOnTimeout: HumanDecision = HumanDecision.Rejected

internal fun build(): ApprovalRequest {
require(title.isNotBlank()) { "humanApproval { } requires a non-blank title." }
return ApprovalRequest(
title = title,
body = body,
timeout = timeout,
defaultOnTimeout = defaultOnTimeout,
)
}
}

/**
* Pause the agentic loop for human approval. Throws — never returns.
*
* Equivalent to constructing an [ApprovalRequest] and calling
* [interrupt] with it. The agentic loop recognises the payload as an
* [ApprovalRequest] and emits [PipelineEvent.ApprovalRequested] for
* audit consumers before throwing.
*
* The caller catches [AgentInterruptException], inspects `payload as
* ApprovalRequest`, asks the human, then resumes via:
*
* ```kotlin
* agent.invokeSuspendResuming(
* input = originalInput,
* resumeFrom = exception.snapshot,
* resumeWith = HumanDecision.Approved, // or .Rejected / .Edited(...) / .Responded(...)
* )
* ```
*
* The model sees the [HumanDecision] rendered as JSON (via
* [agents_engine.generation.toLlmInput]) on the synthesised tool
* result message. From its perspective the round-trip is invisible.
*/
fun humanApproval(block: ApprovalBuilder.() -> Unit): Nothing {
val request = ApprovalBuilder().apply(block).build()
interrupt(payload = request)
}
66 changes: 66 additions & 0 deletions src/main/kotlin/agents_engine/core/PipelineEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,43 @@ sealed interface PipelineEvent {
val allowedTools: List<String>,
override val runtimeContext: AgentRuntimeContext = AgentRuntimeContext.currentOrNew(),
) : PipelineEvent

/**
* #2489 — a tool inside the agentic loop called `humanApproval { }` and
* the runtime is about to pause for human input. Emitted before the
* [AgentInterruptException] is thrown, so audit consumers see the request
* on the same wall-clock ordering as the snapshot capture. Field-only
* — `title` is the rendered prompt; `hasBody` indicates whether
* additional context (typed plan, artefact) accompanied the request,
* without copying the body into the audit row (which may be high-volume
* or PII-sensitive). `timeoutMs` is the advisory wall-clock cap the
* caller should honour.
*/
data class ApprovalRequested(
override val agentName: String,
override val timestamp: Instant,
val title: String,
val hasBody: Boolean,
val timeoutMs: Long?,
override val runtimeContext: AgentRuntimeContext = AgentRuntimeContext.currentOrNew(),
) : PipelineEvent

/**
* #2489 — the resume path observed a [HumanDecision] in `resumeWith`,
* synthesised the tool result, and is about to continue the loop.
* `decision` is the simple class name of the [HumanDecision] variant
* (Approved / Rejected / Edited / Responded) — `hasPayload` indicates
* whether the Edited/Responded variant carried a non-null payload.
* The payload itself stays off the audit row (same PII discipline as
* [ApprovalRequested.hasBody]).
*/
data class ApprovalDecided(
override val agentName: String,
override val timestamp: Instant,
val decision: String,
val hasPayload: Boolean,
override val runtimeContext: AgentRuntimeContext = AgentRuntimeContext.currentOrNew(),
) : PipelineEvent
}

/**
Expand All @@ -133,6 +170,8 @@ sealed interface PipelineEvent {
* - [PipelineEvent.ErrorOccurred] — when an exception is about to propagate out (see [Agent.onError])
* - [PipelineEvent.BudgetThreshold] — when a budget crosses [Agent.onBudgetThreshold]'s threshold
* - [PipelineEvent.ToolHallucinated] — when the model emits a tool name not in the skill's allowlist (#2757)
* - [PipelineEvent.ApprovalRequested] — when a tool calls `humanApproval { }` (#2489)
* - [PipelineEvent.ApprovalDecided] — when resume synthesises a result from a [HumanDecision] (#2489)
*/
fun Agent<*, *>.observe(handler: (PipelineEvent) -> Unit) {
val agentName = this.name
Expand Down Expand Up @@ -208,4 +247,31 @@ fun Agent<*, *>.observe(handler: (PipelineEvent) -> Unit) {
),
)
}

val priorApprovalRequested = this.approvalRequestedListener
onApprovalRequested { title, hasBody, timeoutMs ->
priorApprovalRequested?.invoke(title, hasBody, timeoutMs)
handler(
PipelineEvent.ApprovalRequested(
agentName = agentName,
timestamp = Instant.now(),
title = title,
hasBody = hasBody,
timeoutMs = timeoutMs,
),
)
}

val priorApprovalDecided = this.approvalDecidedListener
onApprovalDecided { decision, hasPayload ->
priorApprovalDecided?.invoke(decision, hasPayload)
handler(
PipelineEvent.ApprovalDecided(
agentName = agentName,
timestamp = Instant.now(),
decision = decision,
hasPayload = hasPayload,
),
)
}
}
Loading
Loading