Skip to content

🗄 (🔗) 💽 ⏮️ 🏒

Warning

🚥 👆 ▶️, 🔰 🗄 (🔗) 💽 👈 ⚙️ 🇸🇲 🔜 🥃.

💭 🆓 🚶 👉.

🚥 👆 ▶️ 🏗 ⚪️➡️ 🖌, 👆 🎲 👻 📆 ⏮️ 🇸🇲 🐜 (🗄 (🔗) 💽), ⚖️ 🙆 🎏 🔁 🐜.

🚥 👆 ⏪ ✔️ 📟 🧢 👈 ⚙️ 🏒 🐜, 👆 💪 ✅ 📥 ❔ ⚙️ ⚫️ ⏮️ FastAPI.

🐍 3️⃣.7️⃣ ➕ ✔

👆 🔜 💪 🐍 3️⃣.7️⃣ ⚖️ 🔛 🔒 ⚙️ 🏒 ⏮️ FastAPI.

🏒 🔁

🏒 🚫 🔧 🔁 🛠️, ⚖️ ⏮️ 👫 🤯.

🏒 ✔️ 🏋️ 🔑 🔃 🚮 🔢 & 🔃 ❔ ⚫️ 🔜 ⚙️.

🚥 👆 🛠️ 🈸 ⏮️ 🗝 🚫-🔁 🛠️, & 💪 👷 ⏮️ 🌐 🚮 🔢, ⚫️ 💪 👑 🧰.

✋️ 🚥 👆 💪 🔀 🔢, 🐕‍🦺 🌖 🌘 1️⃣ 🔁 💽, 👷 ⏮️ 🔁 🛠️ (💖 FastAPI), ♒️, 👆 🔜 💪 🚮 🏗 ➕ 📟 🔐 👈 🔢.

👐, ⚫️ 💪 ⚫️, & 📥 👆 🔜 👀 ⚫️❔ ⚫️❔ 📟 👆 ✔️ 🚮 💪 ⚙️ 🏒 ⏮️ FastAPI.

📡 ℹ

👆 💪 ✍ 🌅 🔃 🏒 🧍 🔃 🔁 🐍 🩺, ❔, 🇵🇷.

🎏 📱

👥 🔜 ✍ 🎏 🈸 🇸🇲 🔰 (🗄 (🔗) 💽).

🌅 📟 🤙 🎏.

, 👥 🔜 🎯 🕴 🔛 🔺.

📁 📊

➡️ 💬 👆 ✔️ 📁 📛 my_super_project 👈 🔌 🎧-📁 🤙 sql_app ⏮️ 📊 💖 👉:

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    └── schemas.py

👉 🌖 🎏 📊 👥 ✔️ 🇸🇲 🔰.

🔜 ➡️ 👀 ⚫️❔ 🔠 📁/🕹 🔨.

✍ 🏒 🍕

➡️ 🔗 📁 sql_app/database.py.

🐩 🏒 📟

➡️ 🥇 ✅ 🌐 😐 🏒 📟, ✍ 🏒 💽:

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

Tip

✔️ 🤯 👈 🚥 👆 💚 ⚙️ 🎏 💽, 💖 ✳, 👆 🚫 🚫 🔀 🎻. 👆 🔜 💪 ⚙️ 🎏 🏒 💽 🎓.

🗒

❌:

check_same_thread=False

🌓 1️⃣ 🇸🇲 🔰:

connect_args={"check_same_thread": False}

...⚫️ 💪 🕴 SQLite.

📡 ℹ

⚫️❔ 🎏 📡 ℹ 🗄 (🔗) 💽 ✔.

⚒ 🏒 🔁-🔗 PeeweeConnectionState

👑 ❔ ⏮️ 🏒 & FastAPI 👈 🏒 ⚓️ 🙇 🔛 🐍 threading.local, & ⚫️ 🚫 ✔️ 🎯 🌌 🔐 ⚫️ ⚖️ ➡️ 👆 🍵 🔗/🎉 🔗 (🔨 🇸🇲 🔰).

