@@ -30,6 +30,20 @@ def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SI
3030 return Transformer [T , T ](chunk_size = chunk_size ) # type: ignore
3131
3232
33+ def build_chunk_generator [T ](chunk_size : int ) -> Callable [[Iterable [T ]], Iterator [list [T ]]]:
34+ """
35+ Returns a function that breaks an iterable into chunks of a specified size.
36+ This is useful for creating transformers that process data in manageable chunks.
37+ """
38+
39+ def chunk_generator (data : Iterable [T ]) -> Iterator [list [T ]]:
40+ data_iter = iter (data )
41+ while chunk := list (itertools .islice (data_iter , chunk_size )):
42+ yield chunk
43+
44+ return chunk_generator
45+
46+
3347class Transformer [In , Out ]:
3448 """
3549 Defines and composes data transformations by passing context explicitly.
@@ -45,6 +59,7 @@ def __init__(
4559 # The default transformer now accepts and ignores a context argument.
4660 self .transformer : InternalTransformer [In , Out ] = transformer or (lambda chunk , ctx : chunk ) # type: ignore
4761 self .error_handler = ErrorHandler ()
62+ self ._chunk_generator = build_chunk_generator (chunk_size ) if chunk_size else lambda x : iter ([list (x )])
4863
4964 @classmethod
5065 def from_transformer [T , U ](
@@ -58,6 +73,11 @@ def from_transformer[T, U](
5873 transformer = copy .deepcopy (transformer .transformer ), # type: ignore
5974 )
6075
76+ def set_chunker (self , chunker : Callable [[Iterable [In ]], Iterator [list [In ]]]) -> "Transformer[In, Out]" :
77+ """Sets a custom chunking function for the transformer."""
78+ self ._chunk_generator = chunker
79+ return self
80+
6181 def on_error (self , handler : ChunkErrorHandler [In , Out ] | ErrorHandler ) -> "Transformer[In, Out]" :
6282 """Registers an error handler for the transformer."""
6383 # This method is a placeholder for future error handling logic.
@@ -69,12 +89,6 @@ def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Trans
6989 self .error_handler .on_error (handler ) # type: ignore
7090 return self
7191
72- def _chunk_generator (self , data : Iterable [In ]) -> Iterator [list [In ]]:
73- """Breaks an iterable into chunks of a specified size."""
74- data_iter = iter (data )
75- while chunk := list (itertools .islice (data_iter , self .chunk_size )):
76- yield chunk
77-
7892 def _pipe [U ](self , operation : Callable [[list [Out ], PipelineContext ], list [U ]]) -> "Transformer[In, U]" :
7993 """Composes the current transformer with a new context-aware operation."""
8094 prev_transformer = self .transformer
0 commit comments