9-1 用户注册 对于很多应用来说,注册接口是必可不少的。想要实现注册接口,其实很简单,但是你会发现会有很多种选择:
注册时有需要哪些字段?
注册接口使用什么请求方式?
前端朝后端传数据时,放在查询参数中、请求体中?
如果放在请求体中,使用JSON格式还是表单格式?
解决方式
你会发现就一个简单的注册接口,其实还是有很多问题需要我们思考的。
首先,注册接口一般是将用户的个人信息提交给服务端,因此,我们选择POST请求
然后,注册信息中一般都包含密码,所以不能简单的在查询参数中提交给后端,需要把数据放在请求体中。
最后,请求体如何使用JSON格式,那注册时如果有上传文件的需求,将比较麻烦。因此我们使用Form表单的形式。
结论:对于常见的注册接口,使用POST,使用Form表单来上传数据。
补充:任何一种方式都可以实现,但最终如何选择还是要看业务需求。
前端页面代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 注册</h1 > <form action ="http://127.0.0.1:8000/register" method ="post" enctype ="multipart/form-data" > <p > 用户名: <input type ="text" name ="username" > </p > <p > 密码: <input type ="password" name ="password" > </p > <p > 确认密码: <input type ="password" name ="re_password" > </p > <p > 邮件: <input type ="email" name ="email" > </p > <p > <input type ="submit" > </p > </form > </body > </html >
后端接口
注册逻辑:判断用户名是否已经注册;判断二次密码是否一致。
缺陷:密码没有加密;数据没有持久化(存数据库)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from fastapi import FastAPI, Form, Depends, HTTPException app = FastAPI(title="登录认证相关" ) USERS = {}def get_form_data (username: str = Form( ), password: str = Form( ), re_password: str = Form( ), email: str = Form( ) ): return { "username" : username, "password" : password, "re_password" : re_password, "email" : email, }@app.post("/register" ) def register (form_data: dict = Depends(get_form_data ) ): if form_data["username" ] in USERS: raise HTTPException(detail="用户名已经存在" , status_code=400 ) if form_data["password" ] != form_data["re_password" ]: raise HTTPException(detail="两次密码输入不一致" , status_code=400 ) USERS[form_data["username" ]] = form_data return {"username" : form_data["username" ], "email" : form_data["email" ]}
9-2 用户密码加密 存用户信息时,不能明文存储,一定要做加密处理。
示例1: 使用python内置库hasslib 所谓的md5加密
1 2 3 4 5 6 import hashlib m = hashlib.md5("盐" .encode("utf-8" )) m.update("hello" .encode("utf-8" ))print (m.digest()) print (m.hexdigest())
示例2:使用第三方库 passlib
pip install “passlib[bcrypt]”
1 2 3 4 5 from passlib.context import CryptContext crypt = CryptContext(schemes=["bcrypt" ], deprecated="auto" ) crypt.hash ("hello" )
9-3 集成MySQL的注册 第一步:数据持久化
1 2 3 4 5 6 7 CREATE TABLE `users` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(255) NOT NULL, `password` varchar(255) NOT NULL, `email` varchar(255) NOT NULL, PRIMARY KEY (`id`) # id 主键 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=1 ;
使用SQLAlchemy的ORM操作数据库(基于之前的代码结构,其他模块代码见源文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from fastapi import FastAPI, Form, Depends, HTTPExceptionfrom sqlalchemy.orm import Sessionfrom passlib.context import CryptContextfrom database import get_dbfrom models import Userfrom schemas import UserOut app = FastAPI(title="登录认证相关" ) crypt = CryptContext(schemes=["bcrypt" ], deprecated="auto" ) class UserForm : def __init__ (self, username: str = Form( ), password: str = Form( ), re_password: str = Form( ), email: str = Form( ) ): self.username = username self.password = password self.re_password = re_password self.email = email@app.post("/register" , response_model=UserOut ) def register (user: UserForm = Depends( ), db: Session = Depends(get_db ) ): if db.query(User).filter (User.username == user.username).first(): raise HTTPException(detail="用户名已存在" , status_code=400 ) if user.password != user.re_password: raise HTTPException(detail="两次密码输入不一致" , status_code=400 ) new_user = User( username=user.username, password=crypt.hash (user.password), email=user.email ) db.add(new_user) db.commit() db.refresh(new_user) return new_user
9-4 用户登录 登录接口的作用:通过用户输入用户名和密码,找到该用户在本网站上的基本信息。
登录接口的核心逻辑:校验用户名和密码是否和数据库中保存的像匹配,匹配则登录成功,否则失败
1 2 3 4 5 6 7 8 9 10 @app.post("/login" , response_model=UserOut ) def login (username: str = Form( ), password: str = Form( ), db: Session = Depends(get_db ) ): db_user: User = db.query(User).filter_by(username=username).first() if not db_user: raise HTTPException(detail="用户名不存在" , status_code=400 ) if not crypt.verify(password, db_user.password): raise HTTPException(detail="用户名或密码错误" , status_code=400 ) return db_user
补充 :校验明文密码和数据库中密文密码匹配的原理
9-5 记录用户登录状态的方式 需求场景
对于电商购物网站,网站知道当前浏览网站的用户是谁,该用户购物车有哪些商品,该用户买了哪些商品等等。
这些需求的一个核心点就是,网站的服务端需要知道当前用户是谁。
遗憾的是,HTTP协议是无状态的,即服务端不会记录客户端的每一次请求,即每一个请求对服务端来说都是 “陌生人”。
解决方式
计算机本身无法判断坐在显示器前的使用者的身份。进一步说,也无法确认网络的那头究竟有谁。可见,为了弄清究竟是谁在访问服务 器,就得让对方的客户端自报家门。
解决方式:用户登录认证。
核心原理:用户登录之后就给用户一个身份标识,客户端再次访问服务端时带上这个身份标识,那服务端就知道该用户的身份。——打标签
常用的具体实现方式:
让用户在请求头中携带唯一标识 token就来了,所谓的令牌
使用cookie
使用jwt
9-6 使用请求头实现登录认证 在请求头中实现登录认证的方式,其实很简单,具体实现逻辑如下:
当用户访问后端服务时,后端判断请求头中是否有指定的请求头键值对,有且正确则该用户是登录过的。
如果请求头中没有指定的键值对,则是无效的请求。
示例:请求头中有合法的键值对才可以获取访问的图书信息。
当请求中没有 x-token时,或者它的值不是指定的合法token时,都校验会校验失败,得不到数据。
用户在登录之后才能获得一个x-token,然后请求其他接口时在请求头中携带者x-token才能获取数据。
即只有携带这个正确的x-token才是经过登录认证的用户,才允许获取数据。有令牌,畅通无阻
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 X_TOKEN = "SDNQOEFJQIEVNFWESCVMWE" @app.post("/login" , response_model=UserOut ) def login (response: Response, username: str = Form( ), password: str = Form( ), db: Session = Depends(get_db ) ): db_user: User = db.query(User).filter_by(username=username).first() if not db_user: raise HTTPException(detail="用户名不存在" , status_code=400 ) if not crypt.verify(password, db_user.password): raise HTTPException(detail="用户名或密码错误" , status_code=400 ) response.headers["x-token" ] = X_TOKEN return db_user@app.get("/books" ) def books (x_token: typing.Optional [str ] = Header( ) ): if x_token and x_token == X_TOKEN: return [{"id" : i, "name" : f"book{i} " } for i in range (1 , 11 )] raise HTTPException(detail="Invalid x_token" , status_code=404 )
9-7 使用Cookie实现登录认证【浏览器】 Cookie
储存在用户本地终端上 的数据 (在用户设备上生成的一个文件
)
Cookie是一段不超过4KB的小型文本数据 ,由一个名称(Name)、一个值(Value)和其它几个用于控制Cookie有效期、安全性、使用范围的可选属性组成
只要是有效的Cookie,浏览器下次访问服务器时,就会在请求中携带cookie文本中的数据。
示例1:获取和设置cookie
在谷歌浏览器中打开F12查看NetWork和Applications中的Cookies
set_cookie时可以value可以是一个单纯的字符串,也可以是一个被json序列化的的Python数据结构,比如字典。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from fastapi import FastAPI, Response, Cookie app = FastAPI()@app.get("/set" ) def set_cookie (response: Response ): response.set_cookie("x_token" , "you_auth_token" , 60 * 60 ) return "success" @app.get("/get" ) def get_cookie (x_token: str = Cookie( ) ): return { "x-token" : x_token }
示例2:使用cookie做登录认证
登录时设置cookie,使用用户名当cookie的值
访问图书资源时,携带cookie, 通过cookie中的用户名,可以知道当前用户的身份信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from fastapi import Cookie, Response ...@app.post("/login" , response_model=UserOut ) def login (response: Response, username: str = Form( ), password: str = Form( ), db: Session = Depends(get_db ) ): db_user: User = db.query(User).filter_by(username=username).first() if not db_user: raise HTTPException(detail="用户名不存在" , status_code=400 ) if not crypt.verify(password, db_user.password): raise HTTPException(detail="用户名或密码错误" , status_code=400 ) response.set_cookie("x_token" , db_user.username, 60 * 60 ) return db_user@app.get("/books" ) def books (x_token: typing.Optional [str ] = Cookie( ), db: Session = Depends(get_db ) ): if not x_token: raise HTTPException(detail="Invalid x_token" , status_code=404 ) db_user: User = db.query(User).filter_by(username=x_token).first() if not db_user: raise HTTPException(detail="Invalid x_token" , status_code=404 ) return [{"id" : i, "name" : f"book{i} " } for i in range (1 , 11 )]
Cookie的优点和缺点: 是一个文件
优点:简单方便
缺点:不安全,cookie信息可能被篡改。
9-8 使用jwt登录认证-理论篇【主流使用】 使用Cookie的方式做登录认证,简单方便,但是存在安全隐患,一般还不推荐使用。
Cookie之所以不安全的根本原因是:用户信息直接存放在浏览器 ,在网络传输中既有可能被非法篡改 。
所以,后来出现了把用户信息存放在服务器端的技术Session ,不过session也就基于cookie实现的。
Session的处理逻辑如下:
用户登录成功后,服务端会将用户的身份信息(用户ID, 邮箱等唯一信息)加密成一个随机字符串sessionid,将sessionid和用户信息做一个映射关系,存在服务器的数据库中。
然后,把sessionid返回给浏览器,保存在cookie中,所以session技术是基于cookie的。
最后,当浏览再次请求服务器时,携带这个sessionid。服务端会校验这个session是否在数据库中有关联用户,以及是否过期等校验,校验通过后才是合法用户,才可以获取请求数据。
但是,session技术因为需要把信息存放在服务器,这样会造成维护成本高,且不容易做分布式服务。并且,一旦sessionid泄露,也是不安全的。
因为sessionid的限制,后来又出现了目前比较流行的解决方案,那就是 JWT(json web token)
Token: 是一个广义的词,是令牌的意思
第一种: 在请求头包含token,这样的做法就是暴露了
第二种:在cookie上保留token
第三种:在cookie上保留JWT ,JWT特殊加密,包含三部分,头部、负载、签名 负载就是用户的信息,只不过是加密过后的用户信息,
将用户的信息加密后保存在cookie上成一串JWT键值对,cookie是一个小文件保存在浏览器上,下次请求的时候,就会带上这个JWT去请求后段,就会通过验证,当然也可以将这个JWT token放在请求头上,都可以
JWT的生成token格式如下,由 .
连接的三段字符串组成。
JWT token是如何签发流程:
第一段HEADER部分,固定包含算法和token类型,对此做JSON序列化并进行base64url加密。
1 {"alg" : "HS256" , "typ" : "JWT" }
第二段PAYLOAD部分,包含一些数据,对此做JSON序列化并进行base64url加密。(payloads:有效载荷 拍喽得)
1 {"sub" : "1234567890" , "user_id" : 20202 , "name" : "liuxu" , "email" : "liuxu@as.com" ...}
第三段SIGNATURE部分,把前两段通过.
拼接起来,然后对其进行HS256
加密,再然后对hs256
密文进行base64url加密,最终得到token的第三段。对hs256
加密时一般会做加盐处理。(signatures:签名 sin呢缺)
最后将三段字符串通过 .
拼接起来就生成了jwt的token。
JWT token的校验和提取用户信息流程:
9-9 使用jwt登录认证-实现
使用第三方模块实现 jwttoken的生成和校验,使用前先下载安装:pip3 install pyjwt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import jwt secret_key = "加盐的秘钥" data = {"user_id" : 20202 , "name" : "liuxu" , "email" : "liuxu@as.com" } jwt_token = jwt.encode(payload=data, key=secret_key)print (jwt_token) raw_data = jwt.decode(jwt_token, key=secret_key, algorithms="HS256" )print (raw_data)
代码解析:
1 2 3 4 5 6 7 8 9 def encode ( self, payload: Dict [str , Any ], key: str , algorithm: Optional [str ] = "HS256" , headers: Optional [Dict ] = None , json_encoder: Optional [Type [json.JSONEncoder]] = None , ) -> str : pass
1 2 3 4 5 6 7 8 9 def decode ( self, jwt: str , key: str = "" , algorithms: Optional [List [str ]] = None , options: Optional [Dict ] = None , **kwargs, ) -> Dict [str , Any ]: pass
给token设置过期时间,在payload中增加exp
字段,它的值是一个datetime对象或者时间戳 exp:expired
1 2 3 4 5 6 7 8 9 10 11 import jwtimport datetime secret_key = "加盐的秘钥" data = {"user_id" : 20202 , "name" : "liuxu" , "email" : "liuxu@as.com" , "exp" : datetime.datetime.now() + datetime.timedelta(days=1 )} jwt_token = jwt.encode(payload=data, key=secret_key) raw_data = jwt.decode(jwt_token, key=secret_key, algorithms="HS256" )print (raw_data)
校验token失败时的处理
jwt库中提供了非常多了校验失败的错误,我们可以直接使用。或者直接使用Exception也可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import jwtimport datetime secret_key = "加盐的秘钥" data = {"user_id" : 20202 , "name" : "liuxu" , "email" : "liuxu@as.com" , "exp" : datetime.datetime.now() + datetime.timedelta(days=-1 )} jwt_token = jwt.encode(payload=data, key=secret_key)try : raw_data = jwt.decode(jwt_token, key=secret_key, algorithms="HS256" ) print (raw_data)except jwt.ExpiredSignatureError as e: print (e)
9-10 fastapi集成jwt 需求:使用jwt做登录校验,在登录成功后给用户签发jwttoken,在图书接口中校验jwttoken, 校验失败则报错,校验成功返回数据
示例1:使用pyjwt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import jwtimport datetime JWT_SECRET_KEY = "ASDN*^n23^$:_};pYz7I" @app.post("/login" , response_model=UserOut ) def login (response: Response, username: str = Form( ), password: str = Form( ), db: Session = Depends(get_db ) ): db_user: User = db.query(User).filter_by(username=username).first() if not db_user: raise HTTPException(detail="用户名不存在" , status_code=400 ) if not crypt.verify(password, db_user.password): raise HTTPException(detail="用户名或密码错误" , status_code=400 ) exp = datetime.datetime.now() + datetime.timedelta(days=1 ) jwt_token = jwt.encode(payload={"id" : db_user.id , "name" : db_user.username, "exp" : exp}, key=JWT_SECRET_KEY) response.headers["x-token" ] = jwt_token return db_user@app.get("/books" ) def books (x_token: typing.Optional [str ] = Header( ) ): try : data = jwt.decode(x_token, key=JWT_SECRET_KEY, algorithms="HS256" ) return { "msg" : f"welcome: {data['name' ]} " , "books" : [{"id" : i, "name" : f"book{i} " } for i in range (1 , 11 )] } except Exception as e: raise HTTPException(detail=e, status_code=404 )
示例2:使用第三方包 python-jose来签发和校验jwttoken,使用前先安装:pip3 install "python-jose[cryptography]"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import datetimefrom jose import jwt, JWTError SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" JWT_EXP = datetime.datetime.now() + datetime.timedelta(days=1 ) data = {"name" : "liuxu" , "exp" : JWT_EXP} jwt_token = jwt.encode(data, SECRET_KEY, algorithm="HS256" )print (jwt_token)try : data = jwt.decode(jwt_token, SECRET_KEY, "HS256" ) print (data)except JWTError as e: raise e
9-11 fastapi的登录认证工具 场景
现在已经可以使用jwt做登录认证了,但是会发现在开发的时候,这个流程比较麻烦。
比如,你在api文档页面操作:
1 先调用登录接口,登录后在响应头中把x-token复制出来
2 然后调用图书接口时,把复制出来的jwttoken贴在请求上
3 一个图书接口操作一遍还行,但是如果有很多接口都依赖登录,就会很麻烦。那有比较优雅的解决方式吗?
FastAPI的解决方式
使用OAuth2PasswordBearer,实现自动登录认证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from datetime import datetime, timedeltafrom fastapi import FastAPI, Form, Depends, HTTPExceptionfrom fastapi.security import OAuth2PasswordBearerfrom jose import jwt, JWTError app = FastAPI(title="XXX项目文档" ) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login" ) SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" @app.post("/login" ) def login (username: str = Form( ), password: str = Form( ) ): if not username or not password: raise HTTPException(detail="wrong username or password" , status_code=404 ) exp = datetime.utcnow() + timedelta(days=1 ) access_token = jwt.encode({"sub" : username, "exp" : exp}, SECRET_KEY, "HS256" ) return {"access_token" : access_token, "token_type" : "bearer" }@app.get("/books" ) def get_books_list (token: str = Depends(oauth2_scheme ) ): try : data = jwt.decode(token, key=SECRET_KEY, algorithms="HS256" ) return { "msg" : f"hello: {data['sub' ]} " , "books" : [{"id" : i, "title" : f"book{i} " } for i in range (5 )] } except JWTError as e: raise e
下图是openadpi文档的登录认证交互窗口,有了它我们就不必再手动调用登录接口,复制token了
补充:
OpenAPI文档不支持携带特殊的请求头字段,但是自定义的请求头是OK的
9-12 fastapi登录认证工具的内部原理 一些感想:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from tools import hash_obj, jwt_obj app = FastAPI(title="登录认证相关" )@app.post("/register" , response_model=UserOut ) def register (user: UserForm = Depends( ), db: Session = Depends(get_db ) ): ... new_user = User( username=user.username, password=hash_obj.hash (user.password), email=user.email ) ...@app.post("/login" , response_model=UserOut ) def login (response: Response, username: str = Form( ), password: str = Form( ), db: Session = Depends(get_db ) ): db_user: User = db.query(User).filter_by(username=username).first() if not db_user: raise HTTPException(detail="用户名不存在" , status_code=400 ) if not hash_obj.verify(password, db_user.password): raise HTTPException(detail="用户名或密码错误" , status_code=400 ) jwt_token = jwt_obj.set_token({"id" : db_user.id , "name" : db_user.username}) response.headers["authorization" ] = jwt_token return db_user@app.get("/books" ) def books (authorization: typing.Optional [str ] = Header( ) ): data = jwt_obj.get_token(authorization) return { "msg" : f"welcome: {data['name' ]} " , "books" : [{"id" : i, "name" : f"book{i} " } for i in range (1 , 11 )] }