& threading.local 🚫 🔗 ⏮️ 🆕 🔁 ⚒ 🏛 🐍.

📡 ℹ

threading.local ⚙️ ✔️ "🎱" 🔢 👈 ✔️ 🎏 💲 🔠 🧵.

👉 ⚠ 🗝 🛠️ 🏗 ✔️ 1️⃣ 👁 🧵 📍 📨, 🙅‍♂ 🌖, 🙅‍♂ 🌘.

⚙️ 👉, 🔠 📨 🔜 ✔️ 🚮 👍 💽 🔗/🎉, ❔ ☑ 🏁 🥅.

✋️ FastAPI, ⚙️ 🆕 🔁 ⚒, 💪 🍵 🌅 🌘 1️⃣ 📨 🔛 🎏 🧵. & 🎏 🕰, 👁 📨, ⚫️ 💪 🏃 💗 👜 🎏 🧵 (🧵), ⚓️ 🔛 🚥 👆 ⚙️ async def ⚖️ 😐 def. 👉 ⚫️❔ 🤝 🌐 🎭 📈 FastAPI.

✋️ 🐍 3️⃣.7️⃣ & 🔛 🚚 🌖 🏧 🎛 threading.local, 👈 💪 ⚙️ 🥉 🌐❔ threading.local 🔜 ⚙️, ✋️ 🔗 ⏮️ 🆕 🔁 ⚒.

👥 🔜 ⚙️ 👈. ⚫️ 🤙 contextvars.

👥 🔜 🔐 🔗 🍕 🏒 👈 ⚙️ threading.local & ❎ 👫 ⏮️ contextvars, ⏮️ 🔗 ℹ.

👉 5️⃣📆 😑 🍖 🏗 (& ⚫️ 🤙), 👆 🚫 🤙 💪 🍕 🤔 ❔ ⚫️ 👷 ⚙️ ⚫️.

👥 🔜 ✍ PeeweeConnectionState:

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

👉 🎓 😖 ⚪️➡️ 🎁 🔗 🎓 ⚙️ 🏒.

⚫️ ✔️ 🌐 ⚛ ⚒ 🏒 ⚙️ contextvars ↩️ threading.local.

contextvars 👷 🍖 🎏 🌘 threading.local. ✋️ 🎂 🏒 🔗 📟 🤔 👈 👉 🎓 👷 ⏮️ threading.local.

, 👥 💪 ➕ 🎱 ⚒ ⚫️ 👷 🚥 ⚫️ ⚙️ threading.local. __init__, __setattr__, & __getattr__ 🛠️ 🌐 ✔ 🎱 👉 ⚙️ 🏒 🍵 🤔 👈 ⚫️ 🔜 🔗 ⏮️ FastAPI.

Tip

👉 🔜 ⚒ 🏒 🎭 ☑ 🕐❔ ⚙️ ⏮️ FastAPI. 🚫 🎲 📂 ⚖️ 📪 🔗 👈 ➖ ⚙️, 🏗 ❌, ♒️.

✋️ ⚫️ 🚫 🤝 🏒 🔁 💎-🏋️. 👆 🔜 ⚙️ 😐 def 🔢 & 🚫 async def.

⚙️ 🛃 PeeweeConnectionState 🎓

🔜, 📁 ._state 🔗 🔢 🏒 💽 db 🎚 ⚙️ 🆕 PeeweeConnectionState:

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

Tip

⚒ 💭 👆 📁 db._state ⏮️ 🏗 db.

Tip

👆 🔜 🎏 🙆 🎏 🏒 💽, 🔌 PostgresqlDatabase, MySQLDatabase, ♒️.

✍ 💽 🏷

➡️ 🔜 👀 📁 sql_app/models.py.

✍ 🏒 🏷 👆 💽

🔜 ✍ 🏒 🏷 (🎓) User & Item.

👉 🎏 👆 🔜 🚥 👆 ⏩ 🏒 🔰 & ℹ 🏷 ✔️ 🎏 💽 🇸🇲 🔰.

Tip

🏒 ⚙️ ⚖ "🏷" 🔗 👉 🎓 & 👐 👈 🔗 ⏮️ 💽.

