Skip to content

Commit ab594e4

Browse files
committed
Revert "revert branch UNPICK"
This reverts commit 62c49d4.
1 parent 62c49d4 commit ab594e4

File tree

16 files changed

+959
-98
lines changed

16 files changed

+959
-98
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4949
async-trait = "0.1.89"
5050
futures = "0.3"
51+
rayon = "1.10"
5152
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5253
url = "2"
5354
log = "0.4.27"

benchmarks/collect_gil_bench.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import math
21+
import time
22+
from typing import Callable
23+
24+
import pyarrow as pa
25+
from datafusion import SessionContext, col, DataFrame
26+
from datafusion import functions as f
27+
28+
29+
def create_partitions(batches: list[pa.RecordBatch], n_partitions: int | None = None) -> list[list[pa.RecordBatch]]:
30+
"""Create partitions from batches."""
31+
if n_partitions is None:
32+
n_partitions = len(batches)
33+
n_partitions = max(1, min(n_partitions, len(batches)))
34+
partition_size = math.ceil(len(batches) / n_partitions)
35+
return [
36+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
37+
]
38+
39+
40+
def create_dataframe_from_batches(
41+
batches: list[pa.RecordBatch],
42+
n_partitions: int | None = None
43+
) -> DataFrame:
44+
"""Create a DataFrame from batches with proper partitioning."""
45+
ctx = SessionContext()
46+
partitions = create_partitions(batches, n_partitions)
47+
return ctx.create_dataframe(partitions)
48+
49+
50+
def time_execution(func: Callable[[], any], description: str) -> None:
51+
"""Time the execution of a function and print results."""
52+
start = time.perf_counter()
53+
result = func()
54+
duration = time.perf_counter() - start
55+
56+
if hasattr(result, '__len__'):
57+
print(f"{description} in {duration:.3f}s, {len(result)} result rows")
58+
else:
59+
print(f"{description} in {duration:.3f}s")
60+
61+
62+
def create_numeric_batches(n_batches: int, batch_size: int) -> list[pa.RecordBatch]:
63+
"""Create batches with numeric data for simple aggregation."""
64+
batches = []
65+
for i in range(n_batches):
66+
start = i * batch_size
67+
arr = pa.array(range(start, start + batch_size))
68+
batches.append(pa.record_batch([arr], names=["a"]))
69+
return batches
70+
71+
72+
def create_multi_column_batches(n_batches: int, batch_size: int) -> list[pa.RecordBatch]:
73+
"""Create batches with multiple columns for complex computations."""
74+
batches = []
75+
for i in range(n_batches):
76+
start = i * batch_size
77+
arr_a = pa.array(range(start, start + batch_size))
78+
arr_b = pa.array([x * 2.5 + 1.0 for x in range(start, start + batch_size)])
79+
arr_c = pa.array([x % 1000 for x in range(start, start + batch_size)])
80+
batches.append(pa.record_batch([arr_a, arr_b, arr_c], names=["a", "b", "c"]))
81+
return batches
82+
83+
84+
def create_string_batches(n_batches: int, batch_size: int) -> list[pa.RecordBatch]:
85+
"""Create batches with string data for string processing."""
86+
batches = []
87+
for i in range(n_batches):
88+
start = i * batch_size
89+
arr_id = pa.array([f"user_{x:08d}" for x in range(start, start + batch_size)])
90+
arr_email = pa.array([f"user{x}@example{x%10}.com" for x in range(start, start + batch_size)])
91+
arr_category = pa.array([f"category_{x%100:03d}" for x in range(start, start + batch_size)])
92+
arr_value = pa.array([x * 1.5 for x in range(start, start + batch_size)])
93+
94+
batches.append(pa.record_batch(
95+
[arr_id, arr_email, arr_category, arr_value],
96+
names=["id", "email", "category", "value"]
97+
))
98+
return batches
99+
100+
101+
def create_groupby_batches(n_batches: int, batch_size: int) -> list[pa.RecordBatch]:
102+
"""Create batches for group-by operations."""
103+
batches = []
104+
for i in range(n_batches):
105+
start = i * batch_size
106+
arr_a = pa.array(range(start, start + batch_size))
107+
arr_group = pa.array([x % 1000 for x in range(start, start + batch_size)])
108+
arr_value = pa.array([x * 2.5 + (x % 100) for x in range(start, start + batch_size)])
109+
110+
batches.append(pa.record_batch(
111+
[arr_a, arr_group, arr_value],
112+
names=["a", "group_id", "value"]
113+
))
114+
return batches
115+
116+
117+
def run_simple_aggregation(
118+
n_batches: int = 8,
119+
batch_size: int = 1_000_000,
120+
n_partitions: int | None = None,
121+
) -> None:
122+
"""Simple aggregation benchmark (original)."""
123+
batches = create_numeric_batches(n_batches, batch_size)
124+
df = create_dataframe_from_batches(batches, n_partitions)
125+
126+
def execute():
127+
return df.aggregate([], [f.sum(col("a"))]).collect()
128+
129+
time_execution(execute, f"Simple aggregation: {n_batches} batches")
130+
131+
132+
def run_complex_computations(
133+
n_batches: int = 8,
134+
batch_size: int = 1_000_000,
135+
n_partitions: int | None = None,
136+
) -> None:
137+
"""CPU-intensive computations with multiple columns."""
138+
batches = create_multi_column_batches(n_batches, batch_size)
139+
df = create_dataframe_from_batches(batches, n_partitions)
140+
141+
# CPU-intensive transformations
142+
df = df.select(
143+
col("a"),
144+
col("b"),
145+
col("c"),
146+
# Complex mathematical operations
147+
(col("a") * col("b") + col("c") * col("c")).alias("poly1"),
148+
(col("a") * col("a") * col("a") + col("b") * col("b")).alias("poly2"),
149+
(col("a") / (col("b") + 1.0) * col("c")).alias("ratio"),
150+
# More expensive operations
151+
f.sqrt(col("a") + col("b")).alias("sqrt_sum"),
152+
(col("a") * col("a")).alias("power2"),
153+
(col("b") * col("b") * col("b")).alias("power3"),
154+
)
155+
156+
# Multiple filtering operations
157+
df = df.filter(col("a") % 100 < 50)
158+
df = df.filter(col("poly1") > 1000)
159+
df = df.filter(col("ratio") < 10000)
160+
161+
# Group by with multiple aggregations
162+
df = df.aggregate(
163+
[col("c") % 10],
164+
[
165+
f.sum(col("poly1")).alias("sum_poly1"),
166+
f.avg(col("poly2")).alias("avg_poly2"),
167+
f.max(col("ratio")).alias("max_ratio"),
168+
f.min(col("sqrt_sum")).alias("min_sqrt"),
169+
f.count(col("a")).alias("count_rows"),
170+
]
171+
)
172+
173+
def execute():
174+
return df.collect()
175+
176+
time_execution(execute, f"Complex computations: {n_batches} batches")
177+
178+
179+
def run_string_processing(
180+
n_batches: int = 8,
181+
batch_size: int = 500_000, # Smaller batches for string operations
182+
n_partitions: int | None = None,
183+
) -> None:
184+
"""CPU-intensive string processing operations."""
185+
batches = create_string_batches(n_batches, batch_size)
186+
df = create_dataframe_from_batches(batches, n_partitions)
187+
188+
# String processing operations
189+
df = df.select(
190+
col("id"),
191+
col("email"),
192+
col("category"),
193+
col("value"),
194+
# String manipulations (CPU intensive)
195+
f.length(col("email")).alias("email_length"),
196+
f.upper(col("category")).alias("category_upper"),
197+
f.lower(col("email")).alias("email_lower"),
198+
f.length(col("id")).alias("id_length"),
199+
)
200+
201+
# String-based filtering
202+
df = df.filter(f.length(col("email")) > 15)
203+
df = df.filter(f.length(col("category_upper")) > 10)
204+
df = df.filter(col("email_length") < 50)
205+
206+
# Group by operations with string processing
207+
df = df.aggregate(
208+
[col("category")], # Group by full category
209+
[
210+
f.sum(col("value")).alias("total_value"),
211+
f.avg(col("email_length")).alias("avg_email_len"),
212+
f.max(col("id_length")).alias("max_id_len"),
213+
f.count(col("id")).alias("count_users"),
214+
]
215+
)
216+
217+
def execute():
218+
return df.collect()
219+
220+
time_execution(execute, f"String processing: {n_batches} batches")
221+
222+
223+
def run_window_functions(
224+
n_batches: int = 8,
225+
batch_size: int = 1_000_000,
226+
n_partitions: int | None = None,
227+
) -> None:
228+
"""CPU-intensive window function operations."""
229+
batches = create_groupby_batches(n_batches, batch_size)
230+
df = create_dataframe_from_batches(batches, n_partitions)
231+
232+
# Note: Window functions in DataFusion Python may have limited support
233+
# Using group-by operations that require sorting and complex aggregations
234+
df = df.filter(col("value") > 100)
235+
df = df.select(
236+
col("group_id"),
237+
col("value"),
238+
(col("value") * col("value")).alias("value_squared"),
239+
f.sqrt(col("value")).alias("value_sqrt"),
240+
)
241+
242+
# Multiple aggregations per group (CPU intensive)
243+
df = df.aggregate(
244+
[col("group_id")],
245+
[
246+
f.sum(col("value")).alias("sum_value"),
247+
f.avg(col("value_squared")).alias("avg_squared"),
248+
f.max(col("value_sqrt")).alias("max_sqrt"),
249+
f.min(col("value")).alias("min_value"),
250+
f.count(col("value")).alias("count_rows"),
251+
]
252+
)
253+
254+
def execute():
255+
return df.collect()
256+
257+
time_execution(execute, f"Window/groupby operations: {n_batches} batches")
258+
259+
260+
def run(
261+
n_batches: int = 8,
262+
batch_size: int = 1_000_000,
263+
n_partitions: int | None = None,
264+
workload: str = "all",
265+
) -> None:
266+
"""Run the specified workload(s)."""
267+
if workload == "simple" or workload == "all":
268+
run_simple_aggregation(n_batches, batch_size, n_partitions)
269+
270+
if workload == "complex" or workload == "all":
271+
run_complex_computations(n_batches, batch_size, n_partitions)
272+
273+
if workload == "strings" or workload == "all":
274+
run_string_processing(n_batches, batch_size // 2, n_partitions) # Use smaller batches for strings
275+
276+
if workload == "groupby" or workload == "all":
277+
run_window_functions(n_batches, batch_size, n_partitions)
278+
279+
280+
if __name__ == "__main__":
281+
import argparse
282+
283+
parser = argparse.ArgumentParser(
284+
description="CPU-intensive benchmarks to demonstrate multi-threading benefits"
285+
)
286+
parser.add_argument(
287+
"--batches",
288+
type=int,
289+
default=8,
290+
help="number of input batches to generate",
291+
)
292+
parser.add_argument(
293+
"--batch-size",
294+
type=int,
295+
default=1_000_000,
296+
help="number of rows per batch",
297+
)
298+
parser.add_argument(
299+
"--partitions",
300+
type=int,
301+
default=None,
302+
help="number of partitions to create (defaults to one per batch)",
303+
)
304+
parser.add_argument(
305+
"--workload",
306+
type=str,
307+
default="all",
308+
choices=["simple", "complex", "strings", "groupby", "all"],
309+
help="type of workload to run: simple (basic aggregation), complex (mathematical operations), strings (string processing), groupby (group-by operations), or all",
310+
)
311+
args = parser.parse_args()
312+
313+
import os
314+
rayon_threads = os.environ.get('RAYON_NUM_THREADS', 'default')
315+
print(f"\n\nRunning benchmark with {args.batches} batches, {args.batch_size} rows per batch")
316+
print(f"Partitions: {args.partitions or args.batches}, Workload: {args.workload}, RAYON_NUM_THREADS: {rayon_threads}")
317+
print("-" * 60)
318+
319+
run(
320+
n_batches=args.batches,
321+
batch_size=args.batch_size,
322+
n_partitions=args.partitions,
323+
workload=args.workload,
324+
)

0 commit comments

Comments
 (0)