Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Bump `pylint` to `4.0.5`
([#4244](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4244))

### Fixed

- `opentelemetry-instrumentation-confluent-kafka`: Populate `server.address` and `server.port` span attributes from the producer/consumer `bootstrap.servers` config; previously `KafkaPropertiesExtractor.extract_bootstrap_servers` was defined but never called
([#4423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4423))

### Breaking changes

- Drop Python 3.9 support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .version import __version__


def _capture_config(args, kwargs):
"""Return the config dict that was passed to a Producer/Consumer
constructor, regardless of whether it was supplied positionally, as
``conf=`` kwarg, or (for Consumer) expanded as **kwargs."""
if args and isinstance(args[0], dict):
return args[0]
conf = kwargs.get("conf")
if isinstance(conf, dict):
return conf
# confluent_kafka.Consumer also supports Consumer(**conf) — in that case
# the kwargs themselves are the config.
if kwargs:
return dict(kwargs)
return None


class AutoInstrumentedProducer(Producer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = _capture_config(args, kwargs)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
super().produce(topic, value, *args, **kwargs)
Expand All @@ -136,6 +156,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
class AutoInstrumentedConsumer(Consumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = _capture_config(args, kwargs)
self._current_consume_span = None

# This method is deliberately implemented in order to allow wrapt to wrap this function
Expand All @@ -155,6 +176,10 @@ class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
self._producer = producer
self._tracer = tracer
# Surface the wrapped producer's config (if any) so that
# KafkaPropertiesExtractor.extract_bootstrap_servers can read it
# through this proxy.
self.config = getattr(producer, "config", None)

def flush(self, timeout=-1):
return self._producer.flush(timeout)
Expand Down Expand Up @@ -184,6 +209,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
self._tracer = tracer
self._current_consume_span = None
self._current_context_token = None
# See ProxiedProducer.__init__ for rationale.
self.config = getattr(consumer, "config", None)

def close(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_close(
Expand Down Expand Up @@ -367,11 +394,15 @@ def wrap_produce(func, instance, tracer, args, kwargs):
topic = KafkaPropertiesExtractor.extract_produce_topic(
args, kwargs
)
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
_enrich_span(
span,
topic,
operation=MessagingOperationTypeValues.RECEIVE,
) # Replace
operation=MessagingOperationTypeValues.PUBLISH,
bootstrap_servers=bootstrap_servers,
) # Publish
propagate.inject(
headers,
setter=_kafka_setter,
Expand All @@ -385,6 +416,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):

record = func(*args, **kwargs)
if record:
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
Expand All @@ -395,6 +429,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
record.partition(),
record.offset(),
operation=MessagingOperationTypeValues.PROCESS,
bootstrap_servers=bootstrap_servers,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
Expand All @@ -409,6 +444,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):

records = func(*args, **kwargs)
if len(records) > 0:
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
Expand All @@ -417,6 +455,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationTypeValues.PROCESS,
bootstrap_servers=bootstrap_servers,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
MESSAGING_SYSTEM,
MessagingOperationTypeValues,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
SpanAttributes,
Expand All @@ -21,7 +25,17 @@
class KafkaPropertiesExtractor:
@staticmethod
def extract_bootstrap_servers(instance):
return instance.config.get("bootstrap_servers")
config = getattr(instance, "config", None)
if not isinstance(config, dict):
return None
# confluent-kafka uses the dotted key "bootstrap.servers"; also accept
# the python-style "bootstrap_servers" for robustness.
servers = config.get("bootstrap.servers") or config.get(
"bootstrap_servers"
)
if isinstance(servers, (list, tuple)):
servers = ",".join(str(s) for s in servers)
return servers

@staticmethod
def _extract_argument(key, position, default_value, args, kwargs):
Expand Down Expand Up @@ -115,12 +129,35 @@ def _get_links_from_records(records):
return links


def _set_bootstrap_servers_attributes(span, bootstrap_servers):
"""Populate server.address and server.port from a bootstrap.servers
string (e.g. ``host1:9092,host2:9092``)."""
if not bootstrap_servers:
return

first_broker = bootstrap_servers.split(",")[0].strip()
if not first_broker:
return

if ":" in first_broker:
host, _, port = first_broker.rpartition(":")
span.set_attribute(SERVER_ADDRESS, host)
try:
span.set_attribute(SERVER_PORT, int(port))
except ValueError:
# Port wasn't numeric; skip rather than emit a bad attribute.
_LOG.debug("non-numeric port in bootstrap.servers: %r", port)
else:
span.set_attribute(SERVER_ADDRESS, first_broker)


def _enrich_span(
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationTypeValues] = None,
bootstrap_servers: Optional[str] = None,
):
if not span.is_recording():
return
Expand All @@ -141,6 +178,8 @@ def _enrich_span(
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)

_set_bootstrap_servers_attributes(span, bootstrap_servers)

# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition is not None and offset is not None and topic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
MESSAGING_OPERATION,
MESSAGING_SYSTEM,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
SpanAttributes,
Expand Down Expand Up @@ -447,3 +451,45 @@ def test_producer_flush(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._assert_span_count(span_list, 1)
self._assert_topic(span_list[0], "topic-1")

def test_producer_sets_bootstrap_servers_attributes(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
producer = MockedProducer(
[],
{
"bootstrap.servers": "broker-a:9092,broker-b:9093",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="k", value="v")

span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(span.attributes[SERVER_ADDRESS], "broker-a")
self.assertEqual(span.attributes[SERVER_PORT], 9092)

def test_consumer_sets_bootstrap_servers_attributes(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[MockedMessage("topic-1", 0, 0, [])],
{
"bootstrap.servers": "broker-1:9092",
"group.id": "g",
"auto.offset.reset": "earliest",
},
)

self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
# Second (empty) poll ends the in-flight `<topic> process` span so it
# shows up in the exporter.
consumer.poll()

process_span = next(
s
for s in self.memory_exporter.get_finished_spans()
if s.name == "topic-1 process"
)
self.assertEqual(process_span.attributes[SERVER_ADDRESS], "broker-1")
self.assertEqual(process_span.attributes[SERVER_PORT], 9092)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class MockConsumer(Consumer):
def __init__(self, queue, config):
self._queue = queue
self.config = config
super().__init__(config)

def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
Expand Down Expand Up @@ -58,6 +59,7 @@ def value(self):
class MockedProducer(Producer):
def __init__(self, queue, config):
self._queue = queue
self.config = config
super().__init__(config)

def produce(self, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
Expand Down
Loading