-
Notifications
You must be signed in to change notification settings - Fork 234
Description
Bug Description
I want to build an LLMOps pipeline for Cortex Agents (I have just configured an Agent that combines two Cortex Analysts semantic views). However, I'm facing an error when I want to add feedbacks that are evaluating the results using the LLM as a judge approach.
To Reproduce
from trulens.otel.semconv.trace import SpanAttributes
from trulens.core.otel.instrument import instrument
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.apps.app import TruApp
from trulens.core.run import RunConfig
from snowflake.snowpark import Session
from snowflake.core import Root
from pydantic import BaseModel, PrivateAttr
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from typing import Any, Type
import json
import logging
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback, SnowflakeFeedback, Select
from trulens.core.feedback import feedback as core_feedback
session = Session.builder.configs(connection_params).create()
os.environ["TRULENS_OTEL_TRACING"] = "1"
sf_connector = SnowflakeConnector(snowpark_session=session)
tru_session = TruSession(sf_connector)
---- Agent Setup ----
class CortexAgentArgs(BaseModel):
query: str
class CortexAgentTool:
name: str = "CortexAgent"
description: str = ""
args_schema: Type[CortexAgentArgs] = CortexAgentArgs
_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()
def __init__(self, session: Session):
self._session = session
self._root = Root(session)
self._agent_service = self._root.cortex_agent_service
def _build_request(self, query: str) -> AgentRunRequest:
return AgentRunRequest.from_dict({
"model": "claude-sonnet-4-5",
"type": "auto",
"tools": [
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "CM_ANALYST_AGENT"}},
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "DBO_HISTORY_ANALYST_AGENT"}},
],
"tool_resources": {
"CM_ANALYST_AGENT": {
"semantic_view": "PDM_MM_DB.SEMANTICS.CM_SEMANTIC_VIEW",
"execution_environment": {
"type": "warehouse",
"warehouse": "PDM_MM_WH",
"query_timeout": 60
}
},
"DBO_HISTORY_ANALYST_AGENT": {
"semantic_view": "PDM_MM_DB.SEMANTICS.DBO_HISTORY_SEMANTIC_VIEW",
"execution_environment": {
"type": "warehouse",
"warehouse": "PDM_MM_WH",
"query_timeout": 60
}
},
},
"messages": [
{"role": "user", "content": [{"type": "text", "text": query}]}
]
})
def _consume_stream(self, stream):
text, sql, citations = "", "", []
for evt in stream.events():
try:
delta = (evt.data.get("delta") if isinstance(evt.data, dict)
else json.loads(evt.data).get("delta")
or json.loads(evt.data).get("data", {}).get("delta"))
except Exception:
continue
if not isinstance(delta, dict):
continue
logger.debug(f"Delta received: {delta}")
for item in delta.get("content", []):
if item.get("type") == "text":
text += item.get("text", "")
elif item.get("type") == "tool_results":
for result in item["tool_results"].get("content", []):
if result.get("type") != "json":
continue
j = result["json"]
text += j.get("text", "")
sql = j.get("sql", sql)
citations.extend({
"source_id": s.get("source_id"),
"doc_id": s.get("doc_id")
} for s in j.get("searchResults", []))
return text, sql, str(citations)
@instrument(
span_type=SpanAttributes.SpanType.RECORD_ROOT,
attributes={
SpanAttributes.RECORD_ROOT.INPUT: "query",
SpanAttributes.RECORD_ROOT.OUTPUT: "return",
},
)
def run(self, query: str, **kwargs):
"""
This agent will retrieve PDM data from Snowflake using Text2SQL.
"""
req = self._build_request(query)
stream = self._agent_service.run(req)
text, sql, citations = self._consume_stream(stream)
results_str = ""
if sql:
try:
df = self._session.sql(sql.rstrip(";")).to_pandas()
results_str = df.to_string(index=False)
except Exception as e:
results_str = f"SQL execution error: {e}"
return text, citations, sql, results_str
cortex_agent_tool = CortexAgentTool(session=session)
provider = Cortex(session, "mistral-large2")
SQL Relevance - How relevant is the generated SQL to the users prompt?
sql_gen_criteria = """Provided is an interpretation of a user's query as input 1 and an LLM generated SQL query
designed to answer the users query as input 2. Grade how relevant the SQL code
appears to be to the user's question."""
sql_relevance = (
Feedback(provider.relevance_with_cot_reasons,
name = "SQL relevance")
.on_input_output())
feedback_list = [sql_relevance]
tru_app = TruApp(
app=cortex_agent_tool,
app_name="cortex_analyst",
app_version="analyst_only",
main_method=cortex_agent_tool.run,
feedbacks=feedback_list,
)
Expected Behavior
A clear and concise description of what you expected to happen.
Relevant Logs/Tracebacks
ValueError Traceback (most recent call last)
Cell In[26], line 1
----> 1 tru_app = TruApp(
2 app=cortex_agent_tool,
3 app_name="cortex_analyst",
4 app_version="analyst_only",
5 main_method=cortex_agent_tool.run,
6 feedbacks=feedback_list,
7 )
File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/apps/app.py:387, in TruApp.init(self, app, main_method, methods_to_instrument, **kwargs)
384 kwargs["instrument"] = instrument
386 # This does instrumentation:
--> 387 super().init(**kwargs)
389 methods_to_instrument = methods_to_instrument or dict()
391 # The rest of this code instruments methods explicitly passed to
392 # constructor as needing instrumentation and checks that methods
393 # decorated with @Instrument or passed explicitly belong to some
394 # component as per serialized version of this app. If they are not,
395 # placeholders are made in app_extra_json so that subsequent
396 # serialization looks like the components exist.
File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/core/app.py:615, in App.init(self, connector, feedbacks, **kwargs)
612 if otel_enabled and main_method is not None:
613 self._wrap_main_function(app, main_method.name)
--> 615 self._tru_post_init()
File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/core/app.py:887, in App._tru_post_init(self)
885 else:
886 if len(self.feedbacks) > 0:
--> 887 raise ValueError(
888 "Feedback logging requires App.connector to be specified."
889 )
891 for f in self.feedbacks:
892 if (
893 self.feedback_mode == feedback_schema.FeedbackMode.DEFERRED
894 or f.run_location
895 == feedback_schema.FeedbackRunLocation.SNOWFLAKE
896 ):
ValueError: Feedback logging requires App.connector to be specified.
Environment:
- OS: MacOS M3
- Python 3.12.11
- TruLens 2.5.1
- Versions of other relevant installed libraries
Additional Context
Add any other context about the problem here.