-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathzai_token.py
More file actions
389 lines (320 loc) · 15.1 KB
/
zai_token.py
File metadata and controls
389 lines (320 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
zAI Token 获取工具
纯后端 Discord OAuth 登录
命令行用法示例:python zai_token.py backend-login --discord-token "你的discord token"
"""
import base64
import json
import argparse
import requests
import re
from typing import Optional, Dict, Any
from urllib.parse import urlparse, parse_qs
import webbrowser
import time
import threading
class DiscordOAuthHandler:
"""Discord OAuth 登录处理器"""
# Discord API 端点
DISCORD_API_BASE = "https://discord.com/api/v9"
def __init__(self, base_url: str = "https://zai.is"):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': f'{base_url}/auth',
'Origin': base_url,
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
})
def get_oauth_login_url(self) -> str:
"""获取 Discord OAuth 登录 URL"""
return f"{self.base_url}/oauth/discord/login"
def backend_login(self, discord_token: str) -> Dict[str, Any]:
"""
纯后端 Discord OAuth 登录
Args:
discord_token: Discord 账号的 token
Returns:
包含 zai.is JWT token 的字典
"""
if not discord_token or len(discord_token) < 20:
return {'error': '无效的 Discord Token'}
print("\n[*] 开始后端 OAuth 登录流程...")
print(f"[*] Discord Token: {discord_token[:20]}...{discord_token[-10:]}")
try:
# Step 1: 访问 OAuth 登录入口,获取 Discord 授权 URL
print("[1/5] 获取 Discord 授权 URL...")
oauth_info = self._get_discord_authorize_url()
if 'error' in oauth_info:
return oauth_info
authorize_url = oauth_info['authorize_url']
client_id = oauth_info['client_id']
redirect_uri = oauth_info['redirect_uri']
state = oauth_info.get('state', '')
scope = oauth_info.get('scope', 'identify email')
print(f" Client ID: {client_id}")
print(f" Redirect URI: {redirect_uri}")
print(f" Scope: {scope}")
# Step 2: 使用 Discord token 授权应用
print("[2/5] 授权应用...")
auth_result = self._authorize_discord_app(
discord_token, client_id, redirect_uri, scope, state
)
if 'error' in auth_result:
return auth_result
callback_url = auth_result['callback_url']
print(f" 获取到回调 URL")
# Step 3: 访问回调 URL 获取 token
print("[3/5] 处理 OAuth 回调...")
token_result = self._handle_oauth_callback(callback_url)
if 'error' in token_result:
return token_result
print(f"[4/5] 成功获取 JWT Token!")
return token_result
except Exception as e:
return {'error': f'登录过程出错: {str(e)}'}
def _get_discord_authorize_url(self) -> Dict[str, Any]:
"""获取 Discord 授权 URL 和参数"""
try:
response = self.session.get(
self.get_oauth_login_url(),
allow_redirects=False
)
if response.status_code in [301, 302, 303, 307, 308]:
location = response.headers.get('Location', '')
if 'discord.com' in location:
parsed = urlparse(location)
params = parse_qs(parsed.query)
return {
'authorize_url': location,
'client_id': params.get('client_id', [''])[0],
'redirect_uri': params.get('redirect_uri', [''])[0],
'scope': params.get('scope', ['identify email'])[0],
'state': params.get('state', [''])[0]
}
return {'error': f'无法获取授权 URL,状态码: {response.status_code}'}
except Exception as e:
return {'error': f'获取授权 URL 失败: {str(e)}'}
def _authorize_discord_app(self, discord_token, client_id, redirect_uri, scope, state) -> Dict[str, Any]:
"""使用 Discord token 授权应用"""
try:
authorize_url = f"{self.DISCORD_API_BASE}/oauth2/authorize"
# 构建 super properties
super_properties = base64.b64encode(json.dumps({
"os": "Windows",
"browser": "Chrome",
"device": "",
"browser_user_agent": self.session.headers['User-Agent'],
}).encode()).decode()
headers = {
'Authorization': discord_token,
'Content-Type': 'application/json',
'X-Super-Properties': super_properties,
}
params = {
'client_id': client_id,
'response_type': 'code',
'redirect_uri': redirect_uri,
'scope': scope,
}
if state:
params['state'] = state
payload = {
'permissions': '0',
'authorize': True,
'integration_type': 0
}
response = self.session.post(
authorize_url,
headers=headers,
params=params,
json=payload
)
if response.status_code == 200:
try:
data = response.json()
location = data.get('location', '')
if location:
if location.startswith('/'):
location = f"{self.base_url}{location}"
return {'callback_url': location}
except:
pass
return {'error': f'授权失败 (状态码: {response.status_code})'}
except Exception as e:
return {'error': f'授权过程出错: {str(e)}'}
def _handle_oauth_callback(self, callback_url: str) -> Dict[str, Any]:
"""处理 OAuth 回调,获取 JWT token"""
try:
print(f" 回调 URL: {callback_url[:80]}...")
response = self.session.get(callback_url, allow_redirects=False)
max_redirects = 10
for i in range(max_redirects):
print(f" 重定向 {i+1}: 状态码 {response.status_code}")
if response.status_code not in [301, 302, 303, 307, 308]:
break
location = response.headers.get('Location', '')
print(f" Location: {location[:100]}...")
# Check for token in URL
token = self._extract_token(location)
if token: return {'token': token}
if location.startswith('/'):
location = f"{self.base_url}{location}"
response = self.session.get(location, allow_redirects=False)
# Final check in URL
final_url = response.url if hasattr(response, 'url') else ''
print(f" 最终 URL: {final_url}")
print(f" 最终状态码: {response.status_code}")
token = self._extract_token(final_url)
if token: return {'token': token}
# Check Cookies
print(f" 检查 Cookies...")
has_session = False
for cookie in self.session.cookies:
print(f" {cookie.name}: {str(cookie.value)[:50]}...")
if cookie.name == 'token':
return {'token': cookie.value}
if any(x in cookie.name.lower() for x in ['session', 'auth', 'id', 'user']):
has_session = True
# Session Fallback
if has_session:
print(f" [!] 尝试 Session 验证...")
user_info = self._verify_session()
print(f" [*] Session 验证结果: {user_info}")
if user_info and not user_info.get('error'):
print(f" [+] Session 验证成功!用户: {user_info.get('name', 'Unknown')}")
return {'token': 'SESSION_AUTH', 'user_info': user_info}
else:
print(f" [-] Session 验证失败或没有找到有效的session")
return {'error': '未能从回调中获取 token'}
except Exception as e:
return {'error': f'处理回调失败: {str(e)}'}
def _extract_token(self, input_str: str) -> Optional[str]:
if '#token=' in input_str:
match = re.search(r'#token=([^&\s]+)', input_str)
if match: return match.group(1)
if '?token=' in input_str:
match = re.search(r'[?&]token=([^&\s]+)', input_str)
if match: return match.group(1)
return None
def oauth_login_with_browser(self) -> Dict[str, Any]:
"""
通过浏览器进行 OAuth 登录
Returns:
包含 zai.is JWT token 的字典
"""
print("\n[*] 开始 OAuth 浏览器登录流程...")
try:
# Step 1: 获取 OAuth 登录 URL
print("[1/3] 获取 Discord 授权 URL...")
oauth_info = self._get_discord_authorize_url()
if 'error' in oauth_info:
return oauth_info
authorize_url = oauth_info['authorize_url']
print(f" 授权 URL: {authorize_url[:80]}...")
# Step 2: 在浏览器中打开授权 URL
print("[2/3] 在浏览器中打开授权页面...")
print(" 请在浏览器中完成 Discord 登录授权")
webbrowser.open(authorize_url)
# Step 3: 等待用户完成授权并检查结果
print("[3/3] 等待授权完成...")
print(" 请在浏览器中完成授权,系统将自动检测...")
# 创建一个标志来停止检查
stop_checking = threading.Event()
result = {'error': '授权超时'}
def check_auth_status():
nonlocal result
start_time = time.time()
check_interval = 2 # 每2秒检查一次
max_wait = 120 # 最多等待120秒
while not stop_checking.is_set() and (time.time() - start_time) < max_wait:
# 检查 session 状态
user_info = self._verify_session()
if user_info and not user_info.get('error'):
print(f"\n[+] 授权成功!用户: {user_info.get('name', 'Unknown')}")
result = {
'token': 'SESSION_AUTH',
'user_info': user_info,
'source': 'oauth_browser'
}
stop_checking.set()
break
# 检查是否有 token cookie
for cookie in self.session.cookies:
if cookie.name == 'token':
print(f"\n[+] 获取到 JWT Token!")
result = {
'token': cookie.value,
'source': 'oauth_browser'
}
stop_checking.set()
break
time.sleep(check_interval)
if not stop_checking.is_set():
print("\n[-] 授权超时,请重试")
# 在后台线程中检查授权状态
check_thread = threading.Thread(target=check_auth_status)
check_thread.start()
# 等待检查完成
check_thread.join()
stop_checking.set()
return result
except Exception as e:
return {'error': f'OAuth 浏览器登录出错: {str(e)}'}
def _verify_session(self) -> Optional[Dict]:
try:
print(f" [*] 调用 API: {self.base_url}/api/v1/auths/")
resp = self.session.get(
f"{self.base_url}/api/v1/auths/",
headers={'Accept': 'application/json'},
timeout=10 # 添加超时设置
)
print(f" [*] API 响应状态码: {resp.status_code}")
if resp.status_code == 200:
data = resp.json()
print(f" [*] API 响应数据: {data}")
return data
else:
print(f" [-] API 响应内容: {resp.text[:500]}...")
return None
except requests.exceptions.Timeout as e:
print(f" [!] API 请求超时: {str(e)}")
return None
except Exception as e:
print(f" [!] Session 验证出错: {str(e)}")
return None
def main():
parser = argparse.ArgumentParser(description='zAI Token 获取工具')
subparsers = parser.add_subparsers(dest='command')
# Only keep backend-login
backend_parser = subparsers.add_parser('backend-login', help='后端登录')
backend_parser.add_argument('--discord-token', required=True, help='Discord Token')
backend_parser.add_argument('--url', default='https://zai.is', help='Base URL')
args = parser.parse_args()
if args.command == 'backend-login':
handler = DiscordOAuthHandler(args.url)
result = handler.backend_login(args.discord_token)
if 'error' in result:
print(f"\n[!] 登录失败: {result['error']}")
else:
print(f"\n[+] 登录成功!\n")
token = result.get('token')
if token == 'SESSION_AUTH':
# Try to extract a real token from user_info if present, else just show a message
user_info = result.get('user_info', {})
print(f"\n[Session Cookie Authentication Active]")
print(f"User: {user_info.get('name')} ({user_info.get('email')})")
print(f"ID: {user_info.get('id')}")
else:
print(f"\n{token}\n")
if __name__ == '__main__':
main()