跳转至

SQL (关系型) 数据库

Info

这些文档即将被更新。🎉

当前版本假设Pydantic v1和SQLAlchemy版本小于2。

新的文档将包括Pydantic v2以及 SQLModel(也是基于SQLAlchemy),一旦SQLModel更新为为使用Pydantic v2。

**FastAPI**不需要你使用SQL(关系型)数据库。

但是您可以使用任何您想要的关系型数据库。

在这里,让我们看一个使用着SQLAlchemy的示例。

您可以很容易地将其调整为任何SQLAlchemy支持的数据库,如:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server,等等其它数据库

在此示例中,我们将使用**SQLite**,因为它使用单个文件并且 在Python中具有集成支持。因此,您可以复制此示例并按原样来运行它。

稍后,对于您的产品级别的应用程序,您可能会要使用像**PostgreSQL**这样的数据库服务器。

Tip

这儿有一个**FastAPI**和**PostgreSQL**的官方项目生成器,全部基于**Docker**,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql

Note

请注意,大部分代码是SQLAlchemy的标准代码,您可以用于任何框架。FastAPI特定的代码和往常一样少。

ORMs(对象关系映射)

**FastAPI**可与任何数据库在任何样式的库中一起与 数据库进行通信。

一种常见的模式是使用“ORM”:对象关系映射。

ORM 具有在代码和数据库表(“关系型”)中的**对象**之间转换(“*映射”)的工具。

使用 ORM,您通常会在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。

例如,一个类Pet可以表示一个 SQL 表pets

该类的每个*实例对象都代表数据库中的一行数据。*

又例如,一个对象orion_catPet的一个实例)可以有一个属性orion_cat.type, 对标数据库中的type列。并且该属性的值可以是其它,例如"cat"

这些 ORM 还具有在表或实体之间建立关系的工具(比如创建多表关系)。

这样,您还可以拥有一个属性orion_cat.owner,它包含该宠物所有者的数据,这些数据取自另外一个表。

因此,orion_cat.owner.name可能是该宠物主人的姓名(来自表owners中的列name)。

它可能有一个像"Arquilian"(一种业务逻辑)。

当您尝试从您的宠物对象访问它时,ORM 将完成所有工作以从相应的表*所有者那里再获取信息。*

常见的 ORM 例如:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架)等。

在这里,我们将看到如何使用**SQLAlchemy ORM**。

以类似的方式,您也可以使用任何其他 ORM。

Tip

在文档中也有一篇使用 Peewee 的等效的文章。

文件结构

对于这些示例,假设您有一个名为的目录my_super_project,其中包含一个名为的子目录sql_app,其结构如下:

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

该文件__init__.py只是一个空文件,但它告诉 Python sql_app 是一个包。

现在让我们看看每个文件/模块的作用。

安装 SQLAlchemy

首先你需要安装SQLAlchemy:

$ pip install sqlalchemy

---> 100%

创建 SQLAlchemy 部件

让我们转到文件sql_app/database.py

导入 SQLAlchemy 部件

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

为 SQLAlchemy 定义数据库 URL地址

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

在这个例子中,我们正在“连接”到一个 SQLite 数据库(用 SQLite 数据库打开一个文件)。

该文件将位于文件中的同一目录中sql_app.db

这就是为什么最后一部分是./sql_app.db.

如果您使用的是**PostgreSQL**数据库,则只需取消注释该行:

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

...并根据您的数据库数据和相关凭据(也适用于 MySQL、MariaDB 或任何其他)对其进行调整。

Tip

如果您想使用不同的数据库,这是就是您必须修改的地方。

创建 SQLAlchemy 引擎

第一步,创建一个 SQLAlchemy的“引擎”。

我们稍后会将这个engine在其他地方使用。

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

注意

参数:

connect_args={"check_same_thread": False}

...仅用于SQLite,在其他数据库不需要它。

"技术细节"

默认情况下,SQLite 只允许一个线程与其通信,假设有多个线程的话,也只将处理一个独立的请求。

这是为了防止意外地为不同的事物(不同的请求)共享相同的连接。

但是在 FastAPI 中,使用普通函数(def)时,多个线程可以为同一个请求与数据库交互,所以我们需要使用connect_args={"check_same_thread": False}来让SQLite允许这样。

此外,我们将确保每个请求都在依赖项中获得自己的数据库连接会话,因此不需要该默认机制。

创建一个SessionLocal

每个SessionLocal类的实例都会是一个数据库会话。当然该类本身还不是数据库会话。

但是一旦我们创建了一个SessionLocal类的实例,这个实例将是实际的数据库会话。

我们将它命名为SessionLocal是为了将它与我们从 SQLAlchemy 导入的Session区别开来。

稍后我们将使用Session(从 SQLAlchemy 导入的那个)。

要创建SessionLocal类,请使用函数sessionmaker

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建一个Base

现在我们将使用declarative_base()返回一个类。

稍后我们将继承这个类,来创建每个数据库模型或类(ORM 模型):

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建数据库模型

现在让我们看看文件sql_app/models.py

Base类来创建 SQLAlchemy 模型

我们将使用我们之前创建的Base类来创建 SQLAlchemy 模型。

Tip

SQLAlchemy 使用的“模型”这个术语 来指代与数据库交互的这些类和实例。

而 Pydantic 也使用“模型”这个术语 来指代不同的东西,即数据验证、转换以及文档类和实例。

database(来自上面的database.py文件)导入Base

创建从它继承的类。

这些类就是 SQLAlchemy 模型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

这个__tablename__属性是用来告诉 SQLAlchemy 要在数据库中为每个模型使用的数据库表的名称。

创建模型属性/列

现在创建所有模型(类)的属性。

这些属性中的每一个都代表其相应数据库表中的一列。

我们使用Column来表示 SQLAlchemy 中的默认值。

我们传递一个 SQLAlchemy “类型”,如IntegerStringBoolean,它定义了数据库中的类型,作为参数。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

创建关系

现在创建关系。

为此,我们使用SQLAlchemy ORM提供的relationship

这将或多或少会成为一种“神奇”属性,其中表示该表与其他相关的表中的值。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

当访问 user 中的属性items时,如 中my_user.items,它将有一个ItemSQLAlchemy 模型列表(来自items表),这些模型具有指向users表中此记录的外键。

当您访问my_user.items时,SQLAlchemy 实际上会从items表中的获取一批记录并在此处填充进去。

同样,当访问 Item中的属性owner时,它将包含表中的UserSQLAlchemy 模型users。使用owner_id属性/列及其外键来了解要从users表中获取哪条记录。

创建 Pydantic 模型

现在让我们查看一下文件sql_app/schemas.py

Tip

为了避免 SQLAlchemy*模型*和 Pydantic*模型*之间的混淆,我们将有models.py(SQLAlchemy 模型的文件)和schemas.py( Pydantic 模型的文件)。

这些 Pydantic 模型或多或少地定义了一个“schema”(一个有效的数据形状)。

因此,这将帮助我们在使用两者时避免混淆。

创建初始 Pydantic*模型*/模式

创建一个ItemBaseUserBasePydantic*模型*(或者我们说“schema”),他们拥有创建或读取数据时具有的共同属性。

然后创建一个继承自他们的ItemCreateUserCreate,并添加创建时所需的其他数据(或属性)。

因此在创建时也应当有一个password属性。

但是为了安全起见,password不会出现在其他同类 Pydantic*模型*中,例如通过API读取一个用户数据时,它不应当包含在内。

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

SQLAlchemy 风格和 Pydantic 风格

请注意,SQLAlchemy*模型*使用 =来定义属性,并将类型作为参数传递给Column,例如:

name = Column(String)

虽然 Pydantic*模型*使用: 声明类型,但新的类型注释语法/类型提示是:

name: str

请牢记这一点,这样您在使用:还是=时就不会感到困惑。

