@@ -61,6 +61,7 @@ def initialize(transport:, clock: Arcp::SystemClock.new)
6161 @job_streams = { }
6262 @job_results = { }
6363 @result_waiters = { }
64+ @submitted_jobs = { }
6465 @reader_task = nil
6566 @heartbeat_task = nil
6667 @next_outbound_seq = 0
@@ -153,6 +154,7 @@ def submit_job(agent:, input: nil, lease_request: nil, lease_constraints: nil,
153154 payload : submit . to_h
154155 )
155156 accepted = Arcp ::Job ::Accepted . from_h ( accepted_env . payload )
157+ @mutex . synchronize { @submitted_jobs [ accepted . job_id ] = true }
156158 Arcp ::Job ::Handle . new (
157159 job_id : accepted . job_id , agent : accepted . agent ,
158160 submitted_at : accepted . accepted_at ,
@@ -161,11 +163,17 @@ def submit_job(agent:, input: nil, lease_request: nil, lease_constraints: nil,
161163 )
162164 end
163165
164- # Subscribes to a job's event stream.
166+ # Subscribes to a job's event stream. Sends `job.subscribe` for any job
167+ # this client did not submit (so observer sessions attach to the runtime
168+ # fanout); submitter sessions reuse the stream the runtime opened for
169+ # them at submit time. The `subscribe` feature is required for explicit
170+ # subscriptions regardless of whether `from_event_seq` is supplied.
165171 def subscribe_job ( job_id :, from_event_seq : nil , history : false )
172+ already_owned = @mutex . synchronize { @submitted_jobs [ job_id ] }
166173 queue = @mutex . synchronize { @job_streams [ job_id ] ||= Async ::Queue . new }
167174
168- if @session . supports? ( Arcp ::Session ::Feature ::SUBSCRIBE ) && from_event_seq
175+ unless already_owned
176+ require_feature! ( Arcp ::Session ::Feature ::SUBSCRIBE )
169177 send_envelope ( type : Arcp ::MessageTypes ::JOB_SUBSCRIBE ,
170178 job_id : job_id ,
171179 payload : Arcp ::Job ::Subscribe . new ( job_id : job_id , from_event_seq : from_event_seq ,
@@ -197,6 +205,8 @@ def get_result(job_id:)
197205 @mutex . synchronize { @result_waiters [ job_id ] = queue }
198206 env = queue . dequeue
199207 end
208+ raise Arcp ::Errors ::ProtocolViolation , 'transport closed before job result' if env . nil?
209+
200210 case env . type
201211 when Arcp ::MessageTypes ::JOB_RESULT
202212 Arcp ::Job ::Result . from_h ( env . payload )
@@ -214,16 +224,32 @@ def ack(seq)
214224 payload : Arcp ::Session ::Ack . new ( last_processed_seq : seq ) . to_h )
215225 end
216226
217- # Sends an envelope on the current session.
218- def send_envelope ( type :, payload :, job_id : nil )
227+ # Builds an envelope for the current session without sending it.
228+ # Lets callers register pending waiters keyed on the envelope id
229+ # before the peer can reply.
230+ def build_envelope ( type :, payload :, job_id : nil )
219231 raise Arcp ::Errors ::Internal , 'session not open' unless @session
220- raise IOError , 'client closed' if @closed
221232
222- env = Arcp ::Envelope . build (
233+ Arcp ::Envelope . build (
223234 type : type , session_id : @session . id ,
224235 trace_id : Arcp ::Trace . current . trace_id ,
225236 job_id : job_id , payload : payload
226237 )
238+ end
239+
240+ # Sends an envelope on the current session.
241+ def send_envelope ( type :, payload :, job_id : nil )
242+ raise IOError , 'client closed' if @closed
243+
244+ env = build_envelope ( type : type , payload : payload , job_id : job_id )
245+ @transport . send ( env )
246+ env
247+ end
248+
249+ # Sends a pre-built envelope, e.g. after registering a pending waiter.
250+ def send_built_envelope ( env )
251+ raise IOError , 'client closed' if @closed
252+
227253 @transport . send ( env )
228254 env
229255 end
@@ -232,13 +258,13 @@ def send_envelope(type:, payload:, job_id: nil)
232258 def close ( reason : nil )
233259 return if @closed
234260
235- @closed = true
236261 begin
237262 send_envelope ( type : Arcp ::MessageTypes ::SESSION_BYE ,
238263 payload : Arcp ::Session ::Bye . new ( reason : reason ) . to_h )
239264 rescue StandardError
240265 nil
241266 end
267+ @closed = true
242268 @heartbeat_task &.stop
243269 @reader_task &.stop
244270 @transport . close ( reason : reason )
@@ -255,9 +281,15 @@ def require_feature!(feature)
255281 end
256282
257283 def request ( type :, expect :, payload :)
258- env = send_envelope ( type : type , payload : payload )
284+ env = build_envelope ( type : type , payload : payload )
259285 queue = Async ::Queue . new
260286 @mutex . synchronize { @pending [ env . id ] = [ expect , queue ] }
287+ begin
288+ send_built_envelope ( env )
289+ rescue StandardError
290+ @mutex . synchronize { @pending . delete ( env . id ) }
291+ raise
292+ end
261293 response = queue . dequeue
262294 raise Arcp ::Errors ::ProtocolViolation , 'transport closed' if response . nil?
263295
@@ -348,14 +380,9 @@ def feed_result(env)
348380
349381 def feed_pending ( env )
350382 reply_to = env . payload . is_a? ( Hash ) ? env . payload [ 'reply_to' ] : nil
351- key = reply_to || @mutex . synchronize do
352- @pending . keys . find do |k |
353- @pending [ k ] . is_a? ( Array ) && @pending [ k ] [ 0 ] == env . type
354- end
355- end
356- return unless key
383+ return unless reply_to
357384
358- pair = @mutex . synchronize { @pending . delete ( key ) }
385+ pair = @mutex . synchronize { @pending . delete ( reply_to ) }
359386 pair &.last &.enqueue ( env )
360387 end
361388
0 commit comments