diff --git a/faktory/_proto.py b/faktory/_proto.py index 85d5050..786d32f 100644 --- a/faktory/_proto.py +++ b/faktory/_proto.py @@ -243,7 +243,7 @@ def is_supported_server_version(self, v: int) -> bool: def get_message(self) -> Iterator[str]: socket = self.socket - buffer = self.select_data(self.buffer_size) + buffer = self.select_data() while self.is_connected or self.is_connecting: buffering = True while buffering: @@ -271,32 +271,31 @@ def get_message(self) -> Iterator[str]: self.log.debug("> {}".format("nil")) yield "" else: - if len(buffer) >= number_of_bytes: - # we've already got enough bytes in the buffer - data = buffer[:number_of_bytes] - buffer = buffer[number_of_bytes:] - else: - data = buffer - while len(data) != number_of_bytes: - bytes_required = number_of_bytes - len(data) - data += self.select_data(bytes_required) - buffer = [] + while len(buffer) < number_of_bytes: + buffer += self.select_data() + data = buffer[:number_of_bytes] + buffer = buffer[number_of_bytes:] resp = data.decode().strip("\r\n ") if self.debug: self.log.debug("> {}".format(resp)) yield resp else: - more = self.select_data(self.buffer_size) + more = self.select_data() if not more: buffering = False else: buffer += more - def select_data(self, buffer_size: int): + def select_data(self): s = self.socket ready = select.select([s], [], [], self.timeout) if ready[0]: - buffer = s.recv(buffer_size) + buffer = s.recv(self.buffer_size) + if self.use_tls: + unread = s.pending() + while unread: + buffer += s.recv(unread) + unread = s.pending() if len(buffer) > 0: return buffer self.disconnect()