Coverage for /usr/lib/python3.10/site-packages/hyd/backend/user/authentication.py: 76%
45 statements
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-06 00:31 +0000
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-06 00:31 +0000
1import datetime as dt
3from fastapi import Depends, HTTPException, status
4from fastapi.security import SecurityScopes
5from sqlalchemy.orm import Session
7import hyd.backend.token.service as token_service
8from hyd.backend.db import get_db
9from hyd.backend.exc import (
10 HTTPException_NO_PERMISSION,
11 HTTPException_USER_DISABLED,
12 VerificationError,
13)
14from hyd.backend.security import JWT, OAUTH2_SCHEME, verify_jwt
15from hyd.backend.token.models import TokenEntry
16from hyd.backend.user.models import UserEntry
17from hyd.backend.util.const import HEADERS
18from hyd.backend.util.logger import HydLogger
20UTC = dt.timezone.utc
21LOGGER = HydLogger("Authentication")
23####################################################################################################
24#### HTTP Exceptions
25####################################################################################################
27HTTPException_VALIDATION = HTTPException(
28 status_code=status.HTTP_401_UNAUTHORIZED,
29 detail="Token unknown, expired, revoked or corrupted!",
30 headers=HEADERS,
31)
33####################################################################################################
34#### Authentication logic
35####################################################################################################
38async def authenticate_user(
39 security_scopes: SecurityScopes,
40 token: str = Depends(OAUTH2_SCHEME),
41 db: Session = Depends(get_db),
42) -> UserEntry:
43 user_entry, _token_entry = _authenticate(security_scopes, token, db)
44 return user_entry
47def _authenticate(
48 security_scopes: SecurityScopes, token: str, db: Session
49) -> tuple[UserEntry, TokenEntry]:
50 try:
51 jwt: JWT = verify_jwt(token=token)
52 except VerificationError as err:
53 LOGGER.warning(
54 "Token verification failed! {token: %s, error: %s}",
55 token,
56 err,
57 )
58 raise HTTPException_VALIDATION
60 token_entry: TokenEntry = token_service.read_token(token_id=jwt.id, db=db) # TODO raise if None
61 if token_entry is None:
62 LOGGER.warning(
63 "Verified token not found in database! {token: %s, error: %s}",
64 token,
65 err,
66 )
67 raise HTTPException_VALIDATION
69 permitted_scopes: list[str] = [entry.scope for entry in token_entry.scope_entries]
70 user_entry: UserEntry = token_entry.user_entry
72 # check if an user is disabled, because one login tokens expire while disabling an user
73 if user_entry.is_disabled:
74 LOGGER.warning(
75 "Token belongs to deactivated user! {token_id: %d, user_id: %d, username: %s}",
76 token_entry.id,
77 user_entry.id,
78 user_entry.username,
79 )
80 raise HTTPException_USER_DISABLED
82 # check if a token is expired, login and api tokens
83 if token_entry.revoked_at or token_entry.check_expiration(db=db):
84 LOGGER.warning(
85 "Revoked or expired token used! {token_id: %d, user_id: %d, username: %s}",
86 token_entry.id,
87 user_entry.id,
88 user_entry.username,
89 )
90 raise HTTPException_VALIDATION
92 # check scopes for permission handling
93 for scope in security_scopes.scopes:
94 if scope not in permitted_scopes:
95 LOGGER.warning(
96 "Token lacks scopes! {token_id: %d, user_id: %d, username: %s}",
97 token_entry.id,
98 user_entry.id,
99 user_entry.username,
100 )
101 raise HTTPException_NO_PERMISSION
103 user_entry._session_token_entry = token_entry
104 user_entry._session_permitted_scopes = permitted_scopes
105 token_entry._last_request = dt.datetime.now(tz=UTC)
107 LOGGER.info(
108 "{token_id: %d, user_id: %d, username: %s, scopes: %s}",
109 jwt.id,
110 user_entry.id,
111 user_entry.username,
112 permitted_scopes,
113 )
114 return user_entry, token_entry