Skip to content

Commit 0de32e5

Browse files
committed
Add test for curve encryption
1 parent 5acbca5 commit 0de32e5

1 file changed

Lines changed: 179 additions & 0 deletions

File tree

tests/test_curve.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# Copyright (c) IPython Development Team.
2+
# Distributed under the terms of the Modified BSD License.
3+
4+
import json
5+
import os
6+
import time
7+
8+
import pytest
9+
import zmq
10+
11+
from ipykernel.kernelapp import IPKernelApp
12+
13+
14+
@pytest.fixture
15+
def temp_folder_path(tmp_path):
16+
return str(tmp_path)
17+
18+
19+
@pytest.fixture
20+
def curve_disabled_kernel_app(temp_folder_path):
21+
app, connection_file_path = _make_app(temp_folder_path, enable_curve=False)
22+
try:
23+
yield app, connection_file_path
24+
finally:
25+
app.close()
26+
27+
28+
@pytest.fixture
29+
def curve_enabled_kernel_app(temp_folder_path):
30+
app, connection_file_path = _make_app(temp_folder_path, enable_curve=True)
31+
try:
32+
yield app, connection_file_path
33+
finally:
34+
app.close()
35+
36+
37+
def test_curve_disabled_by_default():
38+
"""CurveZMQ must be off by default so existing kernels are unaffected."""
39+
app = IPKernelApp()
40+
assert app.enable_curve is False
41+
42+
43+
def test_connection_file_no_curve_keys_by_default(curve_disabled_kernel_app):
44+
"""Connection file must not contain curve keys when Curve is disabled."""
45+
app, connection_file_path = curve_disabled_kernel_app
46+
app.init_sockets()
47+
app.init_heartbeat()
48+
app.write_connection_file()
49+
with open(connection_file_path) as f:
50+
info = json.load(f)
51+
assert "curve_publickey" not in info
52+
assert "curve_secretkey" not in info
53+
54+
55+
def test_curve_connection_file_has_keys(curve_enabled_kernel_app):
56+
"""When Curve is enabled the connection file must carry both keys."""
57+
app, connection_file_path = curve_enabled_kernel_app
58+
app.init_sockets()
59+
app.init_heartbeat()
60+
app.write_connection_file()
61+
with open(connection_file_path) as f:
62+
info = json.load(f)
63+
assert "curve_publickey" in info, "curve_publickey missing from connection file"
64+
assert "curve_secretkey" in info, "curve_secretkey missing from connection file"
65+
# Keys are Z85-encoded ASCII strings - always exactly 40 characters.
66+
assert len(info["curve_publickey"]) == 40
67+
assert len(info["curve_secretkey"]) == 40
68+
# Existing fields must still be present (backward-compat check).
69+
assert "key" in info
70+
assert "shell_port" in info
71+
72+
73+
def test_curve_keys_are_stable_per_startup(curve_enabled_kernel_app):
74+
"""Keys generated at startup stay the same throughout the process lifetime."""
75+
app, connection_file_path = curve_enabled_kernel_app
76+
app.init_sockets()
77+
pub1 = app._curve_publickey
78+
# Writing the file twice should not regenerate keys.
79+
app.init_heartbeat()
80+
app.write_connection_file()
81+
assert app._curve_publickey == pub1
82+
83+
84+
def test_curve_socket_server_options(curve_enabled_kernel_app):
85+
"""Bound sockets must have CURVE_SERVER=True when Curve is enabled."""
86+
app, connection_file_path = curve_enabled_kernel_app
87+
app.init_sockets()
88+
# shell and stdin are ROUTER sockets configured directly.
89+
assert app.shell_socket.curve_server, "shell_socket missing curve_server"
90+
assert app.stdin_socket.curve_server, "stdin_socket missing curve_server"
91+
assert app.control_socket.curve_server, "control_socket missing curve_server"
92+
# Key material is write-only in pyzmq; we verify it was applied
93+
# through the curve_server flag and the reject test below.
94+
95+
96+
def test_no_curve_socket_options_when_disabled(curve_disabled_kernel_app):
97+
"""No CURVE options are set when Curve is disabled (default)."""
98+
app, connection_file_path = curve_disabled_kernel_app
99+
app.init_sockets()
100+
# curve_server defaults to 0/False; key options are write-only.
101+
assert not app.shell_socket.curve_server
102+
103+
104+
def test_curve_unauthenticated_socket_messages_dropped(curve_enabled_kernel_app):
105+
"""With CurveZMQ, frames from a socket without the server key are dropped.
106+
107+
This is the core security property: a raw DEALER socket that connects to
108+
a CURVE_SERVER-enabled ROUTER cannot deliver messages to it. Compare
109+
with test_transport_security.py in jupyter-client which shows the *absence*
110+
of this property today.
111+
"""
112+
app, connection_file_path = curve_enabled_kernel_app
113+
app.init_sockets()
114+
115+
# Build the endpoint URL from the bound port.
116+
endpoint = f"tcp://{app.ip}:{app.shell_port}"
117+
118+
ctx = zmq.Context()
119+
unauth = ctx.socket(zmq.DEALER)
120+
try:
121+
unauth.connect(endpoint)
122+
# ZMQ delivers the connect synchronously, but the curve
123+
# handshake silently drops the message.
124+
unauth.send(b"probe", flags=zmq.NOBLOCK)
125+
126+
poller = zmq.Poller()
127+
poller.register(app.shell_socket, zmq.POLLIN)
128+
events = dict(poller.poll(timeout=300))
129+
assert app.shell_socket not in events, (
130+
"Unauthenticated message reached the kernel socket - "
131+
"CurveZMQ should have dropped it"
132+
)
133+
finally:
134+
unauth.close(linger=0)
135+
ctx.term()
136+
137+
138+
def test_curve_authenticated_socket_can_communicate(curve_enabled_kernel_app):
139+
"""With CurveZMQ, a correctly-keyed client socket can reach the kernel."""
140+
app, connection_file_path = curve_enabled_kernel_app
141+
app.init_sockets()
142+
143+
endpoint = f"tcp://{app.ip}:{app.shell_port}"
144+
server_public = app._curve_publickey
145+
146+
ctx = zmq.Context()
147+
auth_client = ctx.socket(zmq.DEALER)
148+
# Client uses the server's public key as CURVE_SERVERKEY; its own
149+
# keypair is used only for encryption, not for access control.
150+
client_pub, client_sec = zmq.curve_keypair()
151+
auth_client.curve_secretkey = client_sec
152+
auth_client.curve_publickey = client_pub
153+
auth_client.curve_serverkey = server_public
154+
try:
155+
auth_client.connect(endpoint)
156+
# Allow the handshake to complete.
157+
time.sleep(0.05)
158+
auth_client.send(b"probe", flags=zmq.NOBLOCK)
159+
160+
poller = zmq.Poller()
161+
poller.register(app.shell_socket, zmq.POLLIN)
162+
events = dict(poller.poll(timeout=1000))
163+
assert app.shell_socket in events, (
164+
"Authenticated client message was not received by kernel socket"
165+
)
166+
finally:
167+
auth_client.close(linger=0)
168+
ctx.term()
169+
170+
171+
def _make_app(temp_folder_path, **kwargs):
172+
"""Return a minimal IPKernelApp rooted in temporary directory *temp_folder_path*."""
173+
connection_file_path = os.path.join(temp_folder_path, "kernel.json")
174+
app = IPKernelApp(connection_file=connection_file_path, **kwargs)
175+
# Replicate the subset of initialize() that sets up connection info
176+
# without importing IPython shell machinery.
177+
super(IPKernelApp, app).initialize(argv=[""])
178+
app.init_connection_file()
179+
return app, connection_file_path

0 commit comments

Comments
 (0)