Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 102 additions & 21 deletions cmd/lk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ var (
Name: "image-tar",
Usage: "Pre-built image from an OCI tar file (e.g. ./image.tar). No Docker daemon required.",
}
deploymentFlag = &cli.StringSliceFlag{
Name: "deployment",
Usage: "Agent deployments. For create/deploy, specifies the target deployment (defaults to 'production'). For update-secrets, assigns deployment(s) to the secret. Can be specified multiple times (e.g. --deployment staging --deployment production).",
Required: false,
Aliases: []string{"d"},
}
Copy link
Copy Markdown

@shawnyang-coder shawnyang-coder Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only create/deploy/update-secrets? What about other rpc like status/update?

Can an agent (identified by unique agent_id) be in both Staging and Prod ? or only one env at any time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status/update shows agent details including all environments. Yes, multiple environments (prod, staging) can exists within a single agent (agent_id).


skipSDKCheckFlag = &cli.BoolFlag{
Name: "skip-sdk-check",
Expand Down Expand Up @@ -182,6 +188,7 @@ var (
ignoreEmptySecretsFlag,
silentFlag,
regionFlag,
deploymentFlag,
skipSDKCheckFlag,
agentPrebuiltImageFlag,
agentPrebuiltImageTarFlag,
Expand Down Expand Up @@ -228,6 +235,7 @@ var (
secretsMountFlag,
silentFlag,
regionFlag,
deploymentFlag,
ignoreEmptySecretsFlag,
skipSDKCheckFlag,
agentPrebuiltImageFlag,
Expand Down Expand Up @@ -299,18 +307,20 @@ var (
Flags: []cli.Flag{
idFlag(false),
logTypeFlag,
deploymentFlag,
},
ArgsUsage: "[working-dir]",
},
{
Name: "delete",
Usage: "Delete an agent",
Usage: "Delete an agent or a specific deployment",
Before: createAgentClient,
Action: deleteAgent,
Aliases: []string{"destroy"},
Flags: []cli.Flag{
silentFlag,
idFlag(false),
deploymentFlag,
},
ArgsUsage: "[working-dir]",
},
Expand Down Expand Up @@ -353,6 +363,7 @@ var (
secretsFileFlag,
secretsMountFlag,
ignoreEmptySecretsFlag,
deploymentFlag,
idFlag(false),
&cli.BoolFlag{
Name: "overwrite",
Expand Down Expand Up @@ -390,6 +401,13 @@ func createAgentClient(ctx context.Context, cmd *cli.Command) (context.Context,
return createAgentClientWithOpts(ctx, cmd)
}

func formatTime(t time.Time) string {
if t.IsZero() {
return "---"
}
return t.Format(time.RFC3339)
}

func createAgentClientWithOpts(ctx context.Context, cmd *cli.Command, opts ...loadOption) (context.Context, error) {
var err error

Expand Down Expand Up @@ -672,7 +690,7 @@ func createAgent(ctx context.Context, cmd *cli.Command) error {
return err
} else if viewLogs {
fmt.Println("Tailing runtime logs...safe to exit at any time")
return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, os.Stdout, resp.ServerRegions[0])
return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, "", os.Stdout, resp.ServerRegions[0])
}
}
return nil
Expand Down Expand Up @@ -797,8 +815,14 @@ func deployAgent(ctx context.Context, cmd *cli.Command) error {
}
}

agentDeployment := "production"
if cmd.IsSet("deployment") {
agentDeployment = cmd.StringSlice("deployment")[0]
}
fmt.Printf("Using deployment [%s]\n", util.Accented(agentDeployment))

excludeFiles := []string{fmt.Sprintf("**/%s", config.LiveKitTOMLFile)}
if err := agentsClient.DeployAgent(buildContext, agentId, os.DirFS(workingDir), secrets, excludeFiles, os.Stderr); err != nil {
if err := agentsClient.DeployAgentV2(buildContext, agentId, os.DirFS(workingDir), secrets, agentDeployment, excludeFiles, os.Stderr); err != nil {
if twerr, ok := err.(twirp.Error); ok {
return fmt.Errorf("unable to deploy agent: %s", twerr.Msg())
}
Expand Down Expand Up @@ -886,21 +910,28 @@ func getAgentStatus(ctx context.Context, cmd *cli.Command) error {
logger.Errorw("error parsing mem req", err)
}

version := regionalAgent.Version
if version == "" {
version = agent.Version
}
rows = append(rows, []string{
agent.AgentId,
agent.Version,
regionalAgent.AgentName,
version,
regionalAgent.Region,
regionalAgent.Deployment,
regionalAgent.Status,
fmt.Sprintf("%s / %s", curCPU, regionalAgent.CpuLimit),
fmt.Sprintf("%s / %s", curMem, memLimit),
fmt.Sprintf("%d / %d / %d", regionalAgent.Replicas, regionalAgent.MinReplicas, regionalAgent.MaxReplicas),
agent.DeployedAt.AsTime().Format(time.RFC3339),
fmt.Sprintf("%t", regionalAgent.DeploymentEnabled),
formatTime(agent.DeployedAt.AsTime()),
})
}
}

t := util.CreateTable().
Headers("ID", "Version", "Region", "Status", "CPU", "Mem", "Replicas", "Deployed At").
Headers("ID", "Name", "Version", "Region", "Deployment", "Status", "CPU", "Mem", "Replicas", "Deployment Feature", "Deployed At").
Rows(rows...)

fmt.Println(t)
Expand Down Expand Up @@ -1022,7 +1053,11 @@ func getLogs(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("no agent deployments found")
}

return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, os.Stdout, response.Agents[0].AgentDeployments[0].ServerRegion)
var agentEnvironment string
if deployments := cmd.StringSlice("deployment"); len(deployments) > 0 {
agentEnvironment = deployments[0]
}
return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, agentEnvironment, os.Stdout, response.Agents[0].AgentDeployments[0].ServerRegion)
}

