【python】FastAPI之数据库操作(五)

8-1 关系型数据库

由于之前学过数据库基础知识,这里不做介绍只做对比

场景的数据库一般表现为软件,比如

  • MySQL、Oracle、SQL Server、PostgreSQL、DB2、SQLite等等(关系型数据库)。
  • Redis、Memcached、MangoDb等等(键值型数据库)。

关系型数据库

  • 关系型数据库,是指采用了关系模型来组织数据的数据库,其以行和列的形式存储数据,可以类比EXCEL表格。
  • 一张表可以包含多列,每一列都有一个标题字段。然后可以有N行,一行数据有多个字段
  • 一个数据库中可以包含多张表。

8-2 使用pymysql

8-2使用pymysql查询

  • 在数据库 db 中新建下面这张 users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 新建一个数据库,名字是 db
create database db charset utf8;

-- 使用 db
use db;

-- 在db中新建一张 users表
CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL,
`password` varchar(255) COLLATE utf8_bin NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=1 ;


-- 插入两条数据
insert into users(name, password) values("liuxu","12345");
insert into users(name, password) values("liuxu2","12345");
  • 通过Python代码连接数据库并查询数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pymysql

# 获取连接
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='12345',
database='db',
charset='utf8'
)

# 获取游标
cursor = conn.cursor()


# 准备SQL语句
sql = 'select * from users;'


print(cursor.execute(sql)) # 返回受影响的函数

print(cursor.fetchone()) # 取出一行数据, (1, 'liuxu', '12345')
print(cursor.fetchmany(2)) # 取出N行数据
print(cursor.fetchall()) # 取出剩下所有数据
  • 按条件查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...


sql = 'select * from users where id = %s;' # 使用 %s占位

cursor.execute(sql, 2) # execute()中第一个是sql语句,第二个是替换占位的条件

for d in cursor.fetchall():
print(d)
...


sql = 'select * from users where id >= %s and id <= %s;' # 使用 %s占位

cursor.execute(sql, [1, 2]) # 以为有两个参数,使用列表会元组的形式存放替换占位的条件

for d in cursor.fetchall():
print(d)

...
cursor = conn.cursor(pymysql.cursors.DictCursor)

sql = 'select * from users where id >= %(start)s and id <= %(end)s;'

cursor.execute(sql, {"start": 1, "end": 2})
for d in cursor.fetchall():
print(d)

8-3 使用pymysql之新增数据

示例1:增加数据,手动提交

1
2
3
4
5
6
7
...
cursor = conn.cursor(pymysql.cursors.DictCursor)


sql = 'insert into users(name, password) values("liuxu3", "123456");'
cursor.execute(sql)
conn.commit() # 需要确认一次, 否则数据库中不会保存记录

示例2:自动提交保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymysql

# 获取连接
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='12345',
database='db',
charset='utf8',
autocommit=True # autocommit=True,表示execute后自动提交保存,无需再手动commit()
)

cursor = conn.cursor(pymysql.cursors.DictCursor)


sql = 'insert into users(name, password) values("liuxu4", "123456")'
cursor.execute(sql)

# 批量执行 executemany
示例3
cursor = conn.cursor(pymysql.cursors.DictCursor)


sql = 'insert into users(name, password) values(%s, %s)' # 占位, 注意和 "%s"的区别!!!
item_list = [("liuxu10", "12345"), ("liuixu11", "12345"), ("liuxu12", "12345")] # 多个替换数据
cursor.executemany(sql, args=item_list) # 返回受影响的函数

8-4 使用pymysql之修改和删除数据

8-3 ORM介绍

需求场景

上面我们使用pymysql可以直接操作MySQL,但是你会发现对数据的增删改查,都需要我们自己手写SQL语句。并且,存在大量重复的代码,不能使用面向对象。那是否是否一种更优雅的方式来操作数据库呢?

