Skip to content

Commit 6aabcee

Browse files
committed
Fix LibevConnection close() race causing EBADF errors (#614)
LibevConnection.close() closes the socket immediately while watchers are stopped asynchronously in the next event loop iteration via _loop_will_run(). This creates a race window where handle_read() or handle_write() can operate on a closed socket fd, producing EBADF errors that surface as ConnectionShutdown. - Add is_closed/is_defunct guards in handle_read() and handle_write() error paths to silently exit during shutdown instead of calling defunct() with EBADF - Set last_error in close() when connected_event is not yet set to prevent factory() from returning a dead connection - Set last_error on server-initiated close (EOF) in handle_read() before calling close()
1 parent f348637 commit 6aabcee

1 file changed

Lines changed: 12 additions & 1 deletion

File tree

cassandra/io/libevreactor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ def close(self):
300300
msg = "Connection to %s was closed" % self.endpoint
301301
if self.last_error:
302302
msg += ": %s" % (self.last_error,)
303-
self.error_all_requests(ConnectionShutdown(msg))
303+
shutdown_exc = ConnectionShutdown(msg)
304+
self.error_all_requests(shutdown_exc)
305+
if not self.connected_event.is_set():
306+
self.last_error = shutdown_exc
304307
self.connected_event.set()
305308

306309
def handle_write(self, watcher, revents, errno=None):
@@ -332,6 +335,8 @@ def handle_write(self, watcher, revents, errno=None):
332335
with self._deque_lock:
333336
self.deque.appendleft(next_msg)
334337
else:
338+
if self.is_closed or self.is_defunct:
339+
return
335340
self.defunct(err)
336341
return
337342
else:
@@ -365,19 +370,25 @@ def handle_read(self, watcher, revents, errno=None):
365370
if not self._iobuf.tell():
366371
return
367372
else:
373+
if self.is_closed or self.is_defunct:
374+
return
368375
self.defunct(err)
369376
return
370377
elif err.args[0] in NONBLOCKING:
371378
if not self._iobuf.tell():
372379
return
373380
else:
381+
if self.is_closed or self.is_defunct:
382+
return
374383
self.defunct(err)
375384
return
376385

377386
if self._iobuf.tell():
378387
self.process_io_buffer()
379388
else:
380389
log.debug("Connection %s closed by server", self)
390+
self.last_error = ConnectionShutdown(
391+
"Connection to %s was closed by server" % self.endpoint)
381392
self.close()
382393

383394
def push(self, data):

0 commit comments

Comments
 (0)