Skip to content

drpcmanager: replace streamBuffer with streamRegistry#47

Open
shubhamdhama wants to merge 3 commits intoshubham/use-atomic-counterfrom
shubham/add-stream-registry
Open

drpcmanager: replace streamBuffer with streamRegistry#47
shubhamdhama wants to merge 3 commits intoshubham/use-atomic-counterfrom
shubham/add-stream-registry

Conversation

@shubhamdhama
Copy link
Copy Markdown

Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.

@shubhamdhama shubhamdhama force-pushed the shubham/use-atomic-counter branch from f5869a5 to f204e04 Compare March 30, 2026 14:07
@shubhamdhama shubhamdhama force-pushed the shubham/add-stream-registry branch from 86bb186 to e16d021 Compare March 30, 2026 14:07
Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.
r.mu.Lock()
defer r.mu.Unlock()

if stream == nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is input validation and should be done before taking the lock.

}

if r.closed {
return managerClosed.New("register")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix error message.

r.mu.RLock()
defer r.mu.RUnlock()

s, ok := r.streams[id]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have to check if registry is closed?

func (r *streamRegistry) Unregister(id uint64) {
r.mu.Lock()
defer r.mu.Unlock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have to check if registry is closed?


// ForEach calls fn for each registered stream. The registry is read-locked
// during iteration.
func (r *streamRegistry) ForEach(fn func(*drpcstream.Stream)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you need this?

// reg tracks active streams. Currently holds at most one active stream;
// a second may briefly coexist during stream handoff (old stream's
// Unregister races with new stream's Register).
reg *streamRegistry
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not just call it activeStreams?

// empty. It iterates the map because the registry may briefly hold two streams
// during stream handoff. This method should be removed once multiplexing is
// supported and callers look up streams by ID directly.
func (r *streamRegistry) GetLatest() *drpcstream.Stream {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep track of the latest streamID and avoid iterating over the map holding the lock.

m.log("TERM", func() string { return fmt.Sprint(err) })
m.sigs.tport.Set(m.tr.Close())
m.sbuf.Close()
m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() })
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be s.Cancel() instead of s.Close()? s.Close() writes to the transport, and the current code already holds the lock and writes to the transport for every stream. I'm assuming this path isn't intended for mux streams?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants