Skip to content

Comments

feat(go): added DefineCustomAgent and DefinePromptAgent#4462

Open
apascal07 wants to merge 33 commits intoap/go-bidifrom
ap/go-session-flow
Open

feat(go): added DefineCustomAgent and DefinePromptAgent#4462
apascal07 wants to merge 33 commits intoap/go-bidifrom
ap/go-session-flow

Conversation

@apascal07
Copy link
Collaborator

@apascal07 apascal07 commented Feb 6, 2026

Adds an experimental Agent Flow API (ai/x) for multi-turn conversations with automatic snapshot management, built on top of the bidirectional streaming primitives added upstream.

Examples

Custom Agent Flows

DefineCustomAgent provides multi-turn conversations with managed state, token-level streaming, and automatic snapshots. The AgentSession.Run loop handles turn boundaries while allowing flexibility before and after to set up expensive clients or clean up in-progress state before returning the final outcome.

chatFlow := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], sess *aix.AgentSession[struct{}]) (*aix.AgentFlowResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentFlowInput) error {
            for chunk, err := range genkit.GenerateStream(ctx, g,
                ai.WithModelName("googleai/gemini-3-flash-preview"),
                ai.WithMessages(sess.Messages()...),
            ) {
                if err != nil {
                    return err
                }
                if chunk.Done {
                    sess.AddMessages(chunk.Response.Message)
                    break
                }
                resp.SendModelChunk(chunk.Chunk) // stream tokens to client
            }

            return nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
)

sess.Result() returns an AgentFlowResult with the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or omitting certain artifacts), you can construct the result directly:

return &aix.AgentFlowResult{Artifacts: sess.Artifacts()}, nil

The client drives the conversation by sending messages and iterating chunks until EndTurn:

conn, _ := chatFlow.StreamBidi(ctx)

conn.SendText("What is Go?")

for chunk, err := range conn.Receive() {
    if chunk.ModelChunk != nil {
        fmt.Print(chunk.ModelChunk.Text())
    }
    if chunk.EndTurn {
        break // turn complete, ready for next input
    }
}

conn.SendText("Tell me more about its concurrency model")
// ... iterate conn.Receive() again ...

conn.Close()
output, _ := conn.Output() // AgentFlowOutput with final state/snapshot

Prompt-Backed Agent Flows

DefinePromptAgent eliminates the manual generate loop entirely. Give it a prompt and it handles rendering, streaming, and history management automatically:

# prompts/chat.prompt
---
model: googleai/gemini-3-flash-preview
input:
  schema:
    personality: string
  default:
    personality: a helpful assistant
---
{{ role "system" }}
You are {{ personality }}. Keep responses concise.
type ChatInput struct {
    Personality string `json:"personality"`
}

chatFlow := genkit.DefinePromptAgent[any](
    g, "chat", ChatInput{Personality: "a sarcastic pirate"},
)

conn, _ := chatFlow.StreamBidi(ctx)
conn.SendText("What is Go?")
for chunk, _ := range conn.Receive() {
    // tokens stream automatically
    if chunk.EndTurn { break }
}
conn.Close()

Snapshots & Resumption

Configure automatic snapshot persistence with a store and optional callback:

store := aix.NewInMemorySessionStore[MyState]()

chatFlow := genkit.DefineCustomAgent(g, "chat", myFunc,
    aix.WithSessionStore(store),
    aix.WithSnapshotOn[MyState](aix.SnapshotEventTurnEnd),
)

Resume a conversation from a server-stored snapshot:

conn, _ := chatFlow.StreamBidi(ctx, aix.WithSnapshotID[MyState]("snapshot-abc-123"))

Or resume from client-kept state (no server store needed):

conn, _ := chatFlow.StreamBidi(ctx, aix.WithState(&aix.SessionState[MyState]{
    Messages: previousMessages,
    Custom:   MyState{Topic: "concurrency"},
}))

Custom Session State

The State type parameter lets you maintain typed state across turns:

type ChatState struct {
    TopicsDiscussed []string `json:"topicsDiscussed"`
}

