Skip to content

Commit

Permalink
Use topic_name and article_name as columns in WikiArticle and `…
Browse files Browse the repository at this point in the history
…WikiArticlePicture` (#172)

* Use `topic_name` and `article_name` as columns in `WikiArticle` and `WikiArticlePicture`

* Add a check for files extension

* Nits
  • Loading branch information
vaamb authored Feb 2, 2025
1 parent ee81e11 commit 9351e18
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 155 deletions.
3 changes: 3 additions & 0 deletions src/ouranos/core/config/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __setitem__(self, key, value):
MAX_TEXT_FILE_SIZE = 10 * 1024 * 1024
MAX_PICTURE_FILE_SIZE = 10 * 1024 * 1024

# File extensions
SUPPORTED_TEXT_EXTENSIONS = {"md", "txt"}
SUPPORTED_IMAGE_EXTENSIONS = {"gif", "jpeg", "jpg", "png", "svg", "webp"}

# Login
SESSION_FRESHNESS = 15 * 60 * 60
Expand Down
189 changes: 67 additions & 122 deletions src/ouranos/core/database/models/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

from ouranos import current_app
from ouranos.core.config.consts import (
REGISTRATION_TOKEN_VALIDITY, TOKEN_SUBS)
REGISTRATION_TOKEN_VALIDITY, SUPPORTED_IMAGE_EXTENSIONS, TOKEN_SUBS)
from ouranos.core.database.models.abc import Base, CRUDMixin, ToDictMixin
from ouranos.core.database.models.caches import cache_users
from ouranos.core.database.models.types import PathType, UtcDateTime
from ouranos.core.database.utils import ArchiveLink
from ouranos.core.utils import humanize_list, slugify, Tokenizer
from ouranos.core.utils import check_filename, slugify, Tokenizer


argon2_hasher = PasswordHasher()

