Skip to content

Commit eabb6bf

Browse files
committed
Add test_large_query.ipynb
1 parent 937d7d2 commit eabb6bf

File tree

3 files changed

+746
-0
lines changed

3 files changed

+746
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"cells": [],
3+
"metadata": {},
4+
"nbformat": 4,
5+
"nbformat_minor": 5
6+
}
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"id": "b8171e80-141a-4f5a-9427-9459b93f9103",
7+
"metadata": {},
8+
"outputs": [
9+
{
10+
"name": "stdout",
11+
"output_type": "stream",
12+
"text": [
13+
"/Users/kosiew/GitHub/datafusion-python/.venv/bin/python3\n"
14+
]
15+
}
16+
],
17+
"source": [
18+
"import sys\n",
19+
"print(sys.executable)\n"
20+
]
21+
},
22+
{
23+
"cell_type": "code",
24+
"execution_count": 2,
25+
"id": "6f6810fe-6cc5-4277-b314-ea277e61455d",
26+
"metadata": {},
27+
"outputs": [],
28+
"source": [
29+
"import time\n",
30+
"import threading\n",
31+
"import pyarrow as pa\n",
32+
"from datafusion import SessionContext"
33+
]
34+
},
35+
{
36+
"cell_type": "code",
37+
"execution_count": 3,
38+
"id": "22d46be4-49ff-4a12-93e3-4dfac34e293e",
39+
"metadata": {},
40+
"outputs": [
41+
{
42+
"name": "stdout",
43+
"output_type": "stream",
44+
"text": [
45+
"Dataset created successfully!\n"
46+
]
47+
}
48+
],
49+
"source": [
50+
"def create_large_dataset():\n",
51+
" \"\"\"Create a large dataset for testing interruption.\"\"\"\n",
52+
" ctx = SessionContext()\n",
53+
" \n",
54+
" # Create large record batches similar to the test\n",
55+
" batches = []\n",
56+
" for i in range(10):\n",
57+
" batch = pa.RecordBatch.from_arrays(\n",
58+
" [\n",
59+
" pa.array(list(range(i * 1000, (i + 1) * 1000))),\n",
60+
" pa.array([f\"value_{j}\" for j in range(i * 1000, (i + 1) * 1000)]),\n",
61+
" ],\n",
62+
" names=[\"a\", \"b\"],\n",
63+
" )\n",
64+
" batches.append(batch)\n",
65+
" \n",
66+
" # Register tables\n",
67+
" ctx.register_record_batches(\"t1\", [batches])\n",
68+
" ctx.register_record_batches(\"t2\", [batches])\n",
69+
" \n",
70+
" return ctx\n",
71+
"\n",
72+
"# Setup the test environment\n",
73+
"ctx = create_large_dataset()\n",
74+
"print(\"Dataset created successfully!\")"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": 5,
80+
"id": "8f31a017-41c5-4222-b63e-c942ddd4d002",
81+
"metadata": {},
82+
"outputs": [
83+
{
84+
"name": "stdout",
85+
"output_type": "stream",
86+
"text": [
87+
"Starting long-running query...\n",
88+
"Press Ctrl+C to interrupt!\n",
89+
"Query completed successfully! Got 2996 batches\n"
90+
]
91+
}
92+
],
93+
"source": [
94+
"# Create a complex, long-running query\n",
95+
"df = ctx.sql(\"\"\"\n",
96+
" WITH t1_expanded AS (\n",
97+
" SELECT \n",
98+
" a, \n",
99+
" b, \n",
100+
" CAST(a AS DOUBLE) / 1.5 AS c,\n",
101+
" CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS d\n",
102+
" FROM t1\n",
103+
" CROSS JOIN (SELECT 1 AS dummy FROM t1 LIMIT 5)\n",
104+
" ),\n",
105+
" t2_expanded AS (\n",
106+
" SELECT \n",
107+
" a,\n",
108+
" b,\n",
109+
" CAST(a AS DOUBLE) * 2.5 AS e,\n",
110+
" CAST(a AS DOUBLE) * CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS f\n",
111+
" FROM t2\n",
112+
" CROSS JOIN (SELECT 1 AS dummy FROM t2 LIMIT 5)\n",
113+
" )\n",
114+
" SELECT \n",
115+
" t1.a, t1.b, t1.c, t1.d, \n",
116+
" t2.a AS a2, t2.b AS b2, t2.e, t2.f\n",
117+
" FROM t1_expanded t1\n",
118+
" JOIN t2_expanded t2 ON t1.a % 100 = t2.a % 100\n",
119+
" WHERE t1.a > 100 AND t2.a > 100\n",
120+
"\"\"\")\n",
121+
"\n",
122+
"print(\"Starting long-running query...\")\n",
123+
"print(\"Press Ctrl+C to interrupt!\")\n",
124+
"\n",
125+
"try:\n",
126+
" result = df.collect()\n",
127+
" print(f\"Query completed successfully! Got {len(result)} batches\")\n",
128+
"except KeyboardInterrupt:\n",
129+
" print(\"✅ Query was successfully interrupted by Ctrl+C!\")\n",
130+
"except Exception as e:\n",
131+
" print(f\"❌ Unexpected error: {e}\")"
132+
]
133+
},
134+
{
135+
"cell_type": "code",
136+
"execution_count": 6,
137+
"id": "6ad8d10c-afc2-411b-ad2c-057e1f38c5ed",
138+
"metadata": {},
139+
"outputs": [],
140+
"source": [
141+
"df = ctx.sql(\"\"\"\n",
142+
" WITH t1_expanded AS (\n",
143+
" SELECT \n",
144+
" a, \n",
145+
" b, \n",
146+
" CAST(a AS DOUBLE) / 1.5 AS c,\n",
147+
" CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS d\n",
148+
" FROM t1\n",
149+
" CROSS JOIN (SELECT 1 AS dummy FROM t1 LIMIT 5)\n",
150+
" ),\n",
151+
" t2_expanded AS (\n",
152+
" SELECT \n",
153+
" a,\n",
154+
" b,\n",
155+
" CAST(a AS DOUBLE) * 2.5 AS e,\n",
156+
" CAST(a AS DOUBLE) * CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS f\n",
157+
" FROM t2\n",
158+
" CROSS JOIN (SELECT 1 AS dummy FROM t2 LIMIT 5)\n",
159+
" )\n",
160+
" SELECT \n",
161+
" t1.a, t1.b, t1.c, t1.d, \n",
162+
" t2.a AS a2, t2.b AS b2, t2.e, t2.f\n",
163+
" FROM t1_expanded t1\n",
164+
" JOIN t2_expanded t2 ON t1.a % 100 = t2.a % 100\n",
165+
" WHERE t1.a > 100 AND t2.a > 100\n",
166+
"\"\"\")"
167+
]
168+
},
169+
{
170+
"cell_type": "code",
171+
"execution_count": 7,
172+
"id": "de500c47-3c2a-4732-8763-82cd3ff69701",
173+
"metadata": {},
174+
"outputs": [
175+
{
176+
"ename": "NameError",
177+
"evalue": "name 'create_very_large_dataset' is not defined",
178+
"output_type": "error",
179+
"traceback": [
180+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
181+
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
182+
"Cell \u001b[0;32mIn[7], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m ctx \u001b[38;5;241m=\u001b[39m \u001b[43mcreate_very_large_dataset\u001b[49m()\n",
183+
"\u001b[0;31mNameError\u001b[0m: name 'create_very_large_dataset' is not defined"
184+
]
185+
}
186+
],
187+
"source": [
188+
"ctx = create_very_large_dataset()"
189+
]
190+
},
191+
{
192+
"cell_type": "code",
193+
"execution_count": 8,
194+
"id": "2bc83e68-c836-4a1e-9e6e-8e0e8212d8ee",
195+
"metadata": {},
196+
"outputs": [
197+
{
198+
"ename": "TypeError",
199+
"evalue": "argument 'partitions': 'RecordBatch' object cannot be converted to 'PyList'",
200+
"output_type": "error",
201+
"traceback": [
202+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
203+
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
204+
"Cell \u001b[0;32mIn[8], line 28\u001b[0m\n\u001b[1;32m 25\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m ctx\n\u001b[1;32m 27\u001b[0m \u001b[38;5;66;03m# Setup the test environment\u001b[39;00m\n\u001b[0;32m---> 28\u001b[0m ctx \u001b[38;5;241m=\u001b[39m \u001b[43mcreate_very_large_dataset\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
205+
"Cell \u001b[0;32mIn[8], line 20\u001b[0m, in \u001b[0;36mcreate_very_large_dataset\u001b[0;34m()\u001b[0m\n\u001b[1;32m 17\u001b[0m batches\u001b[38;5;241m.\u001b[39mappend(batch)\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Register multiple large tables\u001b[39;00m\n\u001b[0;32m---> 20\u001b[0m \u001b[43mctx\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mregister_record_batches\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mlarge_table1\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mbatches\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 21\u001b[0m ctx\u001b[38;5;241m.\u001b[39mregister_record_batches(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlarge_table2\u001b[39m\u001b[38;5;124m\"\u001b[39m, batches)\n\u001b[1;32m 22\u001b[0m ctx\u001b[38;5;241m.\u001b[39mregister_record_batches(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlarge_table3\u001b[39m\u001b[38;5;124m\"\u001b[39m, batches)\n",
206+
"File \u001b[0;32m~/GitHub/datafusion-python/python/datafusion/context.py:771\u001b[0m, in \u001b[0;36mSessionContext.register_record_batches\u001b[0;34m(self, name, partitions)\u001b[0m\n\u001b[1;32m 759\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;21mregister_record_batches\u001b[39m(\n\u001b[1;32m 760\u001b[0m \u001b[38;5;28mself\u001b[39m, name: \u001b[38;5;28mstr\u001b[39m, partitions: \u001b[38;5;28mlist\u001b[39m[\u001b[38;5;28mlist\u001b[39m[pa\u001b[38;5;241m.\u001b[39mRecordBatch]]\n\u001b[1;32m 761\u001b[0m ) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 762\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Register record batches as a table.\u001b[39;00m\n\u001b[1;32m 763\u001b[0m \n\u001b[1;32m 764\u001b[0m \u001b[38;5;124;03m This function will convert the provided partitions into a table and\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 769\u001b[0m \u001b[38;5;124;03m partitions: Record batches to register as a table.\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 771\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mctx\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mregister_record_batches\u001b[49m\u001b[43m(\u001b[49m\u001b[43mname\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mpartitions\u001b[49m\u001b[43m)\u001b[49m\n",
207+
"\u001b[0;31mTypeError\u001b[0m: argument 'partitions': 'RecordBatch' object cannot be converted to 'PyList'"
208+
]
209+
}
210+
],
211+
"source": [
212+
"def create_very_large_dataset():\n",
213+
" \"\"\"Create a much larger dataset that will take time to process.\"\"\"\n",
214+
" ctx = SessionContext()\n",
215+
" \n",
216+
" # Create much larger record batches\n",
217+
" batches = []\n",
218+
" for i in range(100): # Increased from 10 to 100\n",
219+
" batch = pa.RecordBatch.from_arrays(\n",
220+
" [\n",
221+
" pa.array(list(range(i * 10000, (i + 1) * 10000))), # 10k rows per batch\n",
222+
" pa.array([f\"value_{j}\" for j in range(i * 10000, (i + 1) * 10000)]),\n",
223+
" pa.array([j * 1.5 for j in range(i * 10000, (i + 1) * 10000)]), # Float column\n",
224+
" pa.array([f\"category_{j % 1000}\" for j in range(i * 10000, (i + 1) * 10000)]), # Categories\n",
225+
" ],\n",
226+
" names=[\"id\", \"text_col\", \"float_col\", \"category\"],\n",
227+
" )\n",
228+
" batches.append(batch)\n",
229+
" \n",
230+
" # Register multiple large tables\n",
231+
" ctx.register_record_batches(\"large_table1\", batches)\n",
232+
" ctx.register_record_batches(\"large_table2\", batches)\n",
233+
" ctx.register_record_batches(\"large_table3\", batches)\n",
234+
" \n",
235+
" print(f\"Created dataset with {len(batches)} batches, ~{len(batches) * 10000:,} rows each\")\n",
236+
" return ctx\n",
237+
"\n",
238+
"# Setup the test environment\n",
239+
"ctx = create_very_large_dataset()"
240+
]
241+
},
242+
{
243+
"cell_type": "code",
244+
"execution_count": 9,
245+
"id": "76686311-ea6f-4f24-9870-543837a387bf",
246+
"metadata": {},
247+
"outputs": [
248+
{
249+
"name": "stdout",
250+
"output_type": "stream",
251+
"text": [
252+
"Created dataset with 100 batches, ~1,000,000 rows each\n"
253+
]
254+
}
255+
],
256+
"source": [
257+
"import time\n",
258+
"import pyarrow as pa\n",
259+
"from datafusion import SessionContext\n",
260+
"\n",
261+
"def create_very_large_dataset():\n",
262+
" \"\"\"Create a much larger dataset that will take time to process.\"\"\"\n",
263+
" ctx = SessionContext()\n",
264+
" \n",
265+
" # Create much larger record batches\n",
266+
" batches = []\n",
267+
" for i in range(100): # Increased from 10 to 100\n",
268+
" batch = pa.RecordBatch.from_arrays(\n",
269+
" [\n",
270+
" pa.array(list(range(i * 10000, (i + 1) * 10000))), # 10k rows per batch\n",
271+
" pa.array([f\"value_{j}\" for j in range(i * 10000, (i + 1) * 10000)]),\n",
272+
" pa.array([j * 1.5 for j in range(i * 10000, (i + 1) * 10000)]), # Float column\n",
273+
" pa.array([f\"category_{j % 1000}\" for j in range(i * 10000, (i + 1) * 10000)]), # Categories\n",
274+
" ],\n",
275+
" names=[\"id\", \"text_col\", \"float_col\", \"category\"],\n",
276+
" )\n",
277+
" batches.append(batch)\n",
278+
" \n",
279+
" # Fix: Register multiple large tables - wrap batches in a list for partitions\n",
280+
" ctx.register_record_batches(\"large_table1\", [batches]) # List of partitions\n",
281+
" ctx.register_record_batches(\"large_table2\", [batches]) # List of partitions\n",
282+
" ctx.register_record_batches(\"large_table3\", [batches]) # List of partitions\n",
283+
" \n",
284+
" print(f\"Created dataset with {len(batches)} batches, ~{len(batches) * 10000:,} rows each\")\n",
285+
" return ctx\n",
286+
"\n",
287+
"# Setup the test environment\n",
288+
"ctx = create_very_large_dataset()"
289+
]
290+
},
291+
{
292+
"cell_type": "code",
293+
"execution_count": 14,
294+
"id": "a163d524-73e7-4905-b3b1-1c83c3001572",
295+
"metadata": {},
296+
"outputs": [
297+
{
298+
"name": "stdout",
299+
"output_type": "stream",
300+
"text": [
301+
"Starting cartesian product query...\n",
302+
"Press Ctrl+C to interrupt!\n",
303+
"Query completed! Got 110 batches\n"
304+
]
305+
}
306+
],
307+
"source": [
308+
"# This will definitely be slow enough to interrupt\n",
309+
"df = ctx.sql(\"\"\"\n",
310+
" SELECT \n",
311+
" t1.id,\n",
312+
" t2.id as id2,\n",
313+
" t1.float_col * t2.float_col as product,\n",
314+
" CONCAT(t1.text_col, '_', t2.text_col) as combined_text,\n",
315+
" SIN(t1.float_col) + COS(t2.float_col) as trig_calc,\n",
316+
" CASE \n",
317+
" WHEN t1.id % 2 = 0 THEN 'even'\n",
318+
" ELSE 'odd'\n",
319+
" END as parity\n",
320+
" FROM large_table1 t1\n",
321+
" CROSS JOIN large_table2 t2\n",
322+
" WHERE t1.id BETWEEN 1000 AND 5000\n",
323+
" AND t2.id BETWEEN 1500 AND 5500\n",
324+
" ORDER BY product DESC\n",
325+
" LIMIT 900000\n",
326+
"\"\"\")\n",
327+
"\n",
328+
"print(\"Starting cartesian product query...\")\n",
329+
"print(\"Press Ctrl+C to interrupt!\")\n",
330+
"\n",
331+
"try:\n",
332+
" result = df.collect()\n",
333+
" print(f\"Query completed! Got {len(result)} batches\")\n",
334+
"except KeyboardInterrupt:\n",
335+
" print(\"✅ Query was successfully interrupted by Ctrl+C!\")\n",
336+
"except Exception as e:\n",
337+
" print(f\"Error: {e}\")"
338+
]
339+
},
340+
{
341+
"cell_type": "code",
342+
"execution_count": null,
343+
"id": "ec71bab4-b79e-4ca0-b08e-bec540522784",
344+
"metadata": {},
345+
"outputs": [],
346+
"source": []
347+
}
348+
],
349+
"metadata": {
350+
"kernelspec": {
351+
"display_name": "Python 3 (ipykernel)",
352+
"language": "python",
353+
"name": "python3"
354+
},
355+
"language_info": {
356+
"codemirror_mode": {
357+
"name": "ipython",
358+
"version": 3
359+
},
360+
"file_extension": ".py",
361+
"mimetype": "text/x-python",
362+
"name": "python",
363+
"nbconvert_exporter": "python",
364+
"pygments_lexer": "ipython3",
365+
"version": "3.11.12"
366+
}
367+
},
368+
"nbformat": 4,
369+
"nbformat_minor": 5
370+
}

0 commit comments

Comments
 (0)