chatFlow := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], sess *aix.AgentSession[ChatState]) (*aix.AgentFlowResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentFlowInput) error {
            // ... generate response ...

            sess.UpdateCustom(func(s ChatState) ChatState {
                s.TopicsDiscussed = append(s.TopicsDiscussed, extractTopic(input))
                return s
            })
            return nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
    aix.WithSessionStore(aix.NewInMemorySessionStore[ChatState]()),
)

Custom state is included in snapshots and available when resuming.


API Reference

Agent Flow API (ai/x — experimental)

Define

// DefineCustomAgent creates an AgentFlow with automatic snapshot management
// and registers it. Full control: you write the turn loop.
func DefineCustomAgent[Stream, State any](
    r Registry, name string,
    fn AgentFlowFunc[Stream, State],
    opts ...AgentFlowOption[State],
) *AgentFlow[Stream, State]

// DefinePromptAgent creates a prompt-backed AgentFlow with an automatic
// conversation loop. Each turn renders the prompt, appends conversation history,
// calls the model with streaming, and updates session state.
// The defaultInput is used for prompt rendering unless overridden per
// invocation via WithInputVariables.
func DefinePromptAgent[State, PromptIn any](
    r Registry, name string,
    p PromptRenderer[PromptIn],
    defaultInput PromptIn,
    opts ...AgentFlowOption[State],
) *AgentFlow[any, State]
AgentFlowOption[State]
// WithSessionStore sets the store for persisting snapshots.
WithSessionStore[State](store SessionStore[State])

// WithSnapshotCallback configures when snapshots are created.
// If not provided and a store is configured, snapshots are always created.
WithSnapshotCallback[State](cb SnapshotCallback[State])

// WithSnapshotOn configures snapshots to be created only for the specified events.
// Convenience wrapper around WithSnapshotCallback.
WithSnapshotOn[State](events ...SnapshotEvent)

AgentFlowFunc

// AgentFlowFunc is the function signature for custom agent flows.
// It receives a responder for streaming output, a session for state management,
// and returns an optional AgentFlowResult with the final output.
type AgentFlowFunc[Stream, State any] = func(
    ctx context.Context,
    resp Responder[Stream],
    sess *AgentSession[State],
) (*AgentFlowResult, error)

AgentFlow[Stream, State]

// AgentFlow is a bidirectional streaming flow with automatic snapshot management.

// StreamBidi starts a new agent flow invocation.
func (*AgentFlow) StreamBidi(ctx context.Context, opts ...StreamBidiOption[State]) (*AgentFlowConnection[...], error)
StreamBidiOption[State]
// WithSnapshotID loads state from a persisted snapshot by ID.
// Use this for server-managed state where snapshots are stored.
WithSnapshotID[State](id string)

// WithState sets the initial state for the invocation.
// Use this for client-managed state where the client sends state directly.
WithState[State](state *SessionState[State])

// WithInputVariables overrides the default input variables for a
// prompt-backed agent flow. Used with DefinePromptAgent.
WithInputVariables[State](input any)

AgentFlowConnection[Stream, State]

Unlike BidiConnection, breaking from Receive() does not cancel the connection — enabling multi-turn patterns.

// Send sends an AgentFlowInput to the agent flow.
func (*AgentFlowConnection) Send(input *AgentFlowInput) error

// SendMessages sends messages to the agent flow.
func (*AgentFlowConnection) SendMessages(messages ...*ai.Message) error

// SendText sends a single user text message to the agent flow.
func (*AgentFlowConnection) SendText(text string) error

// SendToolRestarts sends tool restart parts to resume interrupted tool calls.
// Parts should be created via ai.ToolDef.RestartWith.
func (*AgentFlowConnection) SendToolRestarts(parts ...*ai.Part) error

// Close signals that no more inputs will be sent.
func (*AgentFlowConnection) Close() error

// Receive returns an iterator for receiving stream chunks. Breaking out of this
// iterator does not cancel the connection, enabling multi-turn patterns where
// the caller breaks on EndTurn, sends the next input, then calls Receive again.
func (*AgentFlowConnection) Receive() iter.Seq2[*AgentFlowStreamChunk[Stream], error]

// Output returns the final response after the agent flow completes.
func (*AgentFlowConnection) Output() (*AgentFlowOutput[State], error)

// Done returns a channel closed when the connection completes.
func (*AgentFlowConnection) Done() <-chan struct{}

AgentSession[State]

