diff --git a/src/memos/chunkers/charactertext_chunker.py b/src/memos/chunkers/charactertext_chunker.py new file mode 100644 index 000000000..15c0958ba --- /dev/null +++ b/src/memos/chunkers/charactertext_chunker.py @@ -0,0 +1,41 @@ +from memos.configs.chunker import MarkdownChunkerConfig +from memos.dependency import require_python_package +from memos.log import get_logger + +from .base import BaseChunker, Chunk + + +logger = get_logger(__name__) + + +class CharacterTextChunker(BaseChunker): + """Character-based text chunker.""" + + @require_python_package( + import_name="langchain_text_splitters", + install_command="pip install langchain_text_splitters==1.0.0", + install_link="https://github.com/langchain-ai/langchain-text-splitters", + ) + def __init__( + self, + config: MarkdownChunkerConfig | None = None, + chunk_size: int = 1000, + chunk_overlap: int = 200, + ): + from langchain_text_splitters import ( + RecursiveCharacterTextSplitter, + ) + + self.config = config + self.chunker = RecursiveCharacterTextSplitter( + chunk_size=config.chunk_size if config else chunk_size, + chunk_overlap=config.chunk_overlap if config else chunk_overlap, + length_function=len, + separators=["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""], + ) + + def chunk(self, text: str, **kwargs) -> list[str] | list[Chunk]: + """Chunk the given text into smaller chunks based on sentences.""" + chunks = self.chunker.split_text(text) + logger.debug(f"Generated {len(chunks)} chunks from input text") + return chunks diff --git a/src/memos/chunkers/markdown_chunker.py b/src/memos/chunkers/markdown_chunker.py index 477e96b8d..de375a4dc 100644 --- a/src/memos/chunkers/markdown_chunker.py +++ b/src/memos/chunkers/markdown_chunker.py @@ -16,7 +16,13 @@ class MarkdownChunker(BaseChunker): install_command="pip install langchain_text_splitters==1.0.0", install_link="https://github.com/langchain-ai/langchain-text-splitters", ) - def __init__(self, config: MarkdownChunkerConfig): + def __init__( + self, + config: MarkdownChunkerConfig | None = None, + chunk_size: int = 1000, + chunk_overlap: int = 200, + recursive: bool = False, + ): from langchain_text_splitters import ( MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter, @@ -24,18 +30,21 @@ def __init__(self, config: MarkdownChunkerConfig): self.config = config self.chunker = MarkdownHeaderTextSplitter( - headers_to_split_on=config.headers_to_split_on, - strip_headers=config.strip_headers, + headers_to_split_on=config.headers_to_split_on + if config + else [("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3")], + strip_headers=config.strip_headers if config else False, ) self.chunker_recursive = None logger.info(f"Initialized MarkdownHeaderTextSplitter with config: {config}") - if config.recursive: + if (config and config.recursive) or recursive: self.chunker_recursive = RecursiveCharacterTextSplitter( - chunk_size=config.chunk_size, - chunk_overlap=config.chunk_overlap, + chunk_size=config.chunk_size if config else chunk_size, + chunk_overlap=config.chunk_overlap if config else chunk_overlap, + length_function=len, ) - def chunk(self, text: str) -> list[str] | list[Chunk]: + def chunk(self, text: str, **kwargs) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" md_header_splits = self.chunker.split_text(text) chunks = [] diff --git a/src/memos/chunkers/simple_chunker.py b/src/memos/chunkers/simple_chunker.py new file mode 100644 index 000000000..cc0dc40d0 --- /dev/null +++ b/src/memos/chunkers/simple_chunker.py @@ -0,0 +1,50 @@ +class SimpleTextSplitter: + """Simple text splitter wrapper.""" + + def __init__(self, chunk_size: int, chunk_overlap: int): + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + + def chunk(self, text: str, **kwargs) -> list[str]: + return self._simple_split_text(text, self.chunk_size, self.chunk_overlap) + + def _simple_split_text(self, text: str, chunk_size: int, chunk_overlap: int) -> list[str]: + """ + Simple text splitter as fallback when langchain is not available. + + Args: + text: Text to split + chunk_size: Maximum size of chunks + chunk_overlap: Overlap between chunks + + Returns: + List of text chunks + """ + if not text or len(text) <= chunk_size: + return [text] if text.strip() else [] + + chunks = [] + start = 0 + text_len = len(text) + + while start < text_len: + # Calculate end position + end = min(start + chunk_size, text_len) + + # If not the last chunk, try to break at a good position + if end < text_len: + # Try to break at newline, sentence end, or space + for separator in ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " "]: + last_sep = text.rfind(separator, start, end) + if last_sep != -1: + end = last_sep + len(separator) + break + + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + # Move start position with overlap + start = max(start + 1, end - chunk_overlap) + + return chunks diff --git a/src/memos/mem_reader/read_multi_modal/base.py b/src/memos/mem_reader/read_multi_modal/base.py index 123eb22bc..a3992a1f1 100644 --- a/src/memos/mem_reader/read_multi_modal/base.py +++ b/src/memos/mem_reader/read_multi_modal/base.py @@ -226,7 +226,7 @@ def parse( else: raise ValueError(f"Unknown mode: {mode}. Must be 'fast' or 'fine'") - def _split_text(self, text: str) -> list[str]: + def _split_text(self, text: str, is_markdown: bool = False) -> list[str]: """ Split text into chunks using text splitter from utils. @@ -245,7 +245,7 @@ def _split_text(self, text: str) -> list[str]: return [text] if text.strip() else [] try: - chunks = splitter.split_text(text) + chunks = splitter.chunk(text) logger.debug(f"[FileContentParser] Split text into {len(chunks)} chunks") return chunks except Exception as e: diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index dfc5691f5..67de3020d 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -506,7 +506,7 @@ def parse_fine( memory_type = "LongTermMemory" # Split parsed text into chunks - content_chunks = self._split_text(parsed_text) + content_chunks = self._split_text(parsed_text, is_markdown) # Filter out empty chunks and create indexed list valid_chunks = [ diff --git a/src/memos/mem_reader/read_multi_modal/utils.py b/src/memos/mem_reader/read_multi_modal/utils.py index 0c887a9f2..137312af4 100644 --- a/src/memos/mem_reader/read_multi_modal/utils.py +++ b/src/memos/mem_reader/read_multi_modal/utils.py @@ -111,48 +111,6 @@ def _cheap_close(t: str) -> str: DEFAULT_CHUNK_OVERLAP = int(os.getenv("FILE_PARSER_CHUNK_OVERLAP", "200")) -def _simple_split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[str]: - """ - Simple text splitter as fallback when langchain is not available. - - Args: - text: Text to split - chunk_size: Maximum size of chunks - chunk_overlap: Overlap between chunks - - Returns: - List of text chunks - """ - if not text or len(text) <= chunk_size: - return [text] if text.strip() else [] - - chunks = [] - start = 0 - text_len = len(text) - - while start < text_len: - # Calculate end position - end = min(start + chunk_size, text_len) - - # If not the last chunk, try to break at a good position - if end < text_len: - # Try to break at newline, sentence end, or space - for separator in ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " "]: - last_sep = text.rfind(separator, start, end) - if last_sep != -1: - end = last_sep + len(separator) - break - - chunk = text[start:end].strip() - if chunk: - chunks.append(chunk) - - # Move start position with overlap - start = max(start + 1, end - chunk_overlap) - - return chunks - - # Initialize parser instance file_parser = None try: @@ -163,51 +121,27 @@ def _simple_split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[s logger.error(f"[FileContentParser] Failed to create parser: {e}") file_parser = None -# Initialize text splitter instance -text_splitter = None -_use_simple_splitter = False +markdown_text_splitter = None try: - try: - from langchain.text_splitter import RecursiveCharacterTextSplitter - except ImportError: - try: - from langchain_text_splitters import ( - MarkdownHeaderTextSplitter, - RecursiveCharacterTextSplitter, - ) - except ImportError: - logger.error( - "langchain not available. Install with: pip install langchain or pip install langchain-text-splitters" - ) - - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=DEFAULT_CHUNK_SIZE, - chunk_overlap=DEFAULT_CHUNK_OVERLAP, - length_function=len, - separators=["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""], - ) - markdown_text_splitter = MarkdownHeaderTextSplitter( - headers_to_split_on=[("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3")], - strip_headers=False, - ) - logger.debug( - f"[FileContentParser] Initialized langchain text splitter with chunk_size={DEFAULT_CHUNK_SIZE}, " - f"chunk_overlap={DEFAULT_CHUNK_OVERLAP}" + from memos.chunkers.charactertext_chunker import CharacterTextChunker + from memos.chunkers.markdown_chunker import MarkdownChunker + + markdown_text_splitter = MarkdownChunker( + chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP, recursive=True ) -except ImportError as e: - logger.warning( - f"[FileContentParser] langchain not available, using simple text splitter as fallback: {e}. " - "Install with: pip install langchain or pip install langchain-text-splitters" + text_splitter = CharacterTextChunker( + chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP ) - text_splitter = None - _use_simple_splitter = True + logger.info("[FileContentParser] Initialized text splitter instances by lancga") except Exception as e: - logger.error( - f"[FileContentParser] Failed to initialize text splitter: {e}, using simple splitter as fallback" + logger.warning( + f"[FileContentParser] Failed to create text splitter: {e} will use simple splitter fallback" ) + from memos.chunkers.simple_chunker import SimpleTextSplitter + + markdown_text_splitter = None text_splitter = None - _use_simple_splitter = True def get_parser() -> Any: @@ -220,7 +154,9 @@ def get_parser() -> Any: return file_parser -def get_text_splitter(chunk_size: int | None = None, chunk_overlap: int | None = None) -> Any: +def get_text_splitter( + chunk_size: int | None = None, chunk_overlap: int | None = None, is_markdown: bool = False +) -> Any: """ Get text splitter instance or a callable that uses simple splitter. @@ -231,28 +167,15 @@ def get_text_splitter(chunk_size: int | None = None, chunk_overlap: int | None = Returns: Text splitter instance (RecursiveCharacterTextSplitter) or a callable wrapper for simple splitter """ - if text_splitter is not None: + if is_markdown and markdown_text_splitter is not None: + return markdown_text_splitter + elif text_splitter is not None: return text_splitter - - # Return a callable wrapper that uses simple splitter - if _use_simple_splitter: + else: actual_chunk_size = chunk_size or DEFAULT_CHUNK_SIZE actual_chunk_overlap = chunk_overlap or DEFAULT_CHUNK_OVERLAP - - class SimpleTextSplitter: - """Simple text splitter wrapper.""" - - def __init__(self, chunk_size: int, chunk_overlap: int): - self.chunk_size = chunk_size - self.chunk_overlap = chunk_overlap - - def split_text(self, text: str) -> list[str]: - return _simple_split_text(text, self.chunk_size, self.chunk_overlap) - return SimpleTextSplitter(actual_chunk_size, actual_chunk_overlap) - return None - def extract_role(message: dict[str, Any]) -> str: """Extract role from message."""