From 77aeffec7326d71c3e89ec3d1594b7c69626cbc1 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Dec 2025 22:48:19 +0000 Subject: [PATCH 1/3] Add Communication Clarity Framework for self-debugging code This implements hypothesis-driven communication semantics where code refuses to execute on unverified assumptions. The framework extends EigenScript's geometric predicates with clarity predicates that check intent verification, not just computational state. New features: - MIGHT IS operator for tentative (hypothesis) bindings - CLARIFY OF operator to verify intent - ASSUMES interrogative to query hidden variables - Clarity predicates: clarified, ambiguous, explicit, assumed - CLARITY/CS numeric score (0.0-1.0) Core principles implemented: 1. Questions and statements both assign objectives 2. Implication is not universal (hidden variables cause failure) 3. Clarification is hypothesis testing 4. Withhold inference intentionally 5. Control remains with the speaker 6. Speakers discover their own intent Includes 44 passing tests and updated documentation. --- docs/api/predicates.md | 353 +++++++++++- src/eigenscript/evaluator/interpreter.py | 140 +++++ src/eigenscript/lexer/tokenizer.py | 6 + src/eigenscript/parser/ast_builder.py | 76 ++- src/eigenscript/runtime/__init__.py | 21 +- src/eigenscript/runtime/clarity.py | 690 +++++++++++++++++++++++ tests/test_clarity.py | 588 +++++++++++++++++++ 7 files changed, 1869 insertions(+), 5 deletions(-) create mode 100644 src/eigenscript/runtime/clarity.py create mode 100644 tests/test_clarity.py diff --git a/docs/api/predicates.md b/docs/api/predicates.md index 45d449e..44db824 100644 --- a/docs/api/predicates.md +++ b/docs/api/predicates.md @@ -428,6 +428,341 @@ loop while improving: --- +--- + +# Communication Clarity Predicates + +The Communication Clarity Framework extends EigenScript with predicates that check the *clarity of intent*, not just computational state. These predicates implement hypothesis-driven communication semantics where code debugs itself by refusing to execute on unverified assumptions. + +--- + +## clarified + +Check if all bindings have verified intent. + +**Syntax:** +```eigenscript +if clarified: + # All intents verified, safe to proceed +``` + +**Returns:** `true` if clarity score meets threshold, `false` otherwise + +**Example:** +```eigenscript +config might is "production" # Tentative: hypothesis +database might is "postgres" # Tentative: hypothesis + +# Won't proceed until intent is clarified +if not clarified: + print of "Please clarify intent before proceeding" + clarify of config + clarify of database + +if clarified: + # Safe to execute critical operations + initialize_system of config +``` + +**Semantics:** Measures the ratio of clarified bindings to total bindings. Returns true when clarity score >= threshold (default 0.95). + +--- + +## ambiguous + +Check if there are any unresolved bindings. + +**Syntax:** +```eigenscript +if ambiguous: + # Some bindings need clarification +``` + +**Returns:** `true` if any bindings are tentative/unresolved, `false` otherwise + +**Example:** +```eigenscript +user_input might is value + +if ambiguous: + print of "Warning: ambiguous state detected" + # Force disambiguation + clarify of user_input +``` + +**Semantics:** Returns true if any bindings have not been clarified. + +--- + +## explicit + +Check if all intents are fully explicit (opposite of ambiguous). + +**Syntax:** +```eigenscript +if explicit: + # All intents are explicit +``` + +**Returns:** `true` if clarity score is 100%, `false` otherwise + +**Example:** +```eigenscript +x is 10 # Clarified (normal assignment) +y is 20 # Clarified (normal assignment) + +if explicit: + print of "All intents are explicit" +``` + +**Semantics:** Returns true only when clarity score is exactly 1.0. + +--- + +## clarity / cs + +Get the current clarity score as a numeric value. + +**Syntax:** +```eigenscript +score is clarity +# or +score is cs +``` + +**Returns:** Float between 0.0 (fully ambiguous) and 1.0 (fully clarified) + +**Example:** +```eigenscript +a might is 1 # Tentative +b is 2 # Clarified +c might is 3 # Tentative + +score is clarity +print of score # Prints 0.333... (1/3 clarified) + +clarify of a +clarify of c +print of clarity # Prints 1.0 (all clarified) +``` + +**Semantics:** Computes ratio of clarified bindings to total bindings. + +--- + +## assumed + +Check if there are any unverified assumptions (hidden variables). + +**Syntax:** +```eigenscript +if assumed: + # Hidden variables detected +``` + +**Returns:** `true` if any assumptions exist, `false` otherwise + +**Example:** +```eigenscript +# The clarity tracker can detect implicit assumptions +if assumed: + print of "Warning: hidden variables detected" +``` + +**Semantics:** Returns true if any assumptions have been registered. + +--- + +# Clarity Operators + +## might is + +Create a tentative (hypothesis) binding. + +**Syntax:** +```eigenscript +identifier might is expression +``` + +**Example:** +```eigenscript +mode might is "strict" # Hypothesis: we want strict mode +count might is 0 # Hypothesis: count starts at zero + +# The bindings exist but are marked as tentative +print of mode # Prints "strict" +print of clarified # Prints 0.0 (not clarified) +``` + +**Semantics:** Creates a binding marked as TENTATIVE. The `clarified` predicate returns false until all tentative bindings are clarified. + +--- + +## clarify of + +Clarify a tentative binding, changing it from TENTATIVE to CLARIFIED. + +**Syntax:** +```eigenscript +clarify of identifier +``` + +**Returns:** `1.0` on success, `0.0` on failure + +**Example:** +```eigenscript +mode might is "strict" +print of clarified # Prints 0.0 + +clarify of mode +print of clarified # Prints 1.0 +``` + +**Semantics:** Marks the binding as clarified in the clarity tracker. + +--- + +## assumes is + +Query hidden variables/assumptions for a binding. + +**Syntax:** +```eigenscript +assumptions is assumes is identifier +``` + +**Returns:** String listing assumptions, or empty string if none + +**Example:** +```eigenscript +x is 10 +assumptions is assumes is x +print of assumptions # Prints "" (no assumptions) +``` + +**Semantics:** Returns any assumptions detected for the binding. + +--- + +# Complete Examples + +### Self-Debugging Configuration + +```eigenscript +# Configuration with hypothesis-driven clarity +db_host might is "localhost" +db_port might is 5432 +db_name might is "production" + +# The program refuses to proceed until intent is clear +if not clarified: + print of "Configuration requires clarification:" + print of clarity + +# Explicit verification of each hypothesis +clarify of db_host +clarify of db_port +clarify of db_name + +if clarified: + print of "Configuration verified. Proceeding..." + connect_database of db_host +``` + +### Clarity-Driven Loop + +```eigenscript +define safe_process as: + config might is input + + # Loop until all ambiguity resolved + loop while not clarified: + print of "Please confirm configuration" + clarify of config + + # Now safe to proceed + return execute of config + +result is safe_process of user_config +``` + +### Combining Clarity and Convergence + +```eigenscript +# Hybrid predicates: both semantic AND communicative clarity +define robust_algorithm as: + params might is initial + x is start + + # Verify intent before heavy computation + if not clarified: + clarify of params + + loop while not converged: + if diverging: + print of "Warning: diverging" + break + x is iterate of x + + return x + +result is robust_algorithm of problem +``` + +### Agency-Preserving Design + +```eigenscript +# The framework presents options without deciding for the user +mode might is unknown + +# Instead of guessing, the program asks for clarification +if ambiguous: + print of "Possible modes:" + print of "1. strict - Full validation" + print of "2. relaxed - Permissive parsing" + print of "3. auto - Detect from input" + print of "Which mode is intended?" + + # User must clarify before proceeding + mode is input of "Enter mode: " + clarify of mode + +if clarified: + process_with_mode of mode +``` + +--- + +## Geometric Interpretation + +Clarity predicates measure communication properties: + +| Predicate | Communication Property | +|-----------|----------------------| +| `clarified` | All intents verified | +| `ambiguous` | Hidden variables exist | +| `explicit` | 100% clarity score | +| `clarity`/`cs` | Ratio of clarified bindings | +| `assumed` | Unverified assumptions present | + +The `might is` operator creates a hypothesis binding (lightlike). +The `clarify of` operator transforms it to clarified (timelike). + +--- + +## The Framework Principles + +The clarity predicates implement these principles: + +1. **Questions and statements both assign objectives** - Creating a binding assigns intent-verification work +2. **Implication is not universal** - `might is` makes hidden variables visible +3. **Clarification is hypothesis testing** - Bindings start as hypotheses, become facts +4. **Withhold inference intentionally** - Don't collapse ambiguity silently +5. **Control remains with the speaker** - Present options, don't decide for user +6. **Speakers discover their own intent** - `might is` forces intent externalization + +--- + ## Summary Semantic predicates: @@ -439,9 +774,23 @@ Semantic predicates: - `oscillating` - Periodic behavior - `equilibrium` - At balance point -**Total: 6 predicates** +Communication clarity predicates: + +- `clarified` - All intents verified +- `ambiguous` - Unresolved bindings exist +- `explicit` - 100% clarity +- `clarity`/`cs` - Clarity score (0.0-1.0) +- `assumed` - Hidden variables detected + +Clarity operators: + +- `might is` - Tentative (hypothesis) binding +- `clarify of` - Verify intent +- `assumes is` - Query hidden variables + +**Total: 11 predicates + 3 operators** -These enable automatic convergence detection and adaptive algorithms. +These enable self-debugging code that refuses to execute on unverified assumptions. --- diff --git a/src/eigenscript/evaluator/interpreter.py b/src/eigenscript/evaluator/interpreter.py index 1c0d820..68fea08 100644 --- a/src/eigenscript/evaluator/interpreter.py +++ b/src/eigenscript/evaluator/interpreter.py @@ -13,6 +13,7 @@ ASTNode, Program, Assignment, + TentativeAssignment, Relation, BinaryOp, UnaryOp, @@ -37,6 +38,13 @@ from eigenscript.semantic.metric import MetricTensor from eigenscript.runtime.framework_strength import FrameworkStrengthTracker from eigenscript.runtime.explain import PredicateExplainer +from eigenscript.runtime.clarity import ( + ClarityTracker, + ClarityExplainer, + ClarityType, + Assumption, + detect_assumptions, +) from eigenscript.builtins import BuiltinFunction, get_builtins # Type alias for values that can flow through the interpreter @@ -251,6 +259,7 @@ def __init__( convergence_threshold: float = 0.95, enable_convergence_detection: bool = True, explain_mode: bool = False, + clarity_threshold: float = 0.95, ): """ Initialize the interpreter. @@ -262,6 +271,7 @@ def __init__( convergence_threshold: FS threshold for eigenstate detection (default: 0.95) enable_convergence_detection: Enable automatic convergence detection (default: True) explain_mode: Enable explain mode for predicate evaluations (default: False) + clarity_threshold: Clarity score threshold for the `clarified` predicate (default: 0.95) """ # Geometric components self.space = LRVMSpace(dimension=dimension) @@ -288,6 +298,11 @@ def __init__( # Explain mode for predicate evaluations self.explainer = PredicateExplainer(enabled=explain_mode) + # Communication Clarity Framework + self.clarity_tracker = ClarityTracker() + self.clarity_explainer = ClarityExplainer(enabled=explain_mode) + self.clarity_threshold = clarity_threshold + # Special lightlike OF vector self._of_vector = self._create_of_vector() @@ -403,6 +418,8 @@ def evaluate(self, node: ASTNode) -> Union[LRVMVector, EigenList]: return self._eval_program(node) elif isinstance(node, Assignment): return self._eval_assignment(node) + elif isinstance(node, TentativeAssignment): + return self._eval_tentative_assignment(node) elif isinstance(node, Relation): return self._eval_relation(node) elif isinstance(node, BinaryOp): @@ -468,6 +485,39 @@ def _eval_assignment(self, node: Assignment) -> LRVMVector: # Bind in environment (handles both vectors and lists) self.environment.bind(node.identifier, value) + # Register as clarified binding in clarity tracker + self.clarity_tracker.register_binding(node.identifier, tentative=False) + + # Return the value as-is (may be EigenList or LRVMVector) + return value + + def _eval_tentative_assignment(self, node: TentativeAssignment) -> LRVMVector: + """ + Evaluate MIGHT IS operator: x might is y + + Semantic: Tentative binding (hypothesis) in LRVM space. + + From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized before execution + - Prevents reinforcing the false belief that implication was sufficient + + The binding is created but marked as TENTATIVE in the clarity tracker. + The `clarified` predicate will return False until clarify() is called. + """ + # Evaluate right-hand side + value = self.evaluate(node.expression) + + # Bind in environment (handles both vectors and lists) + self.environment.bind(node.identifier, value) + + # Register as tentative binding in clarity tracker + self.clarity_tracker.register_binding( + node.identifier, + tentative=True, + hypothesis=node.hypothesis, + ) + # Return the value as-is (may be EigenList or LRVMVector) return value @@ -477,6 +527,17 @@ def _eval_relation(self, node: Relation) -> LRVMVector: Semantic: Metric contraction x^T g y OR function application """ + # Special handling for clarity framework: clarify of x + # This clarifies a tentative binding, changing it from TENTATIVE to CLARIFIED + if isinstance(node.left, Identifier) and node.left.name.lower() == "clarify": + if isinstance(node.right, Identifier): + binding_name = node.right.name + success = self.clarity_tracker.clarify(binding_name) + return self.space.embed_scalar(1.0 if success else 0.0) + else: + # Can only clarify identifiers + raise RuntimeError("clarify requires an identifier (clarify of x)") + # Special handling for function calls: # If left side is an identifier, check if it's a function before evaluating if isinstance(node.left, Identifier): @@ -1392,6 +1453,52 @@ def _eval_identifier( result = 0.0 return self.space.embed_scalar(result) + # Communication Clarity Framework predicates + elif name_lower == "clarified": + # CLARIFIED: Check if all bindings have verified intent + # From the framework: "clarity is required before execution" + clarity_score = self.clarity_tracker.compute_clarity_score() + result = 1.0 if clarity_score >= self.clarity_threshold else 0.0 + unresolved = self.clarity_tracker.get_unresolved_bindings() + self.clarity_explainer.explain_clarified( + result=result > 0.5, + clarity_score=clarity_score, + threshold=self.clarity_threshold, + unresolved=unresolved, + ) + return self.space.embed_scalar(result) + + elif name_lower == "ambiguous": + # AMBIGUOUS: Check if there are any ambiguous bindings + # From the framework: "implication is not universal" + unresolved = self.clarity_tracker.get_unresolved_bindings() + result = 1.0 if len(unresolved) > 0 else 0.0 + return self.space.embed_scalar(result) + + elif name_lower == "assumed": + # ASSUMED: Check if there are any unverified assumptions + # From the framework: "hidden variables cause failure" + assumptions = self.clarity_tracker.get_assumptions() + result = 1.0 if len(assumptions) > 0 else 0.0 + self.clarity_explainer.explain_assumed( + result=result > 0.5, + assumptions=assumptions, + ) + return self.space.embed_scalar(result) + + elif name_lower == "explicit": + # EXPLICIT: Opposite of ambiguous - all intents are clarified + # From the framework: "making hidden variables visible" + clarity_score = self.clarity_tracker.compute_clarity_score() + result = 1.0 if clarity_score >= 1.0 else 0.0 + return self.space.embed_scalar(result) + + elif name_lower == "clarity" or name_lower == "cs": + # CLARITY / CS: Return current clarity score as numeric value + # Analogous to framework_strength / fs + clarity_score = self.clarity_tracker.compute_clarity_score() + return self.space.embed_scalar(clarity_score) + # Regular variable lookup return self.environment.lookup(node.name) @@ -1579,6 +1686,39 @@ def _eval_interrogative(self, node: Interrogative) -> LRVMVector: # Not enough history return self.space.embed_string("unknown") + elif interrogative == "assumes": + # ASSUMES: Query hidden variables/assumptions for a binding + # From the Communication Clarity Framework: + # "Hidden variables cause failure, not misunderstanding" + # + # Returns a list of assumptions that have been detected for the binding, + # or an empty list if the binding is fully clarified. + if isinstance(node.expression, Identifier): + binding_name = node.expression.name + assumptions = self.clarity_tracker.get_assumptions(binding_name) + + if assumptions: + # Return a list of assumption descriptions + assumption_strs = [ + f"{a.name}: {a.context}" for a in assumptions + ] + # Join as a multi-line string for readable output + result_str = "\n".join(assumption_strs) + return self.space.embed_string(result_str) + else: + # No assumptions - binding is clear + return self.space.embed_string("") + else: + # Query global assumptions if not a specific binding + global_assumptions = self.clarity_tracker.get_assumptions() + if global_assumptions: + assumption_strs = [ + f"{a.name}: {a.context}" for a in global_assumptions + ] + return self.space.embed_string("\n".join(assumption_strs)) + else: + return self.space.embed_string("") + else: raise RuntimeError(f"Unknown interrogative: {interrogative}") diff --git a/src/eigenscript/lexer/tokenizer.py b/src/eigenscript/lexer/tokenizer.py index 95881e3..aff24bc 100644 --- a/src/eigenscript/lexer/tokenizer.py +++ b/src/eigenscript/lexer/tokenizer.py @@ -48,6 +48,9 @@ class TokenType(Enum): # Interrogative aliases STATUS = "STATUS" TREND = "TREND" + # Clarity operators (communication clarity framework) + MIGHT = "MIGHT" # Tentative binding: x might is 5 + ASSUMES = "ASSUMES" # Assumption query: assumes is x # Literals NUMBER = "NUMBER" @@ -159,6 +162,9 @@ class Tokenizer: # Interrogative aliases "status": TokenType.STATUS, "trend": TokenType.TREND, + # Clarity operators + "might": TokenType.MIGHT, + "assumes": TokenType.ASSUMES, } def __init__(self, source: str): diff --git a/src/eigenscript/parser/ast_builder.py b/src/eigenscript/parser/ast_builder.py index de6dd8e..8ce9bae 100644 --- a/src/eigenscript/parser/ast_builder.py +++ b/src/eigenscript/parser/ast_builder.py @@ -214,6 +214,35 @@ def __repr__(self) -> str: return f"Assignment({self.identifier!r}, {self.expression})" +@dataclass +class TentativeAssignment(ASTNode): + """ + Represents the MIGHT IS operator (tentative/hypothesis binding). + + Semantic: x might is y → v_x ← v_y with clarity_type=TENTATIVE + + This creates a binding that is explicitly marked as a hypothesis, + not yet verified. The program can check for tentative bindings and + refuse to execute certain operations until intent is clarified. + + From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized before execution + - Prevents reinforcing the false belief that implication was sufficient + + Example: + count might is 0 # Hypothesis: count starts at zero + mode might is "strict" # Hypothesis: we want strict mode + """ + + identifier: str + expression: ASTNode + hypothesis: Optional[str] = None # Optional description of the hypothesis + + def __repr__(self) -> str: + return f"TentativeAssignment({self.identifier!r}, {self.expression}, hypothesis={self.hypothesis!r})" + + @dataclass class IndexedAssignment(ASTNode): """ @@ -354,14 +383,16 @@ class Interrogative(ASTNode): - WHERE: Spatial position in semantic space - WHY: Causal direction (gradient) - HOW: Transformation/process + - ASSUMES: Hidden variables/assumptions (clarity framework) Example: what is x why is change how is convergence + assumes is x """ - interrogative: str # "who", "what", "when", "where", "why", "how" + interrogative: str # "who", "what", "when", "where", "why", "how", "assumes" expression: ASTNode def __repr__(self) -> str: @@ -676,12 +707,17 @@ def parse_statement(self) -> Optional[ASTNode]: return Continue() # Assignment (identifier IS expression) or IndexedAssignment (identifier[idx] IS expression) + # or TentativeAssignment (identifier MIGHT IS expression) if token.type == TokenType.IDENTIFIER: # Look ahead for IS token (simple assignment) next_token = self.peek_token() if next_token and next_token.type == TokenType.IS: return self.parse_assignment() + # Look ahead for MIGHT IS (tentative assignment) + if next_token and next_token.type == TokenType.MIGHT: + return self.parse_tentative_assignment() + # Look ahead for LBRACKET (potential indexed assignment) if next_token and next_token.type == TokenType.LBRACKET: return self.parse_potential_indexed_assignment() @@ -721,6 +757,37 @@ def parse_assignment(self) -> Assignment: return Assignment(identifier, expression) + def parse_tentative_assignment(self) -> "TentativeAssignment": + """ + Parse a tentative assignment statement. + + Grammar: identifier MIGHT IS expression + + This creates a hypothesis binding that must be clarified before + certain operations can proceed. From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized + - Prevents silent execution on unverified assumptions + """ + # Get identifier + id_token = self.expect(TokenType.IDENTIFIER) + identifier = id_token.value + + # Expect MIGHT token + self.expect(TokenType.MIGHT) + + # Expect IS token + self.expect(TokenType.IS) + + # Parse expression + expression = self.parse_expression() + + # Consume optional newline + if self.current_token() and self.current_token().type == TokenType.NEWLINE: + self.advance() + + return TentativeAssignment(identifier, expression) + def parse_potential_indexed_assignment(self) -> ASTNode: """ Parse potential indexed assignment (identifier[idx] IS expression). @@ -1116,7 +1183,7 @@ def parse_interrogative(self) -> Interrogative: """ Parse an interrogative operator. - Grammar: (WHO | WHAT | WHEN | WHERE | WHY | HOW | WAS | CHANGE | STATUS | TREND) (IS)? primary + Grammar: (WHO | WHAT | WHEN | WHERE | WHY | HOW | WAS | CHANGE | STATUS | TREND | ASSUMES) (IS)? primary Example: what x @@ -1126,6 +1193,7 @@ def parse_interrogative(self) -> Interrogative: change is x (delta/difference) status is x (alias for how) trend is x (trajectory analysis) + assumes is x (hidden variables - clarity framework) """ # Get interrogative type token = self.current_token() @@ -1142,6 +1210,8 @@ def parse_interrogative(self) -> Interrogative: # Interrogative aliases TokenType.STATUS: "status", TokenType.TREND: "trend", + # Clarity operators + TokenType.ASSUMES: "assumes", } interrogative = interrogative_map[token.type] self.advance() @@ -1512,6 +1582,8 @@ def parse_primary(self) -> ASTNode: # Interrogative aliases TokenType.STATUS, TokenType.TREND, + # Clarity operators + TokenType.ASSUMES, ): return self.parse_interrogative() diff --git a/src/eigenscript/runtime/__init__.py b/src/eigenscript/runtime/__init__.py index 1ddaca7..47c2a85 100644 --- a/src/eigenscript/runtime/__init__.py +++ b/src/eigenscript/runtime/__init__.py @@ -4,9 +4,28 @@ This module handles runtime state management including: - Framework Strength measurement - Convergence detection +- Communication clarity tracking - Built-in functions """ from eigenscript.runtime.framework_strength import FrameworkStrengthTracker +from eigenscript.runtime.clarity import ( + ClarityType, + ClarityState, + ClarityTracker, + ClarityExplainer, + AmbiguityResolver, + Assumption, + detect_assumptions, +) -__all__ = ["FrameworkStrengthTracker"] +__all__ = [ + "FrameworkStrengthTracker", + "ClarityType", + "ClarityState", + "ClarityTracker", + "ClarityExplainer", + "AmbiguityResolver", + "Assumption", + "detect_assumptions", +] diff --git a/src/eigenscript/runtime/clarity.py b/src/eigenscript/runtime/clarity.py new file mode 100644 index 0000000..92132e9 --- /dev/null +++ b/src/eigenscript/runtime/clarity.py @@ -0,0 +1,690 @@ +""" +Communication Clarity Framework for EigenScript. + +This module implements hypothesis-driven communication semantics where: +- Clarity is required before execution (not optional) +- Hidden variables are made explicit +- Intent is treated as falsifiable hypothesis +- Agency remains with the speaker (programmer) + +The framework applies the same rigor to code that EigenScript applies +to computation: programs must pass clarity checks before execution, +just as they must pass convergence checks. + +Key Principles: + 1. Questions and statements both assign objectives (create work) + 2. Implication is not universal (hidden variables cause failure) + 3. Clarification is hypothesis testing (not social correction) + 4. Withhold inference intentionally (don't collapse ambiguity silently) + 5. Control remains with the speaker (present options, don't decide) + 6. Speakers discover their own intent (force internal alignment) + +Clarity Types (parallel to geometric signatures): + - Ambiguous: Multiple valid interpretations exist (||c||² < 0, spacelike) + - Tentative: Hypothesis stated but unverified (||c||² = 0, lightlike) + - Clarified: Intent verified, ready for execution (||c||² > 0, timelike) +""" + +import sys +import numpy as np +from typing import List, Dict, Optional, Set, Tuple, Any +from dataclasses import dataclass, field +from enum import Enum + + +class ClarityType(Enum): + """ + Classification of intent clarity (parallel to geometric signatures). + + Maps to the communication framework: + - AMBIGUOUS: Hidden variables present, multiple interpretations possible + - TENTATIVE: Hypothesis stated but not yet confirmed + - CLARIFIED: Intent explicit and verified, safe to execute + """ + + AMBIGUOUS = "ambiguous" # Spacelike: exploring, unstable + TENTATIVE = "tentative" # Lightlike: at boundary + CLARIFIED = "clarified" # Timelike: converged, stable + + +@dataclass +class Assumption: + """ + Represents an implicit assumption in code. + + Assumptions are hidden variables that haven't been made explicit. + The clarity framework detects and surfaces these for verification. + """ + + name: str # What is being assumed + source: str # Where the assumption originates (variable, operation, etc.) + context: str # Additional context about why this is an assumption + line: Optional[int] = None # Source line if available + + def __repr__(self) -> str: + loc = f" (line {self.line})" if self.line else "" + return f"Assumption({self.name!r} from {self.source!r}{loc})" + + +@dataclass +class ClarityState: + """ + Tracks the clarity state of a value or binding. + + Each binding in EigenScript can have an associated clarity state + that tracks whether its intent has been verified. + """ + + clarity_type: ClarityType = ClarityType.AMBIGUOUS + assumptions: List[Assumption] = field(default_factory=list) + verified_at: Optional[int] = None # Iteration when clarified + hypothesis: Optional[str] = None # The stated hypothesis if tentative + + def is_clear(self) -> bool: + """Check if this state is clarified (safe to execute).""" + return self.clarity_type == ClarityType.CLARIFIED + + def is_tentative(self) -> bool: + """Check if this state is a hypothesis awaiting verification.""" + return self.clarity_type == ClarityType.TENTATIVE + + def is_ambiguous(self) -> bool: + """Check if this state has unresolved ambiguity.""" + return self.clarity_type == ClarityType.AMBIGUOUS + + def has_assumptions(self) -> bool: + """Check if there are unverified assumptions.""" + return len(self.assumptions) > 0 + + +class ClarityTracker: + """ + Tracks communication clarity throughout program execution. + + Analogous to FrameworkStrengthTracker, but measures semantic + clarity rather than computational convergence. + + The tracker monitors: + - Hidden variables (assumptions not made explicit) + - Ambiguous bindings (values with multiple interpretations) + - Tentative hypotheses (stated but unverified intent) + - Clarified state (verified, ready for execution) + + Example: + >>> tracker = ClarityTracker() + >>> tracker.register_binding("x", tentative=True, hypothesis="x is a counter") + >>> tracker.get_clarity("x") + ClarityState(clarity_type=TENTATIVE, ...) + >>> tracker.clarify("x") + >>> tracker.is_clarified("x") + True + """ + + def __init__(self): + """Initialize the clarity tracker.""" + self.bindings: Dict[str, ClarityState] = {} + self.global_assumptions: List[Assumption] = [] + self.clarification_history: List[Tuple[str, int]] = [] + self.iteration: int = 0 + + def register_binding( + self, + name: str, + tentative: bool = False, + hypothesis: Optional[str] = None, + assumptions: Optional[List[Assumption]] = None, + ) -> ClarityState: + """ + Register a new binding with its clarity state. + + Args: + name: Variable name + tentative: Whether this is a tentative (might is) binding + hypothesis: The stated hypothesis for tentative bindings + assumptions: Any detected assumptions + + Returns: + The created ClarityState + """ + if tentative: + clarity_type = ClarityType.TENTATIVE + elif assumptions: + clarity_type = ClarityType.AMBIGUOUS + else: + clarity_type = ClarityType.CLARIFIED + + state = ClarityState( + clarity_type=clarity_type, + assumptions=assumptions or [], + hypothesis=hypothesis, + verified_at=self.iteration if clarity_type == ClarityType.CLARIFIED else None, + ) + + self.bindings[name] = state + return state + + def register_assumption( + self, + name: str, + source: str, + context: str, + line: Optional[int] = None, + binding: Optional[str] = None, + ) -> Assumption: + """ + Register a detected assumption (hidden variable). + + Args: + name: What is being assumed + source: Where the assumption originates + context: Why this is considered an assumption + line: Source line number if available + binding: Associated binding name, if any + + Returns: + The created Assumption + """ + assumption = Assumption(name=name, source=source, context=context, line=line) + + if binding and binding in self.bindings: + self.bindings[binding].assumptions.append(assumption) + self.bindings[binding].clarity_type = ClarityType.AMBIGUOUS + else: + self.global_assumptions.append(assumption) + + return assumption + + def clarify(self, name: str) -> bool: + """ + Mark a binding as clarified (intent verified). + + Args: + name: Variable name to clarify + + Returns: + True if successfully clarified, False if not found + """ + if name not in self.bindings: + return False + + state = self.bindings[name] + state.clarity_type = ClarityType.CLARIFIED + state.verified_at = self.iteration + self.clarification_history.append((name, self.iteration)) + return True + + def get_clarity(self, name: str) -> Optional[ClarityState]: + """ + Get the clarity state of a binding. + + Args: + name: Variable name + + Returns: + ClarityState or None if not found + """ + return self.bindings.get(name) + + def is_clarified(self, name: str) -> bool: + """ + Check if a binding is clarified. + + Args: + name: Variable name + + Returns: + True if clarified, False otherwise + """ + state = self.bindings.get(name) + return state is not None and state.is_clear() + + def get_assumptions(self, name: Optional[str] = None) -> List[Assumption]: + """ + Get assumptions for a binding or all global assumptions. + + Args: + name: Variable name, or None for global assumptions + + Returns: + List of assumptions + """ + if name is None: + return self.global_assumptions.copy() + + state = self.bindings.get(name) + if state: + return state.assumptions.copy() + return [] + + def get_unresolved_bindings(self) -> List[str]: + """ + Get all bindings that are not yet clarified. + + Returns: + List of binding names that need clarification + """ + return [ + name + for name, state in self.bindings.items() + if not state.is_clear() + ] + + def compute_clarity_score(self) -> float: + """ + Compute overall clarity score (0.0 to 1.0). + + Similar to Framework Strength, measures the degree of + semantic clarity in the current program state. + + Returns: + Clarity score between 0.0 (fully ambiguous) and 1.0 (fully clarified) + """ + if not self.bindings: + return 1.0 # No bindings = trivially clear + + clarified = sum(1 for s in self.bindings.values() if s.is_clear()) + return clarified / len(self.bindings) + + def has_clarity(self, threshold: float = 0.95) -> bool: + """ + Check if clarity score meets threshold. + + Args: + threshold: Minimum clarity score (default: 0.95) + + Returns: + True if clarity score >= threshold + """ + return self.compute_clarity_score() >= threshold + + def advance_iteration(self) -> None: + """Advance the iteration counter.""" + self.iteration += 1 + + def reset(self) -> None: + """Reset the tracker to initial state.""" + self.bindings.clear() + self.global_assumptions.clear() + self.clarification_history.clear() + self.iteration = 0 + + +class ClarityExplainer: + """ + Provides human-readable explanations of clarity evaluations. + + Follows the agency-preserving principle: presents options rather + than dictating corrections. Shows WHY clarity is insufficient + without telling the programmer what they "should" do. + + Example output: + `clarified` → FALSE + └─ binding 'x' is TENTATIVE + └─ hypothesis: "x represents user count" + └─ possible interpretations: + 1. x as total users (all time) + 2. x as active users (current session) + 3. x as unique users (deduplicated) + └─ which interpretation is intended? + """ + + def __init__(self, enabled: bool = False, use_color: bool = True): + """ + Initialize the explainer. + + Args: + enabled: Whether explain mode is active + use_color: Whether to use ANSI color codes + """ + self.enabled = enabled + self.use_color = use_color and self._supports_color() + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stderr, "isatty") or not sys.stderr.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def explain_clarified( + self, + result: bool, + clarity_score: float, + threshold: float, + unresolved: List[str], + ) -> None: + """ + Explain the evaluation of the `clarified` predicate. + + Args: + result: Whether the predicate evaluated to True + clarity_score: The current clarity score + threshold: The clarity threshold + unresolved: List of unresolved binding names + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`clarified` → {result_str}", file=sys.stderr) + print( + f" └─ clarity_score = {clarity_score:.4f} (threshold: {threshold})", + file=sys.stderr, + ) + + if not result and unresolved: + print(f" └─ unresolved bindings:", file=sys.stderr) + for name in unresolved[:5]: # Show first 5 + print(f" • {self._color(name, 'yellow')}", file=sys.stderr) + if len(unresolved) > 5: + print(f" ... and {len(unresolved) - 5} more", file=sys.stderr) + + def explain_ambiguous( + self, + result: bool, + binding: str, + state: Optional[ClarityState], + interpretations: Optional[List[str]] = None, + ) -> None: + """ + Explain the evaluation of the `ambiguous` predicate. + + Presents possible interpretations without dictating which is "correct". + + Args: + result: Whether the predicate evaluated to True + binding: The binding being checked + state: The clarity state of the binding + interpretations: Possible interpretations (if known) + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`ambiguous` → {result_str}", file=sys.stderr) + print(f" └─ binding: {self._color(binding, 'cyan')}", file=sys.stderr) + + if state: + print( + f" └─ clarity_type: {state.clarity_type.value}", + file=sys.stderr, + ) + + if state.assumptions: + print(f" └─ detected assumptions:", file=sys.stderr) + for assumption in state.assumptions[:3]: + print(f" • {assumption.name}: {assumption.context}", file=sys.stderr) + + if interpretations: + print(f" └─ possible interpretations:", file=sys.stderr) + for i, interp in enumerate(interpretations, 1): + print(f" {i}. {interp}", file=sys.stderr) + print( + f" └─ {self._color('which interpretation is intended?', 'yellow')}", + file=sys.stderr, + ) + + def explain_assumed( + self, + result: bool, + assumptions: List[Assumption], + ) -> None: + """ + Explain the evaluation of the `assumed` predicate. + + Shows what hidden variables have been detected. + + Args: + result: Whether the predicate evaluated to True + assumptions: List of detected assumptions + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`assumed` → {result_str}", file=sys.stderr) + print(f" └─ assumption_count = {len(assumptions)}", file=sys.stderr) + + if assumptions: + print(f" └─ hidden variables detected:", file=sys.stderr) + for assumption in assumptions[:5]: + loc = f" (line {assumption.line})" if assumption.line else "" + print( + f" • {self._color(assumption.name, 'magenta')}: " + f"{assumption.context}{loc}", + file=sys.stderr, + ) + if len(assumptions) > 5: + print(f" ... and {len(assumptions) - 5} more", file=sys.stderr) + + def explain_tentative( + self, + result: bool, + binding: str, + hypothesis: Optional[str], + ) -> None: + """ + Explain the evaluation of the `tentative` predicate. + + Args: + result: Whether the predicate evaluated to True + binding: The binding being checked + hypothesis: The stated hypothesis + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`tentative` → {result_str}", file=sys.stderr) + print(f" └─ binding: {self._color(binding, 'cyan')}", file=sys.stderr) + + if hypothesis: + print(f" └─ hypothesis: \"{hypothesis}\"", file=sys.stderr) + print( + f" └─ {self._color('awaiting verification', 'yellow')}", + file=sys.stderr, + ) + + +class AmbiguityResolver: + """ + Provides agency-preserving ambiguity resolution. + + Instead of guessing what the programmer meant, presents options + and returns control to them. This follows the principle: + "True intellect is knowing you could be wrong—and refusing + to act as if you aren't." + + Example: + resolver = AmbiguityResolver() + options = resolver.generate_options("x", context) + # Returns options like: + # [ + # "x as numeric value (apply arithmetic)", + # "x as string identifier (apply concatenation)", + # "x as function reference (apply invocation)", + # ] + """ + + def __init__(self): + """Initialize the resolver.""" + self.type_interpretations = { + "numeric": "apply arithmetic operations", + "string": "apply string operations (concatenation, slicing)", + "function": "invoke as function", + "list": "apply sequence operations (indexing, iteration)", + "boolean": "use in conditional logic", + } + + def generate_options( + self, + name: str, + possible_types: Optional[List[str]] = None, + context: Optional[str] = None, + ) -> List[str]: + """ + Generate possible interpretations for an ambiguous binding. + + Args: + name: Variable name + possible_types: List of possible types + context: Additional context + + Returns: + List of interpretation strings + """ + if possible_types is None: + possible_types = list(self.type_interpretations.keys()) + + options = [] + for ptype in possible_types: + if ptype in self.type_interpretations: + options.append( + f"{name} as {ptype} ({self.type_interpretations[ptype]})" + ) + + if not options: + options.append(f"{name} with unspecified intent") + + return options + + def format_resolution_prompt( + self, + name: str, + options: List[str], + ) -> str: + """ + Format an agency-preserving resolution prompt. + + Returns a message that presents options without dictating. + + Args: + name: Variable name + options: List of possible interpretations + + Returns: + Formatted prompt string + """ + lines = [ + f"Ambiguity detected for '{name}'.", + "Possible interpretations:", + ] + + for i, option in enumerate(options, 1): + lines.append(f" {i}. {option}") + + lines.append("") + lines.append("Which interpretation is intended?") + + return "\n".join(lines) + + +def detect_assumptions( + expression_type: str, + context: Dict[str, Any], +) -> List[Assumption]: + """ + Detect implicit assumptions in an expression. + + This is a heuristic-based detector that identifies common + sources of hidden variables in code. + + Args: + expression_type: Type of expression being analyzed + context: Context dictionary with relevant information + + Returns: + List of detected assumptions + """ + assumptions = [] + + # Type coercion assumptions + if expression_type == "binary_op": + left_type = context.get("left_type") + right_type = context.get("right_type") + if left_type != right_type: + assumptions.append( + Assumption( + name="type_compatibility", + source=f"{left_type} + {right_type}", + context=f"Assuming {left_type} and {right_type} can be combined", + ) + ) + + # Division by zero assumptions + if expression_type == "division": + if not context.get("divisor_checked"): + assumptions.append( + Assumption( + name="non_zero_divisor", + source="division operation", + context="Assuming divisor is not zero", + ) + ) + + # Index bounds assumptions + if expression_type == "index": + if not context.get("bounds_checked"): + assumptions.append( + Assumption( + name="valid_index", + source="index operation", + context="Assuming index is within bounds", + ) + ) + + # Null/undefined assumptions + if expression_type == "member_access": + if not context.get("null_checked"): + assumptions.append( + Assumption( + name="non_null", + source="member access", + context="Assuming object is not null", + ) + ) + + # Function purity assumptions + if expression_type == "function_call": + if context.get("has_side_effects") is None: + assumptions.append( + Assumption( + name="function_behavior", + source=context.get("function_name", "function"), + context="Side effects and return behavior unspecified", + ) + ) + + return assumptions diff --git a/tests/test_clarity.py b/tests/test_clarity.py new file mode 100644 index 0000000..2ca6f87 --- /dev/null +++ b/tests/test_clarity.py @@ -0,0 +1,588 @@ +""" +Tests for EigenScript Communication Clarity Framework. + +The Clarity Framework implements hypothesis-driven communication semantics where: +- Clarity is required before execution (not optional) +- Hidden variables are made explicit +- Intent is treated as falsifiable hypothesis +- Agency remains with the speaker (programmer) + +Key Principles: + 1. Questions and statements both assign objectives + 2. Implication is not universal (hidden variables cause failure) + 3. Clarification is hypothesis testing + 4. Withhold inference intentionally + 5. Control remains with the speaker + 6. Speakers discover their own intent +""" + +import pytest +import textwrap +from eigenscript.lexer import Tokenizer +from eigenscript.parser import Parser +from eigenscript.evaluator import Interpreter +from eigenscript.builtins import decode_vector +from eigenscript.runtime.clarity import ( + ClarityType, + ClarityState, + ClarityTracker, + ClarityExplainer, + Assumption, + detect_assumptions, +) + + +@pytest.fixture +def interpreter(): + """Create a fresh interpreter for each test.""" + return Interpreter() + + +@pytest.fixture +def clarity_tracker(): + """Create a fresh clarity tracker for each test.""" + return ClarityTracker() + + +def run_code(code: str, interpreter): + """Helper to run EigenScript code.""" + code = textwrap.dedent(code).strip() + tokenizer = Tokenizer(code) + tokens = tokenizer.tokenize() + parser = Parser(tokens) + ast = parser.parse() + return interpreter.evaluate(ast) + + +class TestClarityTracker: + """Tests for the ClarityTracker class.""" + + def test_initial_state(self, clarity_tracker): + """Test tracker starts with no bindings.""" + assert len(clarity_tracker.bindings) == 0 + assert clarity_tracker.compute_clarity_score() == 1.0 # No bindings = trivially clear + + def test_register_clarified_binding(self, clarity_tracker): + """Test registering a clarified binding.""" + state = clarity_tracker.register_binding("x", tentative=False) + assert state.clarity_type == ClarityType.CLARIFIED + assert clarity_tracker.is_clarified("x") + assert clarity_tracker.compute_clarity_score() == 1.0 + + def test_register_tentative_binding(self, clarity_tracker): + """Test registering a tentative (hypothesis) binding.""" + state = clarity_tracker.register_binding( + "x", tentative=True, hypothesis="x is a counter" + ) + assert state.clarity_type == ClarityType.TENTATIVE + assert not clarity_tracker.is_clarified("x") + assert state.hypothesis == "x is a counter" + assert clarity_tracker.compute_clarity_score() == 0.0 + + def test_clarify_tentative_binding(self, clarity_tracker): + """Test clarifying a tentative binding.""" + clarity_tracker.register_binding("x", tentative=True) + assert not clarity_tracker.is_clarified("x") + + success = clarity_tracker.clarify("x") + assert success + assert clarity_tracker.is_clarified("x") + assert clarity_tracker.compute_clarity_score() == 1.0 + + def test_clarify_nonexistent_binding(self, clarity_tracker): + """Test clarifying a binding that doesn't exist.""" + success = clarity_tracker.clarify("nonexistent") + assert not success + + def test_multiple_bindings_clarity_score(self, clarity_tracker): + """Test clarity score with multiple bindings.""" + clarity_tracker.register_binding("a", tentative=False) + clarity_tracker.register_binding("b", tentative=True) + clarity_tracker.register_binding("c", tentative=False) + + # 2 out of 3 clarified = 0.666... + score = clarity_tracker.compute_clarity_score() + assert abs(score - 2 / 3) < 0.01 + + def test_get_unresolved_bindings(self, clarity_tracker): + """Test getting list of unresolved bindings.""" + clarity_tracker.register_binding("a", tentative=False) + clarity_tracker.register_binding("b", tentative=True) + clarity_tracker.register_binding("c", tentative=True) + + unresolved = clarity_tracker.get_unresolved_bindings() + assert "b" in unresolved + assert "c" in unresolved + assert "a" not in unresolved + + def test_register_assumption(self, clarity_tracker): + """Test registering an assumption (hidden variable).""" + clarity_tracker.register_binding("x") + assumption = clarity_tracker.register_assumption( + name="non_zero", + source="division operation", + context="Assuming divisor is not zero", + binding="x", + ) + + assert assumption.name == "non_zero" + assert len(clarity_tracker.get_assumptions("x")) == 1 + state = clarity_tracker.get_clarity("x") + assert state.clarity_type == ClarityType.AMBIGUOUS + + def test_global_assumptions(self, clarity_tracker): + """Test global assumptions not tied to a binding.""" + assumption = clarity_tracker.register_assumption( + name="environment", + source="runtime", + context="Assuming production environment", + ) + + global_assumptions = clarity_tracker.get_assumptions() + assert len(global_assumptions) == 1 + assert global_assumptions[0].name == "environment" + + +class TestClarityPredicates: + """Tests for clarity predicates in the interpreter.""" + + def test_clarified_predicate_all_clear(self, interpreter): + """Test CLARIFIED returns 1.0 when all bindings are clarified.""" + code = """ + x is 10 + y is 20 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarified_predicate_with_tentative(self, interpreter): + """Test CLARIFIED returns 0.0 when tentative bindings exist.""" + code = """ + x is 10 + y might is 20 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_clarified_after_clarify(self, interpreter): + """Test CLARIFIED returns 1.0 after clarifying tentative binding.""" + code = """ + x might is 10 + clarify of x + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_ambiguous_predicate_with_tentative(self, interpreter): + """Test AMBIGUOUS returns 1.0 when tentative bindings exist.""" + code = """ + x might is 10 + ambiguous + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_ambiguous_predicate_all_clear(self, interpreter): + """Test AMBIGUOUS returns 0.0 when all bindings are clarified.""" + code = """ + x is 10 + ambiguous + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_explicit_predicate(self, interpreter): + """Test EXPLICIT predicate (opposite of ambiguous).""" + code = """ + x is 10 + y is 20 + explicit + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarity_score_numeric(self, interpreter): + """Test CLARITY returns numeric score.""" + code = """ + x is 10 + y might is 20 + z is 30 + clarity + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # 2 out of 3 clarified = 0.666... + assert abs(value - 2 / 3) < 0.01 + + def test_cs_alias_for_clarity(self, interpreter): + """Test CS is alias for CLARITY.""" + code1 = """ + x is 10 + clarity + """ + code2 = """ + x is 10 + cs + """ + interpreter1 = Interpreter() + interpreter2 = Interpreter() + + result1 = run_code(code1, interpreter1) + result2 = run_code(code2, interpreter2) + + value1 = decode_vector(result1, interpreter1.space) + value2 = decode_vector(result2, interpreter2.space) + assert value1 == value2 + + +class TestTentativeAssignment: + """Tests for MIGHT IS (tentative assignment) syntax.""" + + def test_tentative_assignment_creates_binding(self, interpreter): + """Test MIGHT IS creates a binding with the value.""" + code = """ + x might is 42 + x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 42 + + def test_tentative_assignment_marks_as_tentative(self, interpreter): + """Test MIGHT IS marks binding as tentative.""" + code = """ + x might is 42 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 # Not clarified because x is tentative + + def test_clarify_tentative_assignment(self, interpreter): + """Test clarifying a tentative assignment.""" + code = """ + x might is 42 + clarify of x + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 # Now clarified + + def test_multiple_tentative_assignments(self, interpreter): + """Test multiple tentative assignments.""" + code = """ + a might is 1 + b might is 2 + c is 3 + clarity + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # 1 out of 3 clarified = 0.333... + assert abs(value - 1 / 3) < 0.01 + + +class TestClarifyOperator: + """Tests for CLARIFY OF operator.""" + + def test_clarify_returns_success(self, interpreter): + """Test CLARIFY OF returns 1.0 on success.""" + code = """ + x might is 10 + clarify of x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarify_nonexistent_returns_failure(self, interpreter): + """Test CLARIFY OF returns 0.0 for nonexistent binding.""" + code = """ + clarify of nonexistent + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_clarify_already_clarified(self, interpreter): + """Test CLARIFY OF on already clarified binding.""" + code = """ + x is 10 + clarify of x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Should still succeed (idempotent) + assert value == 1.0 + + +class TestAssumesInterrogative: + """Tests for ASSUMES interrogative.""" + + def test_assumes_empty_for_clarified(self, interpreter): + """Test ASSUMES returns empty string for clarified binding.""" + code = """ + x is 10 + assumes is x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == "" + + def test_assumes_on_tentative(self, interpreter): + """Test ASSUMES on a tentative binding (no explicit assumptions).""" + code = """ + x might is 10 + assumes is x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Tentative but no explicit assumptions registered + assert value == "" + + +class TestClarityInConditionals: + """Tests for using clarity predicates in conditionals.""" + + def test_if_clarified(self, interpreter): + """Test using CLARIFIED in a conditional.""" + code = """ + x is 10 + result is 0 + if clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + def test_if_not_clarified(self, interpreter): + """Test using NOT CLARIFIED in a conditional.""" + code = """ + x might is 10 + result is 0 + if not clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + def test_clarify_before_critical_operation(self, interpreter): + """Test pattern: clarify before critical operation.""" + code = """ + config might is "strict" + result is 0 + + if not clarified: + clarify of config + + if clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + +class TestClarityInLoops: + """Tests for using clarity predicates in loops.""" + + def test_loop_while_not_clarified(self, interpreter): + """Test looping until all bindings are clarified.""" + code = """ + a might is 1 + b might is 2 + + clarify of a + clarify of b + + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarity_changes_during_loop(self, interpreter): + """Test clarity score changes as bindings are clarified.""" + code = """ + a might is 1 + b might is 2 + c is 3 + + before is clarity + clarify of a + after is clarity + + after > before + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 # clarity increased + + +class TestClarityIntegration: + """Integration tests combining clarity with other features.""" + + def test_clarity_with_framework_strength(self, interpreter): + """Test using both clarity and framework strength.""" + code = """ + x is 1 + x is 2 + x is 3 + x is 4 + x is 5 + + cs_value is cs + fs_value is fs + + cs_value + fs_value + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Both should be between 0 and 1, so sum between 0 and 2 + assert 0 <= value <= 2 + + def test_clarity_with_interrogatives(self, interpreter): + """Test using clarity with other interrogatives.""" + code = """ + x is 10 + value is what is x + quality is how is x + clarity_score is cs + + value + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 10 + + def test_mixed_clarity_and_convergence(self, interpreter): + """Test combining clarity predicates with convergence predicates.""" + code = """ + x is 10 + + clarity_ok is clarified + conv_check is converged + + clarity_ok + conv_check + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Sum of two booleans (0 or 1 each) + assert 0 <= value <= 2 + + +class TestClarityExplainer: + """Tests for the ClarityExplainer class.""" + + def test_explainer_disabled_by_default(self): + """Test explainer is disabled by default.""" + explainer = ClarityExplainer(enabled=False) + # Should not raise or print anything + explainer.explain_clarified( + result=True, + clarity_score=1.0, + threshold=0.95, + unresolved=[], + ) + + def test_explainer_enabled(self, capsys): + """Test explainer outputs when enabled.""" + explainer = ClarityExplainer(enabled=True, use_color=False) + explainer.explain_clarified( + result=False, + clarity_score=0.5, + threshold=0.95, + unresolved=["x", "y"], + ) + + captured = capsys.readouterr() + assert "clarified" in captured.err.lower() + assert "0.5" in captured.err + + +class TestAssumptionDetection: + """Tests for automatic assumption detection.""" + + def test_detect_type_compatibility_assumption(self): + """Test detecting type compatibility assumption.""" + assumptions = detect_assumptions( + "binary_op", + {"left_type": "number", "right_type": "string"}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "type_compatibility" + + def test_detect_division_by_zero_assumption(self): + """Test detecting division by zero assumption.""" + assumptions = detect_assumptions( + "division", + {"divisor_checked": False}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "non_zero_divisor" + + def test_detect_index_bounds_assumption(self): + """Test detecting index bounds assumption.""" + assumptions = detect_assumptions( + "index", + {"bounds_checked": False}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "valid_index" + + def test_no_assumption_when_checked(self): + """Test no assumption when precondition is checked.""" + assumptions = detect_assumptions( + "division", + {"divisor_checked": True}, + ) + assert len(assumptions) == 0 + + +class TestClarityState: + """Tests for the ClarityState dataclass.""" + + def test_default_state_is_ambiguous(self): + """Test default state is ambiguous.""" + state = ClarityState() + assert state.clarity_type == ClarityType.AMBIGUOUS + assert state.is_ambiguous() + assert not state.is_clear() + + def test_clarified_state(self): + """Test clarified state.""" + state = ClarityState(clarity_type=ClarityType.CLARIFIED) + assert state.is_clear() + assert not state.is_ambiguous() + assert not state.is_tentative() + + def test_tentative_state(self): + """Test tentative state.""" + state = ClarityState(clarity_type=ClarityType.TENTATIVE) + assert state.is_tentative() + assert not state.is_clear() + + def test_state_with_assumptions(self): + """Test state with assumptions.""" + assumption = Assumption( + name="test", + source="test_source", + context="test context", + ) + state = ClarityState(assumptions=[assumption]) + assert state.has_assumptions() From c10398cb04e4489bef076d9c30508ee30dd9e0ad Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Dec 2025 23:04:55 +0000 Subject: [PATCH 2/3] Add Active Listener/Speaker mode for interactive clarification Extends the Communication Clarity Framework with active participation in the clarification process. EigenScript can now detect ambiguity and seek clarification before proceeding with potentially risky operations. New features: - ActiveListener: Detects ambiguity in division, indexing, type coercion, and null access operations - DialogueManager: Agency-preserving dialogue for explanations and confirmations - InteractiveClarifier: Combines listener and speaker for interactive clarification workflows - interactive_mode and active_listening interpreter options - Agency-preserving error messages that present options, not dictates Key design principle: "Control remains with the speaker" - EigenScript presents possibilities rather than assuming intent. Includes 31 new tests for active listener features. --- docs/api/predicates.md | 93 ++++ src/eigenscript/evaluator/interpreter.py | 222 +++++++- src/eigenscript/runtime/__init__.py | 6 + src/eigenscript/runtime/clarity.py | 633 +++++++++++++++++++++++ tests/test_clarity.py | 334 ++++++++++++ 5 files changed, 1287 insertions(+), 1 deletion(-) diff --git a/docs/api/predicates.md b/docs/api/predicates.md index 44db824..a801b21 100644 --- a/docs/api/predicates.md +++ b/docs/api/predicates.md @@ -794,6 +794,99 @@ These enable self-debugging code that refuses to execute on unverified assumptio --- +## Active Listener Mode + +EigenScript can act as an **active listener** that detects ambiguity and seeks clarification before proceeding. This implements the communication framework principle: "You treat inferred intent as a hypothesis, not a fact." + +### Enabling Active Listening + +```python +from eigenscript.evaluator import Interpreter + +# Enable active listening mode +interp = Interpreter( + active_listening=True, # Detect ambiguity in operations + interactive_mode=True, # Prompt for clarification +) +``` + +### What Active Listening Detects + +| Operation | Detected Ambiguity | Options Presented | +|-----------|-------------------|-------------------| +| Division | Potential divide by zero | Proceed, Add check, Return default | +| Indexing | Potential out of bounds | Proceed, Add check, Clamp, Wrap | +| Type coercion | Type mismatch | Strict, Lenient, Explicit | +| Null access | Potential null value | Proceed, Add check, Return default, Optional chain | + +### Example: Interactive Division + +```eigenscript +x is 10 +y is get_input of "Enter divisor: " # User enters 0 + +result is x / y +# In interactive mode: +# > Clarification needed: +# > The divisor 'y' might be zero. +# > 1. Proceed (assuming y ≠ 0) +# > 2. Add explicit check +# > 3. Return default value if zero +# > Which approach? _ +``` + +### Example: Interactive Index Access + +```eigenscript +items is [1, 2, 3] +i is get_input of "Enter index: " # User enters 5 + +value is items[i] +# In interactive mode: +# > Clarification needed: +# > The index 'i' might be out of bounds for 'items'. +# > 1. Proceed (assuming valid index) +# > 2. Add bounds check +# > 3. Clamp to valid range +# > 4. Wrap around (modulo length) +# > Which approach? _ +``` + +### Non-Interactive Mode + +When `interactive_mode=False` but `active_listening=True`, ambiguities are detected and logged but the default option (proceed) is chosen. Use `summarize_pending()` to see what clarifications were needed: + +```python +interp = Interpreter(active_listening=True, interactive_mode=False) +result = interp.run(code) + +# Check what clarifications were pending +summary = interp.interactive_clarifier.summarize_pending() +print(summary) +# Pending clarifications (2): +# • The divisor 'x' might be zero. +# • The index 'i' might be out of bounds for 'items'. +``` + +--- + +## Agency-Preserving Errors + +When errors occur, EigenScript presents options rather than dictating what went wrong: + +``` +Variable 'foo' not found in scope. +Possible intentions: + • Did you mean 'food'? + • Did you mean 'foot'? + • Define it first with: foo is + • Or use 'might is' for tentative binding +``` + +This preserves the programmer's agency by presenting possibilities rather than assuming intent. + +--- + **Congratulations!** You've explored the complete EigenScript API. **Return to:** [API Index](index.md) diff --git a/src/eigenscript/evaluator/interpreter.py b/src/eigenscript/evaluator/interpreter.py index 68fea08..0fa2d41 100644 --- a/src/eigenscript/evaluator/interpreter.py +++ b/src/eigenscript/evaluator/interpreter.py @@ -7,7 +7,7 @@ import numpy as np import os -from typing import Dict, Optional, List, Union, Callable +from typing import Dict, Optional, List, Union, Callable, Any from dataclasses import dataclass from eigenscript.parser.ast_builder import ( ASTNode, @@ -44,6 +44,9 @@ ClarityType, Assumption, detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, ) from eigenscript.builtins import BuiltinFunction, get_builtins @@ -260,6 +263,8 @@ def __init__( enable_convergence_detection: bool = True, explain_mode: bool = False, clarity_threshold: float = 0.95, + interactive_mode: bool = False, + active_listening: bool = False, ): """ Initialize the interpreter. @@ -272,6 +277,8 @@ def __init__( enable_convergence_detection: Enable automatic convergence detection (default: True) explain_mode: Enable explain mode for predicate evaluations (default: False) clarity_threshold: Clarity score threshold for the `clarified` predicate (default: 0.95) + interactive_mode: Enable interactive clarification prompts (default: False) + active_listening: Enable automatic ambiguity detection in operations (default: False) """ # Geometric components self.space = LRVMSpace(dimension=dimension) @@ -303,6 +310,18 @@ def __init__( self.clarity_explainer = ClarityExplainer(enabled=explain_mode) self.clarity_threshold = clarity_threshold + # Interactive clarification mode (Active Listener/Speaker) + self.interactive_mode = interactive_mode + self.active_listening = active_listening + self.interactive_clarifier = InteractiveClarifier( + interactive=interactive_mode, + verbose=explain_mode, + ) if interactive_mode or active_listening else None + self.active_listener = ActiveListener( + interactive=interactive_mode, + strict=False, + ) if active_listening else None + # Special lightlike OF vector self._of_vector = self._create_of_vector() @@ -532,6 +551,45 @@ def _eval_relation(self, node: Relation) -> LRVMVector: if isinstance(node.left, Identifier) and node.left.name.lower() == "clarify": if isinstance(node.right, Identifier): binding_name = node.right.name + + # Interactive clarification mode + if self.interactive_mode and self.interactive_clarifier: + # Get the current value for display + try: + current_value = self.environment.lookup(binding_name) + # Get clarity state if available + clarity_state = self.clarity_tracker.get_clarity(binding_name) + + if clarity_state and clarity_state.hypothesis: + # Show the hypothesis and ask for confirmation + confirmed = self.interactive_clarifier.dialogue.confirm_understanding( + statement=f"{binding_name} might is {current_value}", + understood_as=clarity_state.hypothesis, + ) + if not confirmed: + # User didn't confirm - ask what they meant + result = self.interactive_clarifier.clarify_binding( + name=binding_name, + value=current_value, + possible_meanings=[ + f"Keep as '{current_value}'", + "Change the value", + "Remove this binding", + ], + ) + if result.get("clarified"): + self.interactive_clarifier.dialogue.acknowledge_clarification( + binding_name, result.get("meaning", str(current_value)) + ) + else: + # No hypothesis - just acknowledge + self.interactive_clarifier.dialogue.acknowledge_clarification( + binding_name, f"value = {current_value}" + ) + except NameError: + # Binding doesn't exist yet - that's ok + pass + success = self.clarity_tracker.clarify(binding_name) return self.space.embed_scalar(1.0 if success else 0.0) else: @@ -665,6 +723,34 @@ def _eval_binary_op(self, node: "BinaryOp") -> LRVMVector: # Project through inverse scaling assert isinstance(left, LRVMVector) and isinstance(right, LRVMVector) scalar = right.coords[0] + + # Active listening: detect potential division by zero + if self.active_listener and abs(scalar) < 1e-10: + # Ambiguity detected - seek clarification + divisor_name = ( + node.right.name if isinstance(node.right, Identifier) else "divisor" + ) + result = self.interactive_clarifier.clarify_operation( + "division", + { + "divisor": scalar, + "divisor_name": divisor_name, + "proven_non_zero": False, + }, + ) + strategy = result.get("strategy", "proceed") + + if strategy == "default": + # Return a default value (zero) instead of erroring + return self.space.zero_vector() + elif strategy == "check": + # User wants explicit check - raise but with helpful message + raise RuntimeError( + f"Division by zero: '{divisor_name}' is zero.\n" + f"Consider adding: if {divisor_name} != 0: ..." + ) + # strategy == "proceed" falls through to normal error + if abs(scalar) < 1e-10: raise RuntimeError("Division by zero (equilibrium singularity)") return left.scale(1.0 / scalar) @@ -1132,6 +1218,46 @@ def _eval_index(self, node: Index) -> Value: if isinstance(indexed_value, EigenList): # Check bounds if index < 0 or index >= len(indexed_value.elements): + # Active listening: detect out-of-bounds access + if self.active_listener: + index_name = ( + node.index_expr.name + if isinstance(node.index_expr, Identifier) + else str(index) + ) + list_name = ( + node.list_expr.name + if isinstance(node.list_expr, Identifier) + else "list" + ) + result = self.interactive_clarifier.clarify_operation( + "index", + { + "index": index, + "index_name": index_name, + "list_name": list_name, + "list_length": len(indexed_value.elements), + "proven_in_bounds": False, + }, + ) + strategy = result.get("strategy", "proceed") + + if strategy == "clamp": + # Clamp to valid range + clamped = max(0, min(index, len(indexed_value.elements) - 1)) + return indexed_value.elements[clamped] + elif strategy == "wrap": + # Wrap around using modulo + wrapped = index % len(indexed_value.elements) + return indexed_value.elements[wrapped] + elif strategy == "check": + # User wants explicit check - raise with helpful message + raise IndexError( + f"List index {index} out of range (list has {len(indexed_value.elements)} elements).\n" + f"Consider adding: if {index_name} < len of {list_name}: ..." + ) + # strategy == "proceed" falls through to normal error + raise IndexError( f"List index {index} out of range (list has {len(indexed_value.elements)} elements)" ) @@ -1884,6 +2010,100 @@ def _is_of_vector(self, vector: LRVMVector) -> bool: """ return self.metric.is_lightlike(vector) + def _check_type_coercion( + self, + left: LRVMVector, + right: LRVMVector, + operation: str, + ) -> Optional[str]: + """ + Check if type coercion is needed and handle via active listening. + + Returns the coercion strategy or None if no coercion needed. + + Args: + left: Left operand + right: Right operand + operation: The operation being performed + + Returns: + Strategy string or None + """ + if not self.active_listener: + return None + + # Detect type mismatch by checking metadata + left_type = "string" if left.metadata.get("string_value") else "numeric" + right_type = "string" if right.metadata.get("string_value") else "numeric" + + if left_type != right_type: + result = self.interactive_clarifier.clarify_operation( + "coercion", + { + "from_type": right_type, + "to_type": left_type, + "operation": operation, + }, + ) + return result.get("strategy", "lenient") + + return None + + def _format_agency_error( + self, + error_type: str, + context: Dict[str, Any], + ) -> str: + """ + Format an error message that preserves user agency. + + Instead of dictating what went wrong, presents the situation + and possible interpretations. + + Args: + error_type: Type of error + context: Context about the error + + Returns: + Formatted error message + """ + if error_type == "undefined_variable": + name = context.get("name", "unknown") + suggestions = context.get("suggestions", []) + msg = f"Variable '{name}' not found in scope.\n" + if suggestions: + msg += "Possible intentions:\n" + for s in suggestions[:3]: + msg += f" • Did you mean '{s}'?\n" + msg += " • Define it first with: {name} is \n" + msg += " • Or use 'might is' for tentative binding" + return msg + + elif error_type == "type_mismatch": + expected = context.get("expected", "unknown") + got = context.get("got", "unknown") + operation = context.get("operation", "operation") + msg = f"Type question for {operation}:\n" + msg += f" Got: {got}\n" + msg += f" Expected: {expected}\n" + msg += "Possible approaches:\n" + msg += f" • Convert {got} to {expected}\n" + msg += f" • Use a different operation for {got}\n" + msg += " • Check if this is the intended value" + return msg + + elif error_type == "function_not_found": + name = context.get("name", "unknown") + msg = f"Function '{name}' not found.\n" + msg += "Possible intentions:\n" + msg += f" • Define it first with: define {name} as: ...\n" + msg += " • Import from a module: from import {name}\n" + msg += " • Check spelling" + return msg + + else: + return f"Error: {error_type} - {context}" + def get_framework_strength(self) -> float: """ Get current Framework Strength. diff --git a/src/eigenscript/runtime/__init__.py b/src/eigenscript/runtime/__init__.py index 47c2a85..70ba449 100644 --- a/src/eigenscript/runtime/__init__.py +++ b/src/eigenscript/runtime/__init__.py @@ -17,6 +17,9 @@ AmbiguityResolver, Assumption, detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, ) __all__ = [ @@ -28,4 +31,7 @@ "AmbiguityResolver", "Assumption", "detect_assumptions", + "ActiveListener", + "DialogueManager", + "InteractiveClarifier", ] diff --git a/src/eigenscript/runtime/clarity.py b/src/eigenscript/runtime/clarity.py index 92132e9..9fb1fd7 100644 --- a/src/eigenscript/runtime/clarity.py +++ b/src/eigenscript/runtime/clarity.py @@ -688,3 +688,636 @@ def detect_assumptions( ) return assumptions + + +class ActiveListener: + """ + Makes EigenScript an active listener that detects ambiguity and seeks clarification. + + The ActiveListener embodies the communication framework principle: + "You treat inferred intent as a hypothesis, not a fact." + + Instead of silently proceeding on assumptions, the ActiveListener: + 1. Detects when multiple interpretations are possible + 2. Pauses execution to seek clarification + 3. Presents options without deciding for the speaker + 4. Records the clarified intent for future reference + + Example: + listener = ActiveListener(interactive=True) + + # When division is attempted: + # > Clarification needed for division operation: + # > The divisor 'x' could be zero. + # > 1. Proceed (assuming x ≠ 0) + # > 2. Add explicit check + # > 3. Provide default value + # > Which approach? _ + """ + + def __init__( + self, + interactive: bool = False, + strict: bool = False, + use_color: bool = True, + ): + """ + Initialize the ActiveListener. + + Args: + interactive: If True, prompt user for clarification + strict: If True, refuse to proceed on any ambiguity + use_color: Whether to use ANSI color codes + """ + self.interactive = interactive + self.strict = strict + self.use_color = use_color and self._supports_color() + self.clarification_log: List[Dict[str, Any]] = [] + self.pending_clarifications: List[Dict[str, Any]] = [] + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "dim": "\033[2m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def detect_ambiguity( + self, + operation: str, + context: Dict[str, Any], + ) -> Optional[Dict[str, Any]]: + """ + Detect if an operation has ambiguous intent. + + Args: + operation: Type of operation (e.g., "division", "index", "coercion") + context: Context about the operation + + Returns: + Ambiguity info dict if ambiguous, None otherwise + """ + ambiguity = None + + if operation == "division": + divisor = context.get("divisor") + divisor_name = context.get("divisor_name", "divisor") + # Can't statically prove non-zero + if not context.get("proven_non_zero"): + ambiguity = { + "type": "division_safety", + "message": f"The divisor '{divisor_name}' might be zero.", + "options": [ + ("proceed", f"Proceed (assuming {divisor_name} ≠ 0)"), + ("check", "Add explicit zero check"), + ("default", "Return default value if zero"), + ], + "context": context, + } + + elif operation == "index": + index_name = context.get("index_name", "index") + list_name = context.get("list_name", "list") + if not context.get("proven_in_bounds"): + ambiguity = { + "type": "index_safety", + "message": f"The index '{index_name}' might be out of bounds for '{list_name}'.", + "options": [ + ("proceed", "Proceed (assuming valid index)"), + ("check", "Add bounds check"), + ("clamp", "Clamp to valid range"), + ("wrap", "Wrap around (modulo length)"), + ], + "context": context, + } + + elif operation == "coercion": + from_type = context.get("from_type", "unknown") + to_type = context.get("to_type", "unknown") + ambiguity = { + "type": "type_coercion", + "message": f"Converting {from_type} to {to_type} - intent unclear.", + "options": [ + ("strict", f"Strict conversion (fail if not exactly {to_type})"), + ("lenient", "Lenient conversion (best effort)"), + ("explicit", "Require explicit conversion"), + ], + "context": context, + } + + elif operation == "null_access": + name = context.get("name", "value") + ambiguity = { + "type": "null_safety", + "message": f"'{name}' might be null/undefined.", + "options": [ + ("proceed", f"Proceed (assuming {name} is defined)"), + ("check", "Add null check"), + ("default", "Use default value if null"), + ("optional", "Return null (optional chaining)"), + ], + "context": context, + } + + elif operation == "intent": + name = context.get("name", "value") + possible_meanings = context.get("meanings", []) + if possible_meanings: + options = [(f"meaning_{i}", m) for i, m in enumerate(possible_meanings)] + options.append(("other", "Something else (please specify)")) + ambiguity = { + "type": "semantic_ambiguity", + "message": f"Multiple interpretations possible for '{name}'.", + "options": options, + "context": context, + } + + return ambiguity + + def request_clarification( + self, + ambiguity: Dict[str, Any], + ) -> str: + """ + Request clarification from the user. + + In interactive mode, prompts and waits for input. + In non-interactive mode, logs and returns default. + + Args: + ambiguity: Ambiguity info dict from detect_ambiguity + + Returns: + The chosen option key + """ + message = ambiguity["message"] + options = ambiguity["options"] + + if self.interactive: + # Print the clarification request + print(f"\n{self._color('⚡ Clarification needed:', 'yellow')}", file=sys.stderr) + print(f" {message}", file=sys.stderr) + print(file=sys.stderr) + + for i, (key, desc) in enumerate(options, 1): + print(f" {self._color(str(i), 'cyan')}. {desc}", file=sys.stderr) + + print(file=sys.stderr) + + # Get user input + while True: + try: + choice = input(f" {self._color('Which approach?', 'bold')} [1-{len(options)}]: ") + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(options): + chosen_key = options[choice_idx][0] + self._log_clarification(ambiguity, chosen_key) + return chosen_key + print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + except ValueError: + print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + except EOFError: + # Non-interactive fallback + return options[0][0] + else: + # Non-interactive: log and return first option (default) + self.pending_clarifications.append(ambiguity) + return options[0][0] + + def _log_clarification( + self, + ambiguity: Dict[str, Any], + choice: str, + ) -> None: + """Log a clarification for future reference.""" + self.clarification_log.append({ + "ambiguity_type": ambiguity["type"], + "message": ambiguity["message"], + "choice": choice, + "context": ambiguity.get("context", {}), + }) + + def get_pending_clarifications(self) -> List[Dict[str, Any]]: + """Get list of pending clarifications that weren't resolved interactively.""" + return self.pending_clarifications.copy() + + def clear_pending(self) -> None: + """Clear pending clarifications.""" + self.pending_clarifications.clear() + + +class DialogueManager: + """ + Manages the dialogue between EigenScript and the user. + + Implements the "active speaker" pattern where EigenScript: + 1. Explains its reasoning when asked + 2. Presents options clearly + 3. Preserves user agency + 4. Never assumes understanding + + From the communication framework: + "Control remains with the speaker - you do not decide intent for them." + + Example: + dm = DialogueManager() + + # When explaining a decision: + dm.explain_reasoning( + decision="Using strict mode for parsing", + because="The 'mode' binding was clarified as 'strict'", + alternatives=["lenient", "auto-detect"] + ) + # Output: + # I'm using strict mode for parsing. + # Reason: The 'mode' binding was clarified as 'strict'. + # Alternatives considered: lenient, auto-detect + """ + + def __init__(self, use_color: bool = True, verbose: bool = False): + """ + Initialize the DialogueManager. + + Args: + use_color: Whether to use ANSI color codes + verbose: Whether to provide detailed explanations + """ + self.use_color = use_color and self._supports_color() + self.verbose = verbose + self.dialogue_history: List[Dict[str, Any]] = [] + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "dim": "\033[2m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def ask_for_intent( + self, + what: str, + options: List[Tuple[str, str]], + context: Optional[str] = None, + ) -> str: + """ + Ask the user to clarify their intent. + + Agency-preserving: presents options without judgment. + + Args: + what: What needs clarification + options: List of (key, description) tuples + context: Optional context explaining why clarification is needed + + Returns: + The chosen option key + """ + print(f"\n{self._color('?', 'cyan')} {what}", file=sys.stderr) + + if context: + print(f" {self._color('Context:', 'dim')} {context}", file=sys.stderr) + + print(file=sys.stderr) + for i, (key, desc) in enumerate(options, 1): + print(f" {self._color(str(i), 'cyan')}. {desc}", file=sys.stderr) + + print(file=sys.stderr) + + while True: + try: + choice = input(f" {self._color('Your choice', 'bold')} [1-{len(options)}]: ") + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(options): + chosen_key = options[choice_idx][0] + self._log_dialogue("intent_request", what, chosen_key) + return chosen_key + print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + except ValueError: + print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + except EOFError: + return options[0][0] + + def explain_reasoning( + self, + decision: str, + because: str, + alternatives: Optional[List[str]] = None, + ) -> None: + """ + Explain the reasoning behind a decision. + + Makes the interpreter's "thinking" visible. + + Args: + decision: What was decided + because: Why it was decided + alternatives: Other options that were considered + """ + if not self.verbose: + return + + print(f"\n{self._color('→', 'green')} {decision}", file=sys.stderr) + print(f" {self._color('Because:', 'dim')} {because}", file=sys.stderr) + + if alternatives: + alts = ", ".join(alternatives) + print(f" {self._color('Alternatives:', 'dim')} {alts}", file=sys.stderr) + + self._log_dialogue("explanation", decision, because) + + def confirm_understanding( + self, + statement: str, + understood_as: str, + ) -> bool: + """ + Confirm understanding of a statement. + + From the framework: "You test [inferred intent] by offering explicit options." + + Args: + statement: The original statement + understood_as: How it was interpreted + + Returns: + True if understanding confirmed, False otherwise + """ + print(f"\n{self._color('?', 'yellow')} Confirming understanding:", file=sys.stderr) + print(f" You said: {self._color(statement, 'cyan')}", file=sys.stderr) + print(f" I understood: {self._color(understood_as, 'green')}", file=sys.stderr) + print(file=sys.stderr) + + try: + response = input(f" {self._color('Is this correct?', 'bold')} [Y/n]: ").strip().lower() + confirmed = response in ("", "y", "yes") + self._log_dialogue("confirmation", statement, confirmed) + return confirmed + except EOFError: + return True + + def present_options( + self, + situation: str, + options: List[Tuple[str, str, str]], # (key, short_desc, long_desc) + ) -> None: + """ + Present options without asking for a choice (informational). + + Preserves agency by showing possibilities without forcing a decision. + + Args: + situation: The current situation + options: List of (key, short_desc, long_desc) tuples + """ + print(f"\n{self._color('ℹ', 'cyan')} {situation}", file=sys.stderr) + print(f" Possible approaches:", file=sys.stderr) + print(file=sys.stderr) + + for key, short_desc, long_desc in options: + print(f" • {self._color(short_desc, 'bold')}", file=sys.stderr) + if self.verbose and long_desc: + print(f" {self._color(long_desc, 'dim')}", file=sys.stderr) + + self._log_dialogue("options_presented", situation, [o[0] for o in options]) + + def acknowledge_clarification( + self, + what: str, + clarified_as: str, + ) -> None: + """ + Acknowledge a clarification from the user. + + Shows the user their clarification was received. + + Args: + what: What was clarified + clarified_as: The clarified value/intent + """ + print( + f"{self._color('✓', 'green')} Understood: {what} → {self._color(clarified_as, 'cyan')}", + file=sys.stderr, + ) + self._log_dialogue("acknowledgment", what, clarified_as) + + def _log_dialogue( + self, + dialogue_type: str, + subject: str, + response: Any, + ) -> None: + """Log a dialogue exchange.""" + self.dialogue_history.append({ + "type": dialogue_type, + "subject": subject, + "response": response, + }) + + def get_dialogue_history(self) -> List[Dict[str, Any]]: + """Get the full dialogue history.""" + return self.dialogue_history.copy() + + +class InteractiveClarifier: + """ + Combines ActiveListener and DialogueManager for interactive clarification. + + This is the main interface for making EigenScript an active participant + in the communication process. + + Example: + clarifier = InteractiveClarifier(interactive=True) + + # During execution, when ambiguity is detected: + result = clarifier.clarify_binding( + name="mode", + value="strict", + possible_meanings=[ + "Strict type checking", + "Strict parsing rules", + "Strict security settings", + ] + ) + # Prompts user and returns clarified meaning + """ + + def __init__( + self, + interactive: bool = False, + verbose: bool = False, + use_color: bool = True, + ): + """ + Initialize the InteractiveClarifier. + + Args: + interactive: If True, prompt user for clarification + verbose: If True, explain reasoning + use_color: Whether to use ANSI color codes + """ + self.interactive = interactive + self.listener = ActiveListener(interactive=interactive, use_color=use_color) + self.dialogue = DialogueManager(use_color=use_color, verbose=verbose) + + def clarify_binding( + self, + name: str, + value: Any, + possible_meanings: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Clarify the meaning/intent of a binding. + + Args: + name: Binding name + value: Current value + possible_meanings: List of possible interpretations + + Returns: + Dict with clarified meaning and confidence + """ + if not possible_meanings: + # No ambiguity if no alternatives + return {"meaning": str(value), "confidence": 1.0, "clarified": True} + + if self.interactive: + options = [(f"m{i}", m) for i, m in enumerate(possible_meanings)] + options.append(("other", "None of these / something else")) + + chosen = self.dialogue.ask_for_intent( + what=f"What does '{name}' = {value} mean?", + options=options, + context="Multiple interpretations are possible", + ) + + if chosen == "other": + # Ask for custom meaning + try: + custom = input(" Please specify: ") + self.dialogue.acknowledge_clarification(name, custom) + return {"meaning": custom, "confidence": 1.0, "clarified": True} + except EOFError: + return {"meaning": str(value), "confidence": 0.5, "clarified": False} + + idx = int(chosen[1:]) + meaning = possible_meanings[idx] + self.dialogue.acknowledge_clarification(name, meaning) + return {"meaning": meaning, "confidence": 1.0, "clarified": True} + + else: + # Non-interactive: return first meaning with low confidence + return { + "meaning": possible_meanings[0] if possible_meanings else str(value), + "confidence": 0.5, + "clarified": False, + "pending_options": possible_meanings, + } + + def clarify_operation( + self, + operation: str, + context: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Clarify how to handle an operation with potential issues. + + Args: + operation: Operation type (division, index, etc.) + context: Context about the operation + + Returns: + Dict with handling strategy + """ + ambiguity = self.listener.detect_ambiguity(operation, context) + + if not ambiguity: + return {"strategy": "proceed", "clarified": True} + + choice = self.listener.request_clarification(ambiguity) + + if self.interactive: + self.dialogue.acknowledge_clarification( + ambiguity["type"], + dict(ambiguity["options"]).get(choice, choice), + ) + + return { + "strategy": choice, + "clarified": self.interactive, + "ambiguity_type": ambiguity["type"], + } + + def should_pause(self, clarity_score: float, threshold: float = 0.95) -> bool: + """ + Determine if execution should pause for clarification. + + Args: + clarity_score: Current clarity score + threshold: Minimum acceptable clarity + + Returns: + True if should pause, False otherwise + """ + if not self.interactive: + return False + + return clarity_score < threshold + + def summarize_pending(self) -> str: + """ + Summarize pending clarifications. + + Returns: + Human-readable summary of what needs clarification + """ + pending = self.listener.get_pending_clarifications() + if not pending: + return "No pending clarifications." + + lines = [f"Pending clarifications ({len(pending)}):"] + for p in pending: + lines.append(f" • {p['message']}") + + return "\n".join(lines) diff --git a/tests/test_clarity.py b/tests/test_clarity.py index 2ca6f87..0fef522 100644 --- a/tests/test_clarity.py +++ b/tests/test_clarity.py @@ -29,6 +29,9 @@ ClarityExplainer, Assumption, detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, ) @@ -586,3 +589,334 @@ def test_state_with_assumptions(self): ) state = ClarityState(assumptions=[assumption]) assert state.has_assumptions() + + +class TestActiveListener: + """Tests for the ActiveListener class.""" + + def test_active_listener_creation(self): + """Test creating an ActiveListener.""" + listener = ActiveListener(interactive=False) + assert not listener.interactive + assert len(listener.clarification_log) == 0 + assert len(listener.pending_clarifications) == 0 + + def test_detect_division_ambiguity(self): + """Test detecting division by zero ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "division_safety" + assert "x" in ambiguity["message"] + + def test_detect_index_ambiguity(self): + """Test detecting index bounds ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "index", + { + "index": 10, + "index_name": "i", + "list_name": "items", + "proven_in_bounds": False, + }, + ) + assert ambiguity is not None + assert ambiguity["type"] == "index_safety" + assert "i" in ambiguity["message"] + assert "items" in ambiguity["message"] + + def test_detect_coercion_ambiguity(self): + """Test detecting type coercion ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "coercion", + {"from_type": "string", "to_type": "number"}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "type_coercion" + + def test_detect_null_access_ambiguity(self): + """Test detecting null access ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "null_access", + {"name": "user"}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "null_safety" + + def test_detect_intent_ambiguity(self): + """Test detecting semantic ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "intent", + { + "name": "mode", + "meanings": ["strict parsing", "strict security", "strict types"], + }, + ) + assert ambiguity is not None + assert ambiguity["type"] == "semantic_ambiguity" + + def test_no_ambiguity_when_proven(self): + """Test no ambiguity detected when precondition is proven.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 5, "divisor_name": "x", "proven_non_zero": True}, + ) + assert ambiguity is None + + def test_non_interactive_returns_default(self): + """Test non-interactive mode returns first option.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + choice = listener.request_clarification(ambiguity) + # In non-interactive mode, should return first option + assert choice == "proceed" + + def test_pending_clarifications(self): + """Test pending clarifications are recorded.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + listener.request_clarification(ambiguity) + pending = listener.get_pending_clarifications() + assert len(pending) == 1 + + +class TestDialogueManager: + """Tests for the DialogueManager class.""" + + def test_dialogue_manager_creation(self): + """Test creating a DialogueManager.""" + dm = DialogueManager(use_color=False, verbose=False) + assert not dm.verbose + assert len(dm.dialogue_history) == 0 + + def test_dialogue_history_logging(self): + """Test dialogue history is logged.""" + dm = DialogueManager(use_color=False, verbose=True) + dm.explain_reasoning( + decision="Using strict mode", + because="User specified strict", + alternatives=["lenient", "auto"], + ) + history = dm.get_dialogue_history() + assert len(history) == 1 + assert history[0]["type"] == "explanation" + + def test_verbose_mode_outputs(self, capsys): + """Test verbose mode produces output.""" + dm = DialogueManager(use_color=False, verbose=True) + dm.explain_reasoning( + decision="Testing output", + because="This is a test", + ) + captured = capsys.readouterr() + assert "Testing output" in captured.err + assert "This is a test" in captured.err + + def test_non_verbose_no_output(self, capsys): + """Test non-verbose mode produces no output for explanations.""" + dm = DialogueManager(use_color=False, verbose=False) + dm.explain_reasoning( + decision="Testing output", + because="This is a test", + ) + captured = capsys.readouterr() + assert "Testing output" not in captured.err + + +class TestInteractiveClarifier: + """Tests for the InteractiveClarifier class.""" + + def test_interactive_clarifier_creation(self): + """Test creating an InteractiveClarifier.""" + clarifier = InteractiveClarifier(interactive=False, verbose=False) + assert not clarifier.interactive + assert clarifier.listener is not None + assert clarifier.dialogue is not None + + def test_clarify_binding_no_options(self): + """Test clarifying binding with no ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_binding( + name="x", + value=10, + possible_meanings=None, + ) + assert result["clarified"] is True + assert result["confidence"] == 1.0 + + def test_clarify_binding_with_options_non_interactive(self): + """Test clarifying binding with options in non-interactive mode.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_binding( + name="mode", + value="strict", + possible_meanings=["strict types", "strict parsing", "strict security"], + ) + # Non-interactive: returns first option with low confidence + assert result["clarified"] is False + assert result["confidence"] == 0.5 + assert "pending_options" in result + + def test_clarify_operation_no_ambiguity(self): + """Test clarifying operation with no ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_operation( + "unknown_operation", + {}, + ) + assert result["strategy"] == "proceed" + assert result["clarified"] is True + + def test_clarify_operation_with_ambiguity(self): + """Test clarifying operation with ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_operation( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + assert result["strategy"] == "proceed" # Default in non-interactive + assert result["clarified"] is False + assert result["ambiguity_type"] == "division_safety" + + def test_should_pause_non_interactive(self): + """Test should_pause returns False in non-interactive mode.""" + clarifier = InteractiveClarifier(interactive=False) + assert not clarifier.should_pause(0.5) + + def test_summarize_pending_empty(self): + """Test summarizing when no pending clarifications.""" + clarifier = InteractiveClarifier(interactive=False) + summary = clarifier.summarize_pending() + assert "No pending" in summary + + def test_summarize_pending_with_items(self): + """Test summarizing pending clarifications.""" + clarifier = InteractiveClarifier(interactive=False) + # Generate some pending clarifications + clarifier.clarify_operation( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + summary = clarifier.summarize_pending() + assert "Pending" in summary + assert "1" in summary + + +class TestInterpreterActiveListening: + """Tests for interpreter with active listening mode.""" + + def test_interpreter_with_active_listening(self): + """Test creating interpreter with active listening enabled.""" + interp = Interpreter(active_listening=True) + assert interp.active_listening is True + assert interp.active_listener is not None + assert interp.interactive_clarifier is not None + + def test_interpreter_without_active_listening(self): + """Test interpreter without active listening.""" + interp = Interpreter(active_listening=False) + assert interp.active_listening is False + assert interp.active_listener is None + + def test_interpreter_with_interactive_mode(self): + """Test creating interpreter with interactive mode.""" + interp = Interpreter(interactive_mode=True) + assert interp.interactive_mode is True + assert interp.interactive_clarifier is not None + + def test_division_with_active_listening_non_zero(self): + """Test division with active listening when divisor is non-zero.""" + interp = Interpreter(active_listening=True) + code = """ + x is 10 + y is 2 + x / y + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 5.0 # Normal division works + + def test_format_agency_error_undefined(self): + """Test agency-preserving error format for undefined variable.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "undefined_variable", + {"name": "foo", "suggestions": ["food", "foot"]}, + ) + assert "foo" in msg + assert "Possible intentions" in msg + assert "food" in msg + + def test_format_agency_error_type_mismatch(self): + """Test agency-preserving error format for type mismatch.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "type_mismatch", + {"expected": "number", "got": "string", "operation": "addition"}, + ) + assert "Type question" in msg + assert "number" in msg + assert "string" in msg + + def test_format_agency_error_function_not_found(self): + """Test agency-preserving error format for function not found.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "function_not_found", + {"name": "calculate"}, + ) + assert "calculate" in msg + assert "Possible intentions" in msg + + +class TestClarityWithActiveListening: + """Tests combining clarity framework with active listening.""" + + def test_clarify_with_active_listening(self): + """Test clarify of works with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x might is 10 + clarify of x + clarified + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 1.0 + + def test_ambiguous_with_active_listening(self): + """Test ambiguous predicate with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x might is 10 + ambiguous + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 1.0 + + def test_clarity_score_with_active_listening(self): + """Test clarity score with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x is 10 + y might is 20 + clarity + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 0.5 # 1 out of 2 clarified From ff7a3138db4996f99df04454654fca9b17c9d1fe Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Dec 2025 23:10:21 +0000 Subject: [PATCH 3/3] Format code with black --- src/eigenscript/evaluator/interpreter.py | 31 ++++---- src/eigenscript/runtime/clarity.py | 92 ++++++++++++++++-------- tests/test_clarity.py | 4 +- 3 files changed, 85 insertions(+), 42 deletions(-) diff --git a/src/eigenscript/evaluator/interpreter.py b/src/eigenscript/evaluator/interpreter.py index 0fa2d41..026d070 100644 --- a/src/eigenscript/evaluator/interpreter.py +++ b/src/eigenscript/evaluator/interpreter.py @@ -313,14 +313,22 @@ def __init__( # Interactive clarification mode (Active Listener/Speaker) self.interactive_mode = interactive_mode self.active_listening = active_listening - self.interactive_clarifier = InteractiveClarifier( - interactive=interactive_mode, - verbose=explain_mode, - ) if interactive_mode or active_listening else None - self.active_listener = ActiveListener( - interactive=interactive_mode, - strict=False, - ) if active_listening else None + self.interactive_clarifier = ( + InteractiveClarifier( + interactive=interactive_mode, + verbose=explain_mode, + ) + if interactive_mode or active_listening + else None + ) + self.active_listener = ( + ActiveListener( + interactive=interactive_mode, + strict=False, + ) + if active_listening + else None + ) # Special lightlike OF vector self._of_vector = self._create_of_vector() @@ -579,7 +587,8 @@ def _eval_relation(self, node: Relation) -> LRVMVector: ) if result.get("clarified"): self.interactive_clarifier.dialogue.acknowledge_clarification( - binding_name, result.get("meaning", str(current_value)) + binding_name, + result.get("meaning", str(current_value)), ) else: # No hypothesis - just acknowledge @@ -1825,9 +1834,7 @@ def _eval_interrogative(self, node: Interrogative) -> LRVMVector: if assumptions: # Return a list of assumption descriptions - assumption_strs = [ - f"{a.name}: {a.context}" for a in assumptions - ] + assumption_strs = [f"{a.name}: {a.context}" for a in assumptions] # Join as a multi-line string for readable output result_str = "\n".join(assumption_strs) return self.space.embed_string(result_str) diff --git a/src/eigenscript/runtime/clarity.py b/src/eigenscript/runtime/clarity.py index 9fb1fd7..f0ea013 100644 --- a/src/eigenscript/runtime/clarity.py +++ b/src/eigenscript/runtime/clarity.py @@ -157,7 +157,9 @@ def register_binding( clarity_type=clarity_type, assumptions=assumptions or [], hypothesis=hypothesis, - verified_at=self.iteration if clarity_type == ClarityType.CLARIFIED else None, + verified_at=( + self.iteration if clarity_type == ClarityType.CLARIFIED else None + ), ) self.bindings[name] = state @@ -263,11 +265,7 @@ def get_unresolved_bindings(self) -> List[str]: Returns: List of binding names that need clarification """ - return [ - name - for name, state in self.bindings.items() - if not state.is_clear() - ] + return [name for name, state in self.bindings.items() if not state.is_clear()] def compute_clarity_score(self) -> float: """ @@ -439,7 +437,10 @@ def explain_ambiguous( if state.assumptions: print(f" └─ detected assumptions:", file=sys.stderr) for assumption in state.assumptions[:3]: - print(f" • {assumption.name}: {assumption.context}", file=sys.stderr) + print( + f" • {assumption.name}: {assumption.context}", + file=sys.stderr, + ) if interpretations: print(f" └─ possible interpretations:", file=sys.stderr) @@ -511,7 +512,7 @@ def explain_tentative( print(f" └─ binding: {self._color(binding, 'cyan')}", file=sys.stderr) if hypothesis: - print(f" └─ hypothesis: \"{hypothesis}\"", file=sys.stderr) + print(f' └─ hypothesis: "{hypothesis}"', file=sys.stderr) print( f" └─ {self._color('awaiting verification', 'yellow')}", file=sys.stderr, @@ -876,7 +877,10 @@ def request_clarification( if self.interactive: # Print the clarification request - print(f"\n{self._color('⚡ Clarification needed:', 'yellow')}", file=sys.stderr) + print( + f"\n{self._color('⚡ Clarification needed:', 'yellow')}", + file=sys.stderr, + ) print(f" {message}", file=sys.stderr) print(file=sys.stderr) @@ -888,15 +892,23 @@ def request_clarification( # Get user input while True: try: - choice = input(f" {self._color('Which approach?', 'bold')} [1-{len(options)}]: ") + choice = input( + f" {self._color('Which approach?', 'bold')} [1-{len(options)}]: " + ) choice_idx = int(choice) - 1 if 0 <= choice_idx < len(options): chosen_key = options[choice_idx][0] self._log_clarification(ambiguity, chosen_key) return chosen_key - print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) except ValueError: - print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) except EOFError: # Non-interactive fallback return options[0][0] @@ -911,12 +923,14 @@ def _log_clarification( choice: str, ) -> None: """Log a clarification for future reference.""" - self.clarification_log.append({ - "ambiguity_type": ambiguity["type"], - "message": ambiguity["message"], - "choice": choice, - "context": ambiguity.get("context", {}), - }) + self.clarification_log.append( + { + "ambiguity_type": ambiguity["type"], + "message": ambiguity["message"], + "choice": choice, + "context": ambiguity.get("context", {}), + } + ) def get_pending_clarifications(self) -> List[Dict[str, Any]]: """Get list of pending clarifications that weren't resolved interactively.""" @@ -1027,15 +1041,23 @@ def ask_for_intent( while True: try: - choice = input(f" {self._color('Your choice', 'bold')} [1-{len(options)}]: ") + choice = input( + f" {self._color('Your choice', 'bold')} [1-{len(options)}]: " + ) choice_idx = int(choice) - 1 if 0 <= choice_idx < len(options): chosen_key = options[choice_idx][0] self._log_dialogue("intent_request", what, chosen_key) return chosen_key - print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) except ValueError: - print(f" Please enter a number between 1 and {len(options)}", file=sys.stderr) + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) except EOFError: return options[0][0] @@ -1084,13 +1106,19 @@ def confirm_understanding( Returns: True if understanding confirmed, False otherwise """ - print(f"\n{self._color('?', 'yellow')} Confirming understanding:", file=sys.stderr) + print( + f"\n{self._color('?', 'yellow')} Confirming understanding:", file=sys.stderr + ) print(f" You said: {self._color(statement, 'cyan')}", file=sys.stderr) print(f" I understood: {self._color(understood_as, 'green')}", file=sys.stderr) print(file=sys.stderr) try: - response = input(f" {self._color('Is this correct?', 'bold')} [Y/n]: ").strip().lower() + response = ( + input(f" {self._color('Is this correct?', 'bold')} [Y/n]: ") + .strip() + .lower() + ) confirmed = response in ("", "y", "yes") self._log_dialogue("confirmation", statement, confirmed) return confirmed @@ -1149,11 +1177,13 @@ def _log_dialogue( response: Any, ) -> None: """Log a dialogue exchange.""" - self.dialogue_history.append({ - "type": dialogue_type, - "subject": subject, - "response": response, - }) + self.dialogue_history.append( + { + "type": dialogue_type, + "subject": subject, + "response": response, + } + ) def get_dialogue_history(self) -> List[Dict[str, Any]]: """Get the full dialogue history.""" @@ -1239,7 +1269,11 @@ def clarify_binding( self.dialogue.acknowledge_clarification(name, custom) return {"meaning": custom, "confidence": 1.0, "clarified": True} except EOFError: - return {"meaning": str(value), "confidence": 0.5, "clarified": False} + return { + "meaning": str(value), + "confidence": 0.5, + "clarified": False, + } idx = int(chosen[1:]) meaning = possible_meanings[idx] diff --git a/tests/test_clarity.py b/tests/test_clarity.py index 0fef522..37953cc 100644 --- a/tests/test_clarity.py +++ b/tests/test_clarity.py @@ -63,7 +63,9 @@ class TestClarityTracker: def test_initial_state(self, clarity_tracker): """Test tracker starts with no bindings.""" assert len(clarity_tracker.bindings) == 0 - assert clarity_tracker.compute_clarity_score() == 1.0 # No bindings = trivially clear + assert ( + clarity_tracker.compute_clarity_score() == 1.0 + ) # No bindings = trivially clear def test_register_clarified_binding(self, clarity_tracker): """Test registering a clarified binding."""