解决方式

  • 使用ORM来完成对数据的操作

  • ORM:对象关系映射(Object-Relational Mapping),它可以将Python中的类-对象-属性 和 数据库中的表-行数据-字段做映射。利用它,我们就不需要在直接操作数据库中的表、行和字段了,直接操作Python中面向对象的类、对象和属性即可实现数据的CRUD

  • 类<——->数据表

  • 类对象<——->数据行

  • 类属性<——–>字段

1

  • 优点:能够让一个不用sql语句的小白也能够通过python 面向对象的代码简单快捷的操作数据库

  • 缺点:封装程度太高,有时候sql语句的效率相对较低,可能需要你自己写SQL语句

Python中的ORM典型代表

  • Django-ORM,是Django框架自带的ORM,只能在django中使用
  • SQLAlchemy ORM,是一个开源的ORM,不依赖web框架
  • Peewee ,不依赖web框架的ORM
  • 等等。

8-4 使用SQLAlchemy查询数据【实现】

pip3 install SQLAlchemy

使用SQLAlchemy的基本流程分为三步:

  • 创建session对象
  • 创建类模型
  • 使用session对象模型类实现数据的CRUD

第一步:创建session对象—-与数据库进行连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 使用pymysql作为MySQLdb
pymysql.install_as_MySQLdb()

# 指定连接的MySQL数据库
DATABASE_URL = "mysql://root:12345@localhost:3306/db" # 多种

# 创建引擎
engine = create_engine(DATABASE_URL)

# 基于引擎创建session
SessionLocal = sessionmaker(bind=engine)

# 实例化session对象,得到db对象
db = SessionLocal()

# db使用只有需要关闭,避免占用资源
db.close()

第二步:创建模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from sqlalchemy.orm import sessionmaker,declarative_base
from sqlalchemy import Column, String, Integer
# 创建对象的基类:
BaseModel = declarative_base() # declarative_base方法返回的是一个类
# 定义User对象:
class User(BaseModel):
__tablename__ = "users" # 指定数据库中表的名字

id = Column(Integer, primary_key=True, autoincrement=True) # 定义id作为主键且自增
name = Column(String(255)) # 定义name字段,是一个string类型
password = Column(String(255)) # password,是一个string类型

# 人性化定制对象的打印输出
def __str__(self):
return f"id: {self.id}, name: {self.name}, password: {self.password}"

第三步:使用session对象和模型类实现数据的查询操

  • result: User = db.query(User).filter(User.id == 1).first()
  • 必须使用 first(), 否则只是SQL语句,不会执行查询操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker, declarative_base

# 创建对象的基类:
BaseModel = declarative_base()


# 定义表对象: 只是在做映射接收表数据
class TestGroup(BaseModel):
__tablename__ = "test_group" # 定义它的表名

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(255))
num = Column(Integer)
teacher = Column(String(255))

def __str__(self):# 当对象被访问打印时触发执行,他必须有一个字符串类型的返回值
return f"id: {self.id}, name: {self.name}, nums: {self.num},teacher: {self.teacher}"


# 定义连接url
DATABASE_URL = "sqlite:///study.db"
# 创建引擎 和会话
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
# db是什么,db是一个会话,其实就是连接上了这个数据库,这个表的映射对象
db = SessionLocal()
# db.query(TestGroup)传入这个定义的表对象,通过引擎对象去运行,执行查询方法,然后filter是筛选
result: TestGroup = db.query(TestGroup).filter(TestGroup.id == 1).first() # 一定要执行first
print(result)
print(result.id, result.name)

# 关闭db连接(重要)
db.close()

8-8 使用SQLAlchemy查询补充【好用】

我们除了使用 db.query(User).filter(User.id == 1).first() 来过滤查询数据外,还有很多其他的查询方式可以使用。

示例1:返回符合条件的所有数据 all()

1
2
3
db = SessionLocal()
result: TestGroup = db.query(User).filter(User.id >=5).all()
print(result)

示例2:使用 filter_by()简化筛选参数

1
2
3
db = SessionLocal()
result: TestGroup = db.query(User).filter_by(id=2).first()
print(result)

示例3:主键筛选使用 get(),直接返回对象的示例

