3939STDERR_CHANNEL = 2
4040ERROR_CHANNEL = 3
4141RESIZE_CHANNEL = 4
42+ CLOSE_CHANNEL = 255
43+
44+ V4_CHANNEL_PROTOCOL = "v4.channel.k8s.io"
45+ V5_CHANNEL_PROTOCOL = "v5.channel.k8s.io"
4246
4347class _IgnoredIO :
4448 def write (self , _x ):
@@ -59,26 +63,40 @@ def __init__(self, configuration, url, headers, capture_all, binary=False):
5963 """
6064 self ._connected = False
6165 self ._channels = {}
66+ self ._closed_channels = set ()
67+ self .subprotocol = None
6268 self .binary = binary
6369 self .newline = '\n ' if not self .binary else b'\n '
6470 if capture_all :
6571 self ._all = StringIO () if not self .binary else BytesIO ()
6672 else :
6773 self ._all = _IgnoredIO ()
6874 self .sock = create_websocket (configuration , url , headers )
75+ self .subprotocol = getattr (self .sock , 'subprotocol' , None )
76+ if not self .subprotocol and self .sock :
77+ headers_dict = self .sock .getheaders ()
78+ if headers_dict :
79+ for k , v in headers_dict .items ():
80+ if k .lower () == 'sec-websocket-protocol' :
81+ self .subprotocol = v
82+ break
6983 self ._connected = True
7084 self ._returncode = None
7185
7286 def peek_channel (self , channel , timeout = 0 ):
7387 """Peek a channel and return part of the input,
7488 empty string otherwise."""
89+ if channel in self ._closed_channels and channel not in self ._channels :
90+ return b"" if self .binary else ""
7591 self .update (timeout = timeout )
7692 if channel in self ._channels :
7793 return self ._channels [channel ]
78- return ""
94+ return b"" if self . binary else ""
7995
8096 def read_channel (self , channel , timeout = 0 ):
8197 """Read data from a channel."""
98+ if channel in self ._closed_channels and channel not in self ._channels :
99+ return b"" if self .binary else ""
82100 if channel not in self ._channels :
83101 ret = self .peek_channel (channel , timeout )
84102 else :
@@ -93,6 +111,7 @@ def readline_channel(self, channel, timeout=None):
93111 timeout = float ("inf" )
94112 start = time .time ()
95113 while self .is_open () and time .time () - start < timeout :
114+ # Always try to drain the channel first
96115 if channel in self ._channels :
97116 data = self ._channels [channel ]
98117 if self .newline in data :
@@ -104,6 +123,14 @@ def readline_channel(self, channel, timeout=None):
104123 else :
105124 del self ._channels [channel ]
106125 return ret
126+
127+ if channel in self ._closed_channels :
128+ if channel in self ._channels :
129+ ret = self ._channels [channel ]
130+ del self ._channels [channel ]
131+ return ret
132+ return b"" if self .binary else ""
133+
107134 self .update (timeout = (timeout - time .time () + start ))
108135
109136 def write_channel (self , channel , data ):
@@ -119,6 +146,14 @@ def write_channel(self, channel, data):
119146 payload = channel_prefix + data
120147 self .sock .send (payload , opcode = opcode )
121148
149+ def close_channel (self , channel ):
150+ """Close a channel (v5 protocol only)."""
151+ if self .subprotocol != V5_CHANNEL_PROTOCOL :
152+ return
153+ data = bytes ([CLOSE_CHANNEL , channel ])
154+ self .sock .send (data , opcode = ABNF .OPCODE_BINARY )
155+ self ._closed_channels .add (channel )
156+
122157 def peek_stdout (self , timeout = 0 ):
123158 """Same as peek_channel with channel=1."""
124159 return self .peek_channel (STDOUT_CHANNEL , timeout = timeout )
@@ -200,13 +235,24 @@ def update(self, timeout=0):
200235 return
201236 elif op_code == ABNF .OPCODE_BINARY or op_code == ABNF .OPCODE_TEXT :
202237 data = frame .data
203- if six .PY3 and not self .binary :
204- data = data .decode ("utf-8" , "replace" )
205- if len (data ) > 1 :
238+ if len (data ) > 0 :
239+ # Parse channel from raw bytes to support v5 CLOSE signal AND avoid charset issues
206240 channel = data [0 ]
207- if six .PY3 and not self .binary :
208- channel = ord (channel )
241+ # In Py3, iterating bytes gives int, but indexing bytes gives int.
242+ # websocket-client frame.data might be bytes.
243+
244+ if channel == CLOSE_CHANNEL and self .subprotocol == V5_CHANNEL_PROTOCOL : # v5 CLOSE
245+ if len (data ) > 1 :
246+ # data[1] is already int in Py3 bytes
247+ close_chan = data [1 ]
248+ self ._closed_channels .add (close_chan )
249+ return
250+
209251 data = data [1 :]
252+ # Decode data if expected text
253+ if not self .binary :
254+ data = data .decode ("utf-8" , "replace" )
255+
210256 if data :
211257 if channel in [STDOUT_CHANNEL , STDERR_CHANNEL ]:
212258 # keeping all messages in the order they received
@@ -476,7 +522,7 @@ def create_websocket(configuration, url, headers=None):
476522 header .append ("sec-websocket-protocol: %s" %
477523 headers ['sec-websocket-protocol' ])
478524 else :
479- header .append ("sec-websocket-protocol: v4.channel.k8s.io" )
525+ header .append ("sec-websocket-protocol: %s,%s" % ( V5_CHANNEL_PROTOCOL , V4_CHANNEL_PROTOCOL ) )
480526
481527 if url .startswith ('wss://' ) and configuration .verify_ssl :
482528 ssl_opts = {
0 commit comments