-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathoauth.py
More file actions
113 lines (94 loc) · 3.94 KB
/
oauth.py
File metadata and controls
113 lines (94 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
GitHub OAuth authentication handler for admin panel access.
"""
import secrets
import logging
from typing import Optional
from authlib.integrations.httpx_client import AsyncOAuth2Client
from fastapi import HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from .config import get_settings
from . import crud
logger = logging.getLogger(__name__)
class GitHubUser(BaseModel):
id: int
login: str
name: Optional[str]
email: Optional[str]
avatar_url: str
class GitHubOAuth:
def __init__(self):
self.settings = get_settings()
self.client_id = self.settings.github_client_id
self.client_secret = self.settings.github_client_secret
self.redirect_uri = self.settings.oauth_redirect_uri
self.authorization_url = "https://github.com/login/oauth/authorize"
self.token_url = "https://github.com/login/oauth/access_token"
self.user_info_url = "https://api.github.com/user"
def generate_authorization_url(self) -> tuple[str, str]:
"""Generate GitHub OAuth authorization URL and state."""
state = secrets.token_urlsafe(32)
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": "read:user user:email", # basic user info only
"state": state,
}
url_parts = [f"{k}={v}" for k, v in params.items()]
auth_url = f"{self.authorization_url}?{'&'.join(url_parts)}"
return auth_url, state
async def exchange_code_for_token(self, code: str, state: str) -> str:
"""Exchange authorization code for access token."""
async with AsyncOAuth2Client(
client_id=self.client_id,
client_secret=self.client_secret,
) as client:
try:
token_response = await client.fetch_token(
self.token_url,
code=code,
redirect_uri=self.redirect_uri,
)
return token_response["access_token"]
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to exchange code for token: {str(e)}",
)
async def get_user_info(self, access_token: str) -> GitHubUser:
"""Get user information from GitHub API."""
async with AsyncOAuth2Client(token={"access_token": access_token}) as client:
try:
response = await client.get(self.user_info_url)
response.raise_for_status()
user_data = response.json()
return GitHubUser(
id=user_data["id"],
login=user_data["login"],
name=user_data.get("name"),
email=user_data.get("email"),
avatar_url=user_data["avatar_url"],
)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Failed to get user info: {str(e)}"
)
async def is_admin_user(self, username: str, db: AsyncSession = None) -> bool:
"""Check if the user is an admin based on username database."""
logger.info(f"🔐 Checking admin access for {username}")
# Check database for admin users
if db:
is_admin = await crud.is_admin_user(db, username)
if is_admin:
logger.info(f"✅ User {username} found in admin users database")
return True
# Fallback: Check initial admin from environment variable
initial_admin = self.settings.admin_initial_username
if initial_admin and username == initial_admin:
logger.info(f"✅ User {username} matches initial admin from environment")
return True
logger.info(f"❌ User {username} is not an admin")
return False
# Global OAuth instance
github_oauth = GitHubOAuth()