1
2
3
db = SessionLocal()
result: TestGroup = db.query(User).get(2) # get(2), 表示查询主键id=2的数据
print(result)

示例4:使用 order_by 排序

  • 默认是升序排列
  • db.query(User).order_by(User.id.desc()).all() 使用 字段.desc()指定降序排列
1
2
3
db = SessionLocal()
result: TestGroup = db.query(User).order_by(User.id).all()
print([r.id for r in result])

示例5:分页逻辑。

  • offset(n)指定过滤几行数据再开始查询
  • limit(n)指定每次查询几条数据
1
2
3
db = SessionLocal()
result: User = db.query(User).offset(0).limit(3).all()
print([r.id for r in result])

8-5 使用SQLAlchemy新增数据

示例1:新增数据

  • 使用 add(模型类对象)新增数据
  • 必须使用 db.commit()提交数据,否则新增的数据不会写到数据库中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1、新增单条数据 用add方法
db = SessionLocal()

db.add(User(name="liuxu", password="123456"))
db.add(User(name="liuxu2", password="123456"))
db.commit() # 非常重要

# 2、新增批量 使用add_all()
user3 = User(name="liuxu3", password="123456")
user4 = User(name="liuxu4", password="123456")
db.add_all([user3, user4])
db.commit()

# 3、批量新增数据,使用 bulk_save_objects( ) 批量保存对象
user5 = User(name="liuxu5", password="123456")
user6 = User(name="liuxu6", password="123456")
db.bulk_save_objects([user5, user6])
db.commit()
db.close()

8-6 使用SQLAlchemy修改和删除数据

修改数据和查询数据的原则:先查到数据,再修改或删除

示例1:修改数据 直接对象.属性 = “新字段名”

1
2
3
4
5
6
db = SessionLocal()

user: User = db.query(User).filter(User.id == 1).first()
print(user)
user.name = "LIUXU" # 直接修改对象的属性值
db.commit() # 然后提交保存才能更新

示例2:修改数据,使用 update(),支持批量更新

1
2
3
db = SessionLocal()
db.query(User).filter(User.id == 1).update({"name": "LLL"}) # 更新指定字段的值
db.commit() # 提交保存修改

示例3:删除数据,使用 delete(),支持批量删除

1
2
3
db = SessionLocal()
db.query(User).filter(User.id >= 3).delete() # 支持批量删除
db.commit() # 提交才能删除

8-7 FastAPI集成SQLAlchemy之查询和删除【有用】

  • 使用ORM的查询和删除操作
  • 使用依赖注入的方式获取db,在请求来的时候获取db连接,在请求结束的时候关闭db连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import typing

from fastapi import FastAPI, HTTPException, Depends
import pymysql
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

pymysql.install_as_MySQLdb()


DATABASE_URL = "mysql://root:12345@localhost:3306/db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)


def get_db():
db: Session = SessionLocal()
try:
yield db # yield 妙用
finally:
db.close()


BaseModel = declarative_base()


class User(BaseModel):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(255))
password = Column(String(255))

def __str__(self):
return f"id: {self.id}, name: {self.name}, password: {self.password}"


app = FastAPI(title="FastAPI + SqlAlchemy")


@app.get("/users")
def get_users(page: int = 1, size: int = 3, db: Session = Depends(get_db)):
users: typing.List[User] = db.query(User).all()[(page - 1) * size:page * size]
return [{"id": u.id, "name": u.name} for u in users]


@app.get("/user/{user_id}")
def get_user_by_id(user_id: int, db: Session = Depends(get_db)):
user: User = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
return {"id": user.id, "name": user.name}


@app.delete("/user/{user_id}")
def delete_user_by_id(user_id: int, db: Session = Depends(get_db)):
db.query(User).filter(User.id == user_id).delete()
return {"code": 1, "msg": "success"}