✋️ Pydantic ⚙️ ⚖ "🏷" 🔗 🕳 🎏, 💽 🔬, 🛠️, & 🧾 🎓 & 👐.

🗄 db ⚪️➡️ database (📁 database.py ⚪️➡️ 🔛) & ⚙️ ⚫️ 📥.

import peewee

from .database import db


class User(peewee.Model):
    email = peewee.CharField(unique=True, index=True)
    hashed_password = peewee.CharField()
    is_active = peewee.BooleanField(default=True)

    class Meta:
        database = db


class Item(peewee.Model):
    title = peewee.CharField(index=True)
    description = peewee.CharField(index=True)
    owner = peewee.ForeignKeyField(User, backref="items")

    class Meta:
        database = db

Tip

🏒 ✍ 📚 🎱 🔢.

⚫️ 🔜 🔁 🚮 id 🔢 🔢 👑 🔑.

⚫️ 🔜 ⚒ 📛 🏓 ⚓️ 🔛 🎓 📛.

Item, ⚫️ 🔜 ✍ 🔢 owner_id ⏮️ 🔢 🆔 User. ✋️ 👥 🚫 📣 ⚫️ 🙆.

✍ Pydantic 🏷

🔜 ➡️ ✅ 📁 sql_app/schemas.py.

Tip

❎ 😨 🖖 🏒 🏷 & Pydantic 🏷, 👥 🔜 ✔️ 📁 models.py ⏮️ 🏒 🏷, & 📁 schemas.py ⏮️ Pydantic 🏷.

👫 Pydantic 🏷 🔬 🌅 ⚖️ 🌘 "🔗" (☑ 📊 💠).

👉 🔜 ℹ 👥 ❎ 😨 ⏪ ⚙️ 👯‍♂️.

✍ Pydantic 🏷 / 🔗

✍ 🌐 🎏 Pydantic 🏷 🇸🇲 🔰:

from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict

Tip

📥 👥 🏗 🏷 ⏮️ id.

👥 🚫 🎯 ✔ id 🔢 🏒 🏷, ✋️ 🏒 🚮 1️⃣ 🔁.

👥 ❎ 🎱 owner_id 🔢 Item.

✍ PeeweeGetterDict Pydantic 🏷 / 🔗

🕐❔ 👆 🔐 💛 🏒 🎚, 💖 some_user.items, 🏒 🚫 🚚 list Item.

⚫️ 🚚 🎁 🛃 🎚 🎓 ModelSelect.

⚫️ 💪 ✍ list 🚮 🏬 ⏮️ list(some_user.items).

✋️ 🎚 ⚫️ 🚫 list. & ⚫️ 🚫 ☑ 🐍 🚂. ↩️ 👉, Pydantic 🚫 💭 🔢 ❔ 🗜 ⚫️ list Pydantic 🏷 / 🔗.

✋️ ⏮️ ⏬ Pydantic ✔ 🚚 🛃 🎓 👈 😖 ⚪️➡️ pydantic.utils.GetterDict, 🚚 🛠️ ⚙️ 🕐❔ ⚙️ orm_mode = True 🗃 💲 🐜 🏷 🔢.

👥 🔜 ✍ 🛃 PeeweeGetterDict 🎓 & ⚙️ ⚫️ 🌐 🎏 Pydantic 🏷 / 🔗 👈 ⚙️ orm_mode:

from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict

📥 👥 ✅ 🚥 🔢 👈 ➖ 🔐 (✅ .items some_user.items) 👐 peewee.ModelSelect.

& 🚥 👈 💼, 📨 list ⏮️ ⚫️.

& ⤴️ 👥 ⚙️ ⚫️ Pydantic 🏷 / 🔗 👈 ⚙️ orm_mode = True, ⏮️ 📳 🔢 getter_dict = PeeweeGetterDict.

Tip

👥 🕴 💪 ✍ 1️⃣ PeeweeGetterDict 🎓, & 👥 💪 ⚙️ ⚫️ 🌐 Pydantic 🏷 / 🔗.

