Skip to content

Commit 8bb3cef

Browse files
equietcursoragent
andauthored
feat: add federated auth helpers for Streamlit data apps (#97)
* feat: add federated auth helpers for Streamlit data apps Introduces deepnote_toolkit.streamlit_data_apps with: - get_federated_auth_token: forwards the viewer's streamlit-token cookie to userpod-api to obtain a per-viewer OAuth access token plus non-secret connectionParams. - FederatedAuthRequired: raised when the viewer has not yet authenticated the integration; carries the OAuth start URL so callers can render an authentication prompt. - prompt_federated_auth: renders an st.error + st.link_button to start the OAuth flow when the viewer is unauthenticated. - get_bigquery_client / get_snowflake_connection: convenience wrappers that build clients using the viewer's access token. Co-authored-by: Cursor <cursoragent@cursor.com> * fix: silence mypy import-not-found for optional streamlit import The Typecheck CI job only installs main deps (no `server` extras), so mypy can't find the `streamlit` module. Both imports are inside functions and guarded for the case where Streamlit isn't installed, so we just need to tell mypy to ignore the missing import. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 0625708 commit 8bb3cef

2 files changed

Lines changed: 643 additions & 0 deletions

File tree

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
"""Helpers for federated authentication inside Streamlit data apps.
2+
3+
Streamlit data apps run as a long-lived process inside the project pod, separate from the
4+
notebook kernel. The viewer is identified by the ``streamlit-token`` cookie set when the
5+
data app SSR page is rendered. To obtain database credentials scoped to the viewer (rather
6+
than to the project owner), apps call this helper, which forwards the cookie to the webapp's
7+
userpod-api as a ``StreamlitToken`` header.
8+
9+
Usage inside a Streamlit data app::
10+
11+
import deepnote_toolkit.streamlit_data_apps as dn
12+
13+
creds = dn.get_federated_auth_token("<integration-id>")
14+
# creds = {
15+
# "integrationType": "big-query" | "snowflake" | "trino",
16+
# "accessToken": "<oauth-access-token>",
17+
# "connectionParams": {"type": "big-query" | "snowflake" | "trino", ...},
18+
# }
19+
20+
Convenience wrappers for the most common clients are also provided::
21+
22+
client = dn.get_bigquery_client("<integration-id>")
23+
conn = dn.get_snowflake_connection("<integration-id>")
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import json
29+
import urllib.error
30+
import urllib.request
31+
from typing import Any, Dict, Optional
32+
33+
from .get_webapp_url import (
34+
get_absolute_userpod_api_url,
35+
get_project_auth_headers,
36+
)
37+
38+
STREAMLIT_TOKEN_COOKIE_NAME = "streamlit-token"
39+
40+
41+
class StreamlitFederatedAuthError(Exception):
42+
"""Raised when the federated auth token cannot be obtained for a Streamlit viewer."""
43+
44+
45+
class FederatedAuthRequired(StreamlitFederatedAuthError):
46+
"""Raised when the viewer has not yet authenticated the federated integration.
47+
48+
Carries ``auth_url`` (the Deepnote OAuth start URL for this integration) and
49+
``integration_name`` so callers can render an authentication prompt.
50+
"""
51+
52+
def __init__(
53+
self,
54+
message: str,
55+
*,
56+
auth_url: str,
57+
integration_name: Optional[str] = None,
58+
) -> None:
59+
super().__init__(message)
60+
self.auth_url = auth_url
61+
self.integration_name = integration_name
62+
63+
64+
def _read_streamlit_token_from_context() -> Optional[str]:
65+
"""Read the ``streamlit-token`` cookie from the active Streamlit context.
66+
67+
Returns ``None`` if Streamlit is not installed, no script run is active, or the cookie
68+
is missing.
69+
"""
70+
71+
try:
72+
import streamlit as st # type: ignore[import-not-found]
73+
except ImportError:
74+
return None
75+
76+
try:
77+
cookies = st.context.cookies
78+
except Exception:
79+
return None
80+
81+
if not cookies:
82+
return None
83+
84+
token = cookies.get(STREAMLIT_TOKEN_COOKIE_NAME)
85+
if not isinstance(token, str) or not token:
86+
return None
87+
return token
88+
89+
90+
def get_federated_auth_token(
91+
integration_id: str,
92+
*,
93+
streamlit_token: Optional[str] = None,
94+
timeout: float = 10.0,
95+
) -> Dict[str, Any]:
96+
"""Fetch a federated-auth access token for the current Streamlit viewer.
97+
98+
Parameters
99+
----------
100+
integration_id:
101+
The Deepnote integration UUID.
102+
streamlit_token:
103+
Optional override for the ``streamlit-token`` cookie value. If not provided, the
104+
token is read from ``st.context.cookies``.
105+
timeout:
106+
Timeout in seconds for the HTTP request.
107+
108+
Returns
109+
-------
110+
dict
111+
A dict with ``integrationType``, ``accessToken``, and ``connectionParams`` keys.
112+
``connectionParams`` carries non-secret integration metadata useful for building
113+
a database client (e.g. ``project`` for BigQuery, ``accountName`` for Snowflake).
114+
115+
Raises
116+
------
117+
StreamlitFederatedAuthError
118+
If the token cannot be resolved or the webapp returns a non-2xx response.
119+
"""
120+
121+
if not integration_id:
122+
raise StreamlitFederatedAuthError("integration_id is required.")
123+
124+
token = streamlit_token or _read_streamlit_token_from_context()
125+
if not token:
126+
raise StreamlitFederatedAuthError(
127+
"Could not read the `streamlit-token` cookie from the Streamlit context. "
128+
"This helper is intended to run inside a Streamlit data app served via "
129+
"Deepnote, where the cookie is forwarded by the proxy."
130+
)
131+
132+
# ``get_absolute_userpod_api_url`` resolves the project ID from the runtime config
133+
# / DEEPNOTE_PROJECT_ID env var. Inside the project pod the userpod-proxy sidecar
134+
# already prepends the project ID before forwarding to the webapp, so the relative
135+
# URL passed in here must NOT include it.
136+
url = get_absolute_userpod_api_url(
137+
f"integrations/federated-auth-token-streamlit/{integration_id}"
138+
)
139+
140+
headers: Dict[str, str] = {
141+
"Content-Type": "application/json",
142+
"StreamlitToken": token,
143+
**get_project_auth_headers(),
144+
}
145+
146+
request = urllib.request.Request(
147+
url,
148+
data=b"",
149+
method="POST",
150+
headers=headers,
151+
)
152+
153+
try:
154+
with urllib.request.urlopen(request, timeout=timeout) as response:
155+
body = response.read().decode("utf-8")
156+
except urllib.error.HTTPError as error:
157+
error_body = error.read().decode("utf-8", errors="replace")
158+
auth_required = _parse_auth_required(error_body)
159+
if auth_required is not None:
160+
raise auth_required from error
161+
raise StreamlitFederatedAuthError(
162+
f"Federated auth request failed with HTTP {error.code}: {error_body}"
163+
) from error
164+
except urllib.error.URLError as error:
165+
raise StreamlitFederatedAuthError(
166+
f"Federated auth request failed: {error}"
167+
) from error
168+
169+
try:
170+
payload = json.loads(body)
171+
except json.JSONDecodeError as error:
172+
raise StreamlitFederatedAuthError(
173+
f"Federated auth response was not valid JSON: {body!r}"
174+
) from error
175+
176+
if "accessToken" not in payload or "integrationType" not in payload:
177+
raise StreamlitFederatedAuthError(
178+
f"Federated auth response is missing required fields: {payload!r}"
179+
)
180+
181+
payload.setdefault("connectionParams", {})
182+
return payload
183+
184+
185+
def _parse_auth_required(error_body: str) -> Optional["FederatedAuthRequired"]:
186+
"""Return a ``FederatedAuthRequired`` if the error body advertises an auth URL."""
187+
188+
try:
189+
body = json.loads(error_body)
190+
except (json.JSONDecodeError, TypeError):
191+
return None
192+
193+
if not isinstance(body, dict):
194+
return None
195+
196+
auth_required = body.get("authRequired")
197+
if not isinstance(auth_required, dict):
198+
return None
199+
200+
auth_url = auth_required.get("authUrl")
201+
integration_name = auth_required.get("integrationName")
202+
if not isinstance(auth_url, str) or not auth_url:
203+
return None
204+
205+
message = body.get("error")
206+
if not isinstance(message, str) or not message:
207+
message = (
208+
f"Sign in to {integration_name} to use this integration."
209+
if integration_name
210+
else "Sign in to this integration before using it."
211+
)
212+
213+
return FederatedAuthRequired(
214+
message,
215+
auth_url=auth_url,
216+
integration_name=(
217+
integration_name if isinstance(integration_name, str) else None
218+
),
219+
)
220+
221+
222+
def prompt_federated_auth(
223+
integration_id: str,
224+
*,
225+
streamlit_token: Optional[str] = None,
226+
stop: bool = True,
227+
) -> None:
228+
"""Render a Streamlit prompt asking the viewer to authenticate the integration.
229+
230+
Calls :func:`get_federated_auth_token` to discover the OAuth start URL for the
231+
integration. If the viewer has already authenticated, this is a no-op. Otherwise
232+
it renders ``st.error`` with a link button that opens the same OAuth flow used by
233+
notebooks and published apps. By default the script is then halted via
234+
:func:`streamlit.stop` so the rest of the data app does not run with missing
235+
credentials.
236+
"""
237+
238+
import streamlit as st # type: ignore[import-not-found]
239+
240+
try:
241+
get_federated_auth_token(
242+
integration_id,
243+
streamlit_token=streamlit_token,
244+
)
245+
return
246+
except FederatedAuthRequired as auth_required:
247+
label = (
248+
f"Authenticate {auth_required.integration_name}"
249+
if auth_required.integration_name
250+
else "Authenticate integration"
251+
)
252+
st.error(str(auth_required))
253+
try:
254+
st.link_button(label, auth_required.auth_url, type="primary")
255+
except TypeError:
256+
# Older Streamlit versions don't accept ``type``.
257+
st.link_button(label, auth_required.auth_url)
258+
if stop:
259+
st.stop()
260+
261+
262+
def get_bigquery_client(
263+
integration_id: str,
264+
*,
265+
project: Optional[str] = None,
266+
streamlit_token: Optional[str] = None,
267+
**client_kwargs: Any,
268+
) -> Any:
269+
"""Build a ``google.cloud.bigquery.Client`` for the current Streamlit viewer.
270+
271+
The viewer's OAuth access token is obtained from Deepnote and used as the credential.
272+
"""
273+
274+
from google.api_core.client_info import ClientInfo
275+
from google.cloud import bigquery
276+
from google.oauth2.credentials import Credentials
277+
278+
try:
279+
payload = get_federated_auth_token(
280+
integration_id,
281+
streamlit_token=streamlit_token,
282+
)
283+
except FederatedAuthRequired:
284+
prompt_federated_auth(
285+
integration_id,
286+
streamlit_token=streamlit_token,
287+
)
288+
raise
289+
290+
params = payload.get("connectionParams", {})
291+
if params.get("type") != "big-query":
292+
raise StreamlitFederatedAuthError(
293+
f"Integration {integration_id} is not a BigQuery integration "
294+
f"(got {params.get('type')!r})."
295+
)
296+
297+
resolved_project = project or params.get("project")
298+
if not resolved_project:
299+
raise StreamlitFederatedAuthError(
300+
"BigQuery integration metadata did not include a project. "
301+
"Pass `project=` explicitly."
302+
)
303+
304+
credentials = Credentials(payload["accessToken"])
305+
# Match the User-Agent used by the notebook flow so Google's partnership
306+
# dashboard correctly attributes traffic to Deepnote (MAR-237).
307+
client_info = client_kwargs.pop(
308+
"client_info",
309+
ClientInfo(user_agent="Deepnote/1.0.0 (GPN:Deepnote;production)"),
310+
)
311+
312+
return bigquery.Client(
313+
project=resolved_project,
314+
credentials=credentials,
315+
client_info=client_info,
316+
**client_kwargs,
317+
)
318+
319+
320+
def get_snowflake_connection(
321+
integration_id: str,
322+
*,
323+
account: Optional[str] = None,
324+
warehouse: Optional[str] = None,
325+
database: Optional[str] = None,
326+
role: Optional[str] = None,
327+
user: Optional[str] = None,
328+
streamlit_token: Optional[str] = None,
329+
**connect_kwargs: Any,
330+
) -> Any:
331+
"""Open a ``snowflake.connector`` connection for the current Streamlit viewer.
332+
333+
The viewer's OAuth access token is used as the Snowflake authenticator token.
334+
"""
335+
336+
import snowflake.connector # type: ignore[import-not-found]
337+
338+
try:
339+
payload = get_federated_auth_token(
340+
integration_id,
341+
streamlit_token=streamlit_token,
342+
)
343+
except FederatedAuthRequired:
344+
prompt_federated_auth(
345+
integration_id,
346+
streamlit_token=streamlit_token,
347+
)
348+
raise
349+
350+
params = payload.get("connectionParams", {})
351+
if params.get("type") != "snowflake":
352+
raise StreamlitFederatedAuthError(
353+
f"Integration {integration_id} is not a Snowflake integration "
354+
f"(got {params.get('type')!r})."
355+
)
356+
357+
resolved_account = account or params.get("accountName")
358+
if not resolved_account:
359+
raise StreamlitFederatedAuthError(
360+
"Snowflake integration metadata did not include an account name. "
361+
"Pass `account=` explicitly."
362+
)
363+
364+
kwargs: Dict[str, Any] = dict(connect_kwargs)
365+
kwargs.setdefault("account", resolved_account)
366+
kwargs.setdefault("authenticator", "oauth")
367+
kwargs.setdefault("token", payload["accessToken"])
368+
369+
resolved_warehouse = warehouse or params.get("warehouse")
370+
if resolved_warehouse:
371+
kwargs.setdefault("warehouse", resolved_warehouse)
372+
373+
resolved_database = database or params.get("database")
374+
if resolved_database:
375+
kwargs.setdefault("database", resolved_database)
376+
377+
resolved_role = role or params.get("role")
378+
if resolved_role:
379+
kwargs.setdefault("role", resolved_role)
380+
381+
resolved_user = user or params.get("user")
382+
if resolved_user:
383+
kwargs.setdefault("user", resolved_user)
384+
385+
return snowflake.connector.connect(**kwargs)

0 commit comments

Comments
 (0)