Extends Session[State] with turn management. Passed as the sess parameter to AgentFlowFunc.

// AgentSession extends Session with agent-flow-specific functionality:
// turn management, snapshot persistence, and input channel handling.
type AgentSession[State any] struct {
    *Session[State]
    InputCh   <-chan *AgentFlowInput  // channel delivering per-turn inputs from the client
    TurnIndex int                     // zero-based index of the current conversation turn
}

// Run loops over the input channel, calling fn for each turn. Each turn is
// wrapped in a trace span for observability. Input messages are automatically
// added to the session before fn is called. After fn returns successfully, an
// EndTurn chunk is sent and a snapshot check is triggered.
func (*AgentSession) Run(ctx context.Context, fn func(ctx context.Context, input *AgentFlowInput) error) error

// Result returns an AgentFlowResult populated from the current session state:
// the last message in the conversation history and all artifacts.
// Convenience for custom agent flows that don't need to construct the result manually.
func (*AgentSession) Result() *AgentFlowResult

Session[State]

Thread-safe conversation state. Available via AgentSession embedding or SessionFromContext.

// Session holds conversation state and provides thread-safe read/write access
// to messages, input variables, custom state, and artifacts.

// State returns a deep copy of the current state.
func (*Session) State() *SessionState[State]

// Conversation history
func (*Session) Messages() []*ai.Message
func (*Session) AddMessages(messages ...*ai.Message)
func (*Session) SetMessages(messages []*ai.Message)
func (*Session) UpdateMessages(fn func([]*ai.Message) []*ai.Message)

// Custom state
func (*Session) Custom() State
func (*Session) UpdateCustom(fn func(State) State)

// Input variables (prompt-backed flows)
func (*Session) InputVariables() any

// Artifacts
func (*Session) Artifacts() []*Artifact
func (*Session) AddArtifacts(artifacts ...*Artifact)
func (*Session) UpdateArtifacts(fn func([]*Artifact) []*Artifact)

// Context helpers
func NewSessionContext[State](ctx, *Session[State]) context.Context
func SessionFromContext[State](ctx) *Session[State]

Responder[Stream]

Output channel with convenience methods. Artifacts sent here are auto-added to the session.

// Responder is the output channel for an agent flow. Artifacts sent through it
// are automatically added to the session before being forwarded to the client.
type Responder[Stream any] chan<- *AgentFlowStreamChunk[Stream]

// SendModelChunk sends a generation chunk (token-level streaming).
func (Responder) SendModelChunk(chunk *ai.ModelResponseChunk)

// SendStatus sends a user-defined status update.
func (Responder) SendStatus(status Stream)

// SendArtifact sends an artifact and adds it to the session.
// If an artifact with the same name already exists, it is replaced.
func (Responder) SendArtifact(artifact *Artifact)

Wire Types

// AgentFlowInit is the input for starting an agent flow invocation.
// Provide either SnapshotID (to load from store) or State (direct state).
type AgentFlowInit[State any] struct {
    SnapshotID string               `json:"snapshotId,omitempty"`  // load from persisted snapshot
    State      *SessionState[State] `json:"state,omitempty"`       // direct state
}

// AgentFlowInput is the input sent to an agent flow during a conversation turn.
type AgentFlowInput struct {
    Messages     []*ai.Message `json:"messages,omitempty"`     // user's input for this turn
    ToolRestarts []*ai.Part    `json:"toolRestarts,omitempty"` // tool request parts to re-execute interrupted tools
}

// AgentFlowStreamChunk represents a single item in the agent flow's output stream.
type AgentFlowStreamChunk[Stream any] struct {
    ModelChunk *ai.ModelResponseChunk `json:"modelChunk,omitempty"` // token-level streaming
    Status     Stream                 `json:"status,omitempty"`     // user-defined status update
    Artifact   *Artifact              `json:"artifact,omitempty"`   // newly produced artifact
    SnapshotID string                 `json:"snapshotId,omitempty"` // ID of just-persisted snapshot
    EndTurn    bool                   `json:"endTurn,omitempty"`    // signals turn complete
}

