We are going to use FastAPI security utilities to get the username and password.
OAuth2 specifies that when using the "password flow" (that we are using) the client/user must send a username and password fields as form data.
And the spec says that the fields have to be named like that. So user-name or email wouldn't work.
But don't worry, you can show it as you wish to your final users in the frontend.
And your database models can use any other names you want.
But for the login path operation, we need to use these names to be compatible with the spec (and be able to, for example, use the integrated API documentation system).
The spec also states that the username and password must be sent as form data (so, no JSON here).
First, import OAuth2PasswordRequestForm, and use it as a dependency with Depends in the path operation for /token:
fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants
fromtypingimportAnnotated,UnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfromtyping_extensionsimportAnnotatedfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
OAuth2PasswordRequestForm is a class dependency that declares a form body with:
The username.
The password.
An optional scope field as a big string, composed of strings separated by spaces.
An optional grant_type.
Tip
The OAuth2 spec actually requires a field grant_type with a fixed value of password, but OAuth2PasswordRequestForm doesn't enforce it.
If you need to enforce it, use OAuth2PasswordRequestFormStrict instead of OAuth2PasswordRequestForm.
An optional client_id (we don't need it for our example).
An optional client_secret (we don't need it for our example).
Info
The OAuth2PasswordRequestForm is not a special class for FastAPI as is OAuth2PasswordBearer.
OAuth2PasswordBearer makes FastAPI know that it is a security scheme. So it is added that way to OpenAPI.
But OAuth2PasswordRequestForm is just a class dependency that you could have written yourself, or you could have declared Form parameters directly.
But as it's a common use case, it is provided by FastAPI directly, just to make it easier.
The instance of the dependency class OAuth2PasswordRequestForm won't have an attribute scope with the long string separated by spaces, instead, it will have a scopes attribute with the actual list of strings for each scope sent.
We are not using scopes in this example, but the functionality is there if you need it.
Now, get the user data from the (fake) database, using the username from the form field.
If there is no such user, we return an error saying "Incorrect username or password".
For the error, we use the exception HTTPException:
fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants
fromtypingimportAnnotated,UnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfromtyping_extensionsimportAnnotatedfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
If your database is stolen, the thief won't have your users' plaintext passwords, only the hashes.
So, the thief won't be able to try to use those same passwords in another system (as many users use the same password everywhere, this would be dangerous).
fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants
fromtypingimportAnnotated,UnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfromtyping_extensionsimportAnnotatedfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
The response of the token endpoint must be a JSON object.
It should have a token_type. In our case, as we are using "Bearer" tokens, the token type should be "bearer".
And it should have an access_token, with a string containing our access token.
For this simple example, we are going to just be completely insecure and return the same username as the token.
Tip
In the next chapter, you will see a real secure implementation, with password hashing and JWT tokens.
But for now, let's focus on the specific details we need.
fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants
fromtypingimportAnnotated,UnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfromtyping_extensionsimportAnnotatedfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
By the spec, you should return a JSON with an access_token and a token_type, the same as in this example.
This is something that you have to do yourself in your code, and make sure you use those JSON keys.
It's almost the only thing that you have to remember to do correctly yourself, to be compliant with the specifications.
We want to get the current_useronly if this user is active.
So, we create an additional dependency get_current_active_user that in turn uses get_current_user as a dependency.
Both of these dependencies will just return an HTTP error if the user doesn't exist, or if is inactive.
So, in our endpoint, we will only get a user if the user exists, was correctly authenticated, and is active:
fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants
fromtypingimportAnnotated,UnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfromtyping_extensionsimportAnnotatedfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Tip
Prefer to use the Annotated version if possible.
fromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user
Info
The additional header WWW-Authenticate with value Bearer we are returning here is also part of the spec.
Any HTTP (error) status code 401 "UNAUTHORIZED" is supposed to also return a WWW-Authenticate header.
In the case of bearer tokens (our case), the value of that header should be Bearer.
You can actually skip that extra header and it would still work.
But it's provided here to be compliant with the specifications.
Also, there might be tools that expect and use it (now or in the future) and that might be useful for you or your users, now or in the future.