Coverage for /usr/lib/python3.10/site-packages/hyd/backend/user/authentication.py: 76%

45 statements  

« prev     ^ index     » next       coverage.py v7.0.3, created at 2023-02-05 02:26 +0000

1import datetime as dt 

2 

3from fastapi import Depends, HTTPException, status 

4from fastapi.security import SecurityScopes 

5from sqlalchemy.orm import Session 

6 

7import hyd.backend.token.service as token_service 

8from hyd.backend.db import get_db 

9from hyd.backend.exc import ( 

10 HTTPException_NO_PERMISSION, 

11 HTTPException_USER_DISABLED, 

12 VerificationError, 

13) 

14from hyd.backend.security import JWT, OAUTH2_SCHEME, verify_jwt 

15from hyd.backend.token.models import TokenEntry 

16from hyd.backend.user.models import UserEntry 

17from hyd.backend.util.const import HEADERS 

18from hyd.backend.util.logger import HydLogger 

19 

20UTC = dt.timezone.utc 

21LOGGER = HydLogger("Authentication") 

22 

23#################################################################################################### 

24#### HTTP Exceptions 

25#################################################################################################### 

26 

27HTTPException_VALIDATION = HTTPException( 

28 status_code=status.HTTP_401_UNAUTHORIZED, 

29 detail="Token unknown, expired, revoked or corrupted!", 

30 headers=HEADERS, 

31) 

32 

33#################################################################################################### 

34#### Authentication logic 

35#################################################################################################### 

36 

37 

38async def authenticate_user( 

39 security_scopes: SecurityScopes, 

40 token: str = Depends(OAUTH2_SCHEME), 

41 db: Session = Depends(get_db), 

42) -> UserEntry: 

43 user_entry, _token_entry = _authenticate(security_scopes, token, db) 

44 return user_entry 

45 

46 

47def _authenticate( 

48 security_scopes: SecurityScopes, token: str, db: Session 

49) -> tuple[UserEntry, TokenEntry]: 

50 try: 

51 jwt: JWT = verify_jwt(token=token) 

52 except VerificationError as err: 

53 LOGGER.warning( 

54 "Token verification failed! {token: %s, error: %s}", 

55 token, 

56 err, 

57 ) 

58 raise HTTPException_VALIDATION 

59 

60 token_entry: TokenEntry = token_service.read_token(token_id=jwt.id, db=db) # TODO raise if None 

61 if token_entry is None: 

62 LOGGER.warning( 

63 "Verified token not found in database! {token: %s, error: %s}", 

64 token, 

65 err, 

66 ) 

67 raise HTTPException_VALIDATION 

68 

69 permitted_scopes: list[str] = [entry.scope for entry in token_entry.scope_entries] 

70 user_entry: UserEntry = token_entry.user_entry 

71 

72 # check if an user is disabled, because one login tokens expire while disabling an user 

73 if user_entry.is_disabled: 

74 LOGGER.warning( 

75 "Token belongs to deactivated user! {token_id: %d, user_id: %d, username: %s}", 

76 token_entry.id, 

77 user_entry.id, 

78 user_entry.username, 

79 ) 

80 raise HTTPException_USER_DISABLED 

81 

82 # check if a token is expired, login and api tokens 

83 if token_entry.revoked_at or token_entry.check_expiration(db=db): 

84 LOGGER.warning( 

85 "Revoked or expired token used! {token_id: %d, user_id: %d, username: %s}", 

86 token_entry.id, 

87 user_entry.id, 

88 user_entry.username, 

89 ) 

90 raise HTTPException_VALIDATION 

91 

92 # check scopes for permission handling 

93 for scope in security_scopes.scopes: 

94 if scope not in permitted_scopes: 

95 LOGGER.warning( 

96 "Token lacks scopes! {token_id: %d, user_id: %d, username: %s}", 

97 token_entry.id, 

98 user_entry.id, 

99 user_entry.username, 

100 ) 

101 raise HTTPException_NO_PERMISSION 

102 

103 user_entry._session_token_entry = token_entry 

104 user_entry._session_permitted_scopes = permitted_scopes 

105 token_entry._last_request = dt.datetime.now(tz=UTC) 

106 

107 LOGGER.info( 

108 "{token_id: %d, user_id: %d, username: %s, scopes: %s}", 

109 jwt.id, 

110 user_entry.id, 

111 user_entry.username, 

112 permitted_scopes, 

113 ) 

114 return user_entry, token_entry