// AgentFlowOutput is the output when an agent flow invocation completes.
type AgentFlowOutput[State any] struct {
    Artifacts  []*Artifact          `json:"artifacts,omitempty"`  // artifacts produced during the session
    Message    *ai.Message          `json:"message,omitempty"`    // last model response message
    SnapshotID string               `json:"snapshotId,omitempty"` // final snapshot ID (empty if none)
    State      *SessionState[State] `json:"state,omitempty"`      // final state (client-managed only)
}

// AgentFlowResult is the return value from an AgentFlowFunc.
// Contains user-specified outputs of the agent invocation.
type AgentFlowResult struct {
    Artifacts []*Artifact `json:"artifacts,omitempty"` // artifacts produced during the session
    Message   *ai.Message `json:"message,omitempty"`   // last model response message
}

// SessionState is the portable conversation state that flows between client
// and server. Contains only the data needed for conversation continuity.
type SessionState[State any] struct {
    Messages       []*ai.Message `json:"messages,omitempty"`       // conversation history (excludes prompt-rendered messages)
    Custom         State         `json:"custom,omitempty"`         // user-defined state
    Artifacts      []*Artifact   `json:"artifacts,omitempty"`      // named artifact collections
    InputVariables any           `json:"inputVariables,omitempty"` // prompt input variables
}

// Artifact represents a named collection of parts produced during a session.
// Examples: generated files, images, code snippets, diagrams.
type Artifact struct {
    Name     string         `json:"name,omitempty"`     // identifies the artifact
    Parts    []*ai.Part     `json:"parts"`              // artifact content (text, media, etc.)
    Metadata map[string]any `json:"metadata,omitempty"` // additional artifact-specific data
}

Snapshot System

// SessionStore persists and retrieves snapshots.
type SessionStore[State any] interface {
    GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error)
    SaveSnapshot(ctx context.Context, snapshot *SessionSnapshot[State]) error
}

// NewInMemorySessionStore creates a thread-safe in-memory snapshot store.
func NewInMemorySessionStore[State]() *InMemorySessionStore[State]

// SnapshotCallback decides whether to create a snapshot.
// Return true to create, false to skip.
// If not provided and a store is configured, snapshots are always created.
type SnapshotCallback[State] = func(ctx context.Context, sc *SnapshotContext[State]) bool

// SnapshotContext provides context for snapshot decision callbacks.
type SnapshotContext[State any] struct {
    State     *SessionState[State]  // current state
    PrevState *SessionState[State]  // state at last snapshot (nil if none)
    TurnIndex int                   // turn number in current invocation
    Event     SnapshotEvent         // what triggered this check
}

// Events
const SnapshotEventTurnEnd       SnapshotEvent = "turnEnd"
const SnapshotEventInvocationEnd SnapshotEvent = "invocationEnd"

// SessionSnapshot is a persisted point-in-time capture of session state.
type SessionSnapshot[State any] struct {
    SnapshotID string              `json:"snapshotId"`
    ParentID   string              `json:"parentId,omitempty"`
    CreatedAt  time.Time           `json:"createdAt"`
    Event      SnapshotEvent       `json:"event"`
    State      SessionState[State] `json:"state"`
}

PromptRenderer interface

Satisfied by ai.Prompt and *ai.DataPrompt[In, Out]. Used by DefinePromptAgent.

// PromptRenderer renders a prompt with typed input into GenerateActionOptions.
type PromptRenderer[In any] interface {
    Render(ctx context.Context, input In) (*ai.GenerateActionOptions, error)
}

Known Issues

  • Empty trace on zero-turn connections: StreamBidi creates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic — the trace contains no useful data (just a snapshot ID in the output). A future change could defer span creation until the first input arrives.
  • No single-turn convenience methods: Currently, even one-off requests require the full StreamBidiSendCloseOutput dance. A future Run(ctx, text) method (and variants like RunMessages) would simplify the common case of sending a single input and receiving the final output without managing a connection.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Genkit Go SDK by introducing a new SessionFlow API, specifically designed for building and managing complex, stateful, multi-turn conversational AI applications. This new API is built upon a fundamental refactoring of the core Action type to support bidirectional streaming, enabling more dynamic and interactive AI experiences. The changes provide a structured approach to handling conversational state, including message history, custom data, and generated artifacts, with built-in mechanisms for persistence and lifecycle management.

