Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions lib/protocol/http2/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,45 @@

module Protocol
module HTTP2
# Represents an HTTP/2 client connection.
# Manages client-side protocol semantics including stream ID allocation,
# connection preface handling, and push promise processing.
class Client < Connection
# Initialize a new HTTP/2 client connection.
# @parameter framer [Framer] The frame handler for reading/writing HTTP/2 frames.
def initialize(framer)
super(framer, 1)
end

# Check if the given stream ID represents a locally-initiated stream.
# Client streams have odd numbered IDs.
# @parameter id [Integer] The stream ID to check.
# @returns [bool] True if the stream ID is locally-initiated.
def local_stream_id?(id)
id.odd?
end

# Check if the given stream ID represents a remotely-initiated stream.
# Server streams have even numbered IDs.
# @parameter id [Integer] The stream ID to check.
# @returns [bool] True if the stream ID is remotely-initiated.
def remote_stream_id?(id)
id.even?
end

# Check if the given stream ID is valid for remote initiation.
# Server-initiated streams must have even numbered IDs.
# @parameter stream_id [Integer] The stream ID to validate.
# @returns [bool] True if the stream ID is valid for remote initiation.
def valid_remote_stream_id?(stream_id)
stream_id.even?
end

# Send the HTTP/2 connection preface and initial settings.
# This must be called once when the connection is first established.
# @parameter settings [Array] Optional settings to send with the connection preface.
# @raises [ProtocolError] If called when not in the new state.
# @yields Allows custom processing during preface exchange.
def send_connection_preface(settings = [])
if @state == :new
@framer.write_connection_preface
Expand All @@ -42,10 +64,15 @@ def send_connection_preface(settings = [])
end
end

# Clients cannot create push promise streams.
# @raises [ProtocolError] Always, as clients cannot initiate push promises.
def create_push_promise_stream
raise ProtocolError, "Cannot create push promises from client!"
end

# Process a push promise frame received from the server.
# @parameter frame [PushPromiseFrame] The push promise frame to process.
# @returns [Array(Stream, Hash) | Nil] The promised stream and request headers, or nil if no associated stream.
def receive_push_promise(frame)
if frame.stream_id == 0
raise ProtocolError, "Cannot receive headers for stream 0!"
Expand Down
86 changes: 86 additions & 0 deletions lib/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@

module Protocol
module HTTP2
# This is the core connection class that handles HTTP/2 protocol semantics including
# stream management, settings negotiation, and frame processing.
class Connection
include FlowControlled

# Initialize a new HTTP/2 connection.
# @parameter framer [Framer] The frame handler for reading/writing HTTP/2 frames.
# @parameter local_stream_id [Integer] The starting stream ID for locally-initiated streams.
def initialize(framer, local_stream_id)
super()

Expand All @@ -41,10 +46,15 @@ def initialize(framer, local_stream_id)
@remote_window = Window.new
end

# The connection stream ID (always 0 for connection-level operations).
# @returns [Integer] Always returns 0 for the connection itself.
def id
0
end

# Access streams by ID, with 0 returning the connection itself.
# @parameter id [Integer] The stream ID to look up.
# @returns [Connection | Stream | Nil] The connection (if id=0), stream, or nil.
def [] id
if id.zero?
self
Expand Down Expand Up @@ -89,6 +99,9 @@ def closed?
@state == :closed || @framer.nil?
end

# Remove a stream from the active streams collection.
# @parameter id [Integer] The stream ID to remove.
# @returns [Stream | Nil] The removed stream, or nil if not found.
def delete(id)
@streams.delete(id)
end
Expand All @@ -106,10 +119,17 @@ def close(error = nil)
end
end

# Encode headers using HPACK compression.
# @parameter headers [Array] The headers to encode.
# @parameter buffer [String] Optional buffer for encoding output.
# @returns [String] The encoded header block.
def encode_headers(headers, buffer = String.new.b)
HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers)
end

# Decode headers using HPACK decompression.
# @parameter data [String] The encoded header block data.
# @returns [Array] The decoded headers.
def decode_headers(data)
HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode
end
Expand Down Expand Up @@ -141,6 +161,9 @@ def ignore_frame?(frame)
end
end

# Execute a block within a synchronized context.
# This method provides a synchronization primitive for thread safety.
# @yields The block to execute within the synchronized context.
def synchronize
yield
end
Expand Down Expand Up @@ -171,6 +194,8 @@ def read_frame
raise
end

# Send updated settings to the remote peer.
# @parameter changes [Hash] The settings changes to send.
def send_settings(changes)
@local_settings.append(changes)

Expand All @@ -197,6 +222,9 @@ def send_goaway(error_code = 0, message = "")
self.close!
end

# Process a GOAWAY frame from the remote peer.
# @parameter frame [GoawayFrame] The GOAWAY frame to process.
# @raises [GoawayError] If the frame indicates a connection error.
def receive_goaway(frame)
# We capture the last stream that was processed.
@remote_stream_id, error_code, message = frame.unpack
Expand All @@ -209,6 +237,8 @@ def receive_goaway(frame)
end
end

# Write a single frame to the connection.
# @parameter frame [Frame] The frame to write.
def write_frame(frame)
synchronize do
@framer.write_frame(frame)
Expand All @@ -217,6 +247,10 @@ def write_frame(frame)
@framer.flush
end

# Write multiple frames within a synchronized block.
# @yields {|framer| ...} The framer for writing multiple frames.
# @parameter framer [Framer] The framer instance.
# @raises [EOFError] If the connection is closed.
def write_frames
if @framer
synchronize do
Expand All @@ -229,6 +263,8 @@ def write_frames
end
end