8-8 FastAPI集成SQLAlchemy之新建和更新

  • 使用pydantic的BaseModel做Schema接收请求体数据并做校验。

  • 使用使用SQLAlchemy的ORM实现新建和更新数据操作。

  • db.refresh(db_user) # refresh之后,db_user才有数据

    • 在SQLAlchemy中,db.refresh()是会话对象(Session)的一个方法,用于刷新对象的状态。

      当您从数据库中查询一个对象并将其放入会话中后,该对象的状态将保持在会话中,即使数据库中的数据发生了变化。但有时候您可能希望更新会话中的对象状态,以反映最新的数据库数据。这就是db.refresh()方法发挥作用的地方。

      db.refresh(obj)方法会查询数据库,获取对象的最新数据,并将其更新到会话中的对象上。这样,您可以确保会话中的对象与数据库中的数据保持同步

  • orm_mode = True # 设置后 response_model=UserOut时,才能直接返回 db_user

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pymysql
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel as SchemaBaseModel

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

pymysql.install_as_MySQLdb()


DATABASE_URL = "mysql://root:12345@localhost:3306/db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)


def get_db():
db: Session = SessionLocal()
try:
yield db
finally:
db.close()


BaseModel = declarative_base()


class User(BaseModel):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(255))
password = Column(String(255))

def __str__(self):
return f"id: {self.id}, name: {self.name}, password: {self.password}"


app = FastAPI(title="FastAPI + SqlAlchemy")


class UserBase(SchemaBaseModel):
name: str


class UserIn(UserBase):
password: str


class UserOut(UserBase):
id: int

class Config:
orm_mode = True # 设置后 response_model=UserOut时,才能直接返回 db_user


@app.post("/user", response_model=UserOut)
def create_user(user: UserIn, db: Session = Depends(get_db)):
# db_user = User(name=user.name, password=user.password)
db_user = User(**user.dict()) # 和上面的用法是等价的
db.add(db_user)
db.commit()
db.refresh(db_user) # refresh之后,db_user才有数据
return db_user


@app.put("/user/{user_id}", response_model=UserOut)
def update_user_by_id(user_id: int, user: UserIn, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
db_user.name = user.name
db_user.password = user.password
db.commit()
db.refresh(db_user)
return db_user

8-9 项目代码结构调整【重点】

目前,我们已经实现了FastAPI和SQLAlchemy的集成,但是发现,项目的所有代码都在一个Python文件中。

这样会造成代码结构不清楚,层次不清晰,代码可读性差,维护成本高。

因此,需要做代码拆分,按照功能拆分成不同的Python文件。

  • 项目完整代码示例(一个文件版本)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import typing
import pymysql
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel as SchemaBaseModel

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

pymysql.install_as_MySQLdb()


DATABASE_URL = "mysql://root:12345@localhost:3306/db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)


def get_db():
db: Session = SessionLocal()
try:
yield db
finally:
db.close()


BaseModel = declarative_base()


class User(BaseModel):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(255))
password = Column(String(255))

def __str__(self):
return f"id: {self.id}, name: {self.name}, password: {self.password}"


app = FastAPI(title="FastAPI + SqlAlchemy")


class UserBase(SchemaBaseModel):
name: str


class UserIn(UserBase):
password: str


class UserOut(UserBase):
id: int

class Config:
orm_mode = True # 设置后 response_model=UserOut时,才能直接返回 db_user


@app.get("/users")
def get_users(page: int = 1, size: int = 3, db: Session = Depends(get_db)):
users: typing.List[User] = db.query(User).all()[(page - 1) * size:page * size]
return [{"id": u.id, "name": u.name} for u in users]


@app.get("/user/{user_id}")
def get_user_by_id(user_id: int, db: Session = Depends(get_db)):
user: User = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
return {"id": user.id, "name": user.name}


@app.delete("/user/{user_id}")
def delete_user_by_id(user_id: int, db: Session = Depends(get_db)):
db.query(User).filter(User.id == user_id).delete()
db.commit()
return {"code": 1, "msg": "success"}


