feat(go): added DefineCustomAgent and DefinePromptAgent#4462
feat(go): added DefineCustomAgent and DefinePromptAgent#4462apascal07 wants to merge 33 commits intoap/go-bidifrom
DefineCustomAgent and DefinePromptAgent#4462Conversation
Summary of ChangesHello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Genkit Go SDK by introducing a new Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.
I am having trouble creating individual review comments. Click here to see my feedback.
go/core/action.go (534-550)
This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.
A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.
Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.
func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
defer func() {
if r := recover(); r != nil {
// This recovers from a panic that occurs when sending on a closed channel.
err = NewError(FAILED_PRECONDITION, "connection is closed")
}
}()
select {
case c.inputCh <- input:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
case <-c.doneCh:
// The recover will handle a panic if doneCh and inputCh close concurrently.
return NewError(FAILED_PRECONDITION, "action has completed")
}
}
go/samples/basic-session-flow/main.go (49-53)
The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.
ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
ThinkingConfig: &genai.ThinkingConfig{
ThinkingBudget: genai.Ptr[int32](0),
},
})),
SessionFlow and relatedDefineSessionFlow
DefineSessionFlowDefineCustomAgent and DefinePromptAgent
| defer wg.Done() | ||
| for chunk := range respCh { | ||
| if chunk.Artifact != nil { | ||
| session.AddArtifacts(chunk.Artifact) |
There was a problem hiding this comment.
I am asking this from a high-level analysis of the proposal, so please correct me if I'm mistaken.
I see that we have AddArtifacts and UpdateArtifacts methods on the Session object which are used to update the artifacts within DefineCustomAgent.
If the use-case is a complex one where the agent updates existing Artifacts, how would the developer manage these artifacts? Manually updating the session.artifacts field?
I suppose we can enforce that each artifact is immutable and any "updates" should be treated as a new artifact.
There was a problem hiding this comment.
UpdateArtifacts takes a function that passes in the existing artifacts and returns artifacts which will be used to atomically replace the existing artifacts.
// UpdateArtifacts atomically reads the current artifacts, applies the given
// function, and writes the result back.
func (s *Session[State]) UpdateArtifacts(fn func([]*Artifact) []*Artifact) {
s.mu.Lock()
defer s.mu.Unlock()
s.state.Artifacts = fn(s.state.Artifacts)
}
Adds an experimental Agent Flow API (
ai/x) for multi-turn conversations with automatic snapshot management, built on top of the bidirectional streaming primitives added upstream.Examples
Custom Agent Flows
DefineCustomAgentprovides multi-turn conversations with managed state, token-level streaming, and automatic snapshots. TheAgentSession.Runloop handles turn boundaries while allowing flexibility before and after to set up expensive clients or clean up in-progress state before returning the final outcome.sess.Result()returns anAgentFlowResultwith the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or omitting certain artifacts), you can construct the result directly:The client drives the conversation by sending messages and iterating chunks until
EndTurn:Prompt-Backed Agent Flows
DefinePromptAgenteliminates the manual generate loop entirely. Give it a prompt and it handles rendering, streaming, and history management automatically:Snapshots & Resumption
Configure automatic snapshot persistence with a store and optional callback:
Resume a conversation from a server-stored snapshot:
Or resume from client-kept state (no server store needed):
Custom Session State
The
Statetype parameter lets you maintain typed state across turns:Custom state is included in snapshots and available when resuming.
API Reference
Agent Flow API (
ai/x— experimental)Define
AgentFlowOption[State]
AgentFlowFunc
AgentFlow[Stream, State]
StreamBidiOption[State]
AgentFlowConnection[Stream, State]
Unlike
BidiConnection, breaking fromReceive()does not cancel the connection — enabling multi-turn patterns.AgentSession[State]
Extends
Session[State]with turn management. Passed as thesessparameter toAgentFlowFunc.Session[State]
Thread-safe conversation state. Available via
AgentSessionembedding orSessionFromContext.Responder[Stream]
Output channel with convenience methods. Artifacts sent here are auto-added to the session.
Wire Types
Snapshot System
PromptRenderer interface
Satisfied by
ai.Promptand*ai.DataPrompt[In, Out]. Used byDefinePromptAgent.Known Issues
StreamBidicreates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic — the trace contains no useful data (just a snapshot ID in the output). A future change could defer span creation until the first input arrives.StreamBidi→Send→Close→Outputdance. A futureRun(ctx, text)method (and variants likeRunMessages) would simplify the common case of sending a single input and receiving the final output without managing a connection.