-
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathadmin_auth.py
More file actions
166 lines (142 loc) · 5.29 KB
/
admin_auth.py
File metadata and controls
166 lines (142 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Admin authentication middleware and dependencies for GitHub OAuth protection.
"""
import secrets
import logging
from datetime import datetime, UTC, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status, Request, Cookie
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from .database import get_database
from .models import AdminSession
from .oauth import GitHubUser
logger = logging.getLogger(__name__)
async def create_admin_session(
db: AsyncSession, github_user: GitHubUser, duration_hours: int = 24
) -> str:
"""Create a new admin session for a GitHub user."""
session_token = secrets.token_urlsafe(48)
expires_at = datetime.now(UTC).replace(tzinfo=None) + timedelta(
hours=duration_hours
)
admin_session = AdminSession(
session_token=session_token,
github_user_id=github_user.id,
github_username=github_user.login,
github_name=github_user.name,
github_email=github_user.email,
github_avatar_url=github_user.avatar_url,
expires_at=expires_at,
)
db.add(admin_session)
await db.commit()
await db.refresh(admin_session)
return session_token
async def get_admin_session(
db: AsyncSession, session_token: str
) -> Optional[AdminSession]:
"""Get an active admin session by token."""
result = await db.execute(
select(AdminSession).where(
and_(
AdminSession.session_token == session_token,
AdminSession.is_active.is_(True),
AdminSession.expires_at > datetime.now(UTC).replace(tzinfo=None),
)
)
)
return result.scalars().first()
async def invalidate_admin_session(db: AsyncSession, session_token: str) -> None:
"""Invalidate an admin session."""
result = await db.execute(
select(AdminSession).where(AdminSession.session_token == session_token)
)
session = result.scalars().first()
if session:
session.is_active = False
await db.commit()
async def cleanup_expired_sessions(db: AsyncSession) -> None:
"""Clean up expired admin sessions."""
result = await db.execute(
select(AdminSession).where(
and_(
AdminSession.expires_at <= datetime.now(UTC).replace(tzinfo=None),
AdminSession.is_active.is_(True),
)
)
)
expired_sessions = result.scalars().all()
for session in expired_sessions:
session.is_active = False
if expired_sessions:
await db.commit()
async def require_admin_auth(
request: Request,
admin_session_token: Optional[str] = Cookie(None, alias="admin_session"),
db: AsyncSession = Depends(get_database),
) -> AdminSession:
"""
Dependency to require admin authentication.
Checks for admin session cookie and validates it.
"""
# Uncomment to bypass authentication for testing
# fake_session = AdminSession(
# session_token="test_token",
# github_user_id=12345,
# github_username="test_admin",
# github_name="Test Admin",
# github_email="test@example.com",
# github_avatar_url="https://github.com/identicons/test.png",
# created_at=datetime.now(UTC).replace(tzinfo=None),
# expires_at=datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=24),
# is_active=True
# )
# return fake_session
if not admin_session_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Admin authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
try:
session = await get_admin_session(db, admin_session_token)
except Exception as e:
# Log the database error but don't expose internal details
logger.error(f"Database error in admin auth: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication service unavailable",
headers={"WWW-Authenticate": "Bearer"},
)
if not session:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired admin session",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if user is still an admin (for existing sessions, we need a valid token)
# Note: This check is disabled for existing sessions as we don't store the access token
# Team membership is verified during initial login only
# if not await github_oauth.is_admin_user(session.github_username):
# await invalidate_admin_session(db, admin_session_token)
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail="Admin privileges revoked",
# )
return session
async def optional_admin_auth(
request: Request,
admin_session_token: Optional[str] = Cookie(None, alias="admin_session"),
db: AsyncSession = Depends(get_database),
) -> Optional[AdminSession]:
"""
Optional admin authentication dependency.
Returns None if not authenticated, AdminSession if authenticated.
"""
if not admin_session_token:
return None
try:
return await require_admin_auth(request, admin_session_token, db)
except HTTPException:
return None