Skip to content

Commit 2210645

Browse files
authored
Merge pull request #4 from ringoldsdev/feat/20250718/more-functions
chore: implement buffering
2 parents 8e3b322 + 97a3e35 commit 2210645

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

laygo/pipeline.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from laygo.helpers import PipelineContext
1313
from laygo.helpers import is_context_aware
14+
from laygo.transformers.threaded import ThreadedTransformer
1415
from laygo.transformers.transformer import Transformer
1516

1617
T = TypeVar("T")
@@ -120,7 +121,10 @@ def apply[U](
120121

121122
return self # type: ignore
122123

123-
# ... The rest of the Pipeline class (transform, __iter__, to_list, etc.) remains unchanged ...
124+
def buffer(self, size: int) -> "Pipeline[T]":
125+
self.apply(ThreadedTransformer(max_workers=size))
126+
return self
127+
124128
def __iter__(self) -> Iterator[T]:
125129
"""Allows the pipeline to be iterated over."""
126130
yield from self.processed_data

tests/test_pipeline.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,43 @@ def test_chunked_processing_consistency(self):
171171

172172
expected = list(range(1, 101)) # [1, 2, 3, ..., 100]
173173
assert result == expected
174+
175+
def test_buffer_with_two_maps(self):
176+
"""Test that buffer function works correctly with two sequential map operations."""
177+
# Create a pipeline with two map operations and buffering
178+
data = list(range(10))
179+
180+
# Track execution order to verify buffering behavior
181+
execution_order = []
182+
183+
def first_map(x):
184+
execution_order.append(f"first_map({x})")
185+
return x * 2
186+
187+
def second_map(x):
188+
execution_order.append(f"second_map({x})")
189+
return x + 1
190+
191+
# Apply buffering with 2 workers between two map operations
192+
result = (
193+
Pipeline(data)
194+
.transform(lambda t: t.map(first_map))
195+
.buffer(2) # Buffer with 2 workers
196+
.transform(lambda t: t.map(second_map))
197+
.to_list()
198+
)
199+
200+
# Verify the final result is correct
201+
expected = [(x * 2) + 1 for x in range(10)] # [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
202+
assert result == expected
203+
204+
# Verify both map operations were called for each element
205+
assert len([call for call in execution_order if "first_map" in call]) == 10
206+
assert len([call for call in execution_order if "second_map" in call]) == 10
207+
208+
# Verify all expected values were processed
209+
first_map_values = [int(call.split("(")[1].split(")")[0]) for call in execution_order if "first_map" in call]
210+
second_map_values = [int(call.split("(")[1].split(")")[0]) for call in execution_order if "second_map" in call]
211+
212+
assert sorted(first_map_values) == list(range(10))
213+
assert sorted(second_map_values) == [x * 2 for x in range(10)]

0 commit comments

Comments
 (0)