Expand Down Expand Up @@ -867,6 +868,27 @@ async def create_multiple(
await super().create_multiple(session, values=values)


class WikiTagged:
@classmethod
def _generate_get_query(
cls,
offset: int | None = None,
limit: int | None = None,
order_by: str | None = None,
**lookup_keys: list[lookup_keys_type] | lookup_keys_type | None,
) -> Select:
lookup_keys["status"] = True
tags_name: list[str] | None = lookup_keys.pop("tags_name", None)
stmt = super()._generate_get_query(offset, limit, order_by, **lookup_keys)
if tags_name:
stmt = stmt.join(WikiTag.articles)
if isinstance(tags_name, list):
stmt = stmt.where(WikiTag.name.in_(tags_name))
else:
stmt = stmt.where(WikiTag.name == tags_name)
return stmt


class WikiObject:
@staticmethod
def root_dir() -> ioPath:
Expand All @@ -881,7 +903,7 @@ def get_rel_path(cls, abs_path: ioPath) -> ioPath:
return abs_path.relative_to(cls.root_dir())


class WikiTopic(Base, CRUDMixin, WikiObject):
class WikiTopic(Base, WikiTagged, CRUDMixin, WikiObject):
__tablename__ = "wiki_topics"
__bind_key__ = "app"
__table_args__ = (
Expand All @@ -904,7 +926,7 @@ class WikiTopic(Base, CRUDMixin, WikiObject):
back_populates="topics", secondary=AssociationWikiTagTopic, lazy="selectin")

def __repr__(self) -> str:
return f"<WikiTopic({self.name})>"
return f"<WikiTopic({self.name}, path={self.path})>"

@property
def absolute_path(self) -> ioPath:
Expand Down Expand Up @@ -951,36 +973,6 @@ async def create(
topic = await cls.get(session, **lookup_keys)
await topic.attach_tags(session, tags_name)

@classmethod
def _generate_get_query(
cls,
offset: int | None = None,
limit: int | None = None,
order_by: str | None = None,
**lookup_keys: list[lookup_keys_type] | lookup_keys_type | None,
) -> Select:
lookup_keys["status"] = True
tags_name: list[str] | None = lookup_keys.pop("tags_name", None)
stmt = super()._generate_get_query(offset, limit, order_by, **lookup_keys)
if tags_name:
stmt = stmt.join(WikiTag.articles)
if isinstance(tags_name, list):
stmt = stmt.where(WikiTag.name.in_(tags_name))
else:
stmt = stmt.where(WikiTag.name == tags_name)
return stmt

@classmethod
async def get(
cls,
session: AsyncSession,
/,
**lookup_keys: lookup_keys_type,
) -> Self | None:
stmt = cls._generate_get_query(**lookup_keys)
result = await session.execute(stmt)
return result.scalars().one_or_none()

@classmethod
async def get_multiple(
cls,
Expand All @@ -991,11 +983,8 @@ async def get_multiple(
order_by: str | None = None,
**lookup_keys: list[lookup_keys_type] | lookup_keys_type | None,
) -> Sequence[Self]:
if limit is None:
limit = 50
stmt = cls._generate_get_query(offset, limit, order_by, **lookup_keys)
result = await session.execute(stmt)
return result.scalars().all()
return await super().get_multiple(
session, offset=offset, limit=limit, order_by=order_by, **lookup_keys)

@classmethod
async def update(
Expand Down Expand Up @@ -1034,19 +1023,19 @@ async def get_template(self) -> str:
return content


class WikiArticle(Base, CRUDMixin, WikiObject):
class WikiArticle(Base, WikiTagged, CRUDMixin, WikiObject):
__tablename__ = "wiki_articles"
__bind_key__ = "app"
__table_args__ = (
UniqueConstraint(
"name", "topic_id",
"topic_name", "name",
name="uq_wiki_articles_name"
),
)
_lookup_keys = ["topic_id", "name"] # "topic_name" is used
_lookup_keys = ["topic_name", "name"]

id: Mapped[int] = mapped_column(primary_key=True)
topic_id: Mapped[int] = mapped_column(sa.ForeignKey("wiki_topics.id"))
topic_name: Mapped[str] = mapped_column(sa.ForeignKey("wiki_topics.name"))
name: Mapped[str] = mapped_column(sa.String(length=64))
description: Mapped[Optional[str]] = mapped_column(sa.String(length=512))
path: Mapped[ioPath] = mapped_column(PathType(length=512))
Expand All @@ -1061,13 +1050,9 @@ class WikiArticle(Base, CRUDMixin, WikiObject):

def __repr__(self) -> str:
return (
f"<WikiArticle({self.topic}-{self.name}, path={self.path})>"
f"<WikiArticle({self.topic_name}-{self.name}, path={self.path})>"
)

@property
def topic_name(self) -> str: # Needed for response formatting
return self.topic.name

@property
def abs_path(self) -> ioPath:
return self.root_dir() / self.path
Expand Down Expand Up @@ -1144,14 +1129,13 @@ async def create(
**lookup_keys: lookup_keys_type, # Must contain "topic_name": str and "name": str
) -> None:
values = values or {}
topic_name = lookup_keys.pop("topic_name")
topic_name = lookup_keys["topic_name"]
name = lookup_keys["name"]
# Create the article dir
topic_obj = await WikiTopic.get(session, name=topic_name)
if topic_obj is None:
topic = await WikiTopic.get(session, name=topic_name)
if topic is None:
raise WikiArticleNotFound
lookup_keys["topic_id"] = topic_obj.id
article_dir = topic_obj.absolute_path / slugify(name)
article_dir = topic.absolute_path / slugify(name)
await article_dir.mkdir(parents=True, exist_ok=True)
rel_path = article_dir.relative_to(current_app.static_dir)
values["path"] = str(rel_path)
Expand Down Expand Up @@ -1180,32 +1164,6 @@ async def create(
# Save the article content
await article.set_content(content)

@classmethod
def _generate_get_query(
cls,
offset: int | None = None,
limit: int | None = None,
order_by: str | None = None,
**lookup_keys: list[lookup_keys_type] | lookup_keys_type | None,
) -> Select:
lookup_keys["status"] = True
topic_name: str | None = lookup_keys.pop("topic_name", None)
tags_name: list[str] | None = lookup_keys.pop("tags_name", None)
stmt = super()._generate_get_query(offset, limit, order_by, **lookup_keys)
if topic_name:
stmt = stmt.join(WikiTopic, cls.topic_id == WikiTopic.id)
if isinstance(topic_name, list):
stmt = stmt.where(WikiTopic.name.in_(topic_name))
else:
stmt = stmt.where(WikiTopic.name == topic_name)
if tags_name:
stmt = stmt.join(WikiTag.articles)
if isinstance(tags_name, list):
stmt = stmt.where(WikiTag.name.in_(tags_name))
else:
stmt = stmt.where(WikiTag.name == tags_name)
return stmt

@classmethod
async def get(
cls,
Expand Down Expand Up @@ -1261,12 +1219,11 @@ async def update(
values: dict, # Must contain "author_id": int
**lookup_keys: lookup_keys_type, # Must contain "topic_name": str and "name": str
) -> None:
topic_name = lookup_keys.pop("topic_name")
topic_name = lookup_keys["topic_name"]
name = lookup_keys["name"]
article = await cls.get(session, topic_name=topic_name, name=name)
if not article:
raise WikiArticleNotFound
lookup_keys["topic_id"] = article.topic_id
# Update the article info
content = values.pop("content")
author_id = values.pop("author_id")
Expand Down Expand Up @@ -1301,18 +1258,14 @@ async def delete(
author_id: int,
**lookup_keys: lookup_keys_type,
) -> None:
topic_name = lookup_keys.pop("topic_name")
topic_name = lookup_keys["topic_name"]
name = lookup_keys["name"]
article = await cls.get(session, topic_name=topic_name, name=name)
if not article:
raise WikiArticleNotFound
# Update the article info
stmt = (
update(cls)
.where(cls.id == article.id)
.values(status=False)
)
await session.execute(stmt)
# Mark the article as inactive
await super().update(
session, topic_name=topic_name, name=name, values={"status": False})
# Create the article modification
await WikiArticleModification.create(
session,
Expand Down Expand Up @@ -1393,43 +1346,37 @@ async def get_latest_version(
order_by=WikiArticleModification.version.desc())


class WikiArticlePicture(Base, CRUDMixin, WikiObject):
__tablename__ = "wiki_articles_pictures"
class WikiArticlePicture(Base, WikiTagged, CRUDMixin, WikiObject):
__tablename__ = "wiki_picture"
__bind_key__ = "app"
__table_args__ = (
UniqueConstraint(
"article_id", "name",
name="uq_wiki_article_id"
"topic_name", "article_name", "name",
name="uq_wiki_pictures_name"
),
)
_lookup_keys = ["article_id", "name"]
_lookup_keys = ["topic_name", "article_name", "name"]

id: Mapped[int] = mapped_column(primary_key=True)
article_id: Mapped[int] = mapped_column(sa.ForeignKey("wiki_articles.id"))
topic_name: Mapped[str] = mapped_column(sa.ForeignKey("wiki_topics.name"))
article_name: Mapped[str] = mapped_column(sa.ForeignKey("wiki_articles.name"))
name: Mapped[str] = mapped_column(sa.String(length=64))
path: Mapped[ioPath] = mapped_column(PathType(length=512))
status: Mapped[bool] = mapped_column(default=True)

# relationship
article: Mapped[WikiArticle] = relationship(back_populates="images", lazy="selectin")
article: Mapped[WikiArticle] = relationship(back_populates="images")

def __repr__(self) -> str:
return (
f"<WikiArticlePicture({self.article_id}, timestamp={self.timestamp})>"
f"<WikiArticle({self.topic_name}-{self.article_name}-{self.name}, "
f"path={self.path})>"
)

@property
def absolute_path(self) -> ioPath:
return self.root_dir() / self.path

@property
def topic_name(self) -> str:
return self.article.topic.name

@property
def article_name(self) -> str:
return self.article.name

async def set_image(self, image: bytes) -> None:
async with await self.absolute_path.open("wb") as f:
await f.write(image)
Expand All @@ -1447,32 +1394,27 @@ async def create(
values: dict | None = None, # author_id, content, extension
**lookup_keys: lookup_keys_type, # article_id, name
) -> None:
article_id: int = lookup_keys["article_id"]
article = await WikiArticle.get_by_id(session, article_id=article_id)
topic_name = lookup_keys["topic_name"]
article_name = lookup_keys["article_name"]
article = await WikiArticle.get(
session, topic_name=topic_name, name=article_name)
if article is None:
raise WikiArticleNotFound
# Get extension info
name: str = lookup_keys["name"]
extension: str = values.pop("extension", None) or name.split(".")[-1]
extension: str = values.pop("extension")
if not extension.startswith("."):
extension = f".{extension}"
name: str = name.split(extension)[0]
lookup_keys["name"] = name
supported = {'.gif', '.jpeg', '.jpg', '.png', '.svg', '.webp'}
if extension.lower() not in supported:
raise ValueError(
f"This image format is not supported. Image format supported: "
f"{humanize_list([*supported])}."
)
check_filename(f"{name}{extension}", SUPPORTED_IMAGE_EXTENSIONS)
# Get path info
path = article.path / f"{slugify(name)}{extension}"
values["path"] = str(path)
# Create the picture info
content = values.pop("content")
await super().create(session, values=values, **lookup_keys)
# Save the picture content
x = 1
picture = await cls.get(session, article_id=article_id, name=name)
picture = await cls.get(
session, topic_name=topic_name, article_name=article_name, name=name)
await picture.set_image(content)

@classmethod
Expand All @@ -1483,14 +1425,17 @@ async def delete(
values: dict | None = None, # author_id, content
**lookup_keys: lookup_keys_type, # article_id, name
) -> None:
article_id: int = lookup_keys["article_id"]
topic_name = lookup_keys["topic_name"]
article_name = lookup_keys["article_name"]
name: str = lookup_keys["name"]
article = await WikiArticle.get_by_id(session, article_id=article_id)
if article is None:
picture = await cls.get(
session, topic_name=topic_name, article_name=article_name, name=name)
if picture is None:
raise WikiArticleNotFound
# Mark the picture as inactive
await super().update(
session, article_id=article_id, name=name, values={"status": False})
session, topic_name=topic_name, article_name=article_name, name=name,
values={"status": False})
# Rename the picture content
#picture = await cls.get(
# session, topic_name=topic_name, article_name=article_name, name=name)
Expand Down
14 changes: 14 additions & 0 deletions src/ouranos/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,17 @@ def check_secret_key(config: dict) -> str | None:

def slugify(s: str) -> str:
return _slugify(s, separator="_", lowercase=True)


def check_filename(full_filename: str, extensions: set[str]) -> None:
split = full_filename.split(".")
if len(split) != 2:
if len(split) < 2:
raise ValueError("The full filename with extension should be provided")
raise ValueError("Files cannot contain '.' in their name")
name, extension = split
if extension.lower() not in extensions:
raise ValueError(
f"This file extension is not supported. Extensions supported: "
f"{humanize_list([*extensions])}"
)
Loading

0 comments on commit 9351e18

Please sign in to comment.