No está encriptado, por lo que cualquiera podría recuperar la información de los contenidos.
Pero está firmado. Así que, cuando recibes un token que has emitido, puedes verificar que realmente lo emitiste.
De esta manera, puedes crear un token con una expiración de, digamos, 1 semana. Y luego, cuando el usuario regresa al día siguiente con el token, sabes que el usuario todavía está registrado en tu sistema.
Después de una semana, el token estará expirado y el usuario no estará autorizado y tendrá que iniciar sesión nuevamente para obtener un nuevo token. Y si el usuario (o un tercero) intenta modificar el token para cambiar la expiración, podrás descubrirlo, porque las firmas no coincidirían.
Si quieres jugar con tokens JWT y ver cómo funcionan, revisa https://jwt.io.
Si tu base de datos es robada, el ladrón no tendrá las contraseñas en texto claro de tus usuarios, solo los hashes.
Por lo tanto, el ladrón no podrá intentar usar esa contraseña en otro sistema (como muchos usuarios usan la misma contraseña en todas partes, esto sería peligroso).
Con passlib, incluso podrías configurarlo para poder leer contraseñas creadas por Django, un plug-in de seguridad de Flask u otros muchos.
Así, podrías, por ejemplo, compartir los mismos datos de una aplicación de Django en una base de datos con una aplicación de FastAPI. O migrar gradualmente una aplicación de Django usando la misma base de datos.
Y tus usuarios podrían iniciar sesión desde tu aplicación Django o desde tu aplicación FastAPI, al mismo tiempo.
Importa las herramientas que necesitamos de passlib.
Crea un "contexto" de PassLib. Este es el que se usará para hacer el hash y verificar las contraseñas.
Consejo
El contexto de PassLib también tiene funcionalidad para usar diferentes algoritmos de hashing, incluidos los antiguos obsoletos solo para permitir verificarlos, etc.
Por ejemplo, podrías usarlo para leer y verificar contraseñas generadas por otro sistema (como Django) pero hacer hash de cualquier contraseña nueva con un algoritmo diferente como Bcrypt.
Y ser compatible con todos ellos al mismo tiempo.
Crea una función de utilidad para hacer el hash de una contraseña que venga del usuario.
Y otra utilidad para verificar si una contraseña recibida coincide con el hash almacenado.
Y otra más para autenticar y devolver un usuario.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Nota
Si revisas la nueva (falsa) base de datos fake_users_db, verás cómo se ve ahora la contraseña con hash: "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW".
Y copia el resultado a la variable SECRET_KEY (no uses la del ejemplo).
Crea una variable ALGORITHM con el algoritmo usado para firmar el token JWT y configúralo a "HS256".
Crea una variable para la expiración del token.
Define un Modelo de Pydantic que se usará en el endpoint de token para el response.
Crea una función de utilidad para generar un nuevo token de acceso.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Actualiza get_current_user para recibir el mismo token que antes, pero esta vez, usando tokens JWT.
Decodifica el token recibido, verifícalo y devuelve el usuario actual.
Si el token es inválido, devuelve un error HTTP de inmediato.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Crea un timedelta con el tiempo de expiración del token.
Crea un verdadero token de acceso JWT y devuélvelo.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
La especificación de JWT dice que hay una clave sub, con el sujeto del token.
Es opcional usarlo, pero ahí es donde pondrías la identificación del usuario, por lo que lo estamos usando aquí.
JWT podría ser usado para otras cosas aparte de identificar un usuario y permitirle realizar operaciones directamente en tu API.
Por ejemplo, podrías identificar un "coche" o un "artículo de blog".
Luego, podrías agregar permisos sobre esa entidad, como "conducir" (para el coche) o "editar" (para el blog).
Y luego, podrías darle ese token JWT a un usuario (o bot), y ellos podrían usarlo para realizar esas acciones (conducir el coche, o editar el artículo del blog) sin siquiera necesitar tener una cuenta, solo con el token JWT que tu API generó para eso.
Usando estas ideas, JWT puede ser utilizado para escenarios mucho más sofisticados.
En esos casos, varias de esas entidades podrían tener el mismo ID, digamos foo (un usuario foo, un coche foo, y un artículo del blog foo).
Entonces, para evitar colisiones de ID, cuando crees el token JWT para el usuario, podrías prefijar el valor de la clave sub, por ejemplo, con username:. Así, en este ejemplo, el valor de sub podría haber sido: username:johndoe.
Lo importante a tener en cuenta es que la clave sub debería tener un identificador único a lo largo de toda la aplicación, y debería ser un string.
Si abres las herramientas de desarrollador, podrías ver cómo los datos enviados solo incluyen el token, la contraseña solo se envía en la primera petición para autenticar al usuario y obtener ese token de acceso, pero no después:
Nota
Observa el header Authorization, con un valor que comienza con Bearer.
Con lo que has visto hasta ahora, puedes configurar una aplicación FastAPI segura usando estándares como OAuth2 y JWT.
En casi cualquier framework el manejo de la seguridad se convierte en un tema bastante complejo rápidamente.
Muchos paquetes que lo simplifican tienen que hacer muchos compromisos con el modelo de datos, la base de datos y las funcionalidades disponibles. Y algunos de estos paquetes que simplifican las cosas demasiado en realidad tienen fallos de seguridad en el fondo.
FastAPI no hace ningún compromiso con ninguna base de datos, modelo de datos o herramienta.
Te da toda la flexibilidad para elegir aquellas que se ajusten mejor a tu proyecto.
Y puedes usar directamente muchos paquetes bien mantenidos y ampliamente usados como passlib y PyJWT, porque FastAPI no requiere mecanismos complejos para integrar paquetes externos.
Pero te proporciona las herramientas para simplificar el proceso tanto como sea posible sin comprometer la flexibilidad, la robustez o la seguridad.
Y puedes usar e implementar protocolos seguros y estándar, como OAuth2 de una manera relativamente simple.
Puedes aprender más en la Guía de Usuario Avanzada sobre cómo usar "scopes" de OAuth2, para un sistema de permisos más detallado, siguiendo estos mismos estándares. OAuth2 con scopes es el mecanismo utilizado por muchos grandes proveedores de autenticación, como Facebook, Google, GitHub, Microsoft, Twitter, etc. para autorizar aplicaciones de terceros para interactuar con sus APIs en nombre de sus usuarios.