Skip to content

Commit b3e2c83

Browse files
Apply PR #22504: fix(effect): add effect bridge for callback contexts
2 parents 7fcff4c + 10069a4 commit b3e2c83

File tree

13 files changed

+140
-41
lines changed

13 files changed

+140
-41
lines changed

bun.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/opencode/src/bus/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import z from "zod"
22
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
3-
import { EffectLogger } from "@/effect/logger"
3+
import { EffectBridge } from "@/effect/bridge"
44
import { Log } from "../util/log"
55
import { BusEvent } from "./bus-event"
66
import { GlobalBus } from "./global"
@@ -128,6 +128,7 @@ export namespace Bus {
128128
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
129129
return Effect.gen(function* () {
130130
log.info("subscribing", { type })
131+
const bridge = yield* EffectBridge.make()
131132
const scope = yield* Scope.make()
132133
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
133134

@@ -147,7 +148,7 @@ export namespace Bus {
147148

148149
return () => {
149150
log.info("unsubscribing", { type })
150-
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
151+
bridge.fork(Scope.close(scope, Exit.void))
151152
}
152153
})
153154
}

packages/opencode/src/command/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { BusEvent } from "@/bus/bus-event"
22
import { InstanceState } from "@/effect/instance-state"
3+
import { EffectBridge } from "@/effect/bridge"
34
import type { InstanceContext } from "@/project/instance"
45
import { SessionID, MessageID } from "@/session/schema"
56
import { Effect, Layer, Context } from "effect"
6-
import { EffectLogger } from "@/effect/logger"
77
import z from "zod"
88
import { Config } from "../config/config"
99
import { MCP } from "../mcp"
@@ -82,6 +82,7 @@ export namespace Command {
8282

8383
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
8484
const cfg = yield* config.get()
85+
const bridge = yield* EffectBridge.make()
8586
const commands: Record<string, Info> = {}
8687

8788
commands[Default.INIT] = {
@@ -125,7 +126,7 @@ export namespace Command {
125126
source: "mcp",
126127
description: prompt.description,
127128
get template() {
128-
return Effect.runPromise(
129+
return bridge.promise(
129130
mcp
130131
.getPrompt(
131132
prompt.client,
@@ -141,7 +142,6 @@ export namespace Command {
141142
.map((message) => (message.content.type === "text" ? message.content.text : ""))
142143
.join("\n") || "",
143144
),
144-
Effect.provide(EffectLogger.layer),
145145
),
146146
)
147147
},

packages/opencode/src/control-plane/workspace-context.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ export const WorkspaceContext = {
1212
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
1313
},
1414

15+
restore<R>(workspaceID: string, fn: () => R): R {
16+
return context.provide({ workspaceID }, fn)
17+
},
18+
1519
get workspaceID() {
1620
try {
1721
return context.use().workspaceID
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Effect, Fiber } from "effect"
2+
import { WorkspaceContext } from "@/control-plane/workspace-context"
3+
import { Instance, type InstanceContext } from "@/project/instance"
4+
import { LocalContext } from "@/util/local-context"
5+
import { InstanceRef, WorkspaceRef } from "./instance-ref"
6+
import { attachWith } from "./run-service"
7+
8+
export namespace EffectBridge {
9+
export interface Shape {
10+
readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
11+
readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
12+
}
13+
14+
function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
15+
if (instance && workspace !== undefined) {
16+
return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
17+
}
18+
if (instance) return Instance.restore(instance, fn)
19+
if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
20+
return fn()
21+
}
22+
23+
export function make(): Effect.Effect<Shape> {
24+
return Effect.gen(function* () {
25+
const ctx = yield* Effect.context()
26+
const value = yield* InstanceRef
27+
const instance =
28+
value ??
29+
(() => {
30+
try {
31+
return Instance.current
32+
} catch (err) {
33+
if (!(err instanceof LocalContext.NotFound)) throw err
34+
}
35+
})()
36+
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
37+
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
38+
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
39+
attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>
40+
41+
return {
42+
promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
43+
restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
44+
fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
45+
restore(instance, workspace, () => Effect.runFork(wrap(effect))),
46+
} satisfies Shape
47+
})
48+
}
49+
}

packages/opencode/src/effect/run-service.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
55
import { InstanceRef, WorkspaceRef } from "./instance-ref"
66
import { Observability } from "./observability"
77
import { WorkspaceContext } from "@/control-plane/workspace-context"
8+
import type { InstanceContext } from "@/project/instance"
89

910
export const memoMap = Layer.makeMemoMapUnsafe()
1011

12+
type Refs = {
13+
instance?: InstanceContext
14+
workspace?: string
15+
}
16+
17+
export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
18+
if (!refs.instance && !refs.workspace) return effect
19+
if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
20+
if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
21+
return effect.pipe(
22+
Effect.provideService(InstanceRef, refs.instance),
23+
Effect.provideService(WorkspaceRef, refs.workspace),
24+
)
25+
}
26+
1127
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
1228
try {
13-
const ctx = Instance.current
14-
const workspaceID = WorkspaceContext.workspaceID
15-
return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
29+
return attachWith(effect, {
30+
instance: Instance.current,
31+
workspace: WorkspaceContext.workspaceID,
32+
})
1633
} catch (err) {
1734
if (!(err instanceof LocalContext.NotFound)) throw err
1835
}

packages/opencode/src/mcp/index.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { Bus } from "@/bus"
2525
import { TuiEvent } from "@/cli/cmd/tui/event"
2626
import open from "open"
2727
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
28-
import { EffectLogger } from "@/effect/logger"
28+
import { EffectBridge } from "@/effect/bridge"
2929
import { InstanceState } from "@/effect/instance-state"
3030
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
3131
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -471,25 +471,24 @@ export namespace MCP {
471471
Effect.catch(() => Effect.succeed([] as number[])),
472472
)
473473

474-
function watch(s: State, name: string, client: MCPClient, timeout?: number) {
474+
function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
475475
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
476476
log.info("tools list changed notification received", { server: name })
477477
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
478478

479-
const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
479+
const listed = await bridge.promise(defs(name, client, timeout))
480480
if (!listed) return
481481
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
482482

483483
s.defs[name] = listed
484-
await Effect.runPromise(
485-
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
486-
)
484+
await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
487485
})
488486
}
489487

490488
const state = yield* InstanceState.make<State>(
491489
Effect.fn("MCP.state")(function* () {
492490
const cfg = yield* cfgSvc.get()
491+
const bridge = yield* EffectBridge.make()
493492
const config = cfg.mcp ?? {}
494493
const s: State = {
495494
status: {},
@@ -518,7 +517,7 @@ export namespace MCP {
518517
if (result.mcpClient) {
519518
s.clients[key] = result.mcpClient
520519
s.defs[key] = result.defs!
521-
watch(s, key, result.mcpClient, mcp.timeout)
520+
watch(s, key, result.mcpClient, bridge, mcp.timeout)
522521
}
523522
}),
524523
{ concurrency: "unbounded" },
@@ -565,11 +564,12 @@ export namespace MCP {
565564
listed: MCPToolDef[],
566565
timeout?: number,
567566
) {
567+
const bridge = yield* EffectBridge.make()
568568
yield* closeClient(s, name)
569569
s.status[name] = { status: "connected" }
570570
s.clients[name] = client
571571
s.defs[name] = listed
572-
watch(s, name, client, timeout)
572+
watch(s, name, client, bridge, timeout)
573573
return s.status[name]
574574
})
575575

packages/opencode/src/plugin/index.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
1818
import { PoeAuthPlugin } from "opencode-poe-auth"
1919
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
2020
import { Effect, Layer, Context, Stream } from "effect"
21-
import { EffectLogger } from "@/effect/logger"
21+
import { EffectBridge } from "@/effect/bridge"
2222
import { InstanceState } from "@/effect/instance-state"
2323
import { errorMessage } from "@/util/error"
2424
import { PluginLoader } from "./loader"
@@ -90,14 +90,6 @@ export namespace Plugin {
9090
return result
9191
}
9292

93-
function publishPluginError(bus: Bus.Interface, message: string) {
94-
Effect.runFork(
95-
bus
96-
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
97-
.pipe(Effect.provide(EffectLogger.layer)),
98-
)
99-
}
100-
10193
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
10294
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
10395
if (plugin) {
@@ -120,6 +112,11 @@ export namespace Plugin {
120112
const state = yield* InstanceState.make<State>(
121113
Effect.fn("Plugin.state")(function* (ctx) {
122114
const hooks: Hooks[] = []
115+
const bridge = yield* EffectBridge.make()
116+
117+
function publishPluginError(message: string) {
118+
bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
119+
}
123120

124121
const { Server } = yield* Effect.promise(() => import("../server/server"))
125122

@@ -187,24 +184,24 @@ export namespace Plugin {
187184
if (stage === "install") {
188185
const parsed = parsePluginSpecifier(spec)
189186
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
190-
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
187+
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
191188
return
192189
}
193190

194191
if (stage === "compatibility") {
195192
log.warn("plugin incompatible", { path: spec, error: message })
196-
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
193+
publishPluginError(`Plugin ${spec} skipped: ${message}`)
197194
return
198195
}
199196

200197
if (stage === "entry") {
201198
log.error("failed to resolve plugin server entry", { path: spec, error: message })
202-
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
199+
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
203200
return
204201
}
205202

206203
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
207-
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
204+
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
208205
},
209206
},
210207
}),

packages/opencode/src/provider/provider.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
1919
import { Global } from "../global"
2020
import path from "path"
2121
import { Effect, Layer, Context } from "effect"
22-
import { EffectLogger } from "@/effect/logger"
22+
import { EffectBridge } from "@/effect/bridge"
2323
import { InstanceState } from "@/effect/instance-state"
2424
import { AppFileSystem } from "@/filesystem"
2525
import { isRecord } from "@/util/record"
@@ -1043,6 +1043,7 @@ export namespace Provider {
10431043
const state = yield* InstanceState.make<State>(() =>
10441044
Effect.gen(function* () {
10451045
using _ = log.time("state")
1046+
const bridge = yield* EffectBridge.make()
10461047
const cfg = yield* config.get()
10471048
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
10481049
const database = mapValues(modelsDev, fromModelsDevProvider)
@@ -1223,8 +1224,7 @@ export namespace Provider {
12231224

12241225
const options = yield* Effect.promise(() =>
12251226
plugin.auth!.loader!(
1226-
() =>
1227-
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
1227+
() => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
12281228
database[plugin.auth!.provider],
12291229
),
12301230
)

packages/opencode/src/pty/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
1010
import { Plugin } from "@/plugin"
1111
import { PtyID } from "./schema"
1212
import { Effect, Layer, Context } from "effect"
13-
import { EffectLogger } from "@/effect/logger"
13+
import { EffectBridge } from "@/effect/bridge"
1414

1515
export namespace Pty {
1616
const log = Log.create({ service: "pty" })
@@ -173,6 +173,7 @@ export namespace Pty {
173173

174174
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
175175
const s = yield* InstanceState.get(state)
176+
const bridge = yield* EffectBridge.make()
176177
const id = PtyID.ascending()
177178
const command = input.command || Shell.preferred()
178179
const args = input.args || []
@@ -256,8 +257,8 @@ export namespace Pty {
256257
if (session.info.status === "exited") return
257258
log.info("session exited", { id, exitCode })
258259
session.info.status = "exited"
259-
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
260-
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
260+
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
261+
bridge.fork(remove(id))
261262
}),
262263
)
263264
yield* bus.publish(Event.Created, { info })

0 commit comments

Comments
 (0)