Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
'trace-attributes==7.2.0',
'trace-attributes==7.2.1',
'opentelemetry-api>=1.25.0',
'opentelemetry-sdk>=1.25.0',
'opentelemetry-instrumentation>=0.47b0',
Expand Down
8 changes: 8 additions & 0 deletions src/examples/neo4j_example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .basic import execute_query
from langtrace_python_sdk import with_langtrace_root_span


class Neo4jRunner:
@with_langtrace_root_span("Neo4jRunner")
def run(self):
execute_query()
26 changes: 26 additions & 0 deletions src/examples/neo4j_example/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from langtrace_python_sdk import langtrace
from neo4j import GraphDatabase

langtrace.init()

def execute_query():
driver = GraphDatabase.driver(
os.getenv("NEO4J_URI"),
auth=(os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
)

records, summary, keys = driver.execute_query(
"MATCH (p:Person {age: $age}) RETURN p.name AS name",
age=42,
database_=os.getenv("NEO4J_DATABASE"),
)

# Loop through results and do something with them
for person in records:
print(person)
# Summary information
print("The query `{query}` returned {records_count} records in {time} ms.".format(
query=summary.query, records_count=len(records),
time=summary.result_available_after,
))
9 changes: 9 additions & 0 deletions src/examples/neo4j_graphrag_example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import asyncio
from .basic import search
from langtrace_python_sdk import with_langtrace_root_span


class Neo4jGraphRagRunner:
@with_langtrace_root_span("Neo4jGraphRagRunner")
def run(self):
asyncio.run(search())
52 changes: 52 additions & 0 deletions src/examples/neo4j_graphrag_example/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
from langtrace_python_sdk import langtrace
from langtrace_python_sdk.utils.with_root_span import with_langtrace_root_span
from neo4j import GraphDatabase
from neo4j_graphrag.generation import GraphRAG
from neo4j_graphrag.indexes import create_vector_index
from neo4j_graphrag.llm import OpenAILLM as LLM
from neo4j_graphrag.embeddings import OpenAIEmbeddings as Embeddings
from neo4j_graphrag.retrievers import VectorRetriever
from neo4j_graphrag.experimental.pipeline.kg_builder import SimpleKGPipeline

langtrace.init()

neo4j_driver = GraphDatabase.driver(os.getenv("NEO4J_URI"), auth=(os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD")))

ex_llm=LLM(
model_name="gpt-4o-mini",
model_params={
"response_format": {"type": "json_object"},
"temperature": 0
})

embedder = Embeddings()

@with_langtrace_root_span("run_neo_rag")
async def search():
# 1. Build KG and Store in Neo4j Database
kg_builder_pdf = SimpleKGPipeline(
llm=ex_llm,
driver=neo4j_driver,
embedder=embedder,
from_pdf=True
)
await kg_builder_pdf.run_async(file_path='src/examples/neo4j_graphrag_example/data/abramov.pdf')

create_vector_index(neo4j_driver, name="text_embeddings", label="Chunk",
embedding_property="embedding", dimensions=1536, similarity_fn="cosine")

# 2. KG Retriever
vector_retriever = VectorRetriever(
neo4j_driver,
index_name="text_embeddings",
embedder=embedder
)

# 3. GraphRAG Class
llm = LLM(model_name="gpt-4o")
rag = GraphRAG(llm=llm, retriever=vector_retriever)

# 4. Run
response = rag.search("What did the author do in college?")
print(response.answer)
Binary file not shown.
36 changes: 36 additions & 0 deletions src/langtrace_python_sdk/constants/instrumentation/neo4j.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from langtrace.trace_attributes import Neo4jMethods

APIS = {
"RUN": {
"METHOD": Neo4jMethods.RUN.value,
"OPERATION": "run",
},
"BEGIN_TRANSACTION": {
"METHOD": Neo4jMethods.BEGIN_TRANSACTION.value,
"OPERATION": "begin_transaction",
},
"READ_TRANSACTION": {
"METHOD": Neo4jMethods.READ_TRANSACTION.value,
"OPERATION": "read_transaction",
},
"WRITE_TRANSACTION": {
"METHOD": Neo4jMethods.WRITE_TRANSACTION.value,
"OPERATION": "write_transaction",
},
"EXECUTE_READ": {
"METHOD": Neo4jMethods.EXECUTE_READ.value,
"OPERATION": "execute_read",
},
"EXECUTE_WRITE": {
"METHOD": Neo4jMethods.EXECUTE_WRITE.value,
"OPERATION": "execute_write",
},
"EXECUTE_QUERY": {
"METHOD": Neo4jMethods.EXECUTE_QUERY.value,
"OPERATION": "execute_query",
},
"TX_RUN": {
"METHOD": Neo4jMethods.TX_RUN.value,
"OPERATION": "tx_run",
},
}
1 change: 1 addition & 0 deletions src/langtrace_python_sdk/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .llamaindex import LlamaindexInstrumentation
from .milvus import MilvusInstrumentation
from .mistral import MistralInstrumentation
from .neo4j import Neo4jInstrumentation
from .neo4j_graphrag import Neo4jGraphRAGInstrumentation
from .ollama import OllamaInstrumentor
from .openai import OpenAIInstrumentation
Expand Down
3 changes: 3 additions & 0 deletions src/langtrace_python_sdk/instrumentation/neo4j/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instrumentation import Neo4jInstrumentation

__all__ = ["Neo4jInstrumentation"]
51 changes: 51 additions & 0 deletions src/langtrace_python_sdk/instrumentation/neo4j/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Copyright (c) 2025 Scale3 Labs

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import importlib.metadata
import logging
from typing import Collection

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from langtrace_python_sdk.constants.instrumentation.neo4j import APIS
from langtrace_python_sdk.instrumentation.neo4j.patch import driver_patch

logging.basicConfig(level=logging.FATAL)


class Neo4jInstrumentation(BaseInstrumentor):
"""
The Neo4jInstrumentation class represents the Neo4j graph database instrumentation
"""

def instrumentation_dependencies(self) -> Collection[str]:
return ["neo4j >= 5.25.0"]

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
version = importlib.metadata.version("neo4j")

wrap_function_wrapper(
"neo4j._sync.driver",
"Driver.execute_query",
driver_patch("EXECUTE_QUERY", version, tracer),
)

def _uninstrument(self, **kwargs):
pass
180 changes: 180 additions & 0 deletions src/langtrace_python_sdk/instrumentation/neo4j/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
Copyright (c) 2025 Scale3 Labs

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import json

from langtrace_python_sdk.utils.llm import get_span_name
from langtrace_python_sdk.utils.silently_fail import silently_fail
from langtrace.trace_attributes import DatabaseSpanAttributes
from langtrace_python_sdk.utils import set_span_attribute
from opentelemetry import baggage, trace
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace.propagation import set_span_in_context

from langtrace_python_sdk.constants.instrumentation.common import (
LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY,
SERVICE_PROVIDERS,
)
from langtrace_python_sdk.constants.instrumentation.neo4j import APIS
from importlib.metadata import version as v

from langtrace_python_sdk.constants import LANGTRACE_SDK_NAME


def driver_patch(operation_name, version, tracer):
def traced_method(wrapped, instance, args, kwargs):

api = APIS[operation_name]
service_provider = SERVICE_PROVIDERS.get("NEO4J", "neo4j")
extra_attributes = baggage.get_baggage(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY)
span_attributes = {
"langtrace.sdk.name": "langtrace-python-sdk",
"langtrace.service.name": service_provider,
"langtrace.service.type": "vectordb",
"langtrace.service.version": version,
"langtrace.version": v(LANGTRACE_SDK_NAME),
"db.system": "neo4j",
"db.operation": api["OPERATION"],
"db.query": json.dumps(args[0]) if args and len(args) > 0 else "",
**(extra_attributes if extra_attributes is not None else {}),
}

attributes = DatabaseSpanAttributes(**span_attributes)

with tracer.start_as_current_span(
name=get_span_name(api["METHOD"]),
kind=SpanKind.CLIENT,
context=set_span_in_context(trace.get_current_span()),
) as span:
for field, value in attributes.model_dump(by_alias=True).items():
if value is not None:
span.set_attribute(field, value)

if operation_name == "EXECUTE_QUERY":
_set_execute_query_attributes(span, args, kwargs)

try:
result = wrapped(*args, **kwargs)

if isinstance(result, tuple) and len(result) == 3:
records, result_summary, keys = result
_set_result_attributes(span, records, result_summary, keys)
else:
res = json.dumps(result)
set_span_attribute(span, "neo4j.result.query_response", res)

span.set_status(StatusCode.OK)
return result
except Exception as err:
span.record_exception(err)
span.set_status(Status(StatusCode.ERROR, str(err)))
raise

return traced_method


@silently_fail
def _set_execute_query_attributes(span, args, kwargs):
query = args[0] if args else kwargs.get("query_", None)
if query:
if hasattr(query, "text"):
set_span_attribute(span, "db.query", query.text)
if hasattr(query, "metadata") and query.metadata:
set_span_attribute(span, "db.query.metadata", json.dumps(query.metadata))
if hasattr(query, "timeout") and query.timeout:
set_span_attribute(span, "db.query.timeout", query.timeout)
else:
set_span_attribute(span, "db.query", query)

parameters = kwargs.get("parameters_", None)
if parameters:
try:
set_span_attribute(span, "db.statement.parameters", json.dumps(parameters))
except (TypeError, ValueError):
pass

database = kwargs.get("database_", None)
if database:
set_span_attribute(span, "neo4j.db.name", database)

routing = kwargs.get("routing_", None)
if routing:
set_span_attribute(span, "neo4j.db.routing", str(routing))


@silently_fail
def _set_result_attributes(span, records, result_summary, keys):
"""
Set attributes related to the query result and summary
"""
if records is not None:
record_count = len(records)
set_span_attribute(span, "neo4j.result.record_count", record_count)
if record_count > 0:
set_span_attribute(span, "neo4j.result.records", json.dumps(records))

if keys is not None:
set_span_attribute(span, "neo4j.result.keys", json.dumps(keys))

if result_summary:
if hasattr(result_summary, "database") and result_summary.database:
set_span_attribute(span, "neo4j.db.name", result_summary.database)

if hasattr(result_summary, "query_type") and result_summary.query_type:
set_span_attribute(span, "neo4j.result.query_type", result_summary.query_type)

if hasattr(result_summary, "parameters") and result_summary.parameters:
try:
set_span_attribute(span, "neo4j.result.parameters", json.dumps(result_summary.parameters))
except (TypeError, ValueError):
pass

if hasattr(result_summary, "result_available_after") and result_summary.result_available_after is not None:
set_span_attribute(span, "neo4j.result.available_after_ms", result_summary.result_available_after)

if hasattr(result_summary, "result_consumed_after") and result_summary.result_consumed_after is not None:
set_span_attribute(span, "neo4j.result.consumed_after_ms", result_summary.result_consumed_after)

if hasattr(result_summary, "counters") and result_summary.counters:
counters = result_summary.counters
if hasattr(counters, "nodes_created") and counters.nodes_created:
set_span_attribute(span, "neo4j.result.nodes_created", counters.nodes_created)

if hasattr(counters, "nodes_deleted") and counters.nodes_deleted:
set_span_attribute(span, "neo4j.result.nodes_deleted", counters.nodes_deleted)

if hasattr(counters, "relationships_created") and counters.relationships_created:
set_span_attribute(span, "neo4j.result.relationships_created", counters.relationships_created)

if hasattr(counters, "relationships_deleted") and counters.relationships_deleted:
set_span_attribute(span, "neo4j.result.relationships_deleted", counters.relationships_deleted)

if hasattr(counters, "properties_set") and counters.properties_set:
set_span_attribute(span, "neo4j.result.properties_set", counters.properties_set)

if hasattr(result_summary, "plan") and result_summary.plan:
try:
set_span_attribute(span, "neo4j.result.plan", json.dumps(result_summary.plan))
except (TypeError, ValueError):
pass

if hasattr(result_summary, "notifications") and result_summary.notifications:
try:
set_span_attribute(span, "neo4j.result.notification_count", len(result_summary.notifications))
set_span_attribute(span, "neo4j.result.notifications", json.dumps(result_summary.notifications))
except (AttributeError, TypeError):
pass
Loading
Loading