@app.post("/user", response_model=UserOut)
def create_user(user: UserIn, db: Session = Depends(get_db)):
# db_user = User(name=user.name, password=user.password)
db_user = User(**user.dict()) # 和上面的用法是等价的
db.add(db_user)
db.commit()
db.refresh(db_user) # refresh之后,db_user才有数据
return db_user


@app.put("/user/{user_id}", response_model=UserOut)
def update_user_by_id(user_id: int, user: UserIn, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
db_user.name = user.name
db_user.password = user.password
db.commit()
db.refresh(db_user)
return db_user

拆分后的项目结构

1
2
3
4
5
6
── sql_app
├── main.py # 程序入口
├── crud.py # 增删查改的方法
├── database.py # 数据库相关
├── models.py # ORM模型类相关
└── schemas.py # Pydantic的BaseModel,校验相关
  • database.py # 数据库相关的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pymysql

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

pymysql.install_as_MySQLdb()


DATABASE_URL = "mysql://root:12345@localhost:3306/db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)


def get_db():
db: Session = SessionLocal() # db对象出来了
try:
yield db
finally:
db.close()
  • models.py # ORM模型相关代码–建立与表对应的模型类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy import Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base


BaseModel = declarative_base()


class User(BaseModel): # 创建一个模型类 定义表名字段
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(255))
password = Column(String(255))

def __str__(self):
return f"id: {self.id}, name: {self.name}, password: {self.password}"
  • schemas.py # Pydantic的BaseModel,校验相关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pydantic import BaseModel


class UserBase(BaseModel):
name: str


class UserIn(UserBase): # 输入的 用的响应模型
password: str


class UserOut(UserBase): # 输出用的 响应模型,输出要屏蔽密码
id: int

class Config:
orm_mode = True
  • crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import typing

from fastapi import HTTPException
from sqlalchemy.orm import Session

from models import User
from schemas import UserIn


def get_users(page: int, size: int, db: Session) -> typing.List[User]:
users: typing.List[User] = db.query(User).all()[(page - 1) * size:page * size]
return users


def get_user_by_id(user_id: int, db: Session) -> User:
user: User = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
return user


def delete_user_by_id(user_id: int, db: Session):
db.query(User).filter(User.id == user_id).delete()
db.commit()


def create_user(user: UserIn, db: Session) -> User:
# db_user = User(name=user.name, password=user.password)
db_user = User(**user.dict()) # 和上面的用法是等价的
db.add(db_user)
db.commit()
db.refresh(db_user) # refresh之后,db_user才有数据
return db_user


def update_user(user_id: int, user: UserIn, db: Session) -> User:
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(detail=f"Not found user with id: {user_id}", status_code=404)
db_user.name = user.name
db_user.password = user.password
db.commit()
db.refresh(db_user)
return db_user
  • main.py # 程序主入口,定义接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import typing

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

import crud
from models import User
from database import get_db
from schemas import UserIn, UserOut

app = FastAPI(title="FastAPI + SqlAlchemy")


@app.get("/users", response_model=typing.List[UserOut])
def get_users(page: int = 1, size: int = 3, db: Session = Depends(get_db)):
user = crud.get_users(page, size, db)
return user


@app.get("/user/{user_id}", response_model=UserOut)
def get_user_by_id(user_id: int, db: Session = Depends(get_db)):
user: User = crud.get_user_by_id(user_id, db)
return user


@app.delete("/user/{user_id}")
def delete_user_by_id(user_id: int, db: Session = Depends(get_db)):
crud.delete_user_by_id(user_id, db)
return {"code": 1, "msg": "success"}


@app.post("/user", response_model=UserOut)
def create_user(user: UserIn, db: Session = Depends(get_db)):
return crud.create_user(user, db)


@app.put("/user/{user_id}", response_model=UserOut)
def update_user_by_id(user_id: int, user: UserIn, db: Session = Depends(get_db)):
return crud.update_user(user_id, user, db)


【python】FastAPI之数据库操作(五)
http://example.com/2024/02/28/685FastAPI5数据库操作/
作者
Wangxiaowang
发布于
2024年2月28日
许可协议