FastAPI is a modern Python web framework for building APIs. It is fast, easy to use, and enables the creation of robust and scalable APIs.
Some of its features are:
Uses Starlette and Pydantic under the hood for blazing-fast performance.
Minimizes boilerplate with a simple declaration of routes and inputs.
Great editor support and easy-to-remember syntax.
It automatically generates OpenAPI schemas and docs.
If you want to learn more, check out the FastAPI docs. In this article, we will talk about how FastAPI endpoints can be secured using JWT.
JWT(JSON Web Token) provides a very secure way to authenticate endpoints. Some of its features are:
JSON-encoded access tokens that are cryptographically signed.
Contains claims like issuer, expiry, subject, etc.
Verifiable as they are signed using a secret key.
Useful for securely transmitting information between parties.
If you want to learn more about JWT and see how they work, check out jwt.io
After covering these basics, let’s get started with the coding part.
Install fastapi, uvicorn, python-jose, and passlib.
$ pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]
Uvicorn is an ASGI web server for Python. It is the recommended server for FastAPI.
Python-jose to generate and verify the JWT tokens. You can also use PyJWT.
Passlib handles password hashes.
Import necessary packages:
from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated
Define test db and secret.
test_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "[email protected]",
"hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
}
}
# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30
This SECRET_KEY
will be used to sign JWT. Don’t use this key; generate a new one. To generate a new key, run this in the terminal.
$ openssl rand -hex 32
Algorithm will be HS256
(HMAC with SHA-256).
Create models:
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
class UserInDB(User):
hashed_password: str
Before we move ahead, let’s first talk about the password workflow in Layman’s terms.
Now, create passlib’s CryptContext instance and oauth scheme.
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
When we create an instance of the
OAuth2PasswordBearer
class we pass in thetokenUrl
parameter. This parameter contains the URL that the client (the frontend running in the user's browser) will use to send theusername
andpassword
in order to get a token.
Here, tokenUrl
is a relative URL.
Now, create the login endpoint. The client will call this endpoint to authenticate.
@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
"""Login method to authenticate"""
user = authenticate_user(test_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Here, we are using OAuth2PasswordRequestForm
, which is a class dependency provided by FastAPI. It has a form body with a username, password, and some other fields that we don’t need for this article.
Let’s go through the rest of the code.
After verifying the request, it creates a jwt token with username and expiry, which it returns to the user.
The create_access_token
method creates an encoded jwt that uses SECRET_KEY
, ALGORITHM
, and to_encode
data, which has a username and expiry to generate the token.
Let’s try to fetch the current user after logging in.
@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
"""Fetch current user"""
return current_user
This endpoint would return current user data.
It all looks good so far, but we need a way to test it.
Run this in the terminal:
$ uvicorn main:app --reload
And head to http://127.0.0.1:8000/docs
You should see something like this:
Click on Authorize, and a pop-up will open.
Add testuser
and password
as password and click Authorize. It would authorize the user. After this, try the /user/current/
endpoint to fetch the current user data. Click on Try it Out
to run it. You can open the Chrome dev tools, and check that it only sent the bearer token this time.
If you have made it so far, Congratulations!
Here’s the full code below:
from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated
# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30
test_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "[email protected]",
"hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
# use jwt to create a token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return status.HTTP_401_UNAUTHORIZED
token_data = TokenData(username=username)
except JWTError:
return JWTError
user = get_user(test_db, username=token_data.username)
return user
@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
"""Login method to authenticate"""
user = authenticate_user(test_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
"""Fetch current user"""
return current_user
Feel free to reach out in comments for any questions or suggestions.
Thank you!
[1] FastAPI documentation — FastAPI Security
https://fastapi.tiangolo.com/tutorial/security/