Skip to content

Commit 81be49a

Browse files
committed
feat(parser): Add semantic parser and pipeline implementation
- SemanticTextParser using semantic types throughout - SemanticParserPipeline for composing semantic parsers - Protocol definition for semantic parsers - Integration with command registry for validation
1 parent fa6fbbd commit 81be49a

File tree

2 files changed

+398
-0
lines changed

2 files changed

+398
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
"""Semantic text parser using semantic types for enhanced type safety.
2+
3+
This module provides SemanticTextParser, which is like TextParser but works
4+
with semantic types and provides semantic-aware parsing capabilities.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import shlex
10+
from typing import Optional
11+
12+
from cli_patterns.core.parser_types import (
13+
CommandId,
14+
make_argument_value,
15+
make_command_id,
16+
make_flag_name,
17+
make_option_key,
18+
)
19+
from cli_patterns.ui.parser.semantic_context import SemanticContext
20+
from cli_patterns.ui.parser.semantic_errors import SemanticParseError
21+
from cli_patterns.ui.parser.semantic_registry import SemanticCommandRegistry
22+
from cli_patterns.ui.parser.semantic_result import SemanticParseResult
23+
24+
25+
class SemanticTextParser:
26+
"""Parser for standard text-based commands with semantic type support.
27+
28+
Handles parsing of commands with arguments, short/long flags, and key-value options,
29+
returning semantic types for enhanced type safety and better intellisense support.
30+
"""
31+
32+
def __init__(self) -> None:
33+
"""Initialize semantic text parser."""
34+
self._registry: Optional[SemanticCommandRegistry] = None
35+
36+
def set_registry(self, registry: SemanticCommandRegistry) -> None:
37+
"""Set the command registry for validation and suggestions.
38+
39+
Args:
40+
registry: Semantic command registry to use
41+
"""
42+
self._registry = registry
43+
44+
def can_parse(self, input_str: str, context: SemanticContext) -> bool:
45+
"""Check if input can be parsed by this semantic text parser.
46+
47+
Args:
48+
input_str: Input string to check
49+
context: Semantic parsing context
50+
51+
Returns:
52+
True if input is non-empty text that doesn't start with shell prefix
53+
"""
54+
if not input_str or not input_str.strip():
55+
return False
56+
57+
# Don't handle shell commands (those start with !)
58+
if input_str.lstrip().startswith("!"):
59+
return False
60+
61+
return True
62+
63+
def parse(self, input_str: str, context: SemanticContext) -> SemanticParseResult:
64+
"""Parse text input into structured semantic command result.
65+
66+
Args:
67+
input_str: Input string to parse
68+
context: Semantic parsing context
69+
70+
Returns:
71+
SemanticParseResult with parsed command, args, flags, and options
72+
73+
Raises:
74+
SemanticParseError: If parsing fails or command is unknown
75+
"""
76+
if not self.can_parse(input_str, context):
77+
if not input_str.strip():
78+
raise SemanticParseError(
79+
error_type="EMPTY_INPUT",
80+
message="Empty input cannot be parsed",
81+
suggestions=[make_command_id("help")],
82+
)
83+
else:
84+
raise SemanticParseError(
85+
error_type="INVALID_INPUT",
86+
message="Input cannot be parsed by text parser",
87+
suggestions=[make_command_id("help")],
88+
)
89+
90+
try:
91+
# Use shlex for proper quote handling
92+
tokens = shlex.split(input_str.strip())
93+
except ValueError as e:
94+
# Handle shlex errors (e.g., unmatched quotes)
95+
error_msg = str(e).replace("quotation", "quote")
96+
raise SemanticParseError(
97+
error_type="QUOTE_MISMATCH",
98+
message=f"Syntax error in command: {error_msg}",
99+
suggestions=[make_command_id("help")],
100+
) from e
101+
102+
if not tokens:
103+
raise SemanticParseError(
104+
error_type="EMPTY_INPUT",
105+
message="No command found after parsing",
106+
suggestions=[make_command_id("help")],
107+
)
108+
109+
# First token is the command
110+
command_str = tokens[0]
111+
command = make_command_id(command_str)
112+
113+
# Check if command is registered (if we have a registry)
114+
if self._registry and not self._registry.is_registered(command):
115+
suggestions = self._registry.get_suggestions(command_str, max_suggestions=3)
116+
if not suggestions:
117+
suggestions = [make_command_id("help")]
118+
119+
raise SemanticParseError(
120+
error_type="UNKNOWN_COMMAND",
121+
message=f"Unknown command: {command_str}",
122+
command=command,
123+
suggestions=suggestions,
124+
)
125+
126+
# Parse remaining tokens into args, flags, and options
127+
args = []
128+
flags = set()
129+
options = {}
130+
131+
i = 1
132+
while i < len(tokens):
133+
token = tokens[i]
134+
135+
if token.startswith("--"):
136+
# Long option handling
137+
if "=" in token:
138+
# Format: --key=value
139+
key_value = token[2:] # Remove --
140+
if "=" in key_value:
141+
key, value = key_value.split("=", 1)
142+
options[make_option_key(key)] = make_argument_value(value)
143+
else:
144+
# Format: --key value (next token is value)
145+
key = token[2:] # Remove --
146+
if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
147+
options[make_option_key(key)] = make_argument_value(
148+
tokens[i + 1]
149+
)
150+
i += 1 # Skip the value token
151+
else:
152+
# Treat as flag if no value follows
153+
flags.add(make_flag_name(key))
154+
155+
elif token.startswith("-") and len(token) > 1:
156+
# Short flag(s) handling
157+
flag_chars = token[1:] # Remove -
158+
for char in flag_chars:
159+
flags.add(make_flag_name(char))
160+
161+
else:
162+
# Regular argument
163+
args.append(make_argument_value(token))
164+
165+
i += 1
166+
167+
return SemanticParseResult(
168+
command=command,
169+
args=args,
170+
flags=flags,
171+
options=options,
172+
raw_input=input_str,
173+
)
174+
175+
def get_suggestions(self, partial: str) -> list[CommandId]:
176+
"""Get completion suggestions for partial input.
177+
178+
Args:
179+
partial: Partial input to complete
180+
181+
Returns:
182+
List of semantic command suggestions
183+
"""
184+
if not self._registry:
185+
# Return some default suggestions if no registry
186+
defaults = ["help", "status", "version"]
187+
return [make_command_id(cmd) for cmd in defaults if cmd.startswith(partial)]
188+
189+
return self._registry.get_suggestions(partial)
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
"""Semantic parser pipeline for routing input to semantic parsers.
2+
3+
This module provides SemanticParserPipeline, which routes input to semantic parsers
4+
that work with semantic types and contexts for enhanced type safety.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from dataclasses import dataclass
10+
from typing import Callable, Optional, Protocol, runtime_checkable
11+
12+
from cli_patterns.core.parser_types import CommandId
13+
from cli_patterns.ui.parser.semantic_context import SemanticContext
14+
from cli_patterns.ui.parser.semantic_errors import SemanticParseError
15+
from cli_patterns.ui.parser.semantic_result import SemanticParseResult
16+
17+
18+
@runtime_checkable
19+
class SemanticParser(Protocol):
20+
"""Protocol defining the interface for semantic command parsers.
21+
22+
Semantic parsers work with semantic types and contexts to provide
23+
enhanced type safety for command parsing operations.
24+
"""
25+
26+
def can_parse(self, input_str: str, context: SemanticContext) -> bool:
27+
"""Determine if this parser can handle the given input.
28+
29+
Args:
30+
input_str: Raw input string to evaluate
31+
context: Current semantic parsing context
32+
33+
Returns:
34+
True if this parser can handle the input, False otherwise
35+
"""
36+
...
37+
38+
def parse(self, input_str: str, context: SemanticContext) -> SemanticParseResult:
39+
"""Parse the input string into a structured SemanticParseResult.
40+
41+
Args:
42+
input_str: Raw input string to parse
43+
context: Current semantic parsing context
44+
45+
Returns:
46+
SemanticParseResult containing parsed command, args, flags, and options
47+
48+
Raises:
49+
SemanticParseError: If parsing fails or input is invalid
50+
"""
51+
...
52+
53+
def get_suggestions(self, partial: str) -> list[CommandId]:
54+
"""Get completion suggestions for partial input.
55+
56+
Args:
57+
partial: Partial input string to complete
58+
59+
Returns:
60+
List of suggested semantic command completions
61+
"""
62+
...
63+
64+
65+
@dataclass
66+
class _SemanticParserEntry:
67+
"""Internal entry for storing semantic parser with metadata."""
68+
69+
parser: SemanticParser
70+
condition: Optional[Callable[[str, SemanticContext], bool]]
71+
priority: int
72+
73+
74+
class SemanticParserPipeline:
75+
"""Pipeline for routing input to appropriate semantic parsers.
76+
77+
The pipeline maintains a list of semantic parsers with optional conditions and priorities.
78+
When parsing input, it tries each parser in order until one succeeds, maintaining
79+
semantic type safety throughout the process.
80+
"""
81+
82+
def __init__(self) -> None:
83+
"""Initialize empty semantic parser pipeline."""
84+
self._parsers: list[_SemanticParserEntry] = []
85+
86+
def add_parser(
87+
self,
88+
parser: SemanticParser,
89+
condition: Optional[Callable[[str, SemanticContext], bool]] = None,
90+
priority: int = 0,
91+
) -> None:
92+
"""Add a semantic parser to the pipeline.
93+
94+
Args:
95+
parser: Semantic parser instance to add
96+
condition: Optional condition function that returns True if parser should handle input
97+
priority: Priority for ordering (higher numbers = higher priority, default 0)
98+
"""
99+
entry = _SemanticParserEntry(
100+
parser=parser, condition=condition, priority=priority
101+
)
102+
self._parsers.append(entry)
103+
104+
# Sort by priority (higher numbers first), maintaining insertion order for same priority
105+
self._parsers.sort(
106+
key=lambda x: (
107+
-x.priority,
108+
(
109+
self._parsers.index(x)
110+
if x in self._parsers[:-1]
111+
else len(self._parsers)
112+
),
113+
)
114+
)
115+
116+
def remove_parser(self, parser: SemanticParser) -> bool:
117+
"""Remove a semantic parser from the pipeline.
118+
119+
Args:
120+
parser: Semantic parser instance to remove
121+
122+
Returns:
123+
True if parser was found and removed, False otherwise
124+
"""
125+
for i, entry in enumerate(self._parsers):
126+
if entry.parser is parser:
127+
self._parsers.pop(i)
128+
return True
129+
return False
130+
131+
def parse(self, input_str: str, context: SemanticContext) -> SemanticParseResult:
132+
"""Parse input using the first matching semantic parser in the pipeline.
133+
134+
Args:
135+
input_str: Input string to parse
136+
context: Semantic parsing context
137+
138+
Returns:
139+
SemanticParseResult from the first parser that can handle the input
140+
141+
Raises:
142+
SemanticParseError: If no parser can handle the input or parsing fails
143+
"""
144+
if not self._parsers:
145+
raise SemanticParseError(
146+
error_type="NO_PARSERS",
147+
message="No parsers available in pipeline",
148+
suggestions=[],
149+
)
150+
151+
matching_parsers = []
152+
condition_errors = []
153+
154+
# Find all parsers that can handle the input
155+
for entry in self._parsers:
156+
try:
157+
# Check condition if provided
158+
if entry.condition is not None:
159+
if not entry.condition(input_str, context):
160+
continue
161+
162+
# Check if parser can handle the input
163+
if hasattr(entry.parser, "can_parse"):
164+
if entry.parser.can_parse(input_str, context):
165+
matching_parsers.append(entry)
166+
else:
167+
# If no can_parse method, assume it can handle it
168+
matching_parsers.append(entry)
169+
170+
except Exception as e:
171+
# Condition function failed, skip this parser
172+
condition_errors.append(f"Condition failed for parser: {e}")
173+
continue
174+
175+
if not matching_parsers:
176+
error_msg = "No parser can handle the input"
177+
if condition_errors:
178+
error_msg += f". Condition errors: {'; '.join(condition_errors)}"
179+
180+
raise SemanticParseError(
181+
error_type="NO_MATCHING_PARSER",
182+
message=error_msg,
183+
suggestions=[],
184+
)
185+
186+
# Try the first matching parser (highest priority)
187+
parser_entry = matching_parsers[0]
188+
189+
try:
190+
return parser_entry.parser.parse(input_str, context)
191+
except SemanticParseError:
192+
# Re-raise semantic parse errors from the parser
193+
raise
194+
except Exception as e:
195+
# Convert other exceptions to SemanticParseError
196+
raise SemanticParseError(
197+
error_type="PARSER_ERROR",
198+
message=f"Parser failed: {str(e)}",
199+
suggestions=[],
200+
) from e
201+
202+
def clear(self) -> None:
203+
"""Clear all parsers from the pipeline."""
204+
self._parsers.clear()
205+
206+
@property
207+
def parser_count(self) -> int:
208+
"""Get the number of parsers in the pipeline."""
209+
return len(self._parsers)

0 commit comments

Comments
 (0)