drpcmanager: replace streamBuffer with streamRegistry#47
drpcmanager: replace streamBuffer with streamRegistry#47shubhamdhama wants to merge 3 commits intoshubham/use-atomic-counterfrom
Conversation
f5869a5 to
f204e04
Compare
86bb186 to
e16d021
Compare
Replace the single-stream streamBuffer with a stream registry that maps stream IDs to stream objects. The registry currently holds at most one active stream (two briefly during handoff), but provides the foundation for stream multiplexing where callers will look up streams by ID directly.
f204e04 to
6abe605
Compare
e16d021 to
0129d8f
Compare
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
||
| if stream == nil { |
There was a problem hiding this comment.
This is input validation and should be done before taking the lock.
| } | ||
|
|
||
| if r.closed { | ||
| return managerClosed.New("register") |
| r.mu.RLock() | ||
| defer r.mu.RUnlock() | ||
|
|
||
| s, ok := r.streams[id] |
There was a problem hiding this comment.
Don't you have to check if registry is closed?
| func (r *streamRegistry) Unregister(id uint64) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
There was a problem hiding this comment.
Don't you have to check if registry is closed?
|
|
||
| // ForEach calls fn for each registered stream. The registry is read-locked | ||
| // during iteration. | ||
| func (r *streamRegistry) ForEach(fn func(*drpcstream.Stream)) { |
| // reg tracks active streams. Currently holds at most one active stream; | ||
| // a second may briefly coexist during stream handoff (old stream's | ||
| // Unregister races with new stream's Register). | ||
| reg *streamRegistry |
There was a problem hiding this comment.
nit: why not just call it activeStreams?
| // empty. It iterates the map because the registry may briefly hold two streams | ||
| // during stream handoff. This method should be removed once multiplexing is | ||
| // supported and callers look up streams by ID directly. | ||
| func (r *streamRegistry) GetLatest() *drpcstream.Stream { |
There was a problem hiding this comment.
Keep track of the latest streamID and avoid iterating over the map holding the lock.
drpcmanager/manager.go
Outdated
| m.log("TERM", func() string { return fmt.Sprint(err) }) | ||
| m.sigs.tport.Set(m.tr.Close()) | ||
| m.sbuf.Close() | ||
| m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() }) |
There was a problem hiding this comment.
Should this be s.Cancel() instead of s.Close()? s.Close() writes to the transport, and the current code already holds the lock and writes to the transport for every stream. I'm assuming this path isn't intended for mux streams?
Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.