generated from MinBZK/python-project-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 444-bug-assessments-in-systemcard-viewer-wit…
…hout-space
- Loading branch information
Showing
31 changed files
with
1,730 additions
and
268 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from collections.abc import Callable | ||
from functools import wraps | ||
from typing import Any | ||
|
||
from fastapi import HTTPException, Request | ||
|
||
from amt.core.exceptions import AMTPermissionDenied | ||
|
||
|
||
def add_permissions(permissions: dict[str, list[str]]) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | ||
def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | ||
@wraps(func) | ||
async def wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401 | ||
request = kwargs.get("request") | ||
organization_id = kwargs.get("organization_id") | ||
algoritme_id = kwargs.get("algoritme_id") | ||
if not isinstance(request, Request): # todo: change exception to custom exception | ||
raise HTTPException(status_code=400, detail="Request object is missing") | ||
|
||
for permission, verbs in permissions.items(): | ||
permission = permission.format(organization_id=organization_id) | ||
permission = permission.format(algoritme_id=algoritme_id) | ||
request_permissions: dict[str, list[str]] = ( | ||
request.state.permissions if hasattr(request.state, "permissions") else {} | ||
) | ||
if permission not in request_permissions: | ||
raise AMTPermissionDenied() | ||
for verb in verbs: | ||
if verb not in request.state.permissions[permission]: | ||
raise AMTPermissionDenied() | ||
|
||
return await func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
amt/migrations/versions/e16bb3d53cd6_authorization_system.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""authorization system | ||
Revision ID: e16bb3d53cd6 | ||
Revises: 5de977ad946f | ||
Create Date: 2024-12-23 08:32:15.194858 | ||
""" | ||
|
||
from collections.abc import Sequence | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from amt.core.authorization import AuthorizationResource, AuthorizationVerb, AuthorizationType | ||
from sqlalchemy.orm.session import Session | ||
from amt.models import User, Organization | ||
|
||
# revision identifiers, used by Alembic. | ||
revision: str = "e16bb3d53cd6" | ||
down_revision: str | None = "5de977ad946f" | ||
branch_labels: str | Sequence[str] | None = None | ||
depends_on: str | Sequence[str] | None = None | ||
|
||
|
||
|
||
def upgrade() -> None: | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
role_table = op.create_table( | ||
"role", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("name", sa.String(), nullable=False), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_role")), | ||
) | ||
rule_table = op.create_table( | ||
"rule", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("resource", sa.String(), nullable=False), | ||
sa.Column("verbs", sa.JSON(), nullable=False), | ||
sa.Column("role_id", sa.Integer(), nullable=False), | ||
sa.ForeignKeyConstraint(["role_id"], ["role.id"], name=op.f("fk_rule_role_id_role")), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_rule")), | ||
) | ||
|
||
authorization_table = op.create_table( | ||
"authorization", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("user_id", sa.UUID(), nullable=False), | ||
sa.Column("role_id", sa.Integer(), nullable=False), | ||
sa.Column("type", sa.String(), nullable=False), | ||
sa.Column("type_id", sa.Integer(), nullable=False), | ||
sa.ForeignKeyConstraint(["role_id"], ["role.id"], name=op.f("fk_authorization_role_id_role")), | ||
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name=op.f("fk_authorization_user_id_user")), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_authorization")), | ||
) | ||
|
||
op.bulk_insert( | ||
role_table, | ||
[ | ||
{'id': 1, 'name': 'Organization Maintainer'}, | ||
{'id': 2, 'name': 'Organization Member'}, | ||
{'id': 3, 'name': 'Organization Viewer'}, | ||
{'id': 4, 'name': 'Algorithm Maintainer'}, | ||
{'id': 5, 'name': 'Algorithm Member'}, | ||
{'id': 6, 'name': 'Algorithm Viewer'}, | ||
] | ||
) | ||
|
||
op.bulk_insert( | ||
rule_table, | ||
[ | ||
{'id': 1, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.UPDATE], 'role_id': 1}, | ||
{'id': 2, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 1}, | ||
{'id': 3, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 1}, | ||
{'id': 4, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.READ], 'role_id': 2}, | ||
{'id': 5, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE], 'role_id': 2}, | ||
{'id': 6, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST], 'role_id': 2}, | ||
{'id': 7, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.READ], 'role_id': 3}, | ||
{'id': 8, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST], 'role_id': 3}, | ||
{'id': 9, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST], 'role_id': 3}, | ||
{'id': 10, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.DELETE], 'role_id': 4}, | ||
{'id': 11, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE], 'role_id': 4}, | ||
{'id': 12, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 4}, | ||
{'id': 13, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE], 'role_id': 5}, | ||
{'id': 14, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE], 'role_id': 5}, | ||
{'id': 15, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.READ], 'role_id': 5}, | ||
{'id': 16, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
{'id': 17, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
{'id': 18, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
] | ||
) | ||
|
||
session = Session(bind=op.get_bind()) | ||
|
||
first_user = session.query(User).first() # first user is always present due to other migration | ||
organizations = session.query(Organization).all() | ||
|
||
authorizations = [] | ||
# lets add user 1 to all organizations bij default | ||
for organization in organizations: | ||
authorizations.append( | ||
{'user_id': first_user.id, 'role_id': 1, 'type': AuthorizationType.ORGANIZATION, 'type_id': organization.id}, | ||
) | ||
|
||
op.bulk_insert( | ||
authorization_table, | ||
authorizations | ||
) | ||
|
||
|
||
|
||
def downgrade() -> None: | ||
op.drop_table("rule") | ||
op.drop_table("authorization") | ||
op.drop_table("role") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
from .algorithm import Algorithm | ||
from .authorization import Authorization | ||
from .organization import Organization | ||
from .role import Role | ||
from .rule import Rule | ||
from .task import Task | ||
from .user import User | ||
|
||
__all__ = ["Algorithm", "Organization", "Task", "User"] | ||
__all__ = ["Algorithm", "Authorization", "Organization", "Role", "Rule", "Task", "User"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from sqlalchemy import ForeignKey | ||
from sqlalchemy.orm import Mapped, mapped_column, relationship | ||
|
||
from amt.models.base import Base | ||
|
||
|
||
class Authorization(Base): | ||
__tablename__ = "authorization" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
user_id: Mapped[int] = mapped_column(ForeignKey("user.id")) | ||
user: Mapped["User"] = relationship(back_populates="authorizations") # pyright: ignore [reportUndefinedVariable, reportUnknownVariableType] #noqa | ||
role_id: Mapped[int] = mapped_column(ForeignKey("role.id")) | ||
type: Mapped[str] # type [Organization or Algorithm] | ||
type_id: Mapped[int] # ID of the organization or algorithm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from sqlalchemy import String | ||
from sqlalchemy.orm import Mapped, mapped_column, relationship | ||
|
||
from amt.models import Authorization | ||
from amt.models.base import Base | ||
from amt.models.rule import Rule | ||
|
||
|
||
class Role(Base): | ||
__tablename__ = "role" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
name: Mapped[str] = mapped_column(String, nullable=False) | ||
rules: Mapped[list["Rule"]] = relationship() | ||
authorizations: Mapped[list["Authorization"]] = relationship() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from sqlalchemy import ForeignKey, String | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
from sqlalchemy.types import JSON | ||
|
||
from amt.models.base import Base | ||
|
||
|
||
class Rule(Base): | ||
__tablename__ = "rule" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
resource: Mapped[str] = mapped_column(String, nullable=False) | ||
verbs: Mapped[list[str]] = mapped_column(JSON, default=list) | ||
role_id: Mapped[int] = mapped_column(ForeignKey("role.id")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.