创建用于读取/返回的Pydantic*模型/模式*

现在创建当从 API 返回数据时、将在读取数据时使用的Pydantic*模型(schemas)。*

例如,在创建一个项目之前,我们不知道分配给它的 ID 是什么,但是在读取它时(从 API 返回时)我们已经知道它的 ID。

同样,当读取用户时,我们现在可以声明items,将包含属于该用户的项目。

不仅是这些项目的 ID,还有我们在 Pydantic*模型*中定义的用于读取项目的所有数据:Item.

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

Tip

请注意,读取用户(从 API 返回)时将使用不包括passwordUser Pydantic*模型*。

使用 Pydantic 的orm_mode

现在,在用于查询的 Pydantic*模型*ItemUser,添加一个内部Config类。

此类Config用于为 Pydantic 提供配置。

Config类中,设置属性orm_mode = True

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

Tip

请注意,它使用=分配一个值,例如:

orm_mode = True

它不使用之前的:来类型声明。

这是设置配置值,而不是声明类型。

Pydanticorm_mode将告诉 Pydantic*模型*读取数据,即它不是一个dict,而是一个 ORM 模型(或任何其他具有属性的任意对象)。

这样,而不是仅仅试图从dictid 中获取值,如下所示:

id = data["id"]

它还会尝试从属性中获取它,如:

id = data.id

有了这个,Pydantic*模型*与 ORM 兼容,您只需在*路径操作*response_model的参数中声明它即可。

您将能够返回一个数据库模型,它将从中读取数据。

ORM 模式的技术细节

SQLAlchemy 和许多其他默认情况下是“延迟加载”。

这意味着,例如,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。

例如,访问属性items

current_user.items

将使 SQLAlchemy 转到items表并获取该用户的项目,在调用.items之前不会去查询数据库。

没有orm_mode,如果您从*路径操作*返回一个 SQLAlchemy 模型,它不会包含关系数据。

即使您在 Pydantic 模型中声明了这些关系,也没有用处。

但是在 ORM 模式下,由于 Pydantic 本身会尝试从属性访问它需要的数据(而不是假设为 dict),你可以声明你想要返回的特定数据,它甚至可以从 ORM 中获取它。

CRUD工具

现在让我们看看文件sql_app/crud.py

在这个文件中,我们将编写可重用的函数用来与数据库中的数据进行交互。

**CRUD**分别为:增加(**C**reate)、查询(**R**ead)、更改(**U**pdate)、删除(**D**elete),即增删改查。

...虽然在这个例子中我们只是新增和查询。

读取数据

sqlalchemy.orm中导入Session,这将允许您声明db参数的类型,并在您的函数中进行更好的类型检查和完成。

导入之前的models(SQLAlchemy 模型)和schemas(Pydantic*模型*/模式)。

创建一些工具函数来完成:

  • 通过 ID 和电子邮件查询单个用户。
  • 查询多个用户。
  • 查询多个项目。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

Tip

通过创建仅专用于与数据库交互(获取用户或项目)的函数,独立于*路径操作函数*,您可以更轻松地在多个部分中重用它们,并为它们添加单元测试。

创建数据

现在创建工具函数来创建数据。

它的步骤是:

  • 使用您的数据创建一个 SQLAlchemy 模型*实例。*
  • 使用add来将该实例对象添加到数据库会话。
  • 使用commit来将更改提交到数据库(以便保存它们)。
  • 使用refresh来刷新您的实例对象(以便它包含来自数据库的任何新数据,例如生成的 ID)。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

Tip

SQLAlchemy 模型User包含一个hashed_password,它应该是一个包含散列的安全密码。

但由于 API 客户端提供的是原始密码,因此您需要将其提取并在应用程序中生成散列密码。

然后将hashed_password参数与要保存的值一起传递。

Warning

此示例不安全,密码未经过哈希处理。

在现实生活中的应用程序中,您需要对密码进行哈希处理,并且永远不要以明文形式保存它们。

