Now let’s implement real security with JWT tokens and proper password hashing. This is production-ready authentication!
What We’ll Build
JWT Tokens Cryptographically signed tokens that can’t be forged
Password Hashing Secure password storage with industry-standard algorithms
Token Expiration Tokens that automatically expire after a set time
Secure Validation Proper token validation and user authentication
Installing Dependencies
We’ll use two additional packages:
pip install "pyjwt[crypto]" pwdlib[argon2]
PyJWT : For creating and validating JWT tokens
pwdlib : Modern password hashing library (supports Argon2, bcrypt, scrypt)
Argon2 is the current winner of the Password Hashing Competition and is recommended for new applications.
Configuration and Setup
First, let’s set up our security configuration:
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
from pwdlib import PasswordHash
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
# Secret key for JWT signing (generate with: openssl rand -hex 32)
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer( tokenUrl = "token" )
password_hash = PasswordHash.recommended()
Never commit your SECRET_KEY to version control! In production:
Store it in environment variables
Use a secrets management service
Generate a unique key for each environment
Generate a secure key with: openssl rand -hex 32
Password Hashing
Let’s create functions to hash and verify passwords:
password_hash = PasswordHash.recommended()
# Create a dummy hash for timing attack protection
DUMMY_HASH = password_hash.hash( "dummypassword" )
def verify_password ( plain_password : str , hashed_password : str ) -> bool :
return password_hash.verify(plain_password, hashed_password)
def get_password_hash ( password : str ) -> str :
return password_hash.hash(password)
The DUMMY_HASH is used to prevent timing attacks when a user doesn’t exist. We hash a dummy password so the timing is the same whether the user exists or not.
Database Models
class Token ( BaseModel ):
access_token: str
token_type: str
class TokenData ( BaseModel ):
username: str | None = None
class User ( BaseModel ):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB ( User ):
hashed_password: str
fake_users_db = {
"johndoe" : {
"username" : "johndoe" ,
"full_name" : "John Doe" ,
"email" : "[email protected] " ,
"hashed_password" : "$argon2id$v=19$m=65536,t=3,p=4$..." ,
"disabled" : False ,
}
}
In production, replace fake_users_db with a real database using SQLAlchemy, MongoDB, or your preferred database.
JWT Token Creation
Now let’s create JWT tokens:
def create_access_token ( data : dict , expires_delta : timedelta | None = None ):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else :
expire = datetime.now(timezone.utc) + timedelta( minutes = 15 )
to_encode.update({ "exp" : expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY , algorithm = ALGORITHM )
return encoded_jwt
JWT Token Structure
A JWT token contains three parts (separated by dots):
Example decoded JWT:
{
"sub" : "johndoe" ,
"exp" : 1677649200
}
sub (subject): The username
exp (expiration): Unix timestamp when the token expires
The signature ensures the token hasn’t been tampered with
JWT tokens are encoded , not encrypted . Anyone can decode them and read the payload. Never put sensitive data (like passwords) in JWT tokens!
User Authentication
Create functions to get and authenticate users:
def get_user ( db , username : str ):
if username in db:
user_dict = db[username]
return UserInDB( ** user_dict)
def authenticate_user ( fake_db , username : str , password : str ):
user = get_user(fake_db, username)
if not user:
# Hash a dummy password to prevent timing attacks
verify_password(password, DUMMY_HASH )
return False
if not verify_password(password, user.hashed_password):
return False
return user
Timing Attack Protection : We always hash a password (even if the user doesn’t exist) to prevent attackers from determining valid usernames by measuring response times.
Get Current User (with JWT)
Now let’s implement proper token validation:
async def get_current_user ( token : str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code = status. HTTP_401_UNAUTHORIZED ,
detail = "Could not validate credentials" ,
headers = { "WWW-Authenticate" : "Bearer" },
)
try :
# Decode and verify the JWT token
payload = jwt.decode(token, SECRET_KEY , algorithms = [ ALGORITHM ])
username: str = payload.get( "sub" )
if username is None :
raise credentials_exception
token_data = TokenData( username = username)
except InvalidTokenError:
raise credentials_exception
# Get the user from the database
user = get_user(fake_users_db, username = token_data.username)
if user is None :
raise credentials_exception
return user
async def get_current_active_user (
current_user : User = Depends(get_current_user)
):
if current_user.disabled:
raise HTTPException( status_code = 400 , detail = "Inactive user" )
return current_user
Extract the token
oauth2_scheme extracts the JWT from the Authorization header
Decode and verify
jwt.decode() verifies the signature and expiration, then decodes the payload
Extract username
Get the username from the sub (subject) claim
Get user from database
Look up the user in the database
Return user
Return the validated user object
Login Endpoint
Finally, create the login endpoint that issues JWT tokens:
@app.post ( "/token" )
async def login_for_access_token (
form_data : OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(
fake_users_db,
form_data.username,
form_data.password
)
if not user:
raise HTTPException(
status_code = status. HTTP_401_UNAUTHORIZED ,
detail = "Incorrect username or password" ,
headers = { "WWW-Authenticate" : "Bearer" },
)
access_token_expires = timedelta( minutes = ACCESS_TOKEN_EXPIRE_MINUTES )
access_token = create_access_token(
data = { "sub" : user.username},
expires_delta = access_token_expires
)
return Token( access_token = access_token, token_type = "bearer" )
Protected Endpoints
Now you can protect any endpoint:
@app.get ( "/users/me/" , response_model = User)
async def read_users_me (
current_user : User = Depends(get_current_active_user)
):
return current_user
@app.get ( "/users/me/items/" )
async def read_own_items (
current_user : User = Depends(get_current_active_user)
):
return [{ "item_id" : "Foo" , "owner" : current_user.username}]
Complete Working Example
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
from pwdlib import PasswordHash
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe" : {
"username" : "johndoe" ,
"full_name" : "John Doe" ,
"email" : "[email protected] " ,
"hashed_password" : "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc" ,
"disabled" : False ,
}
}
class Token ( BaseModel ):
access_token: str
token_type: str
class TokenData ( BaseModel ):
username: str | None = None
class User ( BaseModel ):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB ( User ):
hashed_password: str
password_hash = PasswordHash.recommended()
DUMMY_HASH = password_hash.hash( "dummypassword" )
oauth2_scheme = OAuth2PasswordBearer( tokenUrl = "token" )
app = FastAPI()
def verify_password ( plain_password , hashed_password ):
return password_hash.verify(plain_password, hashed_password)
def get_password_hash ( password ):
return password_hash.hash(password)
def get_user ( db , username : str ):
if username in db:
user_dict = db[username]
return UserInDB( ** user_dict)
def authenticate_user ( fake_db , username : str , password : str ):
user = get_user(fake_db, username)
if not user:
verify_password(password, DUMMY_HASH )
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token ( data : dict , expires_delta : timedelta | None = None ):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else :
expire = datetime.now(timezone.utc) + timedelta( minutes = 15 )
to_encode.update({ "exp" : expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY , algorithm = ALGORITHM )
return encoded_jwt
async def get_current_user ( token : str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code = status. HTTP_401_UNAUTHORIZED ,
detail = "Could not validate credentials" ,
headers = { "WWW-Authenticate" : "Bearer" },
)
try :
payload = jwt.decode(token, SECRET_KEY , algorithms = [ ALGORITHM ])
username = payload.get( "sub" )
if username is None :
raise credentials_exception
token_data = TokenData( username = username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username = token_data.username)
if user is None :
raise credentials_exception
return user
async def get_current_active_user (
current_user : User = Depends(get_current_user)
):
if current_user.disabled:
raise HTTPException( status_code = 400 , detail = "Inactive user" )
return current_user
@app.post ( "/token" )
async def login_for_access_token (
form_data : OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code = status. HTTP_401_UNAUTHORIZED ,
detail = "Incorrect username or password" ,
headers = { "WWW-Authenticate" : "Bearer" },
)
access_token_expires = timedelta( minutes = ACCESS_TOKEN_EXPIRE_MINUTES )
access_token = create_access_token(
data = { "sub" : user.username}, expires_delta = access_token_expires
)
return Token( access_token = access_token, token_type = "bearer" )
@app.get ( "/users/me/" , response_model = User)
async def read_users_me (
current_user : User = Depends(get_current_active_user)
):
return current_user
@app.get ( "/users/me/items/" )
async def read_own_items (
current_user : User = Depends(get_current_active_user)
):
return [{ "item_id" : "Foo" , "owner" : current_user.username}]
Testing the Authentication
Get a token
curl -X POST "http://localhost:8000/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=johndoe&password=secret"
Response: {
"access_token" : "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." ,
"token_type" : "bearer"
}
Use the token
curl -X GET "http://localhost:8000/users/me/" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
Wait for expiration
After 30 minutes, the token will expire and you’ll get a 401 error
Security Best Practices
Secret Key
Generate with openssl rand -hex 32
Store in environment variables
Use different keys per environment
Token Expiration
Short expiration (15-30 minutes)
Implement refresh tokens for longer sessions
Force re-authentication for sensitive actions
Password Hashing
Use Argon2id or bcrypt
Never store plain text passwords
Use timing-safe comparisons
HTTPS Only
Always use HTTPS in production
Tokens sent over HTTP can be intercepted
Use secure cookies when applicable
Generating Password Hashes
To create a new user, hash their password:
from pwdlib import PasswordHash
password_hash = PasswordHash.recommended()
hashed = password_hash.hash( "secret" )
print (hashed)
# $argon2id$v=19$m=65536,t=3,p=4$...
The test user’s password is “secret”. The hash changes each time due to the random salt, but any hash can verify the same password.
Next Steps
Now let’s add fine-grained permissions with OAuth2 scopes:
OAuth2 Scopes Learn how to implement OAuth2 scopes for role-based access control