Coverage for /usr/lib/python3.10/site-packages/hyd/backend/security.py: 85%

47 statements  

« prev     ^ index     » next       coverage.py v7.0.3, created at 2023-01-05 15:47 +0000

1from fastapi.security import OAuth2PasswordBearer 

2from jose import JWTError 

3from jose.jwt import decode, encode 

4from passlib.context import CryptContext 

5from pydantic import BaseModel, ValidationError 

6 

7from hyd.backend.exc import HydError, VerificationError 

8from hyd.backend.util.const import ROOT_PATH, SECRET_KEY 

9from hyd.backend.util.models import NameStr, PrimaryKey 

10 

11# https://docs.python.org/3/library/secrets.html#how-many-bytes-should-tokens-use 

12if len(SECRET_KEY) < 64: 

13 raise HydError("SECRET_KEY is too short, use 32 bytes or more!") 

14 

15ALGORITHM = "HS256" 

16 

17_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 

18 

19 

20class Scopes: 

21 USER = "user" 

22 TOKEN = "token" 

23 PROJECT = "project" 

24 VERSION = "version" 

25 TAG = "tag" 

26 

27 

28SCOPES = [ 

29 Scopes.USER, 

30 Scopes.TOKEN, 

31 Scopes.PROJECT, 

32 Scopes.VERSION, 

33 Scopes.TAG, 

34] 

35 

36OAUTH2_SCHEME = OAuth2PasswordBearer( 

37 tokenUrl=ROOT_PATH + "/api/v1/user/login", 

38 scopes={ 

39 Scopes.USER: "Basic user operations.", 

40 Scopes.TOKEN: "Manage tokens.", 

41 Scopes.PROJECT: "Create, delete and list projects.", 

42 Scopes.VERSION: "Manage versions for a project.", 

43 Scopes.TAG: "Manage tags for a project.", 

44 }, 

45) 

46 

47 

48class JWT(BaseModel): 

49 id: PrimaryKey 

50 user_id: PrimaryKey # key value pair just for debugging 

51 username: NameStr # key value pair just for debugging 

52 scopes: list[str] # key value pair just for debugging 

53 

54 

55def hash_password(*, password: str | bytes) -> bytes: 

56 return _pwd_context.hash(password).encode() 

57 

58 

59def verify_password(*, plain_password: str | bytes, hashed_password: str | bytes) -> bool: 

60 return _pwd_context.verify(plain_password, hashed_password) 

61 

62 

63def create_jwt( 

64 *, token_id: PrimaryKey, user_id: PrimaryKey, username: NameStr, scopes: list[str] 

65) -> str: 

66 return encode( 

67 {"jti": str(token_id), "uid": str(user_id), "sub": username, "scopes": scopes}, 

68 SECRET_KEY, 

69 algorithm=ALGORITHM, 

70 ) 

71 

72 

73def verify_jwt(*, token: str) -> JWT: 

74 try: # check signature 

75 payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

76 except JWTError as err: 

77 raise VerificationError(err) 

78 

79 try: # check for missing fields 

80 token_id: PrimaryKey = int(payload["jti"]) 

81 user_id: NameStr = int(payload["uid"]) 

82 username: NameStr = payload["sub"] 

83 scopes: list[str] = payload["scopes"] 

84 except KeyError as err: 

85 raise VerificationError(err) 

86 

87 try: # check for data type validity 

88 return JWT(id=token_id, user_id=user_id, username=username, scopes=scopes) 

89 except ValidationError as err: 

90 raise VerificationError(err)