-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp_auto_user_loading.py
More file actions
67 lines (52 loc) · 2.17 KB
/
app_auto_user_loading.py
File metadata and controls
67 lines (52 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from crypt import methods
from flask import Flask, request, jsonify
from flask_jwt_extended import JWTManager, create_access_token, current_user, jwt_required
from flask_sqlalchemy import SQLAlchemy
from hmac import compare_digest
app = Flask(__name__)
app.config["JWT_SECRET_KEY"] = "super-secret" # Change this!
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite://"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
jwt = JWTManager(app)
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.Text, nullable=False, unique=True)
full_name = db.Column(db.Text, nullable=False)
def check_password(self, password):
return compare_digest(password, "password")
# This callback will be called during the login,
# i.e when creating JWT token using create_access_token
@jwt.user_identity_loader
def _user_identity_loader(user):
return user.id
# This callback will load the user when even a protected end point is accessed.
@jwt.user_lookup_loader
def _user_lookup_loader(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(id=identity).one_or_none()
@app.route("/login", methods=["POST"])
def login():
username = request.json.get("username", None)
password = request.json.get("password", None)
user = User.query.filter_by(username=username).one_or_none()
if not user or not user.check_password(password):
return jsonify("Wrong username or password"), 401
additional_claims = {"aud": "some_audience", "foo": "bar"}
access_token = create_access_token(identity=user, additional_claims=additional_claims)
return jsonify(access_token=access_token)
@app.route("/whoami")
@jwt_required()
def protected():
return jsonify(
id=current_user.id,
full_name=current_user.full_name,
username=current_user.username,
)
if __name__ == "__main__":
db.create_all()
db.session.add(User(full_name="Bruce Wayne", username="batman"))
db.session.add(User(full_name="Ann Takamaki", username="panther"))
db.session.add(User(full_name="Jester Lavore", username="little_sapphire"))
db.session.commit()
app.run(debug=True)