func deleteAgent(ctx context.Context, cmd *cli.Command) error {
Expand All @@ -1032,12 +1067,26 @@ func deleteAgent(ctx context.Context, cmd *cli.Command) error {
return err
}

var deployment string
if deployments := cmd.StringSlice("deployment"); len(deployments) > 0 {
deployment = deployments[0]
}

confirmMsg := fmt.Sprintf("Are you sure you want to delete agent [%s]?", agentID)
deletingMsg := "Deleting agent [" + util.Accented(agentID) + "]"
deletedMsg := fmt.Sprintf("Deleted agent [%s]", util.Accented(agentID))
if deployment != "" {
confirmMsg = fmt.Sprintf("Are you sure you want to delete deployment [%s] from agent [%s]?", deployment, agentID)
deletingMsg = "Deleting deployment [" + util.Accented(deployment) + "] from agent [" + util.Accented(agentID) + "]"
deletedMsg = fmt.Sprintf("Deleted deployment [%s] from agent [%s]", util.Accented(deployment), util.Accented(agentID))
}

if !silent && !SkipPrompts(cmd) {
var confirmDelete bool
if err := huh.NewForm(
huh.NewGroup(
huh.NewConfirm().
Title(fmt.Sprintf("Are you sure you want to delete agent [%s]?", agentID)).
Title(confirmMsg).
Value(&confirmDelete).
Inline(false).
WithTheme(util.Theme),
Expand All @@ -1053,12 +1102,13 @@ func deleteAgent(ctx context.Context, cmd *cli.Command) error {

var res *lkproto.DeleteAgentResponse
err = util.Await(
"Deleting agent ["+util.Accented(agentID)+"]",
deletingMsg,
ctx,
func(ctx context.Context) error {
var clientErr error
res, clientErr = agentsClient.DeleteAgent(ctx, &lkproto.DeleteAgentRequest{
AgentId: agentID,
AgentId: agentID,
Deployment: deployment,
})
return clientErr
},
Expand All @@ -1075,7 +1125,7 @@ func deleteAgent(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("failed to delete agent %s", res.Message)
}

fmt.Printf("Deleted agent [%s]\n", util.Accented(agentID))
fmt.Printf("%s\n", deletedMsg)
return nil
}

Expand Down Expand Up @@ -1110,7 +1160,14 @@ func listAgentVersions(ctx context.Context, cmd *cli.Command) error {
}
}

headers := []string{"Version", "Current", "Status", "Created At", "Deployed At"}
flag := func(b bool) string {
if b {
return "✓"
}
return "---"
}

headers := []string{"Version", "Prod", "Draining", "Active", "Status", "Created At", "Deployed At"}
if showDigest {
headers = append(headers, "Digest")
}
Expand All @@ -1119,10 +1176,12 @@ func listAgentVersions(ctx context.Context, cmd *cli.Command) error {
for _, version := range versions.Versions {
row := []string{
version.Version,
fmt.Sprintf("%t", version.Current),
flag(version.Current),
flag(version.Draining),
flag(version.Active),
version.Status,
version.CreatedAt.AsTime().Format(time.RFC3339),
version.DeployedAt.AsTime().Format(time.RFC3339),
formatTime(version.CreatedAt.AsTime()),
formatTime(version.DeployedAt.AsTime()),
}
if showDigest {
row = append(row, version.Attributes["image_digest"])
Expand Down Expand Up @@ -1174,21 +1233,32 @@ func listAgents(ctx context.Context, cmd *cli.Command) error {

var rows [][]string
for _, agent := range items {
var regions []string
regionSet := map[string]struct{}{}
deploymentSet := map[string]struct{}{}
for _, regionalAgent := range agent.AgentDeployments {
regions = append(regions, regionalAgent.Region)
regionSet[regionalAgent.Region] = struct{}{}
deploymentSet[regionalAgent.Deployment] = struct{}{}
}
regions := make([]string, 0, len(regionSet))
for region := range regionSet {
regions = append(regions, region)
}
deployments := make([]string, 0, len(deploymentSet))
for deployment := range deploymentSet {
deployments = append(deployments, deployment)
}
rows = append(rows, []string{
agent.AgentId,
agent.AgentName,
strings.Join(regions, ","),
strings.Join(deployments, ","),
agent.Version,
agent.DeployedAt.AsTime().Format(time.RFC3339),
formatTime(agent.DeployedAt.AsTime()),
})
}

t := util.CreateTable().
Headers("ID", "Dispatch Name", "Regions", "Version", "Deployed At").
Headers("ID", "Dispatch Name", "Regions", "Deployment", "Version", "Deployed At").
Rows(rows...)

fmt.Println(t)
Expand All @@ -1215,14 +1285,18 @@ func listAgentSecrets(ctx context.Context, cmd *cli.Command) error {

// TODO (steveyoon): show secret.Kind.String() once cloud-agents is released
table := util.CreateTable().
Headers("Name", "Created At", "Updated At")
Headers("Name", "Deployments", "Created At", "Updated At")

for _, secret := range secrets.Secrets {
// NOTE: Maybe these should be omitted on the server side?
if slices.Contains(ignoredSecrets, secret.Name) {
continue
}
table.Row(secret.Name, secret.CreatedAt.AsTime().Format(time.RFC3339), secret.UpdatedAt.AsTime().Format(time.RFC3339))
deployments := strings.Join(secret.Deployments, ", ")
if deployments == "" {
deployments = "(all)"
}
table.Row(secret.Name, deployments, secret.CreatedAt.AsTime().Format(time.RFC3339), secret.UpdatedAt.AsTime().Format(time.RFC3339))
}

fmt.Println(table)
Expand All @@ -1240,6 +1314,13 @@ func updateAgentSecrets(ctx context.Context, cmd *cli.Command) error {
return err
}

if cmd.IsSet("deployment") {
deployments := cmd.StringSlice("deployment")
for _, s := range secrets {
s.Deployments = deployments
}
}

var confirmOverwrite bool
if cmd.Bool("overwrite") {
confirmOverwrite = SkipPrompts(cmd)
Expand Down
24 changes: 20 additions & 4 deletions cmd/lk/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ var (
Name: "agent-name",
Usage: "agent to dispatch",
},
&cli.StringFlag{
Name: "deployment",
Usage: "deployment of the agent to dispatch to (defaults to production when empty)",
},
&cli.StringFlag{
Name: "metadata",
Usage: "metadata to send to agent",
Expand Down Expand Up @@ -143,7 +147,7 @@ func listDispatchAndPrint(cmd *cli.Command, req *livekit.ListAgentDispatchReques
util.PrintJSON(res)
} else {
table := util.CreateTable().
Headers("DispatchID", "Room", "AgentName", "Metadata")
Headers("DispatchID", "Room", "AgentName", "Deployment", "Metadata")
for _, item := range res.AgentDispatches {
if item == nil {
continue
Expand All @@ -153,6 +157,7 @@ func listDispatchAndPrint(cmd *cli.Command, req *livekit.ListAgentDispatchReques
item.Id,
item.Room,
item.AgentName,
item.Deployment,
item.Metadata,
)
}
Expand All @@ -161,11 +166,22 @@ func listDispatchAndPrint(cmd *cli.Command, req *livekit.ListAgentDispatchReques
return nil
}

// normalizeAgentDeployment maps the well-known "production" deployment to an empty string. Production agents
// run with an empty LIVEKIT_AGENT_DEPLOYMENT (legacy behavior) and register under an empty deployment, so a
// dispatch must also use an empty deployment to route to them.
func normalizeAgentDeployment(deployment string) string {
if deployment == "production" {
return ""
}
return deployment
}

func createAgentDispatch(ctx context.Context, cmd *cli.Command) error {
req := &livekit.CreateAgentDispatchRequest{
Room: cmd.String("room"),
AgentName: cmd.String("agent-name"),
Metadata: cmd.String("metadata"),
Room: cmd.String("room"),
AgentName: cmd.String("agent-name"),
Deployment: normalizeAgentDeployment(cmd.String("deployment")),
Metadata: cmd.String("metadata"),
}
if cmd.Bool("new-room") {
req.Room = utils.NewGuid("room-")
Expand Down
10 changes: 8 additions & 2 deletions cmd/lk/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ var (
Name: "agent",
Usage: "Agent to dispatch to the room (identified by agent_name)",
},
&cli.StringFlag{
Name: "deployment",
Usage: "Deployment of the agent to dispatch to (defaults to production when empty)",
},
&cli.StringFlag{
Name: "job-metadata",
Usage: "Metadata attached to job dispatched to the agent (ctx.job.metadata)",
Expand Down Expand Up @@ -415,14 +419,16 @@ func createToken(ctx context.Context, c *cli.Command) error {
}

agent := c.String("agent")
deployment := normalizeAgentDeployment(c.String("deployment"))
jobMetadata := c.String("job-metadata")
if grant.RoomJoin {
if agent != "" {
at.SetRoomConfig(&livekit.RoomConfiguration{
Agents: []*livekit.RoomAgentDispatch{
{
AgentName: agent,
Metadata: jobMetadata,
AgentName: agent,
Deployment: deployment,
Metadata: jobMetadata,
},
},
})
Expand Down
Loading
Loading