Coverage for /usr/lib/python3.10/site-packages/hyd/backend/security.py: 85%
47 statements
« prev ^ index » next coverage.py v7.0.3, created at 2023-01-05 15:47 +0000
« prev ^ index » next coverage.py v7.0.3, created at 2023-01-05 15:47 +0000
1from fastapi.security import OAuth2PasswordBearer
2from jose import JWTError
3from jose.jwt import decode, encode
4from passlib.context import CryptContext
5from pydantic import BaseModel, ValidationError
7from hyd.backend.exc import HydError, VerificationError
8from hyd.backend.util.const import ROOT_PATH, SECRET_KEY
9from hyd.backend.util.models import NameStr, PrimaryKey
11# https://docs.python.org/3/library/secrets.html#how-many-bytes-should-tokens-use
12if len(SECRET_KEY) < 64:
13 raise HydError("SECRET_KEY is too short, use 32 bytes or more!")
15ALGORITHM = "HS256"
17_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
20class Scopes:
21 USER = "user"
22 TOKEN = "token"
23 PROJECT = "project"
24 VERSION = "version"
25 TAG = "tag"
28SCOPES = [
29 Scopes.USER,
30 Scopes.TOKEN,
31 Scopes.PROJECT,
32 Scopes.VERSION,
33 Scopes.TAG,
34]
36OAUTH2_SCHEME = OAuth2PasswordBearer(
37 tokenUrl=ROOT_PATH + "/api/v1/user/login",
38 scopes={
39 Scopes.USER: "Basic user operations.",
40 Scopes.TOKEN: "Manage tokens.",
41 Scopes.PROJECT: "Create, delete and list projects.",
42 Scopes.VERSION: "Manage versions for a project.",
43 Scopes.TAG: "Manage tags for a project.",
44 },
45)
48class JWT(BaseModel):
49 id: PrimaryKey
50 user_id: PrimaryKey # key value pair just for debugging
51 username: NameStr # key value pair just for debugging
52 scopes: list[str] # key value pair just for debugging
55def hash_password(*, password: str | bytes) -> bytes:
56 return _pwd_context.hash(password).encode()
59def verify_password(*, plain_password: str | bytes, hashed_password: str | bytes) -> bool:
60 return _pwd_context.verify(plain_password, hashed_password)
63def create_jwt(
64 *, token_id: PrimaryKey, user_id: PrimaryKey, username: NameStr, scopes: list[str]
65) -> str:
66 return encode(
67 {"jti": str(token_id), "uid": str(user_id), "sub": username, "scopes": scopes},
68 SECRET_KEY,
69 algorithm=ALGORITHM,
70 )
73def verify_jwt(*, token: str) -> JWT:
74 try: # check signature
75 payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM])
76 except JWTError as err:
77 raise VerificationError(err)
79 try: # check for missing fields
80 token_id: PrimaryKey = int(payload["jti"])
81 user_id: NameStr = int(payload["uid"])
82 username: NameStr = payload["sub"]
83 scopes: list[str] = payload["scopes"]
84 except KeyError as err:
85 raise VerificationError(err)
87 try: # check for data type validity
88 return JWT(id=token_id, user_id=user_id, username=username, scopes=scopes)
89 except ValidationError as err:
90 raise VerificationError(err)