paint-brush
Explore How to Effectively Use JWT With FastAPIby@aanchlia
3,058 reads
3,058 reads

Explore How to Effectively Use JWT With FastAPI

by Ankit AnchliaAugust 29th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In this article we talk about how to implement JWT with FastAPI. There are also code snippets for easy understanding.
featured image - Explore How to Effectively Use JWT With FastAPI
Ankit Anchlia HackerNoon profile picture

FastAPI is a modern Python web framework for building APIs. It is fast, easy to use, and enables the creation of robust and scalable APIs.


Some of its features are:


  • Uses Starlette and Pydantic under the hood for blazing-fast performance.

  • Minimizes boilerplate with a simple declaration of routes and inputs.

  • Great editor support and easy-to-remember syntax.

  • It automatically generates OpenAPI schemas and docs.


If you want to learn more, check out the FastAPI docs. In this article, we will talk about how FastAPI endpoints can be secured using JWT.



JWT(JSON Web Token) provides a very secure way to authenticate endpoints. Some of its features are:


  • JSON-encoded access tokens that are cryptographically signed.

  • Contains claims like issuer, expiry, subject, etc.

  • Verifiable as they are signed using a secret key.

  • Useful for securely transmitting information between parties.


If you want to learn more about JWT and see how they work, check out jwt.io


After covering these basics, let’s get started with the coding part.


Install fastapi, uvicorn, python-jose, and passlib.


$ pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]


Uvicorn is an ASGI web server for Python. It is the recommended server for FastAPI.


Python-jose to generate and verify the JWT tokens. You can also use PyJWT.


Passlib handles password hashes.


Import necessary packages:


from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated


Define test db and secret.


test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "[email protected]",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30  


This SECRET_KEY will be used to sign JWT. Don’t use this key; generate a new one. To generate a new key, run this in the terminal.


$ openssl rand -hex 32


Algorithm will be HS256(HMAC with SHA-256).


Create models:


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None


class UserInDB(User):
    hashed_password: str


Before we move ahead, let’s first talk about the password workflow in Layman’s terms.


  • A user will enter a username and password and click login.


  • The client will make an API call with that username and password, and the backend will verify whether the username exists and the password matches with the one in DB.


  • The backend will generate a token (jwt for this article) with an expiry date and return it to the client.


  • After login, the user will use that token in the Authorization header to make subsequent API calls.


Now, create passlib’s CryptContext instance and oauth scheme.


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")


When we create an instance of the OAuth2PasswordBearer class we pass in the tokenUrl parameter. This parameter contains the URL that the client (the frontend running in the user's browser) will use to send the username and password in order to get a token.


FastAPI Security


Here, tokenUrl is a relative URL.


Now, create the login endpoint. The client will call this endpoint to authenticate.


@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""
    
    user = authenticate_user(test_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}


def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


Here, we are using OAuth2PasswordRequestForm, which is a class dependency provided by FastAPI. It has a form body with a username, password, and some other fields that we don’t need for this article.


Let’s go through the rest of the code.

  • The client calls the login method, which takes the username and password and tries to find the user in DB, then it matches the password with the one in db using passlib.


  • After verifying the request, it creates a jwt token with username and expiry, which it returns to the user.


The create_access_token method creates an encoded jwt that uses SECRET_KEY, ALGORITHM, and to_encode data, which has a username and expiry to generate the token.


Let’s try to fetch the current user after logging in.


@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""

    return current_user


This endpoint would return current user data.


It all looks good so far, but we need a way to test it.


Run this in the terminal:

$ uvicorn main:app --reload


And head to http://127.0.0.1:8000/docs


You should see something like this:


Swagger UI


Click on Authorize, and a pop-up will open.


Authorization pop up


Add testuser and password as password and click Authorize. It would authorize the user. After this, try the /user/current/ endpoint to fetch the current user data. Click on Try it Out to run it. You can open the Chrome dev tools, and check that it only sent the bearer token this time.


If you have made it so far, Congratulations!


Here’s the full code below:


from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30

test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "[email protected]",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Union[str, None] = None

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None

class UserInDB(User):
    hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    # use jwt to create a token   
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            return status.HTTP_401_UNAUTHORIZED
        token_data = TokenData(username=username)
    except JWTError:
        return JWTError
    user = get_user(test_db, username=token_data.username)
    return user

@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""

    user = authenticate_user(test_db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""
    
    return current_user


Feel free to reach out in comments for any questions or suggestions.


Thank you!



[1] FastAPI documentation — FastAPI Security

https://fastapi.tiangolo.com/tutorial/security/