# Update local settings and adjust stream window capacities.
# @parameter changes [Hash] The settings changes to apply locally.
def update_local_settings(changes)
capacity = @local_settings.initial_window_size

Expand All @@ -239,6 +275,8 @@ def update_local_settings(changes)
@local_window.desired = capacity
end

# Update remote settings and adjust stream window capacities.
# @parameter changes [Hash] The settings changes to apply to remote peer.
def update_remote_settings(changes)
capacity = @remote_settings.initial_window_size

Expand Down Expand Up @@ -273,12 +311,17 @@ def process_settings(frame)
end
end

# Transition the connection to the open state.
# @returns [Connection] Self for method chaining.
def open!
@state = :open

return self
end

# Receive and process a SETTINGS frame from the remote peer.
# @parameter frame [SettingsFrame] The settings frame to process.
# @raises [ProtocolError] If the connection is in an invalid state.
def receive_settings(frame)
if @state == :new
# We transition to :open when we receive acknowledgement of first settings frame:
Expand All @@ -290,6 +333,8 @@ def receive_settings(frame)
end
end

# Send a PING frame to the remote peer.
# @parameter data [String] The 8-byte ping payload data.
def send_ping(data)
if @state != :closed
frame = PingFrame.new
Expand All @@ -301,6 +346,9 @@ def send_ping(data)
end
end

# Process a PING frame from the remote peer.
# @parameter frame [PingFrame] The ping frame to process.
# @raises [ProtocolError] If ping is received in invalid state.
def receive_ping(frame)
if @state != :closed
# This is handled in `read_payload`:
Expand All @@ -318,6 +366,9 @@ def receive_ping(frame)
end
end

# Process a DATA frame from the remote peer.
# @parameter frame [DataFrame] The data frame to process.
# @raises [ProtocolError] If data is received for invalid stream.
def receive_data(frame)
update_local_window(frame)

Expand All @@ -330,6 +381,10 @@ def receive_data(frame)
end
end

# Check if the given stream ID is valid for remote initiation.
# This method should be overridden by client/server implementations.
# @parameter stream_id [Integer] The stream ID to validate.
# @returns [Boolean] True if the stream ID is valid for remote initiation.
def valid_remote_stream_id?(stream_id)
false
end
Expand Down Expand Up @@ -366,6 +421,10 @@ def create_stream(id = next_stream_id, &block)
end
end

# Create a push promise stream.
# This method should be overridden by client/server implementations.
# @yields {|stream| ...} Optional block to configure the created stream.
# @returns [Stream] The created push promise stream.
def create_push_promise_stream(&block)
create_stream(&block)
end
Expand Down Expand Up @@ -397,10 +456,16 @@ def receive_headers(frame)
end
end

# Receive and process a PUSH_PROMISE frame.
# @parameter frame [PushPromiseFrame] The push promise frame.
# @raises [ProtocolError] Always raises as push promises are not supported.
def receive_push_promise(frame)
raise ProtocolError, "Unable to receive push promise!"
end

# Receive and process a PRIORITY_UPDATE frame.
# @parameter frame [PriorityUpdateFrame] The priority update frame.
# @raises [ProtocolError] If the stream ID is invalid.
def receive_priority_update(frame)
if frame.stream_id != 0
raise ProtocolError, "Invalid stream id: #{frame.stream_id}"
Expand All @@ -414,14 +479,25 @@ def receive_priority_update(frame)
end
end

# Check if the given stream ID represents a client-initiated stream.
# Client streams always have odd numbered IDs.
# @parameter id [Integer] The stream ID to check.
# @returns [Boolean] True if the stream ID is client-initiated.
def client_stream_id?(id)
id.odd?
end

# Check if the given stream ID represents a server-initiated stream.
# Server streams always have even numbered IDs.
# @parameter id [Integer] The stream ID to check.
# @returns [Boolean] True if the stream ID is server-initiated.
def server_stream_id?(id)
id.even?
end

# Check if the given stream ID represents an idle stream.
# @parameter id [Integer] The stream ID to check.
# @returns [Boolean] True if the stream ID is idle (not yet used).
def idle_stream_id?(id)
if id.even?
# Server-initiated streams are even.
Expand Down Expand Up @@ -450,6 +526,9 @@ def closed_stream_id?(id)
end
end

# Receive and process a RST_STREAM frame.
# @parameter frame [ResetStreamFrame] The reset stream frame.
# @raises [ProtocolError] If the frame is invalid for connection context.
def receive_reset_stream(frame)
if frame.connection?
raise ProtocolError, "Cannot reset connection!"
Expand All @@ -475,6 +554,8 @@ def consume_window(size = self.available_size)
end
end

# Receive and process a WINDOW_UPDATE frame.
# @parameter frame [WindowUpdateFrame] The window update frame.
def receive_window_update(frame)
if frame.connection?
super
Expand All @@ -494,10 +575,15 @@ def receive_window_update(frame)
end
end

# Receive and process a CONTINUATION frame.
# @parameter frame [ContinuationFrame] The continuation frame.
# @raises [ProtocolError] Always raises as unexpected continuation frames are not supported.
def receive_continuation(frame)
raise ProtocolError, "Received unexpected continuation: #{frame.class}"
end

# Receive and process a generic frame (default handler).
# @parameter frame [Frame] The frame to receive.
def receive_frame(frame)
# ignore.
end
Expand Down
Loading
Loading