From 647b693e2b60ca78780dc00d176a4ba1e9529614 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 4 Nov 2025 08:13:33 +0100 Subject: [PATCH 1/3] Flush error handling for Kafka writer. --- DEVELOPER.md | 2 +- requirements.txt | 2 +- src/writer_kafka.py | 74 +++++++++++++++++++++++----------- tests/test_writer_kafka.py | 81 +++++++++++++++++++++++++++++++++++++- 4 files changed, 132 insertions(+), 27 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index aa55859..22350a5 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -28,7 +28,7 @@ cd EventGate ```shell python3 -m venv .venv source .venv/bin/activate -pip install -r requirements.txt +pip3 install -r requirements.txt ``` ## Run Pylint Tool Locally diff --git a/requirements.txt b/requirements.txt index ac326f0..01e824e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,6 @@ jsonschema==4.25.1 PyJWT==2.10.1 requests==2.32.5 boto3==1.40.25 -confluent-kafka==2.11.1 +confluent-kafka==2.12.1 # psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use psycopg2==2.9.10 diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 692aeaa..cfa2857 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -22,8 +22,8 @@ import json import logging import os +import time from typing import Any, Dict, Optional, Tuple - from confluent_kafka import Producer try: # KafkaException may not exist in stubbed test module @@ -35,8 +35,10 @@ class KafkaException(Exception): # type: ignore STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None} -# Configurable flush timeout (seconds) to avoid hanging indefinitely -_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "5")) +# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely +_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) +_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) +_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) def init(logger: logging.Logger, config: Dict[str, Any]) -> None: @@ -86,7 +88,6 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] """ logger = STATE["logger"] producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment] - if producer is None: logger.debug("Kafka producer not initialized - skipping") return True, None @@ -100,23 +101,48 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] value=json.dumps(message).encode("utf-8"), callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), ) - try: - remaining = producer.flush(_KAFKA_FLUSH_TIMEOUT_SEC) # type: ignore[arg-type] - except TypeError: # Fallback for stub producers without timeout parameter - remaining = producer.flush() # type: ignore[call-arg] - # remaining can be number of undelivered messages (confluent_kafka returns int) - if not errors and isinstance(remaining, int) and remaining > 0: - timeout_msg = f"Kafka flush timeout after {_KAFKA_FLUSH_TIMEOUT_SEC}s: {remaining} message(s) still pending" - logger.error(timeout_msg) - return False, timeout_msg - except KafkaException as e: # narrow exception capture - err_msg = f"The Kafka writer failed with unknown error: {str(e)}" - logger.exception(err_msg) - return False, err_msg - - if errors: - msg = "; ".join(errors) - logger.error(msg) - return False, msg - - return True, None + + remaining: Optional[int] = None + for attempt in range(1, _MAX_RETRIES + 1): + remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) + # Treat None (flush returns None in some stubs) as success equivalent to 0 pending + if (remaining is None or remaining == 0) and not errors: + break + if attempt < _MAX_RETRIES: + logger.warning( + "Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES + ) + time.sleep(_RETRY_BACKOFF_SEC) + + if errors: + err_msg_summary = "; ".join(errors) + logger.error(err_msg_summary) + return False, err_msg_summary + + # Log a warning if there are still pending messages after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) + + return True, None + + except KafkaException as e: + err_text = f"The Kafka writer failed with a Kafka exception error: {e}" + logger.exception(err_text) + return False, err_text + + +def flush_with_timeout(producer, timeout: float) -> int: + """Flush the Kafka producer with a timeout, handling TypeError for stubs. + + Args: + producer: Kafka Producer instance. + timeout: Timeout in seconds. + Returns: + Number of messages still pending after flush. + """ + try: + return producer.flush(timeout) + except TypeError: # Fallback for stub producers without timeout parameter + return producer.flush() diff --git a/tests/test_writer_kafka.py b/tests/test_writer_kafka.py index 6017b3e..bb880bb 100644 --- a/tests/test_writer_kafka.py +++ b/tests/test_writer_kafka.py @@ -1,4 +1,3 @@ -import json import logging from types import SimpleNamespace import src.writer_kafka as wk @@ -23,6 +22,44 @@ def produce(self, topic, key, value, callback): # noqa: D401 callback("ERR", None) +class FakeProducerFlushSequence(FakeProducerSuccess): + def __init__(self, sequence): # sequence of remaining counts per flush call + super().__init__() + self.sequence = sequence + self.flush_calls = 0 + + def flush(self, *a, **kw): + # Simulate decreasing remaining messages + if self.flush_calls < len(self.sequence): + val = self.sequence[self.flush_calls] + else: + val = self.sequence[-1] + self.flush_calls += 1 + return val + + +class FakeProducerTimeout(FakeProducerSuccess): + def __init__(self, remaining_value): + super().__init__() + self.remaining_value = remaining_value + self.flush_calls = 0 + + def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning + self.flush_calls += 1 + return self.remaining_value + + +class FakeProducerTypeError(FakeProducerSuccess): + def __init__(self): + super().__init__() + self.flush_calls = 0 + + # Intentionally omit timeout parameter causing TypeError on first attempt inside flush_with_timeout + def flush(self): # noqa: D401 + self.flush_calls += 1 + return 0 + + def test_write_skips_when_producer_none(monkeypatch): wk.STATE["logger"] = logging.getLogger("test") wk.STATE["producer"] = None @@ -60,3 +97,45 @@ def produce(self, *a, **kw): # noqa: D401 wk.STATE["producer"] = RaisingProducer() ok, err = wk.write("topic", {"d": 4}) assert not ok and "boom" in err + + +def test_write_flush_retries_until_success(monkeypatch, caplog): + wk.STATE["logger"] = logging.getLogger("test") + caplog.set_level(logging.WARNING) + # Force smaller max retries for deterministic sequence length + monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False) + producer = FakeProducerFlushSequence([5, 4, 3, 1, 0]) + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"e": 5}) + assert ok and err is None + # It should break as soon as remaining == 0 (after flush call returning 0) + assert producer.flush_calls == 5 # sequence consumed until 0 + # Warnings logged for attempts before success (flush_calls -1) because last attempt didn't warn + warn_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING] + assert any("attempt 1" in m or "attempt 2" in m for m in warn_messages) + + +def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog): + wk.STATE["logger"] = logging.getLogger("test") + caplog.set_level(logging.WARNING) + monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False) + producer = FakeProducerTimeout(2) + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"f": 6}) + timeout_warnings = [ + r.message for r in caplog.records if "timeout" in r.message + ] # final warning should mention timeout + assert ok and err is None # function returns success even if timeout warning + assert timeout_warnings, "Expected timeout warning logged" + assert producer.flush_calls == 3 # retried 3 times + + +def test_flush_with_timeout_typeerror_fallback(monkeypatch): + wk.STATE["logger"] = logging.getLogger("test") + monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False) + producer = FakeProducerTypeError() + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"g": 7}) + assert ok and err is None + # Since flush returns 0 immediately, only one flush call should be needed + assert producer.flush_calls == 1 From 9e54970fbec0172a86e55b27a79f961040e40530 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 4 Nov 2025 08:42:10 +0100 Subject: [PATCH 2/3] Method documentation improvement. --- src/writer_kafka.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/writer_kafka.py b/src/writer_kafka.py index cfa2857..8a0bf4a 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -133,14 +133,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] return False, err_text -def flush_with_timeout(producer, timeout: float) -> int: +def flush_with_timeout(producer, timeout: float) -> Optional[int]: """Flush the Kafka producer with a timeout, handling TypeError for stubs. Args: producer: Kafka Producer instance. timeout: Timeout in seconds. Returns: - Number of messages still pending after flush. + Number of messages still pending after the flush call (0 all messages delivered). + None is returned only if the underlying (stub/mock) producer.flush() does not provide a count. """ try: return producer.flush(timeout) From fd3b1ba7b3a1f138b6d50ec48b1d272c4c504076 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 4 Nov 2025 16:08:53 +0100 Subject: [PATCH 3/3] Comments implementation. --- src/writer_kafka.py | 59 ++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 8a0bf4a..6151776 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -92,7 +92,10 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] logger.debug("Kafka producer not initialized - skipping") return True, None - errors: list[Any] = [] + errors: list[str] = [] + has_exception = False + + # Produce step try: logger.debug("Sending to kafka %s", topic_name) producer.produce( @@ -101,36 +104,38 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] value=json.dumps(message).encode("utf-8"), callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), ) + except KafkaException as e: + errors.append(f"Produce exception: {e}") + has_exception = True - remaining: Optional[int] = None - for attempt in range(1, _MAX_RETRIES + 1): + # Flush step (always attempted) + remaining: Optional[int] = None + for attempt in range(1, _MAX_RETRIES + 1): + try: remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) - # Treat None (flush returns None in some stubs) as success equivalent to 0 pending - if (remaining is None or remaining == 0) and not errors: - break - if attempt < _MAX_RETRIES: - logger.warning( - "Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES - ) - time.sleep(_RETRY_BACKOFF_SEC) - - if errors: - err_msg_summary = "; ".join(errors) - logger.error(err_msg_summary) - return False, err_msg_summary - - # Log a warning if there are still pending messages after retries - if isinstance(remaining, int) and remaining > 0: - logger.warning( - "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining - ) + except KafkaException as e: + errors.append(f"Flush exception: {e}") + has_exception = True + + # Treat None (flush returns None in some stubs) as success equivalent to 0 pending + if (remaining is None or remaining == 0) and not errors: + break + if attempt < _MAX_RETRIES: + logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES) + time.sleep(_RETRY_BACKOFF_SEC) + + # Warn if messages still pending after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) - return True, None + if errors: + failure_text = "Kafka writer failed: " + "; ".join(errors) + (logger.exception if has_exception else logger.error)(failure_text) + return False, failure_text - except KafkaException as e: - err_text = f"The Kafka writer failed with a Kafka exception error: {e}" - logger.exception(err_text) - return False, err_text + return True, None def flush_with_timeout(producer, timeout: float) -> Optional[int]: