Puedes usar scopes de OAuth2 directamente con FastAPI, están integrados para funcionar de manera fluida.
Esto te permitiría tener un sistema de permisos más detallado, siguiendo el estándar de OAuth2, integrado en tu aplicación OpenAPI (y la documentación de la API).
OAuth2 con scopes es el mecanismo usado por muchos grandes proveedores de autenticación, como Facebook, Google, GitHub, Microsoft, Twitter, etc. Lo usan para proporcionar permisos específicos a usuarios y aplicaciones.
Cada vez que te "logueas con" Facebook, Google, GitHub, Microsoft, Twitter, esa aplicación está usando OAuth2 con scopes.
En esta sección verás cómo manejar autenticación y autorización con el mismo OAuth2 con scopes en tu aplicación de FastAPI.
Advertencia
Esta es una sección más o menos avanzada. Si estás comenzando, puedes saltarla.
No necesariamente necesitas scopes de OAuth2, y puedes manejar autenticación y autorización como quieras.
Pero OAuth2 con scopes se puede integrar muy bien en tu API (con OpenAPI) y en la documentación de tu API.
No obstante, tú aún impones esos scopes, o cualquier otro requisito de seguridad/autorización, como necesites, en tu código.
En muchos casos, OAuth2 con scopes puede ser un exceso.
Pero si sabes que lo necesitas, o tienes curiosidad, sigue leyendo.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
El primer cambio es que ahora estamos declarando el esquema de seguridad OAuth2 con dos scopes disponibles, me y items.
El parámetro scopes recibe un dict con cada scope como clave y la descripción como valor:
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Como ahora estamos declarando esos scopes, aparecerán en la documentación de la API cuando inicies sesión/autorices.
Y podrás seleccionar cuáles scopes quieres dar de acceso: me y items.
Este es el mismo mecanismo utilizado cuando das permisos al iniciar sesión con Facebook, Google, GitHub, etc:
Ahora, modifica la path operation del token para devolver los scopes solicitados.
Todavía estamos usando el mismo OAuth2PasswordRequestForm. Incluye una propiedad scopes con una list de str, con cada scope que recibió en el request.
Y devolvemos los scopes como parte del token JWT.
Peligro
Para simplificar, aquí solo estamos añadiendo los scopes recibidos directamente al token.
Pero en tu aplicación, por seguridad, deberías asegurarte de añadir solo los scopes que el usuario realmente puede tener, o los que has predefinido.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Declarar scopes en path operations y dependencias¶
Ahora declaramos que la path operation para /users/me/items/ requiere el scope items.
Para esto, importamos y usamos Security de fastapi.
Puedes usar Security para declarar dependencias (igual que Depends), pero Security también recibe un parámetro scopes con una lista de scopes (strings).
En este caso, pasamos una función de dependencia get_current_active_user a Security (de la misma manera que haríamos con Depends).
Pero también pasamos una list de scopes, en este caso con solo un scope: items (podría tener más).
Y la función de dependencia get_current_active_user también puede declarar sub-dependencias, no solo con Depends sino también con Security. Declarando su propia función de sub-dependencia (get_current_user), y más requisitos de scope.
En este caso, requiere el scope me (podría requerir más de un scope).
Nota
No necesariamente necesitas añadir diferentes scopes en diferentes lugares.
Lo estamos haciendo aquí para demostrar cómo FastAPI maneja scopes declarados en diferentes niveles.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Información Técnica
Security es en realidad una subclase de Depends, y tiene solo un parámetro extra que veremos más adelante.
Pero al usar Security en lugar de Depends, FastAPI sabrá que puede declarar scopes de seguridad, usarlos internamente y documentar la API con OpenAPI.
Pero cuando importas Query, Path, Depends, Security y otros de fastapi, en realidad son funciones que devuelven clases especiales.
Aquí es donde estamos usando el mismo esquema de OAuth2 que creamos antes, declarándolo como una dependencia: oauth2_scheme.
Porque esta función de dependencia no tiene ningún requisito de scope en sí, podemos usar Depends con oauth2_scheme, no tenemos que usar Security cuando no necesitamos especificar scopes de seguridad.
También declaramos un parámetro especial de tipo SecurityScopes, importado de fastapi.security.
Esta clase SecurityScopes es similar a Request (Request se usó para obtener el objeto request directamente).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
El parámetro security_scopes será del tipo SecurityScopes.
Tendrá una propiedad scopes con una lista que contiene todos los scopes requeridos por sí mismo y por todas las dependencias que lo usan como sub-dependencia. Eso significa, todos los "dependientes"... esto podría sonar confuso, se explica de nuevo más abajo.
El objeto security_scopes (de la clase SecurityScopes) también proporciona un atributo scope_str con un único string, que contiene esos scopes separados por espacios (lo vamos a usar).
Creamos una HTTPException que podemos reutilizar (raise) más tarde en varios puntos.
En esta excepción, incluimos los scopes requeridos (si los hay) como un string separado por espacios (usando scope_str). Ponemos ese string que contiene los scopes en el header WWW-Authenticate (esto es parte de la especificación).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Verificamos que obtenemos un username, y extraemos los scopes.
Y luego validamos esos datos con el modelo de Pydantic (capturando la excepción ValidationError), y si obtenemos un error leyendo el token JWT o validando los datos con Pydantic, lanzamos la HTTPException que creamos antes.
Para eso, actualizamos el modelo de Pydantic TokenData con una nueva propiedad scopes.
Al validar los datos con Pydantic podemos asegurarnos de que tenemos, por ejemplo, exactamente una list de str con los scopes y un str con el username.
En lugar de, por ejemplo, un dict, o algo más, ya que podría romper la aplicación en algún punto posterior, haciéndolo un riesgo de seguridad.
También verificamos que tenemos un usuario con ese username, y si no, lanzamos esa misma excepción que creamos antes.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Ahora verificamos que todos los scopes requeridos, por esta dependencia y todos los dependientes (incluyendo path operations), estén incluidos en los scopes proporcionados en el token recibido, de lo contrario, lanzamos una HTTPException.
Para esto, usamos security_scopes.scopes, que contiene una list con todos estos scopes como str.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Revisemos de nuevo este árbol de dependencias y los scopes.
Como la dependencia get_current_active_user tiene como sub-dependencia a get_current_user, el scope "me" declarado en get_current_active_user se incluirá en la lista de scopes requeridos en el security_scopes.scopes pasado a get_current_user.
La path operation en sí también declara un scope, "items", por lo que esto también estará en la lista de security_scopes.scopes pasado a get_current_user.
Así es como se ve la jerarquía de dependencias y scopes:
La path operationread_own_items tiene:
Scopes requeridos ["items"] con la dependencia:
get_current_active_user:
La función de dependencia get_current_active_user tiene:
Scopes requeridos ["me"] con la dependencia:
get_current_user:
La función de dependencia get_current_user tiene:
No requiere scopes por sí misma.
Una dependencia usando oauth2_scheme.
Un parámetro security_scopes de tipo SecurityScopes:
Este parámetro security_scopes tiene una propiedad scopes con una list que contiene todos estos scopes declarados arriba, por lo que:
security_scopes.scopes contendrá ["me", "items"] para la path operationread_own_items.
security_scopes.scopes contendrá ["me"] para la path operationread_users_me, porque está declarado en la dependencia get_current_active_user.
security_scopes.scopes contendrá [] (nada) para la path operationread_system_status, porque no declaró ningún Security con scopes, y su dependencia, get_current_user, tampoco declara ningún scopes.
Consejo
Lo importante y "mágico" aquí es que get_current_user tendrá una lista diferente de scopes para verificar para cada path operation.
Todo depende de los scopes declarados en cada path operation y cada dependencia en el árbol de dependencias para esa path operation específica.
Puedes usar SecurityScopes en cualquier punto, y en múltiples lugares, no tiene que ser en la dependencia "raíz".
Siempre tendrá los scopes de seguridad declarados en las dependencias Security actuales y todos los dependientes para esa específicapath operation y ese específico árbol de dependencias.
Debido a que SecurityScopes tendrá todos los scopes declarados por dependientes, puedes usarlo para verificar que un token tiene los scopes requeridos en una función de dependencia central, y luego declarar diferentes requisitos de scope en diferentes path operations.
Serán verificados independientemente para cada path operation.
Si abres la documentación de la API, puedes autenticarte y especificar qué scopes deseas autorizar.
Si no seleccionas ningún scope, estarás "autenticado", pero cuando intentes acceder a /users/me/ o /users/me/items/ obtendrás un error diciendo que no tienes suficientes permisos. Aún podrás acceder a /status/.
Y si seleccionas el scope me pero no el scope items, podrás acceder a /users/me/ pero no a /users/me/items/.
Eso es lo que pasaría a una aplicación de terceros que intentara acceder a una de estas path operations con un token proporcionado por un usuario, dependiendo de cuántos permisos el usuario otorgó a la aplicación.
En este ejemplo estamos usando el flujo de OAuth2 "password".
Esto es apropiado cuando estamos iniciando sesión en nuestra propia aplicación, probablemente con nuestro propio frontend.
Porque podemos confiar en ella para recibir el username y password, ya que la controlamos.
Pero si estás construyendo una aplicación OAuth2 a la que otros se conectarían (es decir, si estás construyendo un proveedor de autenticación equivalente a Facebook, Google, GitHub, etc.) deberías usar uno de los otros flujos.
El más común es el flujo implícito.
El más seguro es el flujo de código, pero es más complejo de implementar ya que requiere más pasos. Como es más complejo, muchos proveedores terminan sugiriendo el flujo implícito.
Nota
Es común que cada proveedor de autenticación nombre sus flujos de una manera diferente, para hacerlos parte de su marca.
Pero al final, están implementando el mismo estándar OAuth2.
FastAPI incluye utilidades para todos estos flujos de autenticación OAuth2 en fastapi.security.oauth2.
De la misma manera que puedes definir una list de Depends en el parámetro dependencies del decorador (como se explica en Dependencias en decoradores de path operation), también podrías usar Security con scopes allí.