8-1 关系型数据库 由于之前学过数据库基础知识,这里不做介绍只做对比
场景的数据库一般表现为软件,比如
MySQL、Oracle、SQL Server、PostgreSQL、DB2、SQLite等等(关系型数据库)。
Redis、Memcached、MangoDb等等(键值型数据库)。
关系型数据库
关系型数据库,是指采用了关系模型 来组织数据的数据库,其以行和列的形式存储数据,可以类比EXCEL表格。
一张表可以包含多列,每一列都有一个标题字段。然后可以有N行,一行数据有多个字段
一个数据库中可以包含多张表。
8-2 使用pymysql 8-2使用pymysql查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 -- 新建一个数据库,名字是 db create database db charset utf8; -- 使用 db use db; -- 在db中新建一张 users表 CREATE TABLE `users` ( `id ` int (11 ) NOT NULL AUTO_INCREMENT, `name` varchar(255 ) COLLATE utf8_bin NOT NULL, `password` varchar(255 ) COLLATE utf8_bin NOT NULL, PRIMARY KEY (`id `) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=1 ; -- 插入两条数据 insert into users(name, password) values("liuxu" ,"12345" ); insert into users(name, password) values("liuxu2" ,"12345" );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import pymysql conn = pymysql.connect( host='127.0.0.1' , port=3306 , user='root' , password='12345' , database='db' , charset='utf8' ) cursor = conn.cursor() sql = 'select * from users;' print (cursor.execute(sql)) print (cursor.fetchone()) print (cursor.fetchmany(2 )) print (cursor.fetchall())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ... sql = 'select * from users where id = %s;' cursor.execute(sql, 2 ) for d in cursor.fetchall(): print (d) ... sql = 'select * from users where id >= %s and id <= %s;' cursor.execute(sql, [1 , 2 ]) for d in cursor.fetchall(): print (d) ... cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from users where id >= %(start)s and id <= %(end)s;' cursor.execute(sql, {"start" : 1 , "end" : 2 })for d in cursor.fetchall(): print (d)
8-3 使用pymysql之新增数据 示例1:增加数据,手动提交
1 2 3 4 5 6 7 ... cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'insert into users(name, password) values("liuxu3", "123456");' cursor.execute(sql) conn.commit()
示例2:自动提交保存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import pymysql conn = pymysql.connect( host='127.0.0.1' , port=3306 , user='root' , password='12345' , database='db' , charset='utf8' , autocommit=True ) cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'insert into users(name, password) values("liuxu4", "123456")' cursor.execute(sql) 示例3 : cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'insert into users(name, password) values(%s, %s)' item_list = [("liuxu10" , "12345" ), ("liuixu11" , "12345" ), ("liuxu12" , "12345" )] cursor.executemany(sql, args=item_list)
8-4 使用pymysql之修改和删除数据 8-3 ORM介绍 需求场景
上面我们使用pymysql可以直接操作MySQL,但是你会发现对数据的增删改查,都需要我们自己手写SQL语句。并且,存在大量重复的代码,不能使用面向对象 。那是否是否一种更优雅的方式来操作数据库呢?
解决方式
Python中的ORM典型代表
Django-ORM,是Django框架自带的ORM,只能在django中使用
SQLAlchemy ORM,是一个开源的ORM,不依赖web框架
Peewee ,不依赖web框架的ORM
等等。
8-4 使用SQLAlchemy查询数据【实现】 pip3 install SQLAlchemy
使用SQLAlchemy的基本流程分为三步:
创建session对象
创建类模型
使用session对象 和模型类 实现数据的CRUD
第一步:创建session对象—-与数据库进行连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import pymysqlfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker pymysql.install_as_MySQLdb() DATABASE_URL = "mysql://root:12345@localhost:3306/db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine) db = SessionLocal() db.close()
第二步:创建模型类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sqlalchemy.orm import sessionmaker,declarative_basefrom sqlalchemy import Column, String, Integer BaseModel = declarative_base() class User (BaseModel ): __tablename__ = "users" id = Column(Integer, primary_key=True , autoincrement=True ) name = Column(String(255 )) password = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , password: {self.password} "
第三步:使用session对象和模型类实现数据的查询操
result: User = db.query(User).filter(User.id == 1).first()
必须使用 first()
, 否则只是SQL语句 ,不会执行查询操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from sqlalchemy import create_engine, Column, String, Integerfrom sqlalchemy.orm import sessionmaker, declarative_base BaseModel = declarative_base()class TestGroup (BaseModel ): __tablename__ = "test_group" id = Column(Integer, primary_key=True , index=True , autoincrement=True ) name = Column(String(255 )) num = Column(Integer) teacher = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , nums: {self.num} ,teacher: {self.teacher} " DATABASE_URL = "sqlite:///study.db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine) db = SessionLocal() result: TestGroup = db.query(TestGroup).filter (TestGroup.id == 1 ).first() print (result)print (result.id , result.name) db.close()
8-8 使用SQLAlchemy查询补充【好用】 我们除了使用 db.query(User).filter(User.id == 1).first()
来过滤查询数据外,还有很多其他的查询方式可以使用。
示例1:返回符合条件的所有数据 all()
1 2 3 db = SessionLocal() result: TestGroup = db.query(User).filter (User.id >=5 ).all ()print (result)
示例2:使用 filter_by()
简化筛选参数
1 2 3 db = SessionLocal() result: TestGroup = db.query(User).filter_by(id =2 ).first()print (result)
示例3:主键筛选使用 get()
,直接返回对象的示例
1 2 3 db = SessionLocal() result: TestGroup = db.query(User).get(2 ) print (result)
示例4:使用 order_by
排序
默认是升序排列
db.query(User).order_by(User.id.desc()).all()
使用 字段.desc()
指定降序排列
1 2 3 db = SessionLocal() result: TestGroup = db.query(User).order_by(User.id ).all ()print ([r.id for r in result])
示例5:分页逻辑。
offset(n)
指定过滤几行数据再开始查询
limit(n)
指定每次查询几条数据
1 2 3 db = SessionLocal() result: User = db.query(User).offset(0 ).limit(3 ).all ()print ([r.id for r in result])
8-5 使用SQLAlchemy新增数据 示例1:新增数据
使用 add(模型类对象)
新增数据
必须使用 db.commit()
提交数据, 否则新增的数据不会写到数据库中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 db = SessionLocal() db.add(User(name="liuxu" , password="123456" )) db.add(User(name="liuxu2" , password="123456" )) db.commit() user3 = User(name="liuxu3" , password="123456" ) user4 = User(name="liuxu4" , password="123456" ) db.add_all([user3, user4]) db.commit() user5 = User(name="liuxu5" , password="123456" ) user6 = User(name="liuxu6" , password="123456" ) db.bulk_save_objects([user5, user6]) db.commit() db.close()
8-6 使用SQLAlchemy修改和删除数据 修改数据和查询数据的原则:先查到数据,再修改或删除
示例1:修改数据 直接对象.属性 = “新字段名”
1 2 3 4 5 6 db = SessionLocal() user: User = db.query(User).filter (User.id == 1 ).first()print (user) user.name = "LIUXU" db.commit()
示例2:修改数据,使用 update()
,支持批量更新
1 2 3 db = SessionLocal() db.query(User).filter (User.id == 1 ).update({"name" : "LLL" }) db.commit()
示例3:删除数据,使用 delete()
,支持批量删除
1 2 3 db = SessionLocal() db.query(User).filter (User.id >= 3 ).delete() db.commit()
8-7 FastAPI集成SQLAlchemy之查询和删除【有用】
使用ORM的查询和删除操作
使用依赖注入的方式获取db,在请求来的时候获取db连接,在请求结束的时候关闭db连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import typingfrom fastapi import FastAPI, HTTPException, Dependsimport pymysqlfrom sqlalchemy import create_engine, Column, String, Integerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, Session pymysql.install_as_MySQLdb() DATABASE_URL = "mysql://root:12345@localhost:3306/db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine)def get_db (): db: Session = SessionLocal() try : yield db finally : db.close() BaseModel = declarative_base()class User (BaseModel ): __tablename__ = "users" id = Column(Integer, primary_key=True , index=True , autoincrement=True ) name = Column(String(255 )) password = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , password: {self.password} " app = FastAPI(title="FastAPI + SqlAlchemy" )@app.get("/users" ) def get_users (page: int = 1 , size: int = 3 , db: Session = Depends(get_db ) ): users: typing.List [User] = db.query(User).all ()[(page - 1 ) * size:page * size] return [{"id" : u.id , "name" : u.name} for u in users]@app.get("/user/{user_id}" ) def get_user_by_id (user_id: int , db: Session = Depends(get_db ) ): user: User = db.query(User).filter (User.id == user_id).first() if not user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) return {"id" : user.id , "name" : user.name}@app.delete("/user/{user_id}" ) def delete_user_by_id (user_id: int , db: Session = Depends(get_db ) ): db.query(User).filter (User.id == user_id).delete() return {"code" : 1 , "msg" : "success" }
8-8 FastAPI集成SQLAlchemy之新建和更新
使用pydantic的BaseModel做Schema接收请求体数据并做校验。
使用使用SQLAlchemy的ORM实现新建和更新数据操作。
db.refresh(db_user) # refresh之后,db_user才有数据
在SQLAlchemy中,db.refresh()
是会话对象(Session)的一个方法,用于刷新对象的状态。
当您从数据库中查询一个对象并将其放入会话中后,该对象的状态将保持在会话中,即使数据库中的数据发生了变化。但有时候您可能希望更新会话中的对象状态,以反映最新的数据库数据。这就是db.refresh()
方法发挥作用的地方。
db.refresh(obj)
方法会查询数据库,获取对象的最新数据,并将其更新到会话中的对象上。这样,您可以确保会话中的对象与数据库中的数据保持同步
orm_mode = True # 设置后 response_model=UserOut时,才能直接返回 db_user
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import pymysqlfrom fastapi import FastAPI, HTTPException, Dependsfrom pydantic import BaseModel as SchemaBaseModelfrom sqlalchemy import create_engine, Column, String, Integerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, Session pymysql.install_as_MySQLdb() DATABASE_URL = "mysql://root:12345@localhost:3306/db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine)def get_db (): db: Session = SessionLocal() try : yield db finally : db.close() BaseModel = declarative_base()class User (BaseModel ): __tablename__ = "users" id = Column(Integer, primary_key=True , index=True , autoincrement=True ) name = Column(String(255 )) password = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , password: {self.password} " app = FastAPI(title="FastAPI + SqlAlchemy" )class UserBase (SchemaBaseModel ): name: str class UserIn (UserBase ): password: str class UserOut (UserBase ): id : int class Config : orm_mode = True @app.post("/user" , response_model=UserOut ) def create_user (user: UserIn, db: Session = Depends(get_db ) ): db_user = User(**user.dict ()) db.add(db_user) db.commit() db.refresh(db_user) return db_user@app.put("/user/{user_id}" , response_model=UserOut ) def update_user_by_id (user_id: int , user: UserIn, db: Session = Depends(get_db ) ): db_user = db.query(User).filter (User.id == user_id).first() if not db_user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) db_user.name = user.name db_user.password = user.password db.commit() db.refresh(db_user) return db_user
8-9 项目代码结构调整【重点】 目前,我们已经实现了FastAPI和SQLAlchemy的集成,但是发现,项目的所有代码都在一个Python文件中。
这样会造成代码结构不清楚,层次不清晰,代码可读性差,维护成本高。
因此,需要做代码拆分,按照功能拆分成不同的Python文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import typingimport pymysqlfrom fastapi import FastAPI, HTTPException, Dependsfrom pydantic import BaseModel as SchemaBaseModelfrom sqlalchemy import create_engine, Column, String, Integerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, Session pymysql.install_as_MySQLdb() DATABASE_URL = "mysql://root:12345@localhost:3306/db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine)def get_db (): db: Session = SessionLocal() try : yield db finally : db.close() BaseModel = declarative_base()class User (BaseModel ): __tablename__ = "users" id = Column(Integer, primary_key=True , index=True , autoincrement=True ) name = Column(String(255 )) password = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , password: {self.password} " app = FastAPI(title="FastAPI + SqlAlchemy" )class UserBase (SchemaBaseModel ): name: str class UserIn (UserBase ): password: str class UserOut (UserBase ): id : int class Config : orm_mode = True @app.get("/users" ) def get_users (page: int = 1 , size: int = 3 , db: Session = Depends(get_db ) ): users: typing.List [User] = db.query(User).all ()[(page - 1 ) * size:page * size] return [{"id" : u.id , "name" : u.name} for u in users]@app.get("/user/{user_id}" ) def get_user_by_id (user_id: int , db: Session = Depends(get_db ) ): user: User = db.query(User).filter (User.id == user_id).first() if not user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) return {"id" : user.id , "name" : user.name}@app.delete("/user/{user_id}" ) def delete_user_by_id (user_id: int , db: Session = Depends(get_db ) ): db.query(User).filter (User.id == user_id).delete() db.commit() return {"code" : 1 , "msg" : "success" }@app.post("/user" , response_model=UserOut ) def create_user (user: UserIn, db: Session = Depends(get_db ) ): db_user = User(**user.dict ()) db.add(db_user) db.commit() db.refresh(db_user) return db_user@app.put("/user/{user_id}" , response_model=UserOut ) def update_user_by_id (user_id: int , user: UserIn, db: Session = Depends(get_db ) ): db_user = db.query(User).filter (User.id == user_id).first() if not db_user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) db_user.name = user.name db_user.password = user.password db.commit() db.refresh(db_user) return db_user
拆分后的项目结构
1 2 3 4 5 6 ── sql_app ├── main.py ├── crud.py ├── database.py ├── models.py └── schemas.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import pymysqlfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, Session pymysql.install_as_MySQLdb() DATABASE_URL = "mysql://root:12345@localhost:3306/db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine)def get_db (): db: Session = SessionLocal() try : yield db finally : db.close()
models.py # ORM模型相关代码–建立与表对应的模型类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from sqlalchemy import Column, String, Integerfrom sqlalchemy.ext.declarative import declarative_base BaseModel = declarative_base()class User (BaseModel ): __tablename__ = "users" id = Column(Integer, primary_key=True , index=True , autoincrement=True ) name = Column(String(255 )) password = Column(String(255 )) def __str__ (self ): return f"id: {self.id } , name: {self.name} , password: {self.password} "
schemas.py # Pydantic的BaseModel,校验相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from pydantic import BaseModelclass UserBase (BaseModel ): name: str class UserIn (UserBase ): password: str class UserOut (UserBase ): id : int class Config : orm_mode = True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import typingfrom fastapi import HTTPExceptionfrom sqlalchemy.orm import Sessionfrom models import Userfrom schemas import UserIndef get_users (page: int , size: int , db: Session ) -> typing.List [User]: users: typing.List [User] = db.query(User).all ()[(page - 1 ) * size:page * size] return usersdef get_user_by_id (user_id: int , db: Session ) -> User: user: User = db.query(User).filter (User.id == user_id).first() if not user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) return userdef delete_user_by_id (user_id: int , db: Session ): db.query(User).filter (User.id == user_id).delete() db.commit()def create_user (user: UserIn, db: Session ) -> User: db_user = User(**user.dict ()) db.add(db_user) db.commit() db.refresh(db_user) return db_userdef update_user (user_id: int , user: UserIn, db: Session ) -> User: db_user = db.query(User).filter (User.id == user_id).first() if not db_user: raise HTTPException(detail=f"Not found user with id: {user_id} " , status_code=404 ) db_user.name = user.name db_user.password = user.password db.commit() db.refresh(db_user) return db_user
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import typingfrom fastapi import FastAPI, Dependsfrom sqlalchemy.orm import Sessionimport crudfrom models import Userfrom database import get_dbfrom schemas import UserIn, UserOut app = FastAPI(title="FastAPI + SqlAlchemy" )@app.get("/users" , response_model=typing.List [UserOut] ) def get_users (page: int = 1 , size: int = 3 , db: Session = Depends(get_db ) ): user = crud.get_users(page, size, db) return user@app.get("/user/{user_id}" , response_model=UserOut ) def get_user_by_id (user_id: int , db: Session = Depends(get_db ) ): user: User = crud.get_user_by_id(user_id, db) return user@app.delete("/user/{user_id}" ) def delete_user_by_id (user_id: int , db: Session = Depends(get_db ) ): crud.delete_user_by_id(user_id, db) return {"code" : 1 , "msg" : "success" }@app.post("/user" , response_model=UserOut ) def create_user (user: UserIn, db: Session = Depends(get_db ) ): return crud.create_user(user, db)@app.put("/user/{user_id}" , response_model=UserOut ) def update_user_by_id (user_id: int , user: UserIn, db: Session = Depends(get_db ) ): return crud.update_user(user_id, user, db)