有关更多详细信息,请返回教程中的安全部分。

在这里,我们只关注数据库的工具和机制。

Tip

这里不是将每个关键字参数传递给Item并从Pydantic模型中读取每个参数,而是先生成一个字典,其中包含Pydantic模型的数据:

item.dict()

然后我们将dict的键值对 作为关键字参数传递给 SQLAlchemy Item

Item(**item.dict())

然后我们传递 Pydantic模型未提供的额外关键字参数owner_id

Item(**item.dict(), owner_id=user_id)

主**FastAPI**应用程序

现在在sql_app/main.py文件中 让我们集成和使用我们之前创建的所有其他部分。

创建数据库表

以非常简单的方式创建数据库表:

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Alembic 注意

通常你可能会使用 Alembic,来进行格式化数据库(创建表等)。

而且您还可以将 Alembic 用于“迁移”(这是它的主要工作)。

“迁移”是每当您更改 SQLAlchemy 模型的结构、添加新属性等以在数据库中复制这些更改、添加新列、新表等时所需的一组步骤。

您可以在Full Stack FastAPI Template的模板中找到一个 FastAPI 项目中的 Alembic 示例。具体在alembic代码目录中

创建依赖项

现在使用我们在sql_app/database.py文件中创建的SessionLocal来创建依赖项。

我们需要每个请求有一个独立的数据库会话/连接(SessionLocal),在整个请求中使用相同的会话,然后在请求完成后关闭它。

然后将为下一个请求创建一个新会话。

为此,我们将创建一个包含yield的依赖项,正如前面关于Dependencies withyield的部分中所解释的那样。

我们的依赖项将创建一个新的 SQLAlchemy SessionLocal,它将在单个请求中使用,然后在请求完成后关闭它。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Info

我们将SessionLocal()请求的创建和处理放在一个try块中。

然后我们在finally块中关闭它。

通过这种方式,我们确保数据库会话在请求后始终关闭。即使在处理请求时出现异常。

但是您不能从退出代码中引发另一个异常(在yield之后)。可以查阅 Dependencies with yield and HTTPException

*然后,当在路径操作函数*中使用依赖项时,我们使用Session,直接从 SQLAlchemy 导入的类型声明它。

*这将为我们在路径操作函数*中提供更好的编辑器支持,因为编辑器将知道db参数的类型Session

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

"技术细节"

参数db实际上是 type SessionLocal,但是这个类(用 创建sessionmaker())是 SQLAlchemy 的“代理” Session,所以,编辑器并不真正知道提供了哪些方法。

但是通过将类型声明为Session,编辑器现在可以知道可用的方法(.add()、.query()、.commit()等)并且可以提供更好的支持(比如完成)。类型声明不影响实际对象。

创建您的**FastAPI** 路径操作

现在,到了最后,编写标准的**FastAPI** *路径操作*代码。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

我们在依赖项中的每个请求之前利用yield创建数据库会话,然后关闭它。

所以我们就可以在*路径操作函数*中创建需要的依赖,就能直接获取会话。

这样,我们就可以直接从*路径操作函数*内部调用crud.get_user并使用该会话,来进行对数据库操作。

Tip

请注意,您返回的值是 SQLAlchemy 模型或 SQLAlchemy 模型列表。

但是由于所有路径操作的response_model都使用 Pydantic模型/使用orm_mode模式,因此您的 Pydantic 模型中声明的数据将从它们中提取并返回给客户端,并进行所有正常的过滤和验证。

Tip

另请注意,response_models应当是标准 Python 类型,例如List[schemas.Item].

但是由于它的内容/参数List是一个 使用orm_mode模式的Pydantic模型,所以数据将被正常检索并返回给客户端,所以没有问题。

关于 def 对比 async def

*在这里,我们在路径操作函数*和依赖项中都使用着 SQLAlchemy 模型,它将与外部数据库进行通信。

这会需要一些“等待时间”。

但是由于 SQLAlchemy 不具有await直接使用的兼容性,因此类似于:

user = await db.query(User).first()

...相反,我们可以使用:

user = db.query(User).first()

然后我们应该声明*路径操作函数*和不带 的依赖关系async def,只需使用普通的def,如下:

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    ...

Info

如果您需要异步连接到关系数据库,请参阅Async SQL (Relational) Databases

"Very Technical Details"

如果您很好奇并且拥有深厚的技术知识,您可以在Async文档中查看有关如何处理 async defdef差别的技术细节。

迁移

因为我们直接使用 SQLAlchemy,并且我们不需要任何类型的插件来使用**FastAPI**,所以我们可以直接将数据库迁移至Alembic进行集成。

由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码位于单独的独立文件中,您甚至可以使用 Alembic 执行迁移,而无需安装 FastAPI、Pydantic 或其他任何东西。

同样,您将能够在与**FastAPI**无关的代码的其他部分中使用相同的 SQLAlchemy 模型和实用程序。

例如,在具有CeleryRQARQ的后台任务工作者中。

审查所有文件

最后回顾整个案例,您应该有一个名为的目录my_super_project,其中包含一个名为sql_app

sql_app中应该有以下文件:

  • sql_app/__init__.py:这是一个空文件。

  • sql_app/database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • sql_app/models.py:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")
  • sql_app/schemas.py:
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
  • sql_app/crud.py:
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
  • sql_app/main.py:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

执行项目

您可以复制这些代码并按原样使用它。

Info

事实上,这里的代码只是大多数测试代码的一部分。

你可以用 Uvicorn 运行它:

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

打开浏览器进入 http://127.0.0.1:8000/docs。

您将能够与您的**FastAPI**应用程序交互,从真实数据库中读取数据:

直接与数据库交互

如果您想独立于 FastAPI 直接浏览 SQLite 数据库(文件)以调试其内容、添加表、列、记录、修改数据等,您可以使用SQLite 的 DB Browser

它看起来像这样:

您还可以使用SQLite ViewerExtendsClass等在线 SQLite 浏览器。

中间件替代数据库会话

如果你不能使用带有yield的依赖项——例如,如果你没有使用**Python 3.7**并且不能安装上面提到的**Python 3.6**的“backports” ——你可以使用类似的方法在“中间件”中设置会话。

“中间件”基本上是一个对每个请求都执行的函数,其中一些代码在端点函数之前执行,另一些代码在端点函数之后执行。

创建中间件

我们要添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemySessionLocal,将其添加到请求中,然后在请求完成后关闭它。

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Info

我们将SessionLocal()请求的创建和处理放在一个try块中。

然后我们在finally块中关闭它。

通过这种方式,我们确保数据库会话在请求后始终关闭,即使在处理请求时出现异常也会关闭。

关于request.state

request.state是每个Request对象的属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。您可以在Starlette 的关于Requeststate的文档中了解更多信息。

对于这种情况下,它帮助我们确保在整个请求中使用单个数据库会话,然后关闭(在中间件中)。

使用yield依赖项与使用中间件的区别

在此处添加**中间件**与yield的依赖项的作用效果类似,但也有一些区别:

  • 中间件需要更多的代码并且更复杂一些。
  • 中间件必须是一个async函数。
    • 如果其中有代码必须“等待”网络,它可能会在那里“阻止”您的应用程序并稍微降低性能。
    • 尽管这里的SQLAlchemy工作方式可能不是很成问题。
    • 但是,如果您向等待大量I/O的中间件添加更多代码,则可能会出现问题。
  • *每个*请求都会运行一个中间件。
    • 将为每个请求创建一个连接。
    • 即使处理该请求的*路径操作*不需要数据库。

Tip

最好使用带有yield的依赖项,如果这足够满足用例需求

Info

带有yield的依赖项是最近刚加入**FastAPI**中的。

所以本教程的先前版本只有带有中间件的示例,并且可能有多个应用程序使用中间件进行数据库会话管理。