Authentication Router¶
The auth router will generate a set of endpoints for authenticating users.
- POST
/register
- POST
/login
- POST
/logout
- POST
/token
- POST
/token/refresh
- GET
/confirm
- POST
/confirm
- POST
/confirm/{token}
- POST
/{id}/change_username
Setup The Authentication service¶
To Setup the authentication service, you will need to add all requirements to
the object AuthService
.
from authx.services.auth import AuthService
from authx.backend import UsersRepo
from authx.core.jwt import JWTBackend
AuthService.setup(
repo = UsersRepo,
auth_backend = JWTBackend,
debug = True,
base_url = 'http://localhost:8000',
site = 'authx',
recaptcha_secret = None,
smtp_username = None,
smtp_password = None,
smtp_host = None,
smtp_tls = False,
display_name = 'authx',
)
This one gonna help use to use the authentication service, that we provide.
from authx import Authentication
from fastapi import FastAPI
app = FastAPI()
auth = Authentication()
app.include_router(auth.auth_router, prefix="/api/users")
Register¶
As we know we will use the POST
method to register a new user, also a callback
method.
This Route is based on:
@router.post("/register", name="auth:register")
async def register(*, request: Request, response: Response):
data = await request.json()
service = AuthService()
tokens = await service.register(data)
set_tokens_in_response(response, tokens)
return None
The Service.register
method will return the Access and Refresh tokens, and if
we have a validation error it will return the error 400
.
Login¶
Same logical way as register, we use the same Authentication route.
Info
app.include_router(auth.auth_router, prefix="/api/users")
include all
authentication routers.
@router.post("/login", name="auth:login")
async def login(*, request: Request, response: Response):
data = await request.json()
service = AuthService()
ip = request.client.host
tokens = await service.login(data, ip)
set_tokens_in_response(response, tokens)
return None
For the login
Service we provide some params:
data
: The data that we will use to login. (login, password).ip
: The IP of the client.
if the data
is not valid, we will return the error 400
, if the data
is
valid, we will return the tokens (access and refresh).
Info
There is also an HTTPException
relate to 404
if the user is not
found, also the 429
relate to brute force attempts.
Info
The HTTP 429
Too Many Requests response status code indicates the
user has sent too many requests in a given amount of time ("rate limiting"). A
Retry-After header might be included to this response indicating how long to
wait before making a new request.
Logout¶
As we know logout
always mean that we will remove the access_cookies_name
and refresh_cookies_name
from the response.
@router.post("/logout", name="auth:logout")
async def logout(*, response: Response):
response.delete_cookie(access_cookie_name)
response.delete_cookie(refresh_cookie_name)
return None
The response.delete_cookie
is a function in the starlette
a lightweight ASGI
framework/toolkit, which is ideal for building high performance asyncio
services.
Token¶
After login and all the steps we have a function relate to get a new token based
on user
a class that initialize the user object and use data
(login,
password) as an argument.
@router.post("/token", name="auth:token")
async def token(*, user: User = Depends(get_authenticated_user)):
return user.data
- User Data:
def __init__(self, data=None):
self.data = data
if data is None:
self.is_authenticated = False
self.is_admin = False
self.id = None
self.username = None
else:
self.is_authenticated = True
self.is_admin = "admin" in self.data.get("permissions")
self.id = int(self.data.get("id"))
self.username = self.data.get("username")
Refresh Token¶
We have also a way to get a new refresh token, this is the same as the token
method, but we will use the refresh_token
instead of access_token
, it take
request
& response
as arguments, use also starlette
to set the
refresh_cookie_name
in the response.
@router.post("/token/refresh", name="auth:refresh_access_token")
async def refresh_access_token(
*, request: Request, response: Response,
):
service = AuthService()
refresh_token = request.cookies.get(refresh_cookie_name)
if refresh_token is None:
raise HTTPException(401)
access_token = await service.refresh_access_token(refresh_token)
set_access_token_in_response(response, access_token)
return {"access": access_token}
- The
service.refresh_access_token
will return the new access token, or it will raise; 401
: if the refresh token is not valid. (Refresh or Ban).500
: if the refresh token is expired.
Info
set_access_token_in_response
take the response
and the token
as
arguments, also set :
py response.set_cookie( key=refresh_cookie_name, value=token, secure=not debug, httponly=True, max_age=refresh_expiration, )
Confirm¶
We have 3 steps for the Email confirmation:
ConfirmationStatus¶
For the get_email_confirmation_status
we will use the GET
method, and we
will return the status of the email confirmation.
@router.get("/confirm", name="auth:get_email_confirmation_status")
async def get_email_confirmation_status(
*, user: User = Depends(get_authenticated_user)
):
service = AuthService(user)
return await service.get_email_confirmation_status()
The service.get_email_confirmation_status()
gonna return the status of the
email confirmation. {"email": sample@sample.com, "confirmed": True}
async def get_email_confirmation_status(self) -> dict:
item = await self._repo.get(self._user.id)
return {"email": item.get("email"), "confirmed": item.get("confirmed")}
RequestConfirmation¶
for the request_email_confirmation
we will use the POST
method, and we will
return the status of the email confirmation, its take the user
as an arguments
or by default its depend on get_authenticated_user
.
@router.post("/confirm", name="auth:request_email_confirmation")
async def request_email_confirmation(
*, user: User = Depends(get_authenticated_user)):
service = AuthService(user)
return await service.request_email_confirmation()
the service.request_email_confirmation()
check the email if is confirmed or
not, for example if is not confirmed its return a link to confirm this email, if
confirmed raise an HTTPException 400
with the message
Email already confirmed
, also can show the timeout exception.
async def request_email_confirmation(self) -> None:
item = await self._repo.get(self._user.id)
if item.get("confirmed"):
raise HTTPException(400)
if not await self._repo.is_email_confirmation_available(self._user.id):
raise HTTPException(429)
email = item.get("email")
await self._request_email_confirmation(email)
return None
Confirmation¶
For the confirm_email
function we will use the POST
method, and it will
return a response with a token to verify, its take the token
as an arguments.
@router.post("/confirm/{token}", name="auth:confirm_email")
async def confirm_email(*, token: str):
service = AuthService()
return await service.confirm_email(token)
The service.confirm_email
hash the token, or looks up hash in db to update the
confirmed row to row (Default is false
), this could raise also 403
an
HTTPException that show if there is no hash in database.
async def confirm_email(self, token: str) -> None:
token_hash = hash_string(token)
if not await self._repo.confirm_email(token_hash):
raise HTTPException(403)
return None
Change Username¶
At the last point of authentication we have a function to change the username,
it take id
, username
, user
as arguments, and return a response with the
new username.
@router.post("/{id}/change_username", name="auth:change_username")
async def change_username(
*,
id: int,
username: str = Body("", embed=True),
user: User = Depends(get_authenticated_user),):
service = AuthService(user)
if user.id == id or user.is_admin:
return await service.change_username(id, username)
else:
raise HTTPException(403)
return router
The service.change_username
will return the new username, or it will raise; -
400
: Username already exists. - 404
: user not found.
async def change_username(self, id: int, username: str) -> None:
new_username = self._validate_user_model(
UserInChangeUsername, {"username": username}
).username
item = await self._repo.get(id)
old_username = item.get("username")
if old_username == new_username:
raise HTTPException(400, detail=get_error_message("username change same"))
existing_user = await self._repo.get_by_username(new_username)
if existing_user is not None:
raise HTTPException(400, detail=get_error_message("existing username"))
logger.info(
f"change_username id={id} old_username={old_username} new_username={new_username}"
)
await self._repo.change_username(id, new_username)
logger.info(f"change_username id={id} success")
return None