💩 🇨🇻

🔜 ➡️ 👀 📁 sql_app/crud.py.

✍ 🌐 💩 🇨🇻

✍ 🌐 🎏 💩 🇨🇻 🇸🇲 🔰, 🌐 📟 📶 🎏:

from . import models, schemas


def get_user(user_id: int):
    return models.User.filter(models.User.id == user_id).first()


def get_user_by_email(email: str):
    return models.User.filter(models.User.email == email).first()


def get_users(skip: int = 0, limit: int = 100):
    return list(models.User.select().offset(skip).limit(limit))


def create_user(user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db_user.save()
    return db_user


def get_items(skip: int = 0, limit: int = 100):
    return list(models.Item.select().offset(skip).limit(limit))


def create_user_item(item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db_item.save()
    return db_item

📤 🔺 ⏮️ 📟 🇸🇲 🔰.

👥 🚫 🚶‍♀️ db 🔢 🤭. ↩️ 👥 ⚙️ 🏷 🔗. 👉 ↩️ db 🎚 🌐 🎚, 👈 🔌 🌐 🔗 ⚛. 👈 ⚫️❔ 👥 ✔️ 🌐 contextvars ℹ 🔛.

🆖, 🕐❔ 🛬 📚 🎚, 💖 get_users, 👥 🔗 🤙 list, 💖:

list(models.User.select())

👉 🎏 🤔 👈 👥 ✔️ ✍ 🛃 PeeweeGetterDict. ✋️ 🛬 🕳 👈 ⏪ list ↩️ peewee.ModelSelect response_model ➡ 🛠️ ⏮️ List[models.User] (👈 👥 🔜 👀 ⏪) 🔜 👷 ☑.

👑 FastAPI 📱

& 🔜 📁 sql_app/main.py ➡️ 🛠️ & ⚙️ 🌐 🎏 🍕 👥 ✍ ⏭.

✍ 💽 🏓

📶 🙃 🌌 ✍ 💽 🏓:

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

✍ 🔗

✍ 🔗 👈 🔜 🔗 💽 ▶️️ ▶️ 📨 & 🔌 ⚫️ 🔚:

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

📥 👥 ✔️ 🛁 yield ↩️ 👥 🤙 🚫 ⚙️ 💽 🎚 🔗.

⚫️ 🔗 💽 & ♻ 🔗 💽 🔗 🔢 👈 🔬 🔠 📨 (⚙️ contextvars 🎱 ⚪️➡️ 🔛).

↩️ 💽 🔗 ⚠ 👤/🅾 🚧, 👉 🔗 ✍ ⏮️ 😐 def 🔢.

& ⤴️, 🔠 ➡ 🛠️ 🔢 👈 💪 🔐 💽 👥 🚮 ⚫️ 🔗.

✋️ 👥 🚫 ⚙️ 💲 👐 👉 🔗 (⚫️ 🤙 🚫 🤝 🙆 💲, ⚫️ ✔️ 🛁 yield). , 👥 🚫 🚮 ⚫️ ➡ 🛠️ 🔢 ✋️ ➡ 🛠️ 👨‍🎨 dependencies 🔢:

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

🔑 🔢 🎧-🔗

🌐 contextvars 🍕 👷, 👥 💪 ⚒ 💭 👥 ✔️ 🔬 💲 ContextVar 🔠 📨 👈 ⚙️ 💽, & 👈 💲 🔜 ⚙️ 💽 🇵🇸 (🔗, 💵, ♒️) 🎂 📨.

👈, 👥 💪 ✍ ➕1️⃣ async 🔗 reset_db_state() 👈 ⚙️ 🎧-🔗 get_db(). ⚫️ 🔜 ⚒ 💲 🔑 🔢 (⏮️ 🔢 dict) 👈 🔜 ⚙️ 💽 🇵🇸 🎂 📨. & ⤴️ 🔗 get_db() 🔜 🏪 ⚫️ 💽 🇵🇸 (🔗, 💵, ♒️).

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

⏭ 📨, 👥 🔜 ⏲ 👈 🔑 🔢 🔄 async 🔗 reset_db_state() & ⤴️ ✍ 🆕 🔗 get_db() 🔗, 👈 🆕 📨 🔜 ✔️ 🚮 👍 💽 🇵🇸 (🔗, 💵, ♒️).

Tip

FastAPI 🔁 🛠️, 1️⃣ 📨 💪 ▶️ ➖ 🛠️, & ⏭ 🏁, ➕1️⃣ 📨 💪 📨 & ▶️ 🏭 👍, & ⚫️ 🌐 💪 🛠️ 🎏 🧵.

✋️ 🔑 🔢 🤔 👫 🔁 ⚒,, 🏒 💽 🇵🇸 ⚒ async 🔗 reset_db_state() 🔜 🚧 🚮 👍 💽 🎂 🎂 📨.

& 🎏 🕰, 🎏 🛠️ 📨 🔜 ✔️ 🚮 👍 💽 🇵🇸 👈 🔜 🔬 🎂 📨.

🏒 🗳

🚥 👆 ⚙️ 🏒 🗳, ☑ 💽 db.obj.

, 👆 🔜 ⏲ ⚫️ ⏮️:

async def reset_db_state():
    database.db.obj._state._state.set(db_state_default.copy())
    database.db.obj._state.reset()

✍ 👆 FastAPI ➡ 🛠️

🔜, 😒, 📥 🐩 FastAPI ➡ 🛠️ 📟.

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

🔃 def 🆚 async def

🎏 ⏮️ 🇸🇲, 👥 🚫 🔨 🕳 💖:

user = await models.User.select().first()

...✋️ ↩️ 👥 ⚙️:

user = models.User.select().first()

, 🔄, 👥 🔜 📣 ➡ 🛠️ 🔢 & 🔗 🍵 async def, ⏮️ 😐 def,:

# Something goes here
def read_users(skip: int = 0, limit: int = 100):
    # Something goes here

🔬 🏒 ⏮️ 🔁

👉 🖼 🔌 ➕ ➡ 🛠️ 👈 🔬 📏 🏭 📨 ⏮️ time.sleep(sleep_time).

⚫️ 🔜 ✔️ 💽 🔗 📂 ▶️ & 🔜 ⌛ 🥈 ⏭ 🙇 🔙. & 🔠 🆕 📨 🔜 ⌛ 🕐 🥈 🌘.

👉 🔜 💪 ➡️ 👆 💯 👈 👆 📱 ⏮️ 🏒 & FastAPI 🎭 ☑ ⏮️ 🌐 💩 🔃 🧵.

🚥 👆 💚 ✅ ❔ 🏒 🔜 💔 👆 📱 🚥 ⚙️ 🍵 🛠️, 🚶 sql_app/database.py 📁 & 🏤 ⏸:

# db._state = PeeweeConnectionState()

& 📁 sql_app/main.py 📁, 🏤 💪 async 🔗 reset_db_state() & ❎ ⚫️ ⏮️ pass:

async def reset_db_state():
#     database.db._state._state.set(db_state_default.copy())
#     database.db._state.reset()
    pass

⤴️ 🏃 👆 📱 ⏮️ Uvicorn:

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

📂 👆 🖥 http://127.0.0.1:8000/docs & ✍ 👩‍❤‍👨 👩‍💻.

⤴️ 📂 1️⃣0️⃣ 📑 http://127.0.0.1:8000/docs#/default/read_🐌👩‍💻_slowusers = 🎏 🕰.

🚶 ➡ 🛠️ "🤚 /slowusers/" 🌐 📑. ⚙️ "🔄 ⚫️ 👅" 🔼 & 🛠️ 📨 🔠 📑, 1️⃣ ▶️️ ⏮️ 🎏.

📑 🔜 ⌛ 🍖 & ⤴️ 👫 🔜 🎦 Internal Server Error.

⚫️❔ 🔨

🥇 📑 🔜 ⚒ 👆 📱 ✍ 🔗 💽 & ⌛ 🥈 ⏭ 🙇 🔙 & 📪 💽 🔗.

⤴️, 📨 ⏭ 📑, 👆 📱 🔜 ⌛ 🕐 🥈 🌘, & 🔛.

👉 ⛓ 👈 ⚫️ 🔜 🔚 🆙 🏁 🏁 📑' 📨 ⏪ 🌘 ⏮️ 🕐.

⤴️ 1️⃣ 🏁 📨 👈 ⌛ 🌘 🥈 🔜 🔄 📂 💽 🔗, ✋️ 1️⃣ 📚 ⏮️ 📨 🎏 📑 🔜 🎲 🍵 🎏 🧵 🥇 🕐, ⚫️ 🔜 ✔️ 🎏 💽 🔗 👈 ⏪ 📂, & 🏒 🔜 🚮 ❌ & 👆 🔜 👀 ⚫️ 📶, & 📨 🔜 ✔️ Internal Server Error.

👉 🔜 🎲 🔨 🌅 🌘 1️⃣ 📚 📑.

🚥 👆 ✔️ 💗 👩‍💻 💬 👆 📱 ⚫️❔ 🎏 🕰, 👉 ⚫️❔ 💪 🔨.

& 👆 📱 ▶️ 🍵 🌅 & 🌖 👩‍💻 🎏 🕰, ⌛ 🕰 👁 📨 💪 📏 & 📏 ⏲ ❌.

🔧 🏒 ⏮️ FastAPI

🔜 🚶 🔙 📁 sql_app/database.py, & ✍ ⏸:

db._state = PeeweeConnectionState()

& 📁 sql_app/main.py 📁, ✍ 💪 async 🔗 reset_db_state():

async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()

❎ 👆 🏃‍♂ 📱 & ▶️ ⚫️ 🔄.

🔁 🎏 🛠️ ⏮️ 1️⃣0️⃣ 📑. 👉 🕰 🌐 👫 🔜 ⌛ & 👆 🔜 🤚 🌐 🏁 🍵 ❌.

...👆 🔧 ⚫️ ❗

📄 🌐 📁

💭 👆 🔜 ✔️ 📁 📛 my_super_project (⚖️ 👐 👆 💚) 👈 🔌 🎧-📁 🤙 sql_app.

sql_app 🔜 ✔️ 📄 📁:

  • sql_app/__init__.py: 🛁 📁.

  • sql_app/database.py:

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()
  • sql_app/models.py:
import peewee

from .database import db


class User(peewee.Model):
    email = peewee.CharField(unique=True, index=True)
    hashed_password = peewee.CharField()
    is_active = peewee.BooleanField(default=True)

    class Meta:
        database = db


class Item(peewee.Model):
    title = peewee.CharField(index=True)
    description = peewee.CharField(index=True)
    owner = peewee.ForeignKeyField(User, backref="items")

    class Meta:
        database = db
  • sql_app/schemas.py:
from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict
  • sql_app/crud.py:
from . import models, schemas


def get_user(user_id: int):
    return models.User.filter(models.User.id == user_id).first()


def get_user_by_email(email: str):
    return models.User.filter(models.User.email == email).first()


def get_users(skip: int = 0, limit: int = 100):
    return list(models.User.select().offset(skip).limit(limit))


def create_user(user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db_user.save()
    return db_user


def get_items(skip: int = 0, limit: int = 100):
    return list(models.Item.select().offset(skip).limit(limit))


def create_user_item(item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db_item.save()
    return db_item
  • sql_app/main.py:
import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

📡 ℹ

Warning

👉 📶 📡 ℹ 👈 👆 🎲 🚫 💪.

⚠

🏒 ⚙️ threading.local 🔢 🏪 ⚫️ 💽 "🇵🇸" 💽 (🔗, 💵, ♒️).

threading.local ✍ 💲 🌟 ⏮️ 🧵, ✋️ 🔁 🛠️ 🔜 🏃 🌐 📟 (✅ 🔠 📨) 🎏 🧵, & 🎲 🚫 ✔.

🔛 🔝 👈, 🔁 🛠️ 💪 🏃 🔁 📟 🧵 (⚙️ asyncio.run_in_executor), ✋️ 🔗 🎏 📨.

👉 ⛓ 👈, ⏮️ 🏒 ⏮️ 🛠️, 💗 📋 💪 ⚙️ 🎏 threading.local 🔢 & 🔚 🆙 🤝 🎏 🔗 & 💽 (👈 👫 🚫🔜 🚫), & 🎏 🕰, 🚥 👫 🛠️ 🔁 👤/🅾-🚧 📟 🧵 (⏮️ 😐 def 🔢 FastAPI, ➡ 🛠️ & 🔗), 👈 📟 🏆 🚫 ✔️ 🔐 💽 🇵🇸 🔢, ⏪ ⚫️ 🍕 🎏 📨 & ⚫️ 🔜 💪 🤚 🔐 🎏 💽 🇵🇸.

🔑 🔢

🐍 3️⃣.7️⃣ ✔️ contextvars 👈 💪 ✍ 🇧🇿 🔢 📶 🎏 threading.local, ✋️ 🔗 👫 🔁 ⚒.

📤 📚 👜 ✔️ 🤯.

ContextVar ✔️ ✍ 🔝 🕹, 💖:

some_var = ContextVar("some_var", default="default value")

⚒ 💲 ⚙️ ⏮️ "🔑" (✅ ⏮️ 📨) ⚙️:

some_var.set("new value")

🤚 💲 🙆 🔘 🔑 (✅ 🙆 🍕 🚚 ⏮️ 📨) ⚙️:

some_var.get()

⚒ 🔑 🔢 async 🔗 reset_db_state()

🚥 🍕 🔁 📟 ⚒ 💲 ⏮️ some_var.set("updated in function") (✅ 💖 async 🔗), 🎂 📟 ⚫️ & 📟 👈 🚶 ⏮️ (✅ 📟 🔘 async 🔢 🤙 ⏮️ await) 🔜 👀 👈 🆕 💲.

, 👆 💼, 🚥 👥 ⚒ 🏒 🇵🇸 🔢 (⏮️ 🔢 dict) async 🔗, 🌐 🎂 🔗 📟 👆 📱 🔜 👀 👉 💲 & 🔜 💪 ♻ ⚫️ 🎂 📨.

& 🔑 🔢 🔜 ⚒ 🔄 ⏭ 📨, 🚥 👫 🛠️.

⚒ 💽 🇵🇸 🔗 get_db()

get_db() 😐 def 🔢, FastAPI 🔜 ⚒ ⚫️ 🏃 🧵, ⏮️ 📁 "🔑", 🧑‍🤝‍🧑 🎏 💲 🔑 🔢 ( dict ⏮️ ⏲ 💽 🇵🇸). ⤴️ ⚫️ 💪 🚮 💽 🇵🇸 👈 dict, 💖 🔗, ♒️.

✋️ 🚥 💲 🔑 🔢 (🔢 dict) ⚒ 👈 😐 def 🔢, ⚫️ 🔜 ✍ 🆕 💲 👈 🔜 🚧 🕴 👈 🧵 🧵, & 🎂 📟 (💖 ➡ 🛠️ 🔢) 🚫🔜 ✔️ 🔐 ⚫️. get_db() 👥 💪 🕴 ⚒ 💲 dict, ✋️ 🚫 🎂 dict ⚫️.

, 👥 💪 ✔️ async 🔗 reset_db_state() ⚒ dict 🔑 🔢. 👈 🌌, 🌐 📟 ✔️ 🔐 🎏 dict 💽 🇵🇸 👁 📨.

🔗 & 🔌 🔗 get_db()

⤴️ ⏭ ❔ 🔜, ⚫️❔ 🚫 🔗 & 🔌 💽 async 🔗 ⚫️, ↩️ get_db()❓

async 🔗 ✔️ async 🔑 🔢 🛡 🎂 📨, ✋️ 🏗 & 📪 💽 🔗 ⚠ 🚧, ⚫️ 💪 📉 🎭 🚥 ⚫️ 📤.

👥 💪 😐 def 🔗 get_db().