Skip to content

[BUG] Feedback failing to be added in TruApp (LLM as a judge) #2301

@jorisrombouts

Description

@jorisrombouts

Bug Description
I want to build an LLMOps pipeline for Cortex Agents (I have just configured an Agent that combines two Cortex Analysts semantic views). However, I'm facing an error when I want to add feedbacks that are evaluating the results using the LLM as a judge approach.

To Reproduce
from trulens.otel.semconv.trace import SpanAttributes
from trulens.core.otel.instrument import instrument
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.apps.app import TruApp
from trulens.core.run import RunConfig

from snowflake.snowpark import Session
from snowflake.core import Root
from pydantic import BaseModel, PrivateAttr
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from typing import Any, Type
import json
import logging

from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback, SnowflakeFeedback, Select
from trulens.core.feedback import feedback as core_feedback

session = Session.builder.configs(connection_params).create()
os.environ["TRULENS_OTEL_TRACING"] = "1"
sf_connector = SnowflakeConnector(snowpark_session=session)
tru_session = TruSession(sf_connector)

---- Agent Setup ----

class CortexAgentArgs(BaseModel):
query: str

class CortexAgentTool:
name: str = "CortexAgent"
description: str = ""
args_schema: Type[CortexAgentArgs] = CortexAgentArgs

_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()

def __init__(self, session: Session):
    self._session = session
    self._root = Root(session)
    self._agent_service = self._root.cortex_agent_service

def _build_request(self, query: str) -> AgentRunRequest:
    return AgentRunRequest.from_dict({
        "model": "claude-sonnet-4-5",
        "type": "auto",
        "tools": [
            {"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "CM_ANALYST_AGENT"}},
            {"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "DBO_HISTORY_ANALYST_AGENT"}},
        ],
        "tool_resources": {
            "CM_ANALYST_AGENT": {
                "semantic_view": "PDM_MM_DB.SEMANTICS.CM_SEMANTIC_VIEW",
                "execution_environment": {
                        "type": "warehouse",
                        "warehouse": "PDM_MM_WH",
                        "query_timeout": 60
                    }
                },
            "DBO_HISTORY_ANALYST_AGENT": {
                "semantic_view": "PDM_MM_DB.SEMANTICS.DBO_HISTORY_SEMANTIC_VIEW",
                "execution_environment": {
                        "type": "warehouse",
                        "warehouse": "PDM_MM_WH",
                        "query_timeout": 60
                    }
                },
            
        },
        "messages": [
            {"role": "user", "content": [{"type": "text", "text": query}]}
        ]
    })

def _consume_stream(self, stream):
    text, sql, citations = "", "", []
    for evt in stream.events():
        try:
            delta = (evt.data.get("delta") if isinstance(evt.data, dict)
                     else json.loads(evt.data).get("delta")
                     or json.loads(evt.data).get("data", {}).get("delta"))
        except Exception:
            continue

        if not isinstance(delta, dict):
            continue

        logger.debug(f"Delta received: {delta}")

        for item in delta.get("content", []):
            if item.get("type") == "text":
                text += item.get("text", "")
            elif item.get("type") == "tool_results":
                for result in item["tool_results"].get("content", []):
                    if result.get("type") != "json":
                        continue
                    j = result["json"]
                    text += j.get("text", "")
                    sql = j.get("sql", sql)
                    citations.extend({
                        "source_id": s.get("source_id"),
                        "doc_id": s.get("doc_id")
                    } for s in j.get("searchResults", []))
    return text, sql, str(citations)

@instrument(
    span_type=SpanAttributes.SpanType.RECORD_ROOT,
    attributes={
        SpanAttributes.RECORD_ROOT.INPUT: "query",
        SpanAttributes.RECORD_ROOT.OUTPUT: "return",
    },
)
def run(self, query: str, **kwargs):
    """
    This agent will retrieve PDM data from Snowflake using Text2SQL.
    """
    req = self._build_request(query)
    stream = self._agent_service.run(req)
    text, sql, citations = self._consume_stream(stream)

    results_str = ""
    if sql:
        try:
            df = self._session.sql(sql.rstrip(";")).to_pandas()
            results_str = df.to_string(index=False)
        except Exception as e:
            results_str = f"SQL execution error: {e}"

    return text, citations, sql, results_str

cortex_agent_tool = CortexAgentTool(session=session)
provider = Cortex(session, "mistral-large2")

SQL Relevance - How relevant is the generated SQL to the users prompt?

sql_gen_criteria = """Provided is an interpretation of a user's query as input 1 and an LLM generated SQL query
designed to answer the users query as input 2. Grade how relevant the SQL code
appears to be to the user's question."""

sql_relevance = (
Feedback(provider.relevance_with_cot_reasons,
name = "SQL relevance")
.on_input_output())

feedback_list = [sql_relevance]
tru_app = TruApp(
app=cortex_agent_tool,
app_name="cortex_analyst",
app_version="analyst_only",
main_method=cortex_agent_tool.run,
feedbacks=feedback_list,
)

Expected Behavior
A clear and concise description of what you expected to happen.

Relevant Logs/Tracebacks

ValueError Traceback (most recent call last)
Cell In[26], line 1
----> 1 tru_app = TruApp(
2 app=cortex_agent_tool,
3 app_name="cortex_analyst",
4 app_version="analyst_only",
5 main_method=cortex_agent_tool.run,
6 feedbacks=feedback_list,
7 )

File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/apps/app.py:387, in TruApp.init(self, app, main_method, methods_to_instrument, **kwargs)
384 kwargs["instrument"] = instrument
386 # This does instrumentation:
--> 387 super().init(**kwargs)
389 methods_to_instrument = methods_to_instrument or dict()
391 # The rest of this code instruments methods explicitly passed to
392 # constructor as needing instrumentation and checks that methods
393 # decorated with @Instrument or passed explicitly belong to some
394 # component as per serialized version of this app. If they are not,
395 # placeholders are made in app_extra_json so that subsequent
396 # serialization looks like the components exist.

File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/core/app.py:615, in App.init(self, connector, feedbacks, **kwargs)
612 if otel_enabled and main_method is not None:
613 self._wrap_main_function(app, main_method.name)
--> 615 self._tru_post_init()

File ~/Projects/SFL/MM/cortex-agents-llmops/.venv/lib/python3.12/site-packages/trulens/core/app.py:887, in App._tru_post_init(self)
885 else:
886 if len(self.feedbacks) > 0:
--> 887 raise ValueError(
888 "Feedback logging requires App.connector to be specified."
889 )
891 for f in self.feedbacks:
892 if (
893 self.feedback_mode == feedback_schema.FeedbackMode.DEFERRED
894 or f.run_location
895 == feedback_schema.FeedbackRunLocation.SNOWFLAKE
896 ):

ValueError: Feedback logging requires App.connector to be specified.

Environment:

  • OS: MacOS M3
  • Python 3.12.11
  • TruLens 2.5.1
  • Versions of other relevant installed libraries

Additional Context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions