diff --git a/docs/api/predicates.md b/docs/api/predicates.md index 45d449e..a801b21 100644 --- a/docs/api/predicates.md +++ b/docs/api/predicates.md @@ -428,6 +428,341 @@ loop while improving: --- +--- + +# Communication Clarity Predicates + +The Communication Clarity Framework extends EigenScript with predicates that check the *clarity of intent*, not just computational state. These predicates implement hypothesis-driven communication semantics where code debugs itself by refusing to execute on unverified assumptions. + +--- + +## clarified + +Check if all bindings have verified intent. + +**Syntax:** +```eigenscript +if clarified: + # All intents verified, safe to proceed +``` + +**Returns:** `true` if clarity score meets threshold, `false` otherwise + +**Example:** +```eigenscript +config might is "production" # Tentative: hypothesis +database might is "postgres" # Tentative: hypothesis + +# Won't proceed until intent is clarified +if not clarified: + print of "Please clarify intent before proceeding" + clarify of config + clarify of database + +if clarified: + # Safe to execute critical operations + initialize_system of config +``` + +**Semantics:** Measures the ratio of clarified bindings to total bindings. Returns true when clarity score >= threshold (default 0.95). + +--- + +## ambiguous + +Check if there are any unresolved bindings. + +**Syntax:** +```eigenscript +if ambiguous: + # Some bindings need clarification +``` + +**Returns:** `true` if any bindings are tentative/unresolved, `false` otherwise + +**Example:** +```eigenscript +user_input might is value + +if ambiguous: + print of "Warning: ambiguous state detected" + # Force disambiguation + clarify of user_input +``` + +**Semantics:** Returns true if any bindings have not been clarified. + +--- + +## explicit + +Check if all intents are fully explicit (opposite of ambiguous). + +**Syntax:** +```eigenscript +if explicit: + # All intents are explicit +``` + +**Returns:** `true` if clarity score is 100%, `false` otherwise + +**Example:** +```eigenscript +x is 10 # Clarified (normal assignment) +y is 20 # Clarified (normal assignment) + +if explicit: + print of "All intents are explicit" +``` + +**Semantics:** Returns true only when clarity score is exactly 1.0. + +--- + +## clarity / cs + +Get the current clarity score as a numeric value. + +**Syntax:** +```eigenscript +score is clarity +# or +score is cs +``` + +**Returns:** Float between 0.0 (fully ambiguous) and 1.0 (fully clarified) + +**Example:** +```eigenscript +a might is 1 # Tentative +b is 2 # Clarified +c might is 3 # Tentative + +score is clarity +print of score # Prints 0.333... (1/3 clarified) + +clarify of a +clarify of c +print of clarity # Prints 1.0 (all clarified) +``` + +**Semantics:** Computes ratio of clarified bindings to total bindings. + +--- + +## assumed + +Check if there are any unverified assumptions (hidden variables). + +**Syntax:** +```eigenscript +if assumed: + # Hidden variables detected +``` + +**Returns:** `true` if any assumptions exist, `false` otherwise + +**Example:** +```eigenscript +# The clarity tracker can detect implicit assumptions +if assumed: + print of "Warning: hidden variables detected" +``` + +**Semantics:** Returns true if any assumptions have been registered. + +--- + +# Clarity Operators + +## might is + +Create a tentative (hypothesis) binding. + +**Syntax:** +```eigenscript +identifier might is expression +``` + +**Example:** +```eigenscript +mode might is "strict" # Hypothesis: we want strict mode +count might is 0 # Hypothesis: count starts at zero + +# The bindings exist but are marked as tentative +print of mode # Prints "strict" +print of clarified # Prints 0.0 (not clarified) +``` + +**Semantics:** Creates a binding marked as TENTATIVE. The `clarified` predicate returns false until all tentative bindings are clarified. + +--- + +## clarify of + +Clarify a tentative binding, changing it from TENTATIVE to CLARIFIED. + +**Syntax:** +```eigenscript +clarify of identifier +``` + +**Returns:** `1.0` on success, `0.0` on failure + +**Example:** +```eigenscript +mode might is "strict" +print of clarified # Prints 0.0 + +clarify of mode +print of clarified # Prints 1.0 +``` + +**Semantics:** Marks the binding as clarified in the clarity tracker. + +--- + +## assumes is + +Query hidden variables/assumptions for a binding. + +**Syntax:** +```eigenscript +assumptions is assumes is identifier +``` + +**Returns:** String listing assumptions, or empty string if none + +**Example:** +```eigenscript +x is 10 +assumptions is assumes is x +print of assumptions # Prints "" (no assumptions) +``` + +**Semantics:** Returns any assumptions detected for the binding. + +--- + +# Complete Examples + +### Self-Debugging Configuration + +```eigenscript +# Configuration with hypothesis-driven clarity +db_host might is "localhost" +db_port might is 5432 +db_name might is "production" + +# The program refuses to proceed until intent is clear +if not clarified: + print of "Configuration requires clarification:" + print of clarity + +# Explicit verification of each hypothesis +clarify of db_host +clarify of db_port +clarify of db_name + +if clarified: + print of "Configuration verified. Proceeding..." + connect_database of db_host +``` + +### Clarity-Driven Loop + +```eigenscript +define safe_process as: + config might is input + + # Loop until all ambiguity resolved + loop while not clarified: + print of "Please confirm configuration" + clarify of config + + # Now safe to proceed + return execute of config + +result is safe_process of user_config +``` + +### Combining Clarity and Convergence + +```eigenscript +# Hybrid predicates: both semantic AND communicative clarity +define robust_algorithm as: + params might is initial + x is start + + # Verify intent before heavy computation + if not clarified: + clarify of params + + loop while not converged: + if diverging: + print of "Warning: diverging" + break + x is iterate of x + + return x + +result is robust_algorithm of problem +``` + +### Agency-Preserving Design + +```eigenscript +# The framework presents options without deciding for the user +mode might is unknown + +# Instead of guessing, the program asks for clarification +if ambiguous: + print of "Possible modes:" + print of "1. strict - Full validation" + print of "2. relaxed - Permissive parsing" + print of "3. auto - Detect from input" + print of "Which mode is intended?" + + # User must clarify before proceeding + mode is input of "Enter mode: " + clarify of mode + +if clarified: + process_with_mode of mode +``` + +--- + +## Geometric Interpretation + +Clarity predicates measure communication properties: + +| Predicate | Communication Property | +|-----------|----------------------| +| `clarified` | All intents verified | +| `ambiguous` | Hidden variables exist | +| `explicit` | 100% clarity score | +| `clarity`/`cs` | Ratio of clarified bindings | +| `assumed` | Unverified assumptions present | + +The `might is` operator creates a hypothesis binding (lightlike). +The `clarify of` operator transforms it to clarified (timelike). + +--- + +## The Framework Principles + +The clarity predicates implement these principles: + +1. **Questions and statements both assign objectives** - Creating a binding assigns intent-verification work +2. **Implication is not universal** - `might is` makes hidden variables visible +3. **Clarification is hypothesis testing** - Bindings start as hypotheses, become facts +4. **Withhold inference intentionally** - Don't collapse ambiguity silently +5. **Control remains with the speaker** - Present options, don't decide for user +6. **Speakers discover their own intent** - `might is` forces intent externalization + +--- + ## Summary Semantic predicates: @@ -439,9 +774,116 @@ Semantic predicates: - `oscillating` - Periodic behavior - `equilibrium` - At balance point -**Total: 6 predicates** +Communication clarity predicates: + +- `clarified` - All intents verified +- `ambiguous` - Unresolved bindings exist +- `explicit` - 100% clarity +- `clarity`/`cs` - Clarity score (0.0-1.0) +- `assumed` - Hidden variables detected + +Clarity operators: + +- `might is` - Tentative (hypothesis) binding +- `clarify of` - Verify intent +- `assumes is` - Query hidden variables + +**Total: 11 predicates + 3 operators** + +These enable self-debugging code that refuses to execute on unverified assumptions. + +--- + +## Active Listener Mode + +EigenScript can act as an **active listener** that detects ambiguity and seeks clarification before proceeding. This implements the communication framework principle: "You treat inferred intent as a hypothesis, not a fact." + +### Enabling Active Listening + +```python +from eigenscript.evaluator import Interpreter + +# Enable active listening mode +interp = Interpreter( + active_listening=True, # Detect ambiguity in operations + interactive_mode=True, # Prompt for clarification +) +``` + +### What Active Listening Detects + +| Operation | Detected Ambiguity | Options Presented | +|-----------|-------------------|-------------------| +| Division | Potential divide by zero | Proceed, Add check, Return default | +| Indexing | Potential out of bounds | Proceed, Add check, Clamp, Wrap | +| Type coercion | Type mismatch | Strict, Lenient, Explicit | +| Null access | Potential null value | Proceed, Add check, Return default, Optional chain | + +### Example: Interactive Division + +```eigenscript +x is 10 +y is get_input of "Enter divisor: " # User enters 0 + +result is x / y +# In interactive mode: +# > Clarification needed: +# > The divisor 'y' might be zero. +# > 1. Proceed (assuming y ≠ 0) +# > 2. Add explicit check +# > 3. Return default value if zero +# > Which approach? _ +``` + +### Example: Interactive Index Access + +```eigenscript +items is [1, 2, 3] +i is get_input of "Enter index: " # User enters 5 + +value is items[i] +# In interactive mode: +# > Clarification needed: +# > The index 'i' might be out of bounds for 'items'. +# > 1. Proceed (assuming valid index) +# > 2. Add bounds check +# > 3. Clamp to valid range +# > 4. Wrap around (modulo length) +# > Which approach? _ +``` + +### Non-Interactive Mode + +When `interactive_mode=False` but `active_listening=True`, ambiguities are detected and logged but the default option (proceed) is chosen. Use `summarize_pending()` to see what clarifications were needed: + +```python +interp = Interpreter(active_listening=True, interactive_mode=False) +result = interp.run(code) + +# Check what clarifications were pending +summary = interp.interactive_clarifier.summarize_pending() +print(summary) +# Pending clarifications (2): +# • The divisor 'x' might be zero. +# • The index 'i' might be out of bounds for 'items'. +``` + +--- + +## Agency-Preserving Errors + +When errors occur, EigenScript presents options rather than dictating what went wrong: + +``` +Variable 'foo' not found in scope. +Possible intentions: + • Did you mean 'food'? + • Did you mean 'foot'? + • Define it first with: foo is + • Or use 'might is' for tentative binding +``` -These enable automatic convergence detection and adaptive algorithms. +This preserves the programmer's agency by presenting possibilities rather than assuming intent. --- diff --git a/src/eigenscript/evaluator/interpreter.py b/src/eigenscript/evaluator/interpreter.py index 1c0d820..026d070 100644 --- a/src/eigenscript/evaluator/interpreter.py +++ b/src/eigenscript/evaluator/interpreter.py @@ -7,12 +7,13 @@ import numpy as np import os -from typing import Dict, Optional, List, Union, Callable +from typing import Dict, Optional, List, Union, Callable, Any from dataclasses import dataclass from eigenscript.parser.ast_builder import ( ASTNode, Program, Assignment, + TentativeAssignment, Relation, BinaryOp, UnaryOp, @@ -37,6 +38,16 @@ from eigenscript.semantic.metric import MetricTensor from eigenscript.runtime.framework_strength import FrameworkStrengthTracker from eigenscript.runtime.explain import PredicateExplainer +from eigenscript.runtime.clarity import ( + ClarityTracker, + ClarityExplainer, + ClarityType, + Assumption, + detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, +) from eigenscript.builtins import BuiltinFunction, get_builtins # Type alias for values that can flow through the interpreter @@ -251,6 +262,9 @@ def __init__( convergence_threshold: float = 0.95, enable_convergence_detection: bool = True, explain_mode: bool = False, + clarity_threshold: float = 0.95, + interactive_mode: bool = False, + active_listening: bool = False, ): """ Initialize the interpreter. @@ -262,6 +276,9 @@ def __init__( convergence_threshold: FS threshold for eigenstate detection (default: 0.95) enable_convergence_detection: Enable automatic convergence detection (default: True) explain_mode: Enable explain mode for predicate evaluations (default: False) + clarity_threshold: Clarity score threshold for the `clarified` predicate (default: 0.95) + interactive_mode: Enable interactive clarification prompts (default: False) + active_listening: Enable automatic ambiguity detection in operations (default: False) """ # Geometric components self.space = LRVMSpace(dimension=dimension) @@ -288,6 +305,31 @@ def __init__( # Explain mode for predicate evaluations self.explainer = PredicateExplainer(enabled=explain_mode) + # Communication Clarity Framework + self.clarity_tracker = ClarityTracker() + self.clarity_explainer = ClarityExplainer(enabled=explain_mode) + self.clarity_threshold = clarity_threshold + + # Interactive clarification mode (Active Listener/Speaker) + self.interactive_mode = interactive_mode + self.active_listening = active_listening + self.interactive_clarifier = ( + InteractiveClarifier( + interactive=interactive_mode, + verbose=explain_mode, + ) + if interactive_mode or active_listening + else None + ) + self.active_listener = ( + ActiveListener( + interactive=interactive_mode, + strict=False, + ) + if active_listening + else None + ) + # Special lightlike OF vector self._of_vector = self._create_of_vector() @@ -403,6 +445,8 @@ def evaluate(self, node: ASTNode) -> Union[LRVMVector, EigenList]: return self._eval_program(node) elif isinstance(node, Assignment): return self._eval_assignment(node) + elif isinstance(node, TentativeAssignment): + return self._eval_tentative_assignment(node) elif isinstance(node, Relation): return self._eval_relation(node) elif isinstance(node, BinaryOp): @@ -468,6 +512,39 @@ def _eval_assignment(self, node: Assignment) -> LRVMVector: # Bind in environment (handles both vectors and lists) self.environment.bind(node.identifier, value) + # Register as clarified binding in clarity tracker + self.clarity_tracker.register_binding(node.identifier, tentative=False) + + # Return the value as-is (may be EigenList or LRVMVector) + return value + + def _eval_tentative_assignment(self, node: TentativeAssignment) -> LRVMVector: + """ + Evaluate MIGHT IS operator: x might is y + + Semantic: Tentative binding (hypothesis) in LRVM space. + + From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized before execution + - Prevents reinforcing the false belief that implication was sufficient + + The binding is created but marked as TENTATIVE in the clarity tracker. + The `clarified` predicate will return False until clarify() is called. + """ + # Evaluate right-hand side + value = self.evaluate(node.expression) + + # Bind in environment (handles both vectors and lists) + self.environment.bind(node.identifier, value) + + # Register as tentative binding in clarity tracker + self.clarity_tracker.register_binding( + node.identifier, + tentative=True, + hypothesis=node.hypothesis, + ) + # Return the value as-is (may be EigenList or LRVMVector) return value @@ -477,6 +554,57 @@ def _eval_relation(self, node: Relation) -> LRVMVector: Semantic: Metric contraction x^T g y OR function application """ + # Special handling for clarity framework: clarify of x + # This clarifies a tentative binding, changing it from TENTATIVE to CLARIFIED + if isinstance(node.left, Identifier) and node.left.name.lower() == "clarify": + if isinstance(node.right, Identifier): + binding_name = node.right.name + + # Interactive clarification mode + if self.interactive_mode and self.interactive_clarifier: + # Get the current value for display + try: + current_value = self.environment.lookup(binding_name) + # Get clarity state if available + clarity_state = self.clarity_tracker.get_clarity(binding_name) + + if clarity_state and clarity_state.hypothesis: + # Show the hypothesis and ask for confirmation + confirmed = self.interactive_clarifier.dialogue.confirm_understanding( + statement=f"{binding_name} might is {current_value}", + understood_as=clarity_state.hypothesis, + ) + if not confirmed: + # User didn't confirm - ask what they meant + result = self.interactive_clarifier.clarify_binding( + name=binding_name, + value=current_value, + possible_meanings=[ + f"Keep as '{current_value}'", + "Change the value", + "Remove this binding", + ], + ) + if result.get("clarified"): + self.interactive_clarifier.dialogue.acknowledge_clarification( + binding_name, + result.get("meaning", str(current_value)), + ) + else: + # No hypothesis - just acknowledge + self.interactive_clarifier.dialogue.acknowledge_clarification( + binding_name, f"value = {current_value}" + ) + except NameError: + # Binding doesn't exist yet - that's ok + pass + + success = self.clarity_tracker.clarify(binding_name) + return self.space.embed_scalar(1.0 if success else 0.0) + else: + # Can only clarify identifiers + raise RuntimeError("clarify requires an identifier (clarify of x)") + # Special handling for function calls: # If left side is an identifier, check if it's a function before evaluating if isinstance(node.left, Identifier): @@ -604,6 +732,34 @@ def _eval_binary_op(self, node: "BinaryOp") -> LRVMVector: # Project through inverse scaling assert isinstance(left, LRVMVector) and isinstance(right, LRVMVector) scalar = right.coords[0] + + # Active listening: detect potential division by zero + if self.active_listener and abs(scalar) < 1e-10: + # Ambiguity detected - seek clarification + divisor_name = ( + node.right.name if isinstance(node.right, Identifier) else "divisor" + ) + result = self.interactive_clarifier.clarify_operation( + "division", + { + "divisor": scalar, + "divisor_name": divisor_name, + "proven_non_zero": False, + }, + ) + strategy = result.get("strategy", "proceed") + + if strategy == "default": + # Return a default value (zero) instead of erroring + return self.space.zero_vector() + elif strategy == "check": + # User wants explicit check - raise but with helpful message + raise RuntimeError( + f"Division by zero: '{divisor_name}' is zero.\n" + f"Consider adding: if {divisor_name} != 0: ..." + ) + # strategy == "proceed" falls through to normal error + if abs(scalar) < 1e-10: raise RuntimeError("Division by zero (equilibrium singularity)") return left.scale(1.0 / scalar) @@ -1071,6 +1227,46 @@ def _eval_index(self, node: Index) -> Value: if isinstance(indexed_value, EigenList): # Check bounds if index < 0 or index >= len(indexed_value.elements): + # Active listening: detect out-of-bounds access + if self.active_listener: + index_name = ( + node.index_expr.name + if isinstance(node.index_expr, Identifier) + else str(index) + ) + list_name = ( + node.list_expr.name + if isinstance(node.list_expr, Identifier) + else "list" + ) + result = self.interactive_clarifier.clarify_operation( + "index", + { + "index": index, + "index_name": index_name, + "list_name": list_name, + "list_length": len(indexed_value.elements), + "proven_in_bounds": False, + }, + ) + strategy = result.get("strategy", "proceed") + + if strategy == "clamp": + # Clamp to valid range + clamped = max(0, min(index, len(indexed_value.elements) - 1)) + return indexed_value.elements[clamped] + elif strategy == "wrap": + # Wrap around using modulo + wrapped = index % len(indexed_value.elements) + return indexed_value.elements[wrapped] + elif strategy == "check": + # User wants explicit check - raise with helpful message + raise IndexError( + f"List index {index} out of range (list has {len(indexed_value.elements)} elements).\n" + f"Consider adding: if {index_name} < len of {list_name}: ..." + ) + # strategy == "proceed" falls through to normal error + raise IndexError( f"List index {index} out of range (list has {len(indexed_value.elements)} elements)" ) @@ -1392,6 +1588,52 @@ def _eval_identifier( result = 0.0 return self.space.embed_scalar(result) + # Communication Clarity Framework predicates + elif name_lower == "clarified": + # CLARIFIED: Check if all bindings have verified intent + # From the framework: "clarity is required before execution" + clarity_score = self.clarity_tracker.compute_clarity_score() + result = 1.0 if clarity_score >= self.clarity_threshold else 0.0 + unresolved = self.clarity_tracker.get_unresolved_bindings() + self.clarity_explainer.explain_clarified( + result=result > 0.5, + clarity_score=clarity_score, + threshold=self.clarity_threshold, + unresolved=unresolved, + ) + return self.space.embed_scalar(result) + + elif name_lower == "ambiguous": + # AMBIGUOUS: Check if there are any ambiguous bindings + # From the framework: "implication is not universal" + unresolved = self.clarity_tracker.get_unresolved_bindings() + result = 1.0 if len(unresolved) > 0 else 0.0 + return self.space.embed_scalar(result) + + elif name_lower == "assumed": + # ASSUMED: Check if there are any unverified assumptions + # From the framework: "hidden variables cause failure" + assumptions = self.clarity_tracker.get_assumptions() + result = 1.0 if len(assumptions) > 0 else 0.0 + self.clarity_explainer.explain_assumed( + result=result > 0.5, + assumptions=assumptions, + ) + return self.space.embed_scalar(result) + + elif name_lower == "explicit": + # EXPLICIT: Opposite of ambiguous - all intents are clarified + # From the framework: "making hidden variables visible" + clarity_score = self.clarity_tracker.compute_clarity_score() + result = 1.0 if clarity_score >= 1.0 else 0.0 + return self.space.embed_scalar(result) + + elif name_lower == "clarity" or name_lower == "cs": + # CLARITY / CS: Return current clarity score as numeric value + # Analogous to framework_strength / fs + clarity_score = self.clarity_tracker.compute_clarity_score() + return self.space.embed_scalar(clarity_score) + # Regular variable lookup return self.environment.lookup(node.name) @@ -1579,6 +1821,37 @@ def _eval_interrogative(self, node: Interrogative) -> LRVMVector: # Not enough history return self.space.embed_string("unknown") + elif interrogative == "assumes": + # ASSUMES: Query hidden variables/assumptions for a binding + # From the Communication Clarity Framework: + # "Hidden variables cause failure, not misunderstanding" + # + # Returns a list of assumptions that have been detected for the binding, + # or an empty list if the binding is fully clarified. + if isinstance(node.expression, Identifier): + binding_name = node.expression.name + assumptions = self.clarity_tracker.get_assumptions(binding_name) + + if assumptions: + # Return a list of assumption descriptions + assumption_strs = [f"{a.name}: {a.context}" for a in assumptions] + # Join as a multi-line string for readable output + result_str = "\n".join(assumption_strs) + return self.space.embed_string(result_str) + else: + # No assumptions - binding is clear + return self.space.embed_string("") + else: + # Query global assumptions if not a specific binding + global_assumptions = self.clarity_tracker.get_assumptions() + if global_assumptions: + assumption_strs = [ + f"{a.name}: {a.context}" for a in global_assumptions + ] + return self.space.embed_string("\n".join(assumption_strs)) + else: + return self.space.embed_string("") + else: raise RuntimeError(f"Unknown interrogative: {interrogative}") @@ -1744,6 +2017,100 @@ def _is_of_vector(self, vector: LRVMVector) -> bool: """ return self.metric.is_lightlike(vector) + def _check_type_coercion( + self, + left: LRVMVector, + right: LRVMVector, + operation: str, + ) -> Optional[str]: + """ + Check if type coercion is needed and handle via active listening. + + Returns the coercion strategy or None if no coercion needed. + + Args: + left: Left operand + right: Right operand + operation: The operation being performed + + Returns: + Strategy string or None + """ + if not self.active_listener: + return None + + # Detect type mismatch by checking metadata + left_type = "string" if left.metadata.get("string_value") else "numeric" + right_type = "string" if right.metadata.get("string_value") else "numeric" + + if left_type != right_type: + result = self.interactive_clarifier.clarify_operation( + "coercion", + { + "from_type": right_type, + "to_type": left_type, + "operation": operation, + }, + ) + return result.get("strategy", "lenient") + + return None + + def _format_agency_error( + self, + error_type: str, + context: Dict[str, Any], + ) -> str: + """ + Format an error message that preserves user agency. + + Instead of dictating what went wrong, presents the situation + and possible interpretations. + + Args: + error_type: Type of error + context: Context about the error + + Returns: + Formatted error message + """ + if error_type == "undefined_variable": + name = context.get("name", "unknown") + suggestions = context.get("suggestions", []) + msg = f"Variable '{name}' not found in scope.\n" + if suggestions: + msg += "Possible intentions:\n" + for s in suggestions[:3]: + msg += f" • Did you mean '{s}'?\n" + msg += " • Define it first with: {name} is \n" + msg += " • Or use 'might is' for tentative binding" + return msg + + elif error_type == "type_mismatch": + expected = context.get("expected", "unknown") + got = context.get("got", "unknown") + operation = context.get("operation", "operation") + msg = f"Type question for {operation}:\n" + msg += f" Got: {got}\n" + msg += f" Expected: {expected}\n" + msg += "Possible approaches:\n" + msg += f" • Convert {got} to {expected}\n" + msg += f" • Use a different operation for {got}\n" + msg += " • Check if this is the intended value" + return msg + + elif error_type == "function_not_found": + name = context.get("name", "unknown") + msg = f"Function '{name}' not found.\n" + msg += "Possible intentions:\n" + msg += f" • Define it first with: define {name} as: ...\n" + msg += " • Import from a module: from import {name}\n" + msg += " • Check spelling" + return msg + + else: + return f"Error: {error_type} - {context}" + def get_framework_strength(self) -> float: """ Get current Framework Strength. diff --git a/src/eigenscript/lexer/tokenizer.py b/src/eigenscript/lexer/tokenizer.py index 95881e3..aff24bc 100644 --- a/src/eigenscript/lexer/tokenizer.py +++ b/src/eigenscript/lexer/tokenizer.py @@ -48,6 +48,9 @@ class TokenType(Enum): # Interrogative aliases STATUS = "STATUS" TREND = "TREND" + # Clarity operators (communication clarity framework) + MIGHT = "MIGHT" # Tentative binding: x might is 5 + ASSUMES = "ASSUMES" # Assumption query: assumes is x # Literals NUMBER = "NUMBER" @@ -159,6 +162,9 @@ class Tokenizer: # Interrogative aliases "status": TokenType.STATUS, "trend": TokenType.TREND, + # Clarity operators + "might": TokenType.MIGHT, + "assumes": TokenType.ASSUMES, } def __init__(self, source: str): diff --git a/src/eigenscript/parser/ast_builder.py b/src/eigenscript/parser/ast_builder.py index de6dd8e..8ce9bae 100644 --- a/src/eigenscript/parser/ast_builder.py +++ b/src/eigenscript/parser/ast_builder.py @@ -214,6 +214,35 @@ def __repr__(self) -> str: return f"Assignment({self.identifier!r}, {self.expression})" +@dataclass +class TentativeAssignment(ASTNode): + """ + Represents the MIGHT IS operator (tentative/hypothesis binding). + + Semantic: x might is y → v_x ← v_y with clarity_type=TENTATIVE + + This creates a binding that is explicitly marked as a hypothesis, + not yet verified. The program can check for tentative bindings and + refuse to execute certain operations until intent is clarified. + + From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized before execution + - Prevents reinforcing the false belief that implication was sufficient + + Example: + count might is 0 # Hypothesis: count starts at zero + mode might is "strict" # Hypothesis: we want strict mode + """ + + identifier: str + expression: ASTNode + hypothesis: Optional[str] = None # Optional description of the hypothesis + + def __repr__(self) -> str: + return f"TentativeAssignment({self.identifier!r}, {self.expression}, hypothesis={self.hypothesis!r})" + + @dataclass class IndexedAssignment(ASTNode): """ @@ -354,14 +383,16 @@ class Interrogative(ASTNode): - WHERE: Spatial position in semantic space - WHY: Causal direction (gradient) - HOW: Transformation/process + - ASSUMES: Hidden variables/assumptions (clarity framework) Example: what is x why is change how is convergence + assumes is x """ - interrogative: str # "who", "what", "when", "where", "why", "how" + interrogative: str # "who", "what", "when", "where", "why", "how", "assumes" expression: ASTNode def __repr__(self) -> str: @@ -676,12 +707,17 @@ def parse_statement(self) -> Optional[ASTNode]: return Continue() # Assignment (identifier IS expression) or IndexedAssignment (identifier[idx] IS expression) + # or TentativeAssignment (identifier MIGHT IS expression) if token.type == TokenType.IDENTIFIER: # Look ahead for IS token (simple assignment) next_token = self.peek_token() if next_token and next_token.type == TokenType.IS: return self.parse_assignment() + # Look ahead for MIGHT IS (tentative assignment) + if next_token and next_token.type == TokenType.MIGHT: + return self.parse_tentative_assignment() + # Look ahead for LBRACKET (potential indexed assignment) if next_token and next_token.type == TokenType.LBRACKET: return self.parse_potential_indexed_assignment() @@ -721,6 +757,37 @@ def parse_assignment(self) -> Assignment: return Assignment(identifier, expression) + def parse_tentative_assignment(self) -> "TentativeAssignment": + """ + Parse a tentative assignment statement. + + Grammar: identifier MIGHT IS expression + + This creates a hypothesis binding that must be clarified before + certain operations can proceed. From the Communication Clarity Framework: + - Treats the binding as a hypothesis, not a fact + - Forces intent to be externalized + - Prevents silent execution on unverified assumptions + """ + # Get identifier + id_token = self.expect(TokenType.IDENTIFIER) + identifier = id_token.value + + # Expect MIGHT token + self.expect(TokenType.MIGHT) + + # Expect IS token + self.expect(TokenType.IS) + + # Parse expression + expression = self.parse_expression() + + # Consume optional newline + if self.current_token() and self.current_token().type == TokenType.NEWLINE: + self.advance() + + return TentativeAssignment(identifier, expression) + def parse_potential_indexed_assignment(self) -> ASTNode: """ Parse potential indexed assignment (identifier[idx] IS expression). @@ -1116,7 +1183,7 @@ def parse_interrogative(self) -> Interrogative: """ Parse an interrogative operator. - Grammar: (WHO | WHAT | WHEN | WHERE | WHY | HOW | WAS | CHANGE | STATUS | TREND) (IS)? primary + Grammar: (WHO | WHAT | WHEN | WHERE | WHY | HOW | WAS | CHANGE | STATUS | TREND | ASSUMES) (IS)? primary Example: what x @@ -1126,6 +1193,7 @@ def parse_interrogative(self) -> Interrogative: change is x (delta/difference) status is x (alias for how) trend is x (trajectory analysis) + assumes is x (hidden variables - clarity framework) """ # Get interrogative type token = self.current_token() @@ -1142,6 +1210,8 @@ def parse_interrogative(self) -> Interrogative: # Interrogative aliases TokenType.STATUS: "status", TokenType.TREND: "trend", + # Clarity operators + TokenType.ASSUMES: "assumes", } interrogative = interrogative_map[token.type] self.advance() @@ -1512,6 +1582,8 @@ def parse_primary(self) -> ASTNode: # Interrogative aliases TokenType.STATUS, TokenType.TREND, + # Clarity operators + TokenType.ASSUMES, ): return self.parse_interrogative() diff --git a/src/eigenscript/runtime/__init__.py b/src/eigenscript/runtime/__init__.py index 1ddaca7..70ba449 100644 --- a/src/eigenscript/runtime/__init__.py +++ b/src/eigenscript/runtime/__init__.py @@ -4,9 +4,34 @@ This module handles runtime state management including: - Framework Strength measurement - Convergence detection +- Communication clarity tracking - Built-in functions """ from eigenscript.runtime.framework_strength import FrameworkStrengthTracker +from eigenscript.runtime.clarity import ( + ClarityType, + ClarityState, + ClarityTracker, + ClarityExplainer, + AmbiguityResolver, + Assumption, + detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, +) -__all__ = ["FrameworkStrengthTracker"] +__all__ = [ + "FrameworkStrengthTracker", + "ClarityType", + "ClarityState", + "ClarityTracker", + "ClarityExplainer", + "AmbiguityResolver", + "Assumption", + "detect_assumptions", + "ActiveListener", + "DialogueManager", + "InteractiveClarifier", +] diff --git a/src/eigenscript/runtime/clarity.py b/src/eigenscript/runtime/clarity.py new file mode 100644 index 0000000..f0ea013 --- /dev/null +++ b/src/eigenscript/runtime/clarity.py @@ -0,0 +1,1357 @@ +""" +Communication Clarity Framework for EigenScript. + +This module implements hypothesis-driven communication semantics where: +- Clarity is required before execution (not optional) +- Hidden variables are made explicit +- Intent is treated as falsifiable hypothesis +- Agency remains with the speaker (programmer) + +The framework applies the same rigor to code that EigenScript applies +to computation: programs must pass clarity checks before execution, +just as they must pass convergence checks. + +Key Principles: + 1. Questions and statements both assign objectives (create work) + 2. Implication is not universal (hidden variables cause failure) + 3. Clarification is hypothesis testing (not social correction) + 4. Withhold inference intentionally (don't collapse ambiguity silently) + 5. Control remains with the speaker (present options, don't decide) + 6. Speakers discover their own intent (force internal alignment) + +Clarity Types (parallel to geometric signatures): + - Ambiguous: Multiple valid interpretations exist (||c||² < 0, spacelike) + - Tentative: Hypothesis stated but unverified (||c||² = 0, lightlike) + - Clarified: Intent verified, ready for execution (||c||² > 0, timelike) +""" + +import sys +import numpy as np +from typing import List, Dict, Optional, Set, Tuple, Any +from dataclasses import dataclass, field +from enum import Enum + + +class ClarityType(Enum): + """ + Classification of intent clarity (parallel to geometric signatures). + + Maps to the communication framework: + - AMBIGUOUS: Hidden variables present, multiple interpretations possible + - TENTATIVE: Hypothesis stated but not yet confirmed + - CLARIFIED: Intent explicit and verified, safe to execute + """ + + AMBIGUOUS = "ambiguous" # Spacelike: exploring, unstable + TENTATIVE = "tentative" # Lightlike: at boundary + CLARIFIED = "clarified" # Timelike: converged, stable + + +@dataclass +class Assumption: + """ + Represents an implicit assumption in code. + + Assumptions are hidden variables that haven't been made explicit. + The clarity framework detects and surfaces these for verification. + """ + + name: str # What is being assumed + source: str # Where the assumption originates (variable, operation, etc.) + context: str # Additional context about why this is an assumption + line: Optional[int] = None # Source line if available + + def __repr__(self) -> str: + loc = f" (line {self.line})" if self.line else "" + return f"Assumption({self.name!r} from {self.source!r}{loc})" + + +@dataclass +class ClarityState: + """ + Tracks the clarity state of a value or binding. + + Each binding in EigenScript can have an associated clarity state + that tracks whether its intent has been verified. + """ + + clarity_type: ClarityType = ClarityType.AMBIGUOUS + assumptions: List[Assumption] = field(default_factory=list) + verified_at: Optional[int] = None # Iteration when clarified + hypothesis: Optional[str] = None # The stated hypothesis if tentative + + def is_clear(self) -> bool: + """Check if this state is clarified (safe to execute).""" + return self.clarity_type == ClarityType.CLARIFIED + + def is_tentative(self) -> bool: + """Check if this state is a hypothesis awaiting verification.""" + return self.clarity_type == ClarityType.TENTATIVE + + def is_ambiguous(self) -> bool: + """Check if this state has unresolved ambiguity.""" + return self.clarity_type == ClarityType.AMBIGUOUS + + def has_assumptions(self) -> bool: + """Check if there are unverified assumptions.""" + return len(self.assumptions) > 0 + + +class ClarityTracker: + """ + Tracks communication clarity throughout program execution. + + Analogous to FrameworkStrengthTracker, but measures semantic + clarity rather than computational convergence. + + The tracker monitors: + - Hidden variables (assumptions not made explicit) + - Ambiguous bindings (values with multiple interpretations) + - Tentative hypotheses (stated but unverified intent) + - Clarified state (verified, ready for execution) + + Example: + >>> tracker = ClarityTracker() + >>> tracker.register_binding("x", tentative=True, hypothesis="x is a counter") + >>> tracker.get_clarity("x") + ClarityState(clarity_type=TENTATIVE, ...) + >>> tracker.clarify("x") + >>> tracker.is_clarified("x") + True + """ + + def __init__(self): + """Initialize the clarity tracker.""" + self.bindings: Dict[str, ClarityState] = {} + self.global_assumptions: List[Assumption] = [] + self.clarification_history: List[Tuple[str, int]] = [] + self.iteration: int = 0 + + def register_binding( + self, + name: str, + tentative: bool = False, + hypothesis: Optional[str] = None, + assumptions: Optional[List[Assumption]] = None, + ) -> ClarityState: + """ + Register a new binding with its clarity state. + + Args: + name: Variable name + tentative: Whether this is a tentative (might is) binding + hypothesis: The stated hypothesis for tentative bindings + assumptions: Any detected assumptions + + Returns: + The created ClarityState + """ + if tentative: + clarity_type = ClarityType.TENTATIVE + elif assumptions: + clarity_type = ClarityType.AMBIGUOUS + else: + clarity_type = ClarityType.CLARIFIED + + state = ClarityState( + clarity_type=clarity_type, + assumptions=assumptions or [], + hypothesis=hypothesis, + verified_at=( + self.iteration if clarity_type == ClarityType.CLARIFIED else None + ), + ) + + self.bindings[name] = state + return state + + def register_assumption( + self, + name: str, + source: str, + context: str, + line: Optional[int] = None, + binding: Optional[str] = None, + ) -> Assumption: + """ + Register a detected assumption (hidden variable). + + Args: + name: What is being assumed + source: Where the assumption originates + context: Why this is considered an assumption + line: Source line number if available + binding: Associated binding name, if any + + Returns: + The created Assumption + """ + assumption = Assumption(name=name, source=source, context=context, line=line) + + if binding and binding in self.bindings: + self.bindings[binding].assumptions.append(assumption) + self.bindings[binding].clarity_type = ClarityType.AMBIGUOUS + else: + self.global_assumptions.append(assumption) + + return assumption + + def clarify(self, name: str) -> bool: + """ + Mark a binding as clarified (intent verified). + + Args: + name: Variable name to clarify + + Returns: + True if successfully clarified, False if not found + """ + if name not in self.bindings: + return False + + state = self.bindings[name] + state.clarity_type = ClarityType.CLARIFIED + state.verified_at = self.iteration + self.clarification_history.append((name, self.iteration)) + return True + + def get_clarity(self, name: str) -> Optional[ClarityState]: + """ + Get the clarity state of a binding. + + Args: + name: Variable name + + Returns: + ClarityState or None if not found + """ + return self.bindings.get(name) + + def is_clarified(self, name: str) -> bool: + """ + Check if a binding is clarified. + + Args: + name: Variable name + + Returns: + True if clarified, False otherwise + """ + state = self.bindings.get(name) + return state is not None and state.is_clear() + + def get_assumptions(self, name: Optional[str] = None) -> List[Assumption]: + """ + Get assumptions for a binding or all global assumptions. + + Args: + name: Variable name, or None for global assumptions + + Returns: + List of assumptions + """ + if name is None: + return self.global_assumptions.copy() + + state = self.bindings.get(name) + if state: + return state.assumptions.copy() + return [] + + def get_unresolved_bindings(self) -> List[str]: + """ + Get all bindings that are not yet clarified. + + Returns: + List of binding names that need clarification + """ + return [name for name, state in self.bindings.items() if not state.is_clear()] + + def compute_clarity_score(self) -> float: + """ + Compute overall clarity score (0.0 to 1.0). + + Similar to Framework Strength, measures the degree of + semantic clarity in the current program state. + + Returns: + Clarity score between 0.0 (fully ambiguous) and 1.0 (fully clarified) + """ + if not self.bindings: + return 1.0 # No bindings = trivially clear + + clarified = sum(1 for s in self.bindings.values() if s.is_clear()) + return clarified / len(self.bindings) + + def has_clarity(self, threshold: float = 0.95) -> bool: + """ + Check if clarity score meets threshold. + + Args: + threshold: Minimum clarity score (default: 0.95) + + Returns: + True if clarity score >= threshold + """ + return self.compute_clarity_score() >= threshold + + def advance_iteration(self) -> None: + """Advance the iteration counter.""" + self.iteration += 1 + + def reset(self) -> None: + """Reset the tracker to initial state.""" + self.bindings.clear() + self.global_assumptions.clear() + self.clarification_history.clear() + self.iteration = 0 + + +class ClarityExplainer: + """ + Provides human-readable explanations of clarity evaluations. + + Follows the agency-preserving principle: presents options rather + than dictating corrections. Shows WHY clarity is insufficient + without telling the programmer what they "should" do. + + Example output: + `clarified` → FALSE + └─ binding 'x' is TENTATIVE + └─ hypothesis: "x represents user count" + └─ possible interpretations: + 1. x as total users (all time) + 2. x as active users (current session) + 3. x as unique users (deduplicated) + └─ which interpretation is intended? + """ + + def __init__(self, enabled: bool = False, use_color: bool = True): + """ + Initialize the explainer. + + Args: + enabled: Whether explain mode is active + use_color: Whether to use ANSI color codes + """ + self.enabled = enabled + self.use_color = use_color and self._supports_color() + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stderr, "isatty") or not sys.stderr.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def explain_clarified( + self, + result: bool, + clarity_score: float, + threshold: float, + unresolved: List[str], + ) -> None: + """ + Explain the evaluation of the `clarified` predicate. + + Args: + result: Whether the predicate evaluated to True + clarity_score: The current clarity score + threshold: The clarity threshold + unresolved: List of unresolved binding names + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`clarified` → {result_str}", file=sys.stderr) + print( + f" └─ clarity_score = {clarity_score:.4f} (threshold: {threshold})", + file=sys.stderr, + ) + + if not result and unresolved: + print(f" └─ unresolved bindings:", file=sys.stderr) + for name in unresolved[:5]: # Show first 5 + print(f" • {self._color(name, 'yellow')}", file=sys.stderr) + if len(unresolved) > 5: + print(f" ... and {len(unresolved) - 5} more", file=sys.stderr) + + def explain_ambiguous( + self, + result: bool, + binding: str, + state: Optional[ClarityState], + interpretations: Optional[List[str]] = None, + ) -> None: + """ + Explain the evaluation of the `ambiguous` predicate. + + Presents possible interpretations without dictating which is "correct". + + Args: + result: Whether the predicate evaluated to True + binding: The binding being checked + state: The clarity state of the binding + interpretations: Possible interpretations (if known) + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`ambiguous` → {result_str}", file=sys.stderr) + print(f" └─ binding: {self._color(binding, 'cyan')}", file=sys.stderr) + + if state: + print( + f" └─ clarity_type: {state.clarity_type.value}", + file=sys.stderr, + ) + + if state.assumptions: + print(f" └─ detected assumptions:", file=sys.stderr) + for assumption in state.assumptions[:3]: + print( + f" • {assumption.name}: {assumption.context}", + file=sys.stderr, + ) + + if interpretations: + print(f" └─ possible interpretations:", file=sys.stderr) + for i, interp in enumerate(interpretations, 1): + print(f" {i}. {interp}", file=sys.stderr) + print( + f" └─ {self._color('which interpretation is intended?', 'yellow')}", + file=sys.stderr, + ) + + def explain_assumed( + self, + result: bool, + assumptions: List[Assumption], + ) -> None: + """ + Explain the evaluation of the `assumed` predicate. + + Shows what hidden variables have been detected. + + Args: + result: Whether the predicate evaluated to True + assumptions: List of detected assumptions + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`assumed` → {result_str}", file=sys.stderr) + print(f" └─ assumption_count = {len(assumptions)}", file=sys.stderr) + + if assumptions: + print(f" └─ hidden variables detected:", file=sys.stderr) + for assumption in assumptions[:5]: + loc = f" (line {assumption.line})" if assumption.line else "" + print( + f" • {self._color(assumption.name, 'magenta')}: " + f"{assumption.context}{loc}", + file=sys.stderr, + ) + if len(assumptions) > 5: + print(f" ... and {len(assumptions) - 5} more", file=sys.stderr) + + def explain_tentative( + self, + result: bool, + binding: str, + hypothesis: Optional[str], + ) -> None: + """ + Explain the evaluation of the `tentative` predicate. + + Args: + result: Whether the predicate evaluated to True + binding: The binding being checked + hypothesis: The stated hypothesis + """ + if not self.enabled: + return + + result_str = ( + self._color("TRUE", "green") if result else self._color("FALSE", "red") + ) + + print(f"`tentative` → {result_str}", file=sys.stderr) + print(f" └─ binding: {self._color(binding, 'cyan')}", file=sys.stderr) + + if hypothesis: + print(f' └─ hypothesis: "{hypothesis}"', file=sys.stderr) + print( + f" └─ {self._color('awaiting verification', 'yellow')}", + file=sys.stderr, + ) + + +class AmbiguityResolver: + """ + Provides agency-preserving ambiguity resolution. + + Instead of guessing what the programmer meant, presents options + and returns control to them. This follows the principle: + "True intellect is knowing you could be wrong—and refusing + to act as if you aren't." + + Example: + resolver = AmbiguityResolver() + options = resolver.generate_options("x", context) + # Returns options like: + # [ + # "x as numeric value (apply arithmetic)", + # "x as string identifier (apply concatenation)", + # "x as function reference (apply invocation)", + # ] + """ + + def __init__(self): + """Initialize the resolver.""" + self.type_interpretations = { + "numeric": "apply arithmetic operations", + "string": "apply string operations (concatenation, slicing)", + "function": "invoke as function", + "list": "apply sequence operations (indexing, iteration)", + "boolean": "use in conditional logic", + } + + def generate_options( + self, + name: str, + possible_types: Optional[List[str]] = None, + context: Optional[str] = None, + ) -> List[str]: + """ + Generate possible interpretations for an ambiguous binding. + + Args: + name: Variable name + possible_types: List of possible types + context: Additional context + + Returns: + List of interpretation strings + """ + if possible_types is None: + possible_types = list(self.type_interpretations.keys()) + + options = [] + for ptype in possible_types: + if ptype in self.type_interpretations: + options.append( + f"{name} as {ptype} ({self.type_interpretations[ptype]})" + ) + + if not options: + options.append(f"{name} with unspecified intent") + + return options + + def format_resolution_prompt( + self, + name: str, + options: List[str], + ) -> str: + """ + Format an agency-preserving resolution prompt. + + Returns a message that presents options without dictating. + + Args: + name: Variable name + options: List of possible interpretations + + Returns: + Formatted prompt string + """ + lines = [ + f"Ambiguity detected for '{name}'.", + "Possible interpretations:", + ] + + for i, option in enumerate(options, 1): + lines.append(f" {i}. {option}") + + lines.append("") + lines.append("Which interpretation is intended?") + + return "\n".join(lines) + + +def detect_assumptions( + expression_type: str, + context: Dict[str, Any], +) -> List[Assumption]: + """ + Detect implicit assumptions in an expression. + + This is a heuristic-based detector that identifies common + sources of hidden variables in code. + + Args: + expression_type: Type of expression being analyzed + context: Context dictionary with relevant information + + Returns: + List of detected assumptions + """ + assumptions = [] + + # Type coercion assumptions + if expression_type == "binary_op": + left_type = context.get("left_type") + right_type = context.get("right_type") + if left_type != right_type: + assumptions.append( + Assumption( + name="type_compatibility", + source=f"{left_type} + {right_type}", + context=f"Assuming {left_type} and {right_type} can be combined", + ) + ) + + # Division by zero assumptions + if expression_type == "division": + if not context.get("divisor_checked"): + assumptions.append( + Assumption( + name="non_zero_divisor", + source="division operation", + context="Assuming divisor is not zero", + ) + ) + + # Index bounds assumptions + if expression_type == "index": + if not context.get("bounds_checked"): + assumptions.append( + Assumption( + name="valid_index", + source="index operation", + context="Assuming index is within bounds", + ) + ) + + # Null/undefined assumptions + if expression_type == "member_access": + if not context.get("null_checked"): + assumptions.append( + Assumption( + name="non_null", + source="member access", + context="Assuming object is not null", + ) + ) + + # Function purity assumptions + if expression_type == "function_call": + if context.get("has_side_effects") is None: + assumptions.append( + Assumption( + name="function_behavior", + source=context.get("function_name", "function"), + context="Side effects and return behavior unspecified", + ) + ) + + return assumptions + + +class ActiveListener: + """ + Makes EigenScript an active listener that detects ambiguity and seeks clarification. + + The ActiveListener embodies the communication framework principle: + "You treat inferred intent as a hypothesis, not a fact." + + Instead of silently proceeding on assumptions, the ActiveListener: + 1. Detects when multiple interpretations are possible + 2. Pauses execution to seek clarification + 3. Presents options without deciding for the speaker + 4. Records the clarified intent for future reference + + Example: + listener = ActiveListener(interactive=True) + + # When division is attempted: + # > Clarification needed for division operation: + # > The divisor 'x' could be zero. + # > 1. Proceed (assuming x ≠ 0) + # > 2. Add explicit check + # > 3. Provide default value + # > Which approach? _ + """ + + def __init__( + self, + interactive: bool = False, + strict: bool = False, + use_color: bool = True, + ): + """ + Initialize the ActiveListener. + + Args: + interactive: If True, prompt user for clarification + strict: If True, refuse to proceed on any ambiguity + use_color: Whether to use ANSI color codes + """ + self.interactive = interactive + self.strict = strict + self.use_color = use_color and self._supports_color() + self.clarification_log: List[Dict[str, Any]] = [] + self.pending_clarifications: List[Dict[str, Any]] = [] + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "dim": "\033[2m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def detect_ambiguity( + self, + operation: str, + context: Dict[str, Any], + ) -> Optional[Dict[str, Any]]: + """ + Detect if an operation has ambiguous intent. + + Args: + operation: Type of operation (e.g., "division", "index", "coercion") + context: Context about the operation + + Returns: + Ambiguity info dict if ambiguous, None otherwise + """ + ambiguity = None + + if operation == "division": + divisor = context.get("divisor") + divisor_name = context.get("divisor_name", "divisor") + # Can't statically prove non-zero + if not context.get("proven_non_zero"): + ambiguity = { + "type": "division_safety", + "message": f"The divisor '{divisor_name}' might be zero.", + "options": [ + ("proceed", f"Proceed (assuming {divisor_name} ≠ 0)"), + ("check", "Add explicit zero check"), + ("default", "Return default value if zero"), + ], + "context": context, + } + + elif operation == "index": + index_name = context.get("index_name", "index") + list_name = context.get("list_name", "list") + if not context.get("proven_in_bounds"): + ambiguity = { + "type": "index_safety", + "message": f"The index '{index_name}' might be out of bounds for '{list_name}'.", + "options": [ + ("proceed", "Proceed (assuming valid index)"), + ("check", "Add bounds check"), + ("clamp", "Clamp to valid range"), + ("wrap", "Wrap around (modulo length)"), + ], + "context": context, + } + + elif operation == "coercion": + from_type = context.get("from_type", "unknown") + to_type = context.get("to_type", "unknown") + ambiguity = { + "type": "type_coercion", + "message": f"Converting {from_type} to {to_type} - intent unclear.", + "options": [ + ("strict", f"Strict conversion (fail if not exactly {to_type})"), + ("lenient", "Lenient conversion (best effort)"), + ("explicit", "Require explicit conversion"), + ], + "context": context, + } + + elif operation == "null_access": + name = context.get("name", "value") + ambiguity = { + "type": "null_safety", + "message": f"'{name}' might be null/undefined.", + "options": [ + ("proceed", f"Proceed (assuming {name} is defined)"), + ("check", "Add null check"), + ("default", "Use default value if null"), + ("optional", "Return null (optional chaining)"), + ], + "context": context, + } + + elif operation == "intent": + name = context.get("name", "value") + possible_meanings = context.get("meanings", []) + if possible_meanings: + options = [(f"meaning_{i}", m) for i, m in enumerate(possible_meanings)] + options.append(("other", "Something else (please specify)")) + ambiguity = { + "type": "semantic_ambiguity", + "message": f"Multiple interpretations possible for '{name}'.", + "options": options, + "context": context, + } + + return ambiguity + + def request_clarification( + self, + ambiguity: Dict[str, Any], + ) -> str: + """ + Request clarification from the user. + + In interactive mode, prompts and waits for input. + In non-interactive mode, logs and returns default. + + Args: + ambiguity: Ambiguity info dict from detect_ambiguity + + Returns: + The chosen option key + """ + message = ambiguity["message"] + options = ambiguity["options"] + + if self.interactive: + # Print the clarification request + print( + f"\n{self._color('⚡ Clarification needed:', 'yellow')}", + file=sys.stderr, + ) + print(f" {message}", file=sys.stderr) + print(file=sys.stderr) + + for i, (key, desc) in enumerate(options, 1): + print(f" {self._color(str(i), 'cyan')}. {desc}", file=sys.stderr) + + print(file=sys.stderr) + + # Get user input + while True: + try: + choice = input( + f" {self._color('Which approach?', 'bold')} [1-{len(options)}]: " + ) + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(options): + chosen_key = options[choice_idx][0] + self._log_clarification(ambiguity, chosen_key) + return chosen_key + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) + except ValueError: + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) + except EOFError: + # Non-interactive fallback + return options[0][0] + else: + # Non-interactive: log and return first option (default) + self.pending_clarifications.append(ambiguity) + return options[0][0] + + def _log_clarification( + self, + ambiguity: Dict[str, Any], + choice: str, + ) -> None: + """Log a clarification for future reference.""" + self.clarification_log.append( + { + "ambiguity_type": ambiguity["type"], + "message": ambiguity["message"], + "choice": choice, + "context": ambiguity.get("context", {}), + } + ) + + def get_pending_clarifications(self) -> List[Dict[str, Any]]: + """Get list of pending clarifications that weren't resolved interactively.""" + return self.pending_clarifications.copy() + + def clear_pending(self) -> None: + """Clear pending clarifications.""" + self.pending_clarifications.clear() + + +class DialogueManager: + """ + Manages the dialogue between EigenScript and the user. + + Implements the "active speaker" pattern where EigenScript: + 1. Explains its reasoning when asked + 2. Presents options clearly + 3. Preserves user agency + 4. Never assumes understanding + + From the communication framework: + "Control remains with the speaker - you do not decide intent for them." + + Example: + dm = DialogueManager() + + # When explaining a decision: + dm.explain_reasoning( + decision="Using strict mode for parsing", + because="The 'mode' binding was clarified as 'strict'", + alternatives=["lenient", "auto-detect"] + ) + # Output: + # I'm using strict mode for parsing. + # Reason: The 'mode' binding was clarified as 'strict'. + # Alternatives considered: lenient, auto-detect + """ + + def __init__(self, use_color: bool = True, verbose: bool = False): + """ + Initialize the DialogueManager. + + Args: + use_color: Whether to use ANSI color codes + verbose: Whether to provide detailed explanations + """ + self.use_color = use_color and self._supports_color() + self.verbose = verbose + self.dialogue_history: List[Dict[str, Any]] = [] + + def _supports_color(self) -> bool: + """Check if the terminal supports color output.""" + import os + + if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): + return False + if os.environ.get("NO_COLOR"): + return False + return True + + def _color(self, text: str, color: str) -> str: + """Apply ANSI color to text if colors are enabled.""" + if not self.use_color: + return text + + colors = { + "green": "\033[32m", + "red": "\033[31m", + "yellow": "\033[33m", + "cyan": "\033[36m", + "magenta": "\033[35m", + "bold": "\033[1m", + "dim": "\033[2m", + "reset": "\033[0m", + } + + return f"{colors.get(color, '')}{text}{colors['reset']}" + + def ask_for_intent( + self, + what: str, + options: List[Tuple[str, str]], + context: Optional[str] = None, + ) -> str: + """ + Ask the user to clarify their intent. + + Agency-preserving: presents options without judgment. + + Args: + what: What needs clarification + options: List of (key, description) tuples + context: Optional context explaining why clarification is needed + + Returns: + The chosen option key + """ + print(f"\n{self._color('?', 'cyan')} {what}", file=sys.stderr) + + if context: + print(f" {self._color('Context:', 'dim')} {context}", file=sys.stderr) + + print(file=sys.stderr) + for i, (key, desc) in enumerate(options, 1): + print(f" {self._color(str(i), 'cyan')}. {desc}", file=sys.stderr) + + print(file=sys.stderr) + + while True: + try: + choice = input( + f" {self._color('Your choice', 'bold')} [1-{len(options)}]: " + ) + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(options): + chosen_key = options[choice_idx][0] + self._log_dialogue("intent_request", what, chosen_key) + return chosen_key + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) + except ValueError: + print( + f" Please enter a number between 1 and {len(options)}", + file=sys.stderr, + ) + except EOFError: + return options[0][0] + + def explain_reasoning( + self, + decision: str, + because: str, + alternatives: Optional[List[str]] = None, + ) -> None: + """ + Explain the reasoning behind a decision. + + Makes the interpreter's "thinking" visible. + + Args: + decision: What was decided + because: Why it was decided + alternatives: Other options that were considered + """ + if not self.verbose: + return + + print(f"\n{self._color('→', 'green')} {decision}", file=sys.stderr) + print(f" {self._color('Because:', 'dim')} {because}", file=sys.stderr) + + if alternatives: + alts = ", ".join(alternatives) + print(f" {self._color('Alternatives:', 'dim')} {alts}", file=sys.stderr) + + self._log_dialogue("explanation", decision, because) + + def confirm_understanding( + self, + statement: str, + understood_as: str, + ) -> bool: + """ + Confirm understanding of a statement. + + From the framework: "You test [inferred intent] by offering explicit options." + + Args: + statement: The original statement + understood_as: How it was interpreted + + Returns: + True if understanding confirmed, False otherwise + """ + print( + f"\n{self._color('?', 'yellow')} Confirming understanding:", file=sys.stderr + ) + print(f" You said: {self._color(statement, 'cyan')}", file=sys.stderr) + print(f" I understood: {self._color(understood_as, 'green')}", file=sys.stderr) + print(file=sys.stderr) + + try: + response = ( + input(f" {self._color('Is this correct?', 'bold')} [Y/n]: ") + .strip() + .lower() + ) + confirmed = response in ("", "y", "yes") + self._log_dialogue("confirmation", statement, confirmed) + return confirmed + except EOFError: + return True + + def present_options( + self, + situation: str, + options: List[Tuple[str, str, str]], # (key, short_desc, long_desc) + ) -> None: + """ + Present options without asking for a choice (informational). + + Preserves agency by showing possibilities without forcing a decision. + + Args: + situation: The current situation + options: List of (key, short_desc, long_desc) tuples + """ + print(f"\n{self._color('ℹ', 'cyan')} {situation}", file=sys.stderr) + print(f" Possible approaches:", file=sys.stderr) + print(file=sys.stderr) + + for key, short_desc, long_desc in options: + print(f" • {self._color(short_desc, 'bold')}", file=sys.stderr) + if self.verbose and long_desc: + print(f" {self._color(long_desc, 'dim')}", file=sys.stderr) + + self._log_dialogue("options_presented", situation, [o[0] for o in options]) + + def acknowledge_clarification( + self, + what: str, + clarified_as: str, + ) -> None: + """ + Acknowledge a clarification from the user. + + Shows the user their clarification was received. + + Args: + what: What was clarified + clarified_as: The clarified value/intent + """ + print( + f"{self._color('✓', 'green')} Understood: {what} → {self._color(clarified_as, 'cyan')}", + file=sys.stderr, + ) + self._log_dialogue("acknowledgment", what, clarified_as) + + def _log_dialogue( + self, + dialogue_type: str, + subject: str, + response: Any, + ) -> None: + """Log a dialogue exchange.""" + self.dialogue_history.append( + { + "type": dialogue_type, + "subject": subject, + "response": response, + } + ) + + def get_dialogue_history(self) -> List[Dict[str, Any]]: + """Get the full dialogue history.""" + return self.dialogue_history.copy() + + +class InteractiveClarifier: + """ + Combines ActiveListener and DialogueManager for interactive clarification. + + This is the main interface for making EigenScript an active participant + in the communication process. + + Example: + clarifier = InteractiveClarifier(interactive=True) + + # During execution, when ambiguity is detected: + result = clarifier.clarify_binding( + name="mode", + value="strict", + possible_meanings=[ + "Strict type checking", + "Strict parsing rules", + "Strict security settings", + ] + ) + # Prompts user and returns clarified meaning + """ + + def __init__( + self, + interactive: bool = False, + verbose: bool = False, + use_color: bool = True, + ): + """ + Initialize the InteractiveClarifier. + + Args: + interactive: If True, prompt user for clarification + verbose: If True, explain reasoning + use_color: Whether to use ANSI color codes + """ + self.interactive = interactive + self.listener = ActiveListener(interactive=interactive, use_color=use_color) + self.dialogue = DialogueManager(use_color=use_color, verbose=verbose) + + def clarify_binding( + self, + name: str, + value: Any, + possible_meanings: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Clarify the meaning/intent of a binding. + + Args: + name: Binding name + value: Current value + possible_meanings: List of possible interpretations + + Returns: + Dict with clarified meaning and confidence + """ + if not possible_meanings: + # No ambiguity if no alternatives + return {"meaning": str(value), "confidence": 1.0, "clarified": True} + + if self.interactive: + options = [(f"m{i}", m) for i, m in enumerate(possible_meanings)] + options.append(("other", "None of these / something else")) + + chosen = self.dialogue.ask_for_intent( + what=f"What does '{name}' = {value} mean?", + options=options, + context="Multiple interpretations are possible", + ) + + if chosen == "other": + # Ask for custom meaning + try: + custom = input(" Please specify: ") + self.dialogue.acknowledge_clarification(name, custom) + return {"meaning": custom, "confidence": 1.0, "clarified": True} + except EOFError: + return { + "meaning": str(value), + "confidence": 0.5, + "clarified": False, + } + + idx = int(chosen[1:]) + meaning = possible_meanings[idx] + self.dialogue.acknowledge_clarification(name, meaning) + return {"meaning": meaning, "confidence": 1.0, "clarified": True} + + else: + # Non-interactive: return first meaning with low confidence + return { + "meaning": possible_meanings[0] if possible_meanings else str(value), + "confidence": 0.5, + "clarified": False, + "pending_options": possible_meanings, + } + + def clarify_operation( + self, + operation: str, + context: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Clarify how to handle an operation with potential issues. + + Args: + operation: Operation type (division, index, etc.) + context: Context about the operation + + Returns: + Dict with handling strategy + """ + ambiguity = self.listener.detect_ambiguity(operation, context) + + if not ambiguity: + return {"strategy": "proceed", "clarified": True} + + choice = self.listener.request_clarification(ambiguity) + + if self.interactive: + self.dialogue.acknowledge_clarification( + ambiguity["type"], + dict(ambiguity["options"]).get(choice, choice), + ) + + return { + "strategy": choice, + "clarified": self.interactive, + "ambiguity_type": ambiguity["type"], + } + + def should_pause(self, clarity_score: float, threshold: float = 0.95) -> bool: + """ + Determine if execution should pause for clarification. + + Args: + clarity_score: Current clarity score + threshold: Minimum acceptable clarity + + Returns: + True if should pause, False otherwise + """ + if not self.interactive: + return False + + return clarity_score < threshold + + def summarize_pending(self) -> str: + """ + Summarize pending clarifications. + + Returns: + Human-readable summary of what needs clarification + """ + pending = self.listener.get_pending_clarifications() + if not pending: + return "No pending clarifications." + + lines = [f"Pending clarifications ({len(pending)}):"] + for p in pending: + lines.append(f" • {p['message']}") + + return "\n".join(lines) diff --git a/tests/test_clarity.py b/tests/test_clarity.py new file mode 100644 index 0000000..37953cc --- /dev/null +++ b/tests/test_clarity.py @@ -0,0 +1,924 @@ +""" +Tests for EigenScript Communication Clarity Framework. + +The Clarity Framework implements hypothesis-driven communication semantics where: +- Clarity is required before execution (not optional) +- Hidden variables are made explicit +- Intent is treated as falsifiable hypothesis +- Agency remains with the speaker (programmer) + +Key Principles: + 1. Questions and statements both assign objectives + 2. Implication is not universal (hidden variables cause failure) + 3. Clarification is hypothesis testing + 4. Withhold inference intentionally + 5. Control remains with the speaker + 6. Speakers discover their own intent +""" + +import pytest +import textwrap +from eigenscript.lexer import Tokenizer +from eigenscript.parser import Parser +from eigenscript.evaluator import Interpreter +from eigenscript.builtins import decode_vector +from eigenscript.runtime.clarity import ( + ClarityType, + ClarityState, + ClarityTracker, + ClarityExplainer, + Assumption, + detect_assumptions, + ActiveListener, + DialogueManager, + InteractiveClarifier, +) + + +@pytest.fixture +def interpreter(): + """Create a fresh interpreter for each test.""" + return Interpreter() + + +@pytest.fixture +def clarity_tracker(): + """Create a fresh clarity tracker for each test.""" + return ClarityTracker() + + +def run_code(code: str, interpreter): + """Helper to run EigenScript code.""" + code = textwrap.dedent(code).strip() + tokenizer = Tokenizer(code) + tokens = tokenizer.tokenize() + parser = Parser(tokens) + ast = parser.parse() + return interpreter.evaluate(ast) + + +class TestClarityTracker: + """Tests for the ClarityTracker class.""" + + def test_initial_state(self, clarity_tracker): + """Test tracker starts with no bindings.""" + assert len(clarity_tracker.bindings) == 0 + assert ( + clarity_tracker.compute_clarity_score() == 1.0 + ) # No bindings = trivially clear + + def test_register_clarified_binding(self, clarity_tracker): + """Test registering a clarified binding.""" + state = clarity_tracker.register_binding("x", tentative=False) + assert state.clarity_type == ClarityType.CLARIFIED + assert clarity_tracker.is_clarified("x") + assert clarity_tracker.compute_clarity_score() == 1.0 + + def test_register_tentative_binding(self, clarity_tracker): + """Test registering a tentative (hypothesis) binding.""" + state = clarity_tracker.register_binding( + "x", tentative=True, hypothesis="x is a counter" + ) + assert state.clarity_type == ClarityType.TENTATIVE + assert not clarity_tracker.is_clarified("x") + assert state.hypothesis == "x is a counter" + assert clarity_tracker.compute_clarity_score() == 0.0 + + def test_clarify_tentative_binding(self, clarity_tracker): + """Test clarifying a tentative binding.""" + clarity_tracker.register_binding("x", tentative=True) + assert not clarity_tracker.is_clarified("x") + + success = clarity_tracker.clarify("x") + assert success + assert clarity_tracker.is_clarified("x") + assert clarity_tracker.compute_clarity_score() == 1.0 + + def test_clarify_nonexistent_binding(self, clarity_tracker): + """Test clarifying a binding that doesn't exist.""" + success = clarity_tracker.clarify("nonexistent") + assert not success + + def test_multiple_bindings_clarity_score(self, clarity_tracker): + """Test clarity score with multiple bindings.""" + clarity_tracker.register_binding("a", tentative=False) + clarity_tracker.register_binding("b", tentative=True) + clarity_tracker.register_binding("c", tentative=False) + + # 2 out of 3 clarified = 0.666... + score = clarity_tracker.compute_clarity_score() + assert abs(score - 2 / 3) < 0.01 + + def test_get_unresolved_bindings(self, clarity_tracker): + """Test getting list of unresolved bindings.""" + clarity_tracker.register_binding("a", tentative=False) + clarity_tracker.register_binding("b", tentative=True) + clarity_tracker.register_binding("c", tentative=True) + + unresolved = clarity_tracker.get_unresolved_bindings() + assert "b" in unresolved + assert "c" in unresolved + assert "a" not in unresolved + + def test_register_assumption(self, clarity_tracker): + """Test registering an assumption (hidden variable).""" + clarity_tracker.register_binding("x") + assumption = clarity_tracker.register_assumption( + name="non_zero", + source="division operation", + context="Assuming divisor is not zero", + binding="x", + ) + + assert assumption.name == "non_zero" + assert len(clarity_tracker.get_assumptions("x")) == 1 + state = clarity_tracker.get_clarity("x") + assert state.clarity_type == ClarityType.AMBIGUOUS + + def test_global_assumptions(self, clarity_tracker): + """Test global assumptions not tied to a binding.""" + assumption = clarity_tracker.register_assumption( + name="environment", + source="runtime", + context="Assuming production environment", + ) + + global_assumptions = clarity_tracker.get_assumptions() + assert len(global_assumptions) == 1 + assert global_assumptions[0].name == "environment" + + +class TestClarityPredicates: + """Tests for clarity predicates in the interpreter.""" + + def test_clarified_predicate_all_clear(self, interpreter): + """Test CLARIFIED returns 1.0 when all bindings are clarified.""" + code = """ + x is 10 + y is 20 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarified_predicate_with_tentative(self, interpreter): + """Test CLARIFIED returns 0.0 when tentative bindings exist.""" + code = """ + x is 10 + y might is 20 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_clarified_after_clarify(self, interpreter): + """Test CLARIFIED returns 1.0 after clarifying tentative binding.""" + code = """ + x might is 10 + clarify of x + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_ambiguous_predicate_with_tentative(self, interpreter): + """Test AMBIGUOUS returns 1.0 when tentative bindings exist.""" + code = """ + x might is 10 + ambiguous + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_ambiguous_predicate_all_clear(self, interpreter): + """Test AMBIGUOUS returns 0.0 when all bindings are clarified.""" + code = """ + x is 10 + ambiguous + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_explicit_predicate(self, interpreter): + """Test EXPLICIT predicate (opposite of ambiguous).""" + code = """ + x is 10 + y is 20 + explicit + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarity_score_numeric(self, interpreter): + """Test CLARITY returns numeric score.""" + code = """ + x is 10 + y might is 20 + z is 30 + clarity + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # 2 out of 3 clarified = 0.666... + assert abs(value - 2 / 3) < 0.01 + + def test_cs_alias_for_clarity(self, interpreter): + """Test CS is alias for CLARITY.""" + code1 = """ + x is 10 + clarity + """ + code2 = """ + x is 10 + cs + """ + interpreter1 = Interpreter() + interpreter2 = Interpreter() + + result1 = run_code(code1, interpreter1) + result2 = run_code(code2, interpreter2) + + value1 = decode_vector(result1, interpreter1.space) + value2 = decode_vector(result2, interpreter2.space) + assert value1 == value2 + + +class TestTentativeAssignment: + """Tests for MIGHT IS (tentative assignment) syntax.""" + + def test_tentative_assignment_creates_binding(self, interpreter): + """Test MIGHT IS creates a binding with the value.""" + code = """ + x might is 42 + x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 42 + + def test_tentative_assignment_marks_as_tentative(self, interpreter): + """Test MIGHT IS marks binding as tentative.""" + code = """ + x might is 42 + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 # Not clarified because x is tentative + + def test_clarify_tentative_assignment(self, interpreter): + """Test clarifying a tentative assignment.""" + code = """ + x might is 42 + clarify of x + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 # Now clarified + + def test_multiple_tentative_assignments(self, interpreter): + """Test multiple tentative assignments.""" + code = """ + a might is 1 + b might is 2 + c is 3 + clarity + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # 1 out of 3 clarified = 0.333... + assert abs(value - 1 / 3) < 0.01 + + +class TestClarifyOperator: + """Tests for CLARIFY OF operator.""" + + def test_clarify_returns_success(self, interpreter): + """Test CLARIFY OF returns 1.0 on success.""" + code = """ + x might is 10 + clarify of x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarify_nonexistent_returns_failure(self, interpreter): + """Test CLARIFY OF returns 0.0 for nonexistent binding.""" + code = """ + clarify of nonexistent + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 0.0 + + def test_clarify_already_clarified(self, interpreter): + """Test CLARIFY OF on already clarified binding.""" + code = """ + x is 10 + clarify of x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Should still succeed (idempotent) + assert value == 1.0 + + +class TestAssumesInterrogative: + """Tests for ASSUMES interrogative.""" + + def test_assumes_empty_for_clarified(self, interpreter): + """Test ASSUMES returns empty string for clarified binding.""" + code = """ + x is 10 + assumes is x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == "" + + def test_assumes_on_tentative(self, interpreter): + """Test ASSUMES on a tentative binding (no explicit assumptions).""" + code = """ + x might is 10 + assumes is x + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Tentative but no explicit assumptions registered + assert value == "" + + +class TestClarityInConditionals: + """Tests for using clarity predicates in conditionals.""" + + def test_if_clarified(self, interpreter): + """Test using CLARIFIED in a conditional.""" + code = """ + x is 10 + result is 0 + if clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + def test_if_not_clarified(self, interpreter): + """Test using NOT CLARIFIED in a conditional.""" + code = """ + x might is 10 + result is 0 + if not clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + def test_clarify_before_critical_operation(self, interpreter): + """Test pattern: clarify before critical operation.""" + code = """ + config might is "strict" + result is 0 + + if not clarified: + clarify of config + + if clarified: + result is 1 + result + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1 + + +class TestClarityInLoops: + """Tests for using clarity predicates in loops.""" + + def test_loop_while_not_clarified(self, interpreter): + """Test looping until all bindings are clarified.""" + code = """ + a might is 1 + b might is 2 + + clarify of a + clarify of b + + clarified + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 + + def test_clarity_changes_during_loop(self, interpreter): + """Test clarity score changes as bindings are clarified.""" + code = """ + a might is 1 + b might is 2 + c is 3 + + before is clarity + clarify of a + after is clarity + + after > before + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 1.0 # clarity increased + + +class TestClarityIntegration: + """Integration tests combining clarity with other features.""" + + def test_clarity_with_framework_strength(self, interpreter): + """Test using both clarity and framework strength.""" + code = """ + x is 1 + x is 2 + x is 3 + x is 4 + x is 5 + + cs_value is cs + fs_value is fs + + cs_value + fs_value + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Both should be between 0 and 1, so sum between 0 and 2 + assert 0 <= value <= 2 + + def test_clarity_with_interrogatives(self, interpreter): + """Test using clarity with other interrogatives.""" + code = """ + x is 10 + value is what is x + quality is how is x + clarity_score is cs + + value + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + assert value == 10 + + def test_mixed_clarity_and_convergence(self, interpreter): + """Test combining clarity predicates with convergence predicates.""" + code = """ + x is 10 + + clarity_ok is clarified + conv_check is converged + + clarity_ok + conv_check + """ + result = run_code(code, interpreter) + value = decode_vector(result, interpreter.space) + # Sum of two booleans (0 or 1 each) + assert 0 <= value <= 2 + + +class TestClarityExplainer: + """Tests for the ClarityExplainer class.""" + + def test_explainer_disabled_by_default(self): + """Test explainer is disabled by default.""" + explainer = ClarityExplainer(enabled=False) + # Should not raise or print anything + explainer.explain_clarified( + result=True, + clarity_score=1.0, + threshold=0.95, + unresolved=[], + ) + + def test_explainer_enabled(self, capsys): + """Test explainer outputs when enabled.""" + explainer = ClarityExplainer(enabled=True, use_color=False) + explainer.explain_clarified( + result=False, + clarity_score=0.5, + threshold=0.95, + unresolved=["x", "y"], + ) + + captured = capsys.readouterr() + assert "clarified" in captured.err.lower() + assert "0.5" in captured.err + + +class TestAssumptionDetection: + """Tests for automatic assumption detection.""" + + def test_detect_type_compatibility_assumption(self): + """Test detecting type compatibility assumption.""" + assumptions = detect_assumptions( + "binary_op", + {"left_type": "number", "right_type": "string"}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "type_compatibility" + + def test_detect_division_by_zero_assumption(self): + """Test detecting division by zero assumption.""" + assumptions = detect_assumptions( + "division", + {"divisor_checked": False}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "non_zero_divisor" + + def test_detect_index_bounds_assumption(self): + """Test detecting index bounds assumption.""" + assumptions = detect_assumptions( + "index", + {"bounds_checked": False}, + ) + assert len(assumptions) == 1 + assert assumptions[0].name == "valid_index" + + def test_no_assumption_when_checked(self): + """Test no assumption when precondition is checked.""" + assumptions = detect_assumptions( + "division", + {"divisor_checked": True}, + ) + assert len(assumptions) == 0 + + +class TestClarityState: + """Tests for the ClarityState dataclass.""" + + def test_default_state_is_ambiguous(self): + """Test default state is ambiguous.""" + state = ClarityState() + assert state.clarity_type == ClarityType.AMBIGUOUS + assert state.is_ambiguous() + assert not state.is_clear() + + def test_clarified_state(self): + """Test clarified state.""" + state = ClarityState(clarity_type=ClarityType.CLARIFIED) + assert state.is_clear() + assert not state.is_ambiguous() + assert not state.is_tentative() + + def test_tentative_state(self): + """Test tentative state.""" + state = ClarityState(clarity_type=ClarityType.TENTATIVE) + assert state.is_tentative() + assert not state.is_clear() + + def test_state_with_assumptions(self): + """Test state with assumptions.""" + assumption = Assumption( + name="test", + source="test_source", + context="test context", + ) + state = ClarityState(assumptions=[assumption]) + assert state.has_assumptions() + + +class TestActiveListener: + """Tests for the ActiveListener class.""" + + def test_active_listener_creation(self): + """Test creating an ActiveListener.""" + listener = ActiveListener(interactive=False) + assert not listener.interactive + assert len(listener.clarification_log) == 0 + assert len(listener.pending_clarifications) == 0 + + def test_detect_division_ambiguity(self): + """Test detecting division by zero ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "division_safety" + assert "x" in ambiguity["message"] + + def test_detect_index_ambiguity(self): + """Test detecting index bounds ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "index", + { + "index": 10, + "index_name": "i", + "list_name": "items", + "proven_in_bounds": False, + }, + ) + assert ambiguity is not None + assert ambiguity["type"] == "index_safety" + assert "i" in ambiguity["message"] + assert "items" in ambiguity["message"] + + def test_detect_coercion_ambiguity(self): + """Test detecting type coercion ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "coercion", + {"from_type": "string", "to_type": "number"}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "type_coercion" + + def test_detect_null_access_ambiguity(self): + """Test detecting null access ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "null_access", + {"name": "user"}, + ) + assert ambiguity is not None + assert ambiguity["type"] == "null_safety" + + def test_detect_intent_ambiguity(self): + """Test detecting semantic ambiguity.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "intent", + { + "name": "mode", + "meanings": ["strict parsing", "strict security", "strict types"], + }, + ) + assert ambiguity is not None + assert ambiguity["type"] == "semantic_ambiguity" + + def test_no_ambiguity_when_proven(self): + """Test no ambiguity detected when precondition is proven.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 5, "divisor_name": "x", "proven_non_zero": True}, + ) + assert ambiguity is None + + def test_non_interactive_returns_default(self): + """Test non-interactive mode returns first option.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + choice = listener.request_clarification(ambiguity) + # In non-interactive mode, should return first option + assert choice == "proceed" + + def test_pending_clarifications(self): + """Test pending clarifications are recorded.""" + listener = ActiveListener(interactive=False) + ambiguity = listener.detect_ambiguity( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + listener.request_clarification(ambiguity) + pending = listener.get_pending_clarifications() + assert len(pending) == 1 + + +class TestDialogueManager: + """Tests for the DialogueManager class.""" + + def test_dialogue_manager_creation(self): + """Test creating a DialogueManager.""" + dm = DialogueManager(use_color=False, verbose=False) + assert not dm.verbose + assert len(dm.dialogue_history) == 0 + + def test_dialogue_history_logging(self): + """Test dialogue history is logged.""" + dm = DialogueManager(use_color=False, verbose=True) + dm.explain_reasoning( + decision="Using strict mode", + because="User specified strict", + alternatives=["lenient", "auto"], + ) + history = dm.get_dialogue_history() + assert len(history) == 1 + assert history[0]["type"] == "explanation" + + def test_verbose_mode_outputs(self, capsys): + """Test verbose mode produces output.""" + dm = DialogueManager(use_color=False, verbose=True) + dm.explain_reasoning( + decision="Testing output", + because="This is a test", + ) + captured = capsys.readouterr() + assert "Testing output" in captured.err + assert "This is a test" in captured.err + + def test_non_verbose_no_output(self, capsys): + """Test non-verbose mode produces no output for explanations.""" + dm = DialogueManager(use_color=False, verbose=False) + dm.explain_reasoning( + decision="Testing output", + because="This is a test", + ) + captured = capsys.readouterr() + assert "Testing output" not in captured.err + + +class TestInteractiveClarifier: + """Tests for the InteractiveClarifier class.""" + + def test_interactive_clarifier_creation(self): + """Test creating an InteractiveClarifier.""" + clarifier = InteractiveClarifier(interactive=False, verbose=False) + assert not clarifier.interactive + assert clarifier.listener is not None + assert clarifier.dialogue is not None + + def test_clarify_binding_no_options(self): + """Test clarifying binding with no ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_binding( + name="x", + value=10, + possible_meanings=None, + ) + assert result["clarified"] is True + assert result["confidence"] == 1.0 + + def test_clarify_binding_with_options_non_interactive(self): + """Test clarifying binding with options in non-interactive mode.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_binding( + name="mode", + value="strict", + possible_meanings=["strict types", "strict parsing", "strict security"], + ) + # Non-interactive: returns first option with low confidence + assert result["clarified"] is False + assert result["confidence"] == 0.5 + assert "pending_options" in result + + def test_clarify_operation_no_ambiguity(self): + """Test clarifying operation with no ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_operation( + "unknown_operation", + {}, + ) + assert result["strategy"] == "proceed" + assert result["clarified"] is True + + def test_clarify_operation_with_ambiguity(self): + """Test clarifying operation with ambiguity.""" + clarifier = InteractiveClarifier(interactive=False) + result = clarifier.clarify_operation( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + assert result["strategy"] == "proceed" # Default in non-interactive + assert result["clarified"] is False + assert result["ambiguity_type"] == "division_safety" + + def test_should_pause_non_interactive(self): + """Test should_pause returns False in non-interactive mode.""" + clarifier = InteractiveClarifier(interactive=False) + assert not clarifier.should_pause(0.5) + + def test_summarize_pending_empty(self): + """Test summarizing when no pending clarifications.""" + clarifier = InteractiveClarifier(interactive=False) + summary = clarifier.summarize_pending() + assert "No pending" in summary + + def test_summarize_pending_with_items(self): + """Test summarizing pending clarifications.""" + clarifier = InteractiveClarifier(interactive=False) + # Generate some pending clarifications + clarifier.clarify_operation( + "division", + {"divisor": 0, "divisor_name": "x", "proven_non_zero": False}, + ) + summary = clarifier.summarize_pending() + assert "Pending" in summary + assert "1" in summary + + +class TestInterpreterActiveListening: + """Tests for interpreter with active listening mode.""" + + def test_interpreter_with_active_listening(self): + """Test creating interpreter with active listening enabled.""" + interp = Interpreter(active_listening=True) + assert interp.active_listening is True + assert interp.active_listener is not None + assert interp.interactive_clarifier is not None + + def test_interpreter_without_active_listening(self): + """Test interpreter without active listening.""" + interp = Interpreter(active_listening=False) + assert interp.active_listening is False + assert interp.active_listener is None + + def test_interpreter_with_interactive_mode(self): + """Test creating interpreter with interactive mode.""" + interp = Interpreter(interactive_mode=True) + assert interp.interactive_mode is True + assert interp.interactive_clarifier is not None + + def test_division_with_active_listening_non_zero(self): + """Test division with active listening when divisor is non-zero.""" + interp = Interpreter(active_listening=True) + code = """ + x is 10 + y is 2 + x / y + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 5.0 # Normal division works + + def test_format_agency_error_undefined(self): + """Test agency-preserving error format for undefined variable.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "undefined_variable", + {"name": "foo", "suggestions": ["food", "foot"]}, + ) + assert "foo" in msg + assert "Possible intentions" in msg + assert "food" in msg + + def test_format_agency_error_type_mismatch(self): + """Test agency-preserving error format for type mismatch.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "type_mismatch", + {"expected": "number", "got": "string", "operation": "addition"}, + ) + assert "Type question" in msg + assert "number" in msg + assert "string" in msg + + def test_format_agency_error_function_not_found(self): + """Test agency-preserving error format for function not found.""" + interp = Interpreter(active_listening=True) + msg = interp._format_agency_error( + "function_not_found", + {"name": "calculate"}, + ) + assert "calculate" in msg + assert "Possible intentions" in msg + + +class TestClarityWithActiveListening: + """Tests combining clarity framework with active listening.""" + + def test_clarify_with_active_listening(self): + """Test clarify of works with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x might is 10 + clarify of x + clarified + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 1.0 + + def test_ambiguous_with_active_listening(self): + """Test ambiguous predicate with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x might is 10 + ambiguous + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 1.0 + + def test_clarity_score_with_active_listening(self): + """Test clarity score with active listening.""" + interp = Interpreter(active_listening=True) + code = """ + x is 10 + y might is 20 + clarity + """ + result = run_code(code, interp) + value = decode_vector(result, interp.space) + assert value == 0.5 # 1 out of 2 clarified