Highlights

  • Introduction of SessionFlow API: A new SessionFlow API has been introduced in go/ai/x to manage stateful, multi-turn conversational AI interactions, including automatic snapshot management and artifact handling.
  • Core Action Refactoring for Bidirectional Streaming: The core ActionDef type has been refactored to Action to natively support bidirectional streaming, which is a foundational change enabling the SessionFlow API. This includes new BidiFunc and BidiConnection types.
  • Update of Existing AI Components: All existing AI components (embedder, evaluator, model, prompt, resource, retriever) have been updated to utilize the new core.Action type and its extended capabilities.
  • Snapshot Management Features: The SessionFlow now includes robust snapshot management, allowing state to be persisted, loaded, and controlled via callbacks, with an in-memory store implementation provided.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • go/ai/embedder.go
    • Updated embedder struct to use core.Action instead of core.ActionDef.
    • Modified NewEmbedder and LookupEmbedder to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/evaluator.go
    • Updated evaluator struct to use core.Action instead of core.ActionDef.
    • Modified NewEvaluator, NewBatchEvaluator, and LookupEvaluator to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/generate.go
    • Updated model and generateAction structs to use core.Action instead of core.ActionDef.
    • Modified LookupModel, model.Generate, and model.supportsConstrained to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/prompt.go
    • Updated prompt struct to use core.Action instead of core.ActionDef.
    • Modified DefinePrompt, LookupPrompt, and prompt.Desc to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/resource.go
    • Updated resource struct to use core.Action instead of core.ActionDef.
    • Modified DefineResource, NewResource, FindMatchingResource, and LookupResource to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/retriever.go
    • Updated retriever struct to use core.Action instead of core.ActionDef.
    • Modified NewRetriever and LookupRetriever to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/x/option.go
    • Added SessionFlowOption and StreamBidiOption interfaces for configuring session flows and bidirectional streams.
    • Introduced WithSnapshotStore, WithSnapshotCallback, WithState, and WithSnapshotID functions for flexible session flow initialization.
  • go/ai/x/session_flow.go
    • Introduced SessionFlowArtifact, SessionFlowInput, SessionFlowInit, SessionFlowOutput, and SessionFlowStreamChunk data structures.
    • Defined the Session type for managing conversational state (messages, custom state, artifacts) with methods for manipulation and snapshot handling.
    • Implemented Responder for sending various types of stream chunks (generation, status, artifacts).
    • Introduced the SessionFlow type, DefineSessionFlow for registration, and StreamBidi for initiating sessions.
    • Added logic for snapshot creation, loading, and integration with tracing, including SessionFlowConnection for buffered chunk reception.
  • go/ai/x/session_flow_test.go
    • Added comprehensive unit tests for SessionFlow covering multi-turn interactions, snapshot persistence, resuming from snapshots, client-managed state, artifact handling, snapshot callbacks, and error handling.
  • go/ai/x/snapshot.go
    • Defined SessionState for portable conversation state, SnapshotEvent for trigger types, and SessionSnapshot for persisted state.
    • Introduced SnapshotContext and SnapshotCallback for custom snapshot logic.
    • Defined SnapshotStore interface and provided an InMemorySnapshotStore implementation.
    • Added SnapshotOn utility function for selective snapshotting.
  • go/core/action.go
    • Refactored ActionDef to Action and added a new Init type parameter for bidirectional actions.
    • Introduced BidiFunc for bidirectional streaming function signatures and ActionOptions for configuration.
    • Implemented NewBidiAction and DefineBidiAction for creating and registering bidirectional actions.
    • Added StreamBidi method to Action for initiating bidirectional connections.
    • Introduced BidiConnection type for managing bidirectional streaming, including Send, Close, Receive, Output, and Done methods.
    • Updated ResolveActionFor and LookupActionFor to use the new Action type and Init parameter.
    • Added wrapBidiAsStreaming to adapt BidiFunc to StreamingFunc.
  • go/core/action_test.go
    • Updated existing tests to use DefineStreamingAction and the new Init type parameter in ResolveActionFor and LookupActionFor.
    • Added new tests for BidiAction functionality, covering echo, initialization, send after close, context cancellation, and Done channel.
  • go/core/api/action.go
    • Added new ActionType constants: ActionTypeSessionFlow and ActionTypeSnapshotStore.
    • Extended ActionDesc with StreamSchema and InitSchema fields for describing bidirectional actions.
  • go/core/background_action.go
    • Updated BackgroundActionDef to use core.Action instead of core.ActionDef.
    • Modified Register, NewBackgroundAction, and LookupBackgroundAction to align with the new core.Action type and updated ResolveActionFor signature.
  • go/core/flow.go
    • Refactored Flow to be a struct embedding *Action with the new Init type parameter.
    • Updated DefineFlow and DefineStreamingFlow to use the new Flow struct and Action type.
    • Introduced NewBidiFlow and DefineBidiFlow for creating and registering bidirectional flows.
    • Updated Run and Stream methods to use the embedded Action's Run method.
  • go/core/flow_test.go
    • Updated existing tests to reflect changes in Flow type and method calls.
    • Added new tests for BidiFlow functionality, including registration, echo, and integration with core.Run.
  • go/genkit/genkit.go
    • Updated DefineFlow and DefineStreamingFlow signatures to include the new Init type parameter.
    • Added DefineBidiFlow function to expose the new bidirectional flow definition.
  • go/genkit/session_flow.go
    • Introduced DefineSessionFlow as a top-level Genkit function to define and register session flows, wrapping aix.DefineSessionFlow.
  • go/samples/basic-session-flow/main.go
    • Added a sample CLI REPL application demonstrating the usage of SessionFlow for multi-turn conversations with token-level streaming and snapshot management.
Activity
  • The pull request author, apascal07, has indicated that the PR title adheres to conventional commits.
  • The author has confirmed that the changes have been manually and unit tested.
  • Documentation updates are noted as pending in the PR description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@apascal07 apascal07 changed the base branch from main to ap/go-bidi February 6, 2026 05:48
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.

I am having trouble creating individual review comments. Click here to see my feedback.

go/core/action.go (534-550)

high

This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.

A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.

Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.

func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This recovers from a panic that occurs when sending on a closed channel.
			err = NewError(FAILED_PRECONDITION, "connection is closed")
		}
	}()

	select {
	case c.inputCh <- input:
		return nil
	case <-c.ctx.Done():
		return c.ctx.Err()
	case <-c.doneCh:
		// The recover will handle a panic if doneCh and inputCh close concurrently.
		return NewError(FAILED_PRECONDITION, "action has completed")
	}
}

go/samples/basic-session-flow/main.go (49-53)

medium

The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.

					ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
						ThinkingConfig: &genai.ThinkingConfig{
							ThinkingBudget: genai.Ptr[int32](0),
						},
					})),

@apascal07 apascal07 changed the title feat(go): added SessionFlow and related feat(go): added DefineSessionFlow Feb 6, 2026
@apascal07 apascal07 mentioned this pull request Feb 6, 2026
@apascal07 apascal07 linked an issue Feb 6, 2026 that may be closed by this pull request
@apascal07 apascal07 changed the title feat(go): added DefineSessionFlow feat(go): added DefineCustomAgent and DefinePromptAgent Feb 18, 2026
@github-actions github-actions bot added the python Python label Feb 18, 2026
defer wg.Done()
for chunk := range respCh {
if chunk.Artifact != nil {
session.AddArtifacts(chunk.Artifact)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am asking this from a high-level analysis of the proposal, so please correct me if I'm mistaken.

I see that we have AddArtifacts and UpdateArtifacts methods on the Session object which are used to update the artifacts within DefineCustomAgent.

If the use-case is a complex one where the agent updates existing Artifacts, how would the developer manage these artifacts? Manually updating the session.artifacts field?

I suppose we can enforce that each artifact is immutable and any "updates" should be treated as a new artifact.

Copy link
Collaborator Author

@apascal07 apascal07 Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateArtifacts takes a function that passes in the existing artifacts and returns artifacts which will be used to atomically replace the existing artifacts.

// UpdateArtifacts atomically reads the current artifacts, applies the given
// function, and writes the result back.
func (s *Session[State]) UpdateArtifacts(fn func([]*Artifact) []*Artifact) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.state.Artifacts = fn(s.state.Artifacts)
}

@apascal07 apascal07 requested a review from pavelgj February 25, 2026 02:32
@apascal07 apascal07 marked this pull request as ready for review February 25, 2026 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

RFC: Session flows

2 participants