feat: [api] Add the WordPacks functionality and their endpoints

This commit is contained in:
wilson 2026-04-14 10:17:33 +01:00
parent 74c173b6ae
commit 628b61f4e1
24 changed files with 1937 additions and 3 deletions

View file

@ -0,0 +1,139 @@
"""add word bank packs
Revision ID: 0013
Revises: 0012
Create Date: 2026-04-12
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0013"
down_revision: Union[str, None] = "0012"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"word_bank_pack",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.Text(), nullable=False),
sa.Column("name_target", sa.Text(), nullable=False),
sa.Column("description", sa.Text(), nullable=False),
sa.Column("description_target", sa.Text(), nullable=False),
sa.Column("source_lang", sa.String(2), nullable=False),
sa.Column("target_lang", sa.String(2), nullable=False),
sa.Column("proficiencies", postgresql.JSONB(), nullable=False, server_default="[]"),
sa.Column("is_published", sa.Boolean(), nullable=False, server_default="false"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index("ix_word_bank_pack_source_target_lang", "word_bank_pack", ["source_lang", "target_lang"])
op.create_index("ix_word_bank_pack_is_published", "word_bank_pack", ["is_published"])
op.create_table(
"word_bank_pack_entry",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"pack_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("word_bank_pack.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"sense_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("dictionary_sense.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("surface_text", sa.Text(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index("ix_word_bank_pack_entry_pack_id", "word_bank_pack_entry", ["pack_id"])
op.create_index("ix_word_bank_pack_entry_sense_id", "word_bank_pack_entry", ["sense_id"])
op.create_table(
"word_bank_pack_flashcard_template",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"pack_entry_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("word_bank_pack_entry.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("card_direction", sa.Text(), nullable=False),
sa.Column("prompt_text", sa.Text(), nullable=False),
sa.Column("answer_text", sa.Text(), nullable=False),
sa.Column("prompt_context_text", sa.Text(), nullable=True),
sa.Column("answer_context_text", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index(
"ix_word_bank_pack_flashcard_template_pack_entry_id",
"word_bank_pack_flashcard_template",
["pack_entry_id"],
)
# Additive FK columns on existing tables
op.add_column(
"learnable_word_bank_entry",
sa.Column(
"pack_entry_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("word_bank_pack_entry.id", ondelete="SET NULL"),
nullable=True,
),
)
op.create_index(
"ix_learnable_word_bank_entry_pack_entry_id",
"learnable_word_bank_entry",
["pack_entry_id"],
)
op.add_column(
"flashcard",
sa.Column(
"source_pack_flashcard_template_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("word_bank_pack_flashcard_template.id", ondelete="SET NULL"),
nullable=True,
),
)
def downgrade() -> None:
op.drop_column("flashcard", "source_pack_flashcard_template_id")
op.drop_index(
"ix_learnable_word_bank_entry_pack_entry_id",
table_name="learnable_word_bank_entry",
)
op.drop_column("learnable_word_bank_entry", "pack_entry_id")
op.drop_index(
"ix_word_bank_pack_flashcard_template_pack_entry_id",
table_name="word_bank_pack_flashcard_template",
)
op.drop_table("word_bank_pack_flashcard_template")
op.drop_index("ix_word_bank_pack_entry_sense_id", table_name="word_bank_pack_entry")
op.drop_index("ix_word_bank_pack_entry_pack_id", table_name="word_bank_pack_entry")
op.drop_table("word_bank_pack_entry")
op.drop_index("ix_word_bank_pack_is_published", table_name="word_bank_pack")
op.drop_index("ix_word_bank_pack_source_target_lang", table_name="word_bank_pack")
op.drop_table("word_bank_pack")

View file

@ -24,10 +24,11 @@ def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed) return pwd_context.verify(plain, hashed)
def create_access_token(user_id: str, email: str) -> str: def create_access_token(user_id: str, email: str, is_admin: bool = False) -> str:
payload = { payload = {
"sub": user_id, "sub": user_id,
"email": email, "email": email,
"is_admin": is_admin,
"exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS), "exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS),
} }
return jwt.encode(payload, settings.jwt_secret, algorithm="HS256") return jwt.encode(payload, settings.jwt_secret, algorithm="HS256")
@ -57,7 +58,7 @@ def _admin_emails() -> frozenset[str]:
def require_admin(token_data: dict = Depends(verify_token)) -> dict: def require_admin(token_data: dict = Depends(verify_token)) -> dict:
if token_data.get("email") not in _admin_emails(): if not token_data.get("is_admin"):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required", detail="Admin access required",

View file

@ -11,5 +11,6 @@ class Account:
is_active: bool is_active: bool
is_email_verified: bool is_email_verified: bool
created_at: datetime created_at: datetime
is_admin: bool = False
human_name: str | None = None human_name: str | None = None
learnable_languages: list[LearnableLanguage] = field(default_factory=list) learnable_languages: list[LearnableLanguage] = field(default_factory=list)

View file

@ -15,6 +15,7 @@ class Flashcard:
answer_context_text: str | None answer_context_text: str | None
card_direction: str card_direction: str
prompt_modality: str prompt_modality: str
source_pack_flashcard_template_id: str | None
created_at: datetime created_at: datetime

View file

@ -0,0 +1,37 @@
from dataclasses import dataclass
from datetime import datetime
@dataclass
class WordBankPack:
id: str
name: str
name_target: str
description: str
description_target: str
source_lang: str
target_lang: str
proficiencies: list[str]
is_published: bool
created_at: datetime
@dataclass
class WordBankPackEntry:
id: str
pack_id: str
sense_id: str | None
surface_text: str
created_at: datetime
@dataclass
class WordBankPackFlashcardTemplate:
id: str
pack_entry_id: str
card_direction: str
prompt_text: str
answer_text: str
prompt_context_text: str | None
answer_context_text: str | None
created_at: datetime

View file

@ -22,4 +22,5 @@ class LearnableWordBankEntry:
entry_pathway: str entry_pathway: str
source_article_id: str | None source_article_id: str | None
disambiguation_status: str disambiguation_status: str
pack_entry_id: str | None
created_at: datetime created_at: datetime

View file

@ -43,6 +43,13 @@ class AccountService:
await service.remove_learnable_language(user_id, lang.id) await service.remove_learnable_language(user_id, lang.id)
""" """
@staticmethod
def _is_admin_email(email: str) -> bool:
admin_emails = frozenset(
e.strip() for e in settings.admin_user_emails.split(",") if e.strip()
)
return email in admin_emails
def __init__( def __init__(
self, self,
db: AsyncSession, db: AsyncSession,
@ -142,6 +149,7 @@ class AccountService:
is_active=user.is_active, is_active=user.is_active,
is_email_verified=user.is_email_verified, is_email_verified=user.is_email_verified,
created_at=user.created_at, created_at=user.created_at,
is_admin=self._is_admin_email(user.email),
) )
async def verify_email_address(self, token: str) -> None: async def verify_email_address(self, token: str) -> None:
@ -201,6 +209,7 @@ class AccountService:
is_active=user.is_active, is_active=user.is_active,
is_email_verified=user.is_email_verified, is_email_verified=user.is_email_verified,
created_at=user.created_at, created_at=user.created_at,
is_admin=self._is_admin_email(user.email),
learnable_languages=languages, learnable_languages=languages,
) )

View file

@ -0,0 +1,233 @@
import uuid
from dataclasses import dataclass
from ..models.pack import WordBankPack, WordBankPackEntry, WordBankPackFlashcardTemplate
from ...outbound.postgres.repositories.pack_repository import PackRepository
from ...outbound.postgres.repositories.vocab_repository import VocabRepository
from ...outbound.postgres.repositories.flashcard_repository import FlashcardRepository
from ...outbound.postgres.repositories.dictionary_repository import DictionaryRepository
class DuplicateEntryError(Exception):
"""Raised when a pack would add plain cards that are identical to ones already in the bank."""
def __init__(self, duplicate_surface_texts: list[str]) -> None:
self.duplicate_surface_texts = duplicate_surface_texts
joined = ", ".join(f'"{t}"' for t in duplicate_surface_texts)
super().__init__(
f"You already have the following word(s) in your bank: {joined}. "
"Remove them first, or add the pack once they have been cleared."
)
class PackNotFoundError(Exception):
pass
@dataclass
class PackApplicationResult:
added_surface_texts: list[str]
class PackService:
def __init__(
self,
pack_repo: PackRepository,
vocab_repo: VocabRepository,
flashcard_repo: FlashcardRepository,
dict_repo: DictionaryRepository,
) -> None:
self.pack_repo = pack_repo
self.vocab_repo = vocab_repo
self.flashcard_repo = flashcard_repo
self.dict_repo = dict_repo
async def create_pack(
self,
name: str,
name_target: str,
description: str,
description_target: str,
source_lang: str,
target_lang: str,
proficiencies: list[str],
) -> WordBankPack:
return await self.pack_repo.create_pack(
name=name,
name_target=name_target,
description=description,
description_target=description_target,
source_lang=source_lang,
target_lang=target_lang,
proficiencies=proficiencies,
)
async def update_pack(
self,
pack_id: uuid.UUID,
name: str | None = None,
name_target: str | None = None,
description: str | None = None,
description_target: str | None = None,
proficiencies: list[str] | None = None,
) -> WordBankPack:
pack = await self.pack_repo.get_pack(pack_id)
if pack is None:
raise PackNotFoundError(f"Pack {pack_id} not found")
return await self.pack_repo.update_pack(
pack_id=pack_id,
name=name,
name_target=name_target,
description=description,
description_target=description_target,
proficiencies=proficiencies,
)
async def publish_pack(self, pack_id: uuid.UUID) -> WordBankPack:
pack = await self.pack_repo.get_pack(pack_id)
if pack is None:
raise PackNotFoundError(f"Pack {pack_id} not found")
return await self.pack_repo.publish_pack(pack_id)
async def add_entry_to_pack(
self,
pack_id: uuid.UUID,
sense_id: uuid.UUID | None,
surface_text: str,
) -> WordBankPackEntry:
pack = await self.pack_repo.get_pack(pack_id)
if pack is None:
raise PackNotFoundError(f"Pack {pack_id} not found")
return await self.pack_repo.add_entry(
pack_id=pack_id,
sense_id=sense_id,
surface_text=surface_text,
)
async def add_flashcard_template_to_entry(
self,
pack_entry_id: uuid.UUID,
card_direction: str,
prompt_text: str,
answer_text: str,
prompt_context_text: str | None = None,
answer_context_text: str | None = None,
) -> WordBankPackFlashcardTemplate:
return await self.pack_repo.add_flashcard_template(
pack_entry_id=pack_entry_id,
card_direction=card_direction,
prompt_text=prompt_text,
answer_text=answer_text,
prompt_context_text=prompt_context_text,
answer_context_text=answer_context_text,
)
async def add_pack_to_user_bank(
self,
pack_id: uuid.UUID,
user_id: uuid.UUID,
source_lang: str,
target_lang: str,
) -> PackApplicationResult:
pack = await self.pack_repo.get_pack(pack_id)
if pack is None or not pack.is_published:
raise PackNotFoundError(f"Pack {pack_id} not found")
entries = await self.pack_repo.get_entries_for_pack(pack_id)
if not entries:
return PackApplicationResult(added_surface_texts=[])
pair = await self.vocab_repo.get_or_create_language_pair(user_id, source_lang, target_lang)
language_pair_id = uuid.UUID(pair.id)
entry_ids = [uuid.UUID(e.id) for e in entries]
templates_by_entry = await self.pack_repo.get_templates_for_entries(entry_ids)
existing_sense_ids = await self.vocab_repo.get_sense_ids_for_user_in_pair(
user_id, language_pair_id
)
# Detect plain-card duplicates: entries whose sense is already in the user's bank
# and whose templates carry no context text (would produce identical plain cards).
duplicates = []
for entry in entries:
if entry.sense_id is None or entry.sense_id not in existing_sense_ids:
continue
entry_templates = templates_by_entry.get(entry.id, [])
has_context = any(
t.prompt_context_text or t.answer_context_text for t in entry_templates
)
if not has_context:
duplicates.append(entry.surface_text)
if duplicates:
raise DuplicateEntryError(duplicates)
added: list[str] = []
for entry in entries:
bank_entry = await self.vocab_repo.add_entry(
user_id=user_id,
language_pair_id=language_pair_id,
surface_text=entry.surface_text,
entry_pathway="pack",
sense_id=uuid.UUID(entry.sense_id) if entry.sense_id else None,
disambiguation_status="auto_resolved" if entry.sense_id else "pending",
pack_entry_id=uuid.UUID(entry.id),
)
entry_templates = templates_by_entry.get(entry.id, [])
if entry_templates:
for template in entry_templates:
await self.flashcard_repo.create_flashcard(
user_id=user_id,
bank_entry_id=uuid.UUID(bank_entry.id),
source_lang=pair.source_lang,
target_lang=pair.target_lang,
prompt_text=template.prompt_text,
answer_text=template.answer_text,
card_direction=template.card_direction,
prompt_context_text=template.prompt_context_text,
answer_context_text=template.answer_context_text,
source_pack_flashcard_template_id=uuid.UUID(template.id),
)
elif entry.sense_id:
# Fallback: no templates — generate plain direction cards from the dictionary
await self._generate_plain_cards(
bank_entry_id=uuid.UUID(bank_entry.id),
user_id=user_id,
sense_id=uuid.UUID(entry.sense_id),
source_lang=pair.source_lang,
target_lang=pair.target_lang,
)
added.append(entry.surface_text)
return PackApplicationResult(added_surface_texts=added)
async def _generate_plain_cards(
self,
bank_entry_id: uuid.UUID,
user_id: uuid.UUID,
sense_id: uuid.UUID,
source_lang: str,
target_lang: str,
) -> None:
sense = await self.dict_repo.get_sense(sense_id)
if sense is None:
return
lemma = await self.dict_repo.get_lemma(uuid.UUID(sense.lemma_id))
if lemma is None:
return
for direction in ("target_to_source", "source_to_target"):
if direction == "target_to_source":
prompt, answer = lemma.headword, sense.gloss
else:
prompt, answer = sense.gloss, lemma.headword
await self.flashcard_repo.create_flashcard(
user_id=user_id,
bank_entry_id=bank_entry_id,
source_lang=source_lang,
target_lang=target_lang,
prompt_text=prompt,
answer_text=answer,
card_direction=direction,
)

View file

@ -32,6 +32,11 @@ class FlashcardEntity(Base):
answer_context_text: Mapped[str | None] = mapped_column(Text, nullable=True) answer_context_text: Mapped[str | None] = mapped_column(Text, nullable=True)
card_direction: Mapped[str] = mapped_column(Text, nullable=False) card_direction: Mapped[str] = mapped_column(Text, nullable=False)
prompt_modality: Mapped[str] = mapped_column(Text, nullable=False, default="text") prompt_modality: Mapped[str] = mapped_column(Text, nullable=False, default="text")
source_pack_flashcard_template_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("word_bank_pack_flashcard_template.id", ondelete="SET NULL"),
nullable=True,
)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), DateTime(timezone=True),
nullable=False, nullable=False,

View file

@ -0,0 +1,73 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import Boolean, ForeignKey, String, Text, DateTime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID, JSONB
from ..database import Base
class WordBankPackEntity(Base):
__tablename__ = "word_bank_pack"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(Text, nullable=False)
name_target: Mapped[str] = mapped_column(Text, nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
description_target: Mapped[str] = mapped_column(Text, nullable=False)
source_lang: Mapped[str] = mapped_column(String(2), nullable=False)
target_lang: Mapped[str] = mapped_column(String(2), nullable=False)
proficiencies: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
is_published: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
class WordBankPackEntryEntity(Base):
__tablename__ = "word_bank_pack_entry"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
pack_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("word_bank_pack.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
sense_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("dictionary_sense.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
surface_text: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
class WordBankPackFlashcardTemplateEntity(Base):
__tablename__ = "word_bank_pack_flashcard_template"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
pack_entry_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("word_bank_pack_entry.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
card_direction: Mapped[str] = mapped_column(Text, nullable=False)
prompt_text: Mapped[str] = mapped_column(Text, nullable=False)
answer_text: Mapped[str] = mapped_column(Text, nullable=False)
prompt_context_text: Mapped[str | None] = mapped_column(Text, nullable=True)
answer_context_text: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)

View file

@ -57,6 +57,11 @@ class LearnableWordBankEntryEntity(Base):
UUID(as_uuid=True), nullable=True UUID(as_uuid=True), nullable=True
) )
disambiguation_status: Mapped[str] = mapped_column(Text, nullable=False, default="pending") disambiguation_status: Mapped[str] = mapped_column(Text, nullable=False, default="pending")
pack_entry_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("word_bank_pack_entry.id", ondelete="SET NULL"),
nullable=True,
)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), DateTime(timezone=True),
nullable=False, nullable=False,

View file

@ -22,6 +22,7 @@ class FlashcardRepository(Protocol):
prompt_modality: str = "text", prompt_modality: str = "text",
prompt_context_text: str | None = None, prompt_context_text: str | None = None,
answer_context_text: str | None = None, answer_context_text: str | None = None,
source_pack_flashcard_template_id: uuid.UUID | None = None,
) -> Flashcard: ... ) -> Flashcard: ...
async def get_flashcards_for_user(self, user_id: uuid.UUID) -> list[Flashcard]: ... async def get_flashcards_for_user(self, user_id: uuid.UUID) -> list[Flashcard]: ...
@ -50,6 +51,11 @@ def _flashcard_to_model(entity: FlashcardEntity) -> Flashcard:
answer_context_text=entity.answer_context_text, answer_context_text=entity.answer_context_text,
card_direction=entity.card_direction, card_direction=entity.card_direction,
prompt_modality=entity.prompt_modality, prompt_modality=entity.prompt_modality,
source_pack_flashcard_template_id=(
str(entity.source_pack_flashcard_template_id)
if entity.source_pack_flashcard_template_id
else None
),
created_at=entity.created_at, created_at=entity.created_at,
) )
@ -81,6 +87,7 @@ class PostgresFlashcardRepository:
prompt_modality: str = "text", prompt_modality: str = "text",
prompt_context_text: str | None = None, prompt_context_text: str | None = None,
answer_context_text: str | None = None, answer_context_text: str | None = None,
source_pack_flashcard_template_id: uuid.UUID | None = None,
) -> Flashcard: ) -> Flashcard:
entity = FlashcardEntity( entity = FlashcardEntity(
user_id=user_id, user_id=user_id,
@ -93,6 +100,7 @@ class PostgresFlashcardRepository:
answer_context_text=answer_context_text, answer_context_text=answer_context_text,
card_direction=card_direction, card_direction=card_direction,
prompt_modality=prompt_modality, prompt_modality=prompt_modality,
source_pack_flashcard_template_id=source_pack_flashcard_template_id,
created_at=datetime.now(timezone.utc), created_at=datetime.now(timezone.utc),
) )
self.db.add(entity) self.db.add(entity)

View file

@ -0,0 +1,331 @@
import uuid
from datetime import datetime, timezone
from typing import Protocol
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from ..entities.pack_entities import (
WordBankPackEntity,
WordBankPackEntryEntity,
WordBankPackFlashcardTemplateEntity,
)
from ..entities.vocab_entities import LearnableWordBankEntryEntity
from ....domain.models.pack import WordBankPack, WordBankPackEntry, WordBankPackFlashcardTemplate
class PackRepository(Protocol):
async def create_pack(
self,
name: str,
name_target: str,
description: str,
description_target: str,
source_lang: str,
target_lang: str,
proficiencies: list[str],
) -> WordBankPack: ...
async def update_pack(
self,
pack_id: uuid.UUID,
name: str | None = None,
name_target: str | None = None,
description: str | None = None,
description_target: str | None = None,
proficiencies: list[str] | None = None,
) -> WordBankPack: ...
async def publish_pack(self, pack_id: uuid.UUID) -> WordBankPack: ...
async def get_pack(self, pack_id: uuid.UUID) -> WordBankPack | None: ...
async def list_packs(
self,
source_lang: str | None = None,
target_lang: str | None = None,
published_only: bool = False,
) -> list[WordBankPack]: ...
async def add_entry(
self,
pack_id: uuid.UUID,
sense_id: uuid.UUID | None,
surface_text: str,
) -> WordBankPackEntry: ...
async def remove_entry(self, entry_id: uuid.UUID) -> None: ...
async def get_entries_for_pack(self, pack_id: uuid.UUID) -> list[WordBankPackEntry]: ...
async def add_flashcard_template(
self,
pack_entry_id: uuid.UUID,
card_direction: str,
prompt_text: str,
answer_text: str,
prompt_context_text: str | None = None,
answer_context_text: str | None = None,
) -> WordBankPackFlashcardTemplate: ...
async def remove_flashcard_template(self, template_id: uuid.UUID) -> None: ...
async def get_templates_for_entry(
self, pack_entry_id: uuid.UUID
) -> list[WordBankPackFlashcardTemplate]: ...
async def get_templates_for_entries(
self, pack_entry_ids: list[uuid.UUID]
) -> dict[str, list[WordBankPackFlashcardTemplate]]: ...
async def count_entries_for_pack(self, pack_id: uuid.UUID) -> int: ...
async def get_pack_ids_added_by_user(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> set[str]: ...
def _pack_to_model(entity: WordBankPackEntity) -> WordBankPack:
return WordBankPack(
id=str(entity.id),
name=entity.name,
name_target=entity.name_target,
description=entity.description,
description_target=entity.description_target,
source_lang=entity.source_lang,
target_lang=entity.target_lang,
proficiencies=entity.proficiencies,
is_published=entity.is_published,
created_at=entity.created_at,
)
def _entry_to_model(entity: WordBankPackEntryEntity) -> WordBankPackEntry:
return WordBankPackEntry(
id=str(entity.id),
pack_id=str(entity.pack_id),
sense_id=str(entity.sense_id) if entity.sense_id else None,
surface_text=entity.surface_text,
created_at=entity.created_at,
)
def _template_to_model(entity: WordBankPackFlashcardTemplateEntity) -> WordBankPackFlashcardTemplate:
return WordBankPackFlashcardTemplate(
id=str(entity.id),
pack_entry_id=str(entity.pack_entry_id),
card_direction=entity.card_direction,
prompt_text=entity.prompt_text,
answer_text=entity.answer_text,
prompt_context_text=entity.prompt_context_text,
answer_context_text=entity.answer_context_text,
created_at=entity.created_at,
)
class PostgresPackRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create_pack(
self,
name: str,
name_target: str,
description: str,
description_target: str,
source_lang: str,
target_lang: str,
proficiencies: list[str],
) -> WordBankPack:
entity = WordBankPackEntity(
name=name,
name_target=name_target,
description=description,
description_target=description_target,
source_lang=source_lang,
target_lang=target_lang,
proficiencies=proficiencies,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _pack_to_model(entity)
async def update_pack(
self,
pack_id: uuid.UUID,
name: str | None = None,
name_target: str | None = None,
description: str | None = None,
description_target: str | None = None,
proficiencies: list[str] | None = None,
) -> WordBankPack:
result = await self.db.execute(
select(WordBankPackEntity).where(WordBankPackEntity.id == pack_id)
)
entity = result.scalar_one()
if name is not None:
entity.name = name
if name_target is not None:
entity.name_target = name_target
if description is not None:
entity.description = description
if description_target is not None:
entity.description_target = description_target
if proficiencies is not None:
entity.proficiencies = proficiencies
await self.db.commit()
await self.db.refresh(entity)
return _pack_to_model(entity)
async def publish_pack(self, pack_id: uuid.UUID) -> WordBankPack:
result = await self.db.execute(
select(WordBankPackEntity).where(WordBankPackEntity.id == pack_id)
)
entity = result.scalar_one()
entity.is_published = True
await self.db.commit()
await self.db.refresh(entity)
return _pack_to_model(entity)
async def get_pack(self, pack_id: uuid.UUID) -> WordBankPack | None:
result = await self.db.execute(
select(WordBankPackEntity).where(WordBankPackEntity.id == pack_id)
)
entity = result.scalar_one_or_none()
return _pack_to_model(entity) if entity else None
async def list_packs(
self,
source_lang: str | None = None,
target_lang: str | None = None,
published_only: bool = False,
) -> list[WordBankPack]:
query = select(WordBankPackEntity)
if source_lang:
query = query.where(WordBankPackEntity.source_lang == source_lang)
if target_lang:
query = query.where(WordBankPackEntity.target_lang == target_lang)
if published_only:
query = query.where(WordBankPackEntity.is_published.is_(True))
query = query.order_by(WordBankPackEntity.created_at.desc())
result = await self.db.execute(query)
return [_pack_to_model(e) for e in result.scalars().all()]
async def add_entry(
self,
pack_id: uuid.UUID,
sense_id: uuid.UUID | None,
surface_text: str,
) -> WordBankPackEntry:
entity = WordBankPackEntryEntity(
pack_id=pack_id,
sense_id=sense_id,
surface_text=surface_text,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _entry_to_model(entity)
async def remove_entry(self, entry_id: uuid.UUID) -> None:
result = await self.db.execute(
select(WordBankPackEntryEntity).where(WordBankPackEntryEntity.id == entry_id)
)
entity = result.scalar_one_or_none()
if entity:
await self.db.delete(entity)
await self.db.commit()
async def get_entries_for_pack(self, pack_id: uuid.UUID) -> list[WordBankPackEntry]:
result = await self.db.execute(
select(WordBankPackEntryEntity)
.where(WordBankPackEntryEntity.pack_id == pack_id)
.order_by(WordBankPackEntryEntity.created_at.asc())
)
return [_entry_to_model(e) for e in result.scalars().all()]
async def add_flashcard_template(
self,
pack_entry_id: uuid.UUID,
card_direction: str,
prompt_text: str,
answer_text: str,
prompt_context_text: str | None = None,
answer_context_text: str | None = None,
) -> WordBankPackFlashcardTemplate:
entity = WordBankPackFlashcardTemplateEntity(
pack_entry_id=pack_entry_id,
card_direction=card_direction,
prompt_text=prompt_text,
answer_text=answer_text,
prompt_context_text=prompt_context_text,
answer_context_text=answer_context_text,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _template_to_model(entity)
async def remove_flashcard_template(self, template_id: uuid.UUID) -> None:
result = await self.db.execute(
select(WordBankPackFlashcardTemplateEntity).where(
WordBankPackFlashcardTemplateEntity.id == template_id
)
)
entity = result.scalar_one_or_none()
if entity:
await self.db.delete(entity)
await self.db.commit()
async def get_templates_for_entry(
self, pack_entry_id: uuid.UUID
) -> list[WordBankPackFlashcardTemplate]:
result = await self.db.execute(
select(WordBankPackFlashcardTemplateEntity)
.where(WordBankPackFlashcardTemplateEntity.pack_entry_id == pack_entry_id)
.order_by(WordBankPackFlashcardTemplateEntity.created_at.asc())
)
return [_template_to_model(e) for e in result.scalars().all()]
async def get_templates_for_entries(
self, pack_entry_ids: list[uuid.UUID]
) -> dict[str, list[WordBankPackFlashcardTemplate]]:
if not pack_entry_ids:
return {}
result = await self.db.execute(
select(WordBankPackFlashcardTemplateEntity)
.where(WordBankPackFlashcardTemplateEntity.pack_entry_id.in_(pack_entry_ids))
.order_by(WordBankPackFlashcardTemplateEntity.created_at.asc())
)
grouped: dict[str, list[WordBankPackFlashcardTemplate]] = {}
for entity in result.scalars().all():
key = str(entity.pack_entry_id)
grouped.setdefault(key, []).append(_template_to_model(entity))
return grouped
async def count_entries_for_pack(self, pack_id: uuid.UUID) -> int:
result = await self.db.execute(
select(func.count()).where(WordBankPackEntryEntity.pack_id == pack_id)
)
return result.scalar_one()
async def get_pack_ids_added_by_user(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> set[str]:
result = await self.db.execute(
select(WordBankPackEntryEntity.pack_id)
.join(
LearnableWordBankEntryEntity,
LearnableWordBankEntryEntity.pack_entry_id == WordBankPackEntryEntity.id,
)
.where(
LearnableWordBankEntryEntity.user_id == user_id,
LearnableWordBankEntryEntity.language_pair_id == language_pair_id,
)
.distinct()
)
return {str(row) for row in result.scalars().all()}

View file

@ -27,8 +27,13 @@ class VocabRepository(Protocol):
wordform_id: uuid.UUID | None = None, wordform_id: uuid.UUID | None = None,
source_article_id: uuid.UUID | None = None, source_article_id: uuid.UUID | None = None,
disambiguation_status: str = "pending", disambiguation_status: str = "pending",
pack_entry_id: uuid.UUID | None = None,
) -> LearnableWordBankEntry: ... ) -> LearnableWordBankEntry: ...
async def get_sense_ids_for_user_in_pair(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> set[str]: ...
async def get_entries_for_user( async def get_entries_for_user(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> list[LearnableWordBankEntry]: ... ) -> list[LearnableWordBankEntry]: ...
@ -63,6 +68,7 @@ def _entry_to_model(entity: LearnableWordBankEntryEntity) -> LearnableWordBankEn
entry_pathway=entity.entry_pathway, entry_pathway=entity.entry_pathway,
source_article_id=str(entity.source_article_id) if entity.source_article_id else None, source_article_id=str(entity.source_article_id) if entity.source_article_id else None,
disambiguation_status=entity.disambiguation_status, disambiguation_status=entity.disambiguation_status,
pack_entry_id=str(entity.pack_entry_id) if entity.pack_entry_id else None,
created_at=entity.created_at, created_at=entity.created_at,
) )
@ -110,6 +116,7 @@ class PostgresVocabRepository:
wordform_id: uuid.UUID | None = None, wordform_id: uuid.UUID | None = None,
source_article_id: uuid.UUID | None = None, source_article_id: uuid.UUID | None = None,
disambiguation_status: str = "pending", disambiguation_status: str = "pending",
pack_entry_id: uuid.UUID | None = None,
) -> LearnableWordBankEntry: ) -> LearnableWordBankEntry:
entity = LearnableWordBankEntryEntity( entity = LearnableWordBankEntryEntity(
user_id=user_id, user_id=user_id,
@ -121,6 +128,7 @@ class PostgresVocabRepository:
wordform_id=wordform_id, wordform_id=wordform_id,
source_article_id=source_article_id, source_article_id=source_article_id,
disambiguation_status=disambiguation_status, disambiguation_status=disambiguation_status,
pack_entry_id=pack_entry_id,
created_at=datetime.now(timezone.utc), created_at=datetime.now(timezone.utc),
) )
self.db.add(entity) self.db.add(entity)
@ -128,6 +136,19 @@ class PostgresVocabRepository:
await self.db.refresh(entity) await self.db.refresh(entity)
return _entry_to_model(entity) return _entry_to_model(entity)
async def get_sense_ids_for_user_in_pair(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> set[str]:
from sqlalchemy import select
result = await self.db.execute(
select(LearnableWordBankEntryEntity.sense_id).where(
LearnableWordBankEntryEntity.user_id == user_id,
LearnableWordBankEntryEntity.language_pair_id == language_pair_id,
LearnableWordBankEntryEntity.sense_id.is_not(None),
)
)
return {str(row) for row in result.scalars().all()}
async def get_entries_for_user( async def get_entries_for_user(
self, user_id: uuid.UUID, language_pair_id: uuid.UUID self, user_id: uuid.UUID, language_pair_id: uuid.UUID
) -> list[LearnableWordBankEntry]: ) -> list[LearnableWordBankEntry]:

View file

View file

@ -0,0 +1,303 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from ....auth import require_admin
from ....domain.services.pack_service import PackService, PackNotFoundError
from ....outbound.postgres.database import get_db
from ....outbound.postgres.repositories.pack_repository import PostgresPackRepository
from ....outbound.postgres.repositories.vocab_repository import PostgresVocabRepository
from ....outbound.postgres.repositories.flashcard_repository import PostgresFlashcardRepository
from ....outbound.postgres.repositories.dictionary_repository import PostgresDictionaryRepository
router = APIRouter(prefix="/admin/packs", tags=["admin-packs"])
# ── Request / Response models ─────────────────────────────────────────────────
class CreatePackRequest(BaseModel):
name: str
name_target: str
description: str
description_target: str
source_lang: str
target_lang: str
proficiencies: list[str] = []
class UpdatePackRequest(BaseModel):
name: str | None = None
name_target: str | None = None
description: str | None = None
description_target: str | None = None
proficiencies: list[str] | None = None
class AddEntryRequest(BaseModel):
sense_id: str | None = None
surface_text: str
class AddFlashcardTemplateRequest(BaseModel):
card_direction: str
prompt_text: str
answer_text: str
prompt_context_text: str | None = None
answer_context_text: str | None = None
class FlashcardTemplateResponse(BaseModel):
id: str
pack_entry_id: str
card_direction: str
prompt_text: str
answer_text: str
prompt_context_text: str | None
answer_context_text: str | None
created_at: str
class PackEntryResponse(BaseModel):
id: str
pack_id: str
sense_id: str | None
surface_text: str
created_at: str
flashcard_templates: list[FlashcardTemplateResponse] = []
class PackResponse(BaseModel):
id: str
name: str
name_target: str
description: str
description_target: str
source_lang: str
target_lang: str
proficiencies: list[str]
is_published: bool
created_at: str
class PackDetailResponse(PackResponse):
entries: list[PackEntryResponse] = []
# ── Dependency ────────────────────────────────────────────────────────────────
def _service(db: AsyncSession) -> PackService:
return PackService(
pack_repo=PostgresPackRepository(db),
vocab_repo=PostgresVocabRepository(db),
flashcard_repo=PostgresFlashcardRepository(db),
dict_repo=PostgresDictionaryRepository(db),
)
def _pack_repo(db: AsyncSession) -> PostgresPackRepository:
return PostgresPackRepository(db)
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.post("", response_model=PackResponse, status_code=201)
async def create_pack(
request: CreatePackRequest,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> PackResponse:
pack = await _service(db).create_pack(
name=request.name,
name_target=request.name_target,
description=request.description,
description_target=request.description_target,
source_lang=request.source_lang,
target_lang=request.target_lang,
proficiencies=request.proficiencies,
)
return _to_pack_response(pack)
@router.get("", response_model=list[PackResponse])
async def list_packs(
source_lang: str | None = None,
target_lang: str | None = None,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> list[PackResponse]:
packs = await _pack_repo(db).list_packs(source_lang=source_lang, target_lang=target_lang)
return [_to_pack_response(p) for p in packs]
@router.get("/{pack_id}", response_model=PackDetailResponse)
async def get_pack(
pack_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> PackDetailResponse:
repo = _pack_repo(db)
pack = await repo.get_pack(_parse_uuid(pack_id))
if pack is None:
raise HTTPException(status_code=404, detail="Pack not found")
entries = await repo.get_entries_for_pack(uuid.UUID(pack.id))
entry_ids = [uuid.UUID(e.id) for e in entries]
templates_by_entry = await repo.get_templates_for_entries(entry_ids)
entry_responses = [
PackEntryResponse(
id=e.id,
pack_id=e.pack_id,
sense_id=e.sense_id,
surface_text=e.surface_text,
created_at=e.created_at.isoformat(),
flashcard_templates=[
_to_template_response(t) for t in templates_by_entry.get(e.id, [])
],
)
for e in entries
]
return PackDetailResponse(**_to_pack_response(pack).model_dump(), entries=entry_responses)
@router.patch("/{pack_id}", response_model=PackResponse)
async def update_pack(
pack_id: str,
request: UpdatePackRequest,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> PackResponse:
try:
pack = await _service(db).update_pack(
pack_id=_parse_uuid(pack_id),
name=request.name,
name_target=request.name_target,
description=request.description,
description_target=request.description_target,
proficiencies=request.proficiencies,
)
except PackNotFoundError:
raise HTTPException(status_code=404, detail="Pack not found")
return _to_pack_response(pack)
@router.post("/{pack_id}/publish", response_model=PackResponse)
async def publish_pack(
pack_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> PackResponse:
try:
pack = await _service(db).publish_pack(_parse_uuid(pack_id))
except PackNotFoundError:
raise HTTPException(status_code=404, detail="Pack not found")
return _to_pack_response(pack)
@router.post("/{pack_id}/entries", response_model=PackEntryResponse, status_code=201)
async def add_entry(
pack_id: str,
request: AddEntryRequest,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> PackEntryResponse:
sense_id = _parse_uuid(request.sense_id) if request.sense_id else None
try:
entry = await _service(db).add_entry_to_pack(
pack_id=_parse_uuid(pack_id),
sense_id=sense_id,
surface_text=request.surface_text,
)
except PackNotFoundError:
raise HTTPException(status_code=404, detail="Pack not found")
return PackEntryResponse(
id=entry.id,
pack_id=entry.pack_id,
sense_id=entry.sense_id,
surface_text=entry.surface_text,
created_at=entry.created_at.isoformat(),
flashcard_templates=[],
)
@router.delete("/{pack_id}/entries/{entry_id}", status_code=204)
async def remove_entry(
pack_id: str,
entry_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> None:
await _pack_repo(db).remove_entry(_parse_uuid(entry_id))
@router.post(
"/{pack_id}/entries/{entry_id}/flashcards",
response_model=FlashcardTemplateResponse,
status_code=201,
)
async def add_flashcard_template(
pack_id: str,
entry_id: str,
request: AddFlashcardTemplateRequest,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> FlashcardTemplateResponse:
template = await _service(db).add_flashcard_template_to_entry(
pack_entry_id=_parse_uuid(entry_id),
card_direction=request.card_direction,
prompt_text=request.prompt_text,
answer_text=request.answer_text,
prompt_context_text=request.prompt_context_text,
answer_context_text=request.answer_context_text,
)
return _to_template_response(template)
@router.delete("/{pack_id}/entries/{entry_id}/flashcards/{template_id}", status_code=204)
async def remove_flashcard_template(
pack_id: str,
entry_id: str,
template_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(require_admin),
) -> None:
await _pack_repo(db).remove_flashcard_template(_parse_uuid(template_id))
# ── Helpers ───────────────────────────────────────────────────────────────────
def _parse_uuid(value: str) -> uuid.UUID:
try:
return uuid.UUID(value)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid UUID: {value!r}")
def _to_pack_response(pack) -> PackResponse:
return PackResponse(
id=pack.id,
name=pack.name,
name_target=pack.name_target,
description=pack.description,
description_target=pack.description_target,
source_lang=pack.source_lang,
target_lang=pack.target_lang,
proficiencies=pack.proficiencies,
is_published=pack.is_published,
created_at=pack.created_at.isoformat(),
)
def _to_template_response(template) -> FlashcardTemplateResponse:
return FlashcardTemplateResponse(
id=template.id,
pack_entry_id=template.pack_entry_id,
card_direction=template.card_direction,
prompt_text=template.prompt_text,
answer_text=template.answer_text,
prompt_context_text=template.prompt_context_text,
answer_context_text=template.answer_context_text,
created_at=template.created_at.isoformat(),
)

View file

@ -72,7 +72,7 @@ async def login(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password"
) )
return TokenResponse(access_token=create_access_token(account.id, account.email)) return TokenResponse(access_token=create_access_token(account.id, account.email, account.is_admin))
@router.get("/verify-email") @router.get("/verify-email")

View file

@ -0,0 +1,99 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import verify_token
from ...outbound.postgres.database import get_db
from ...outbound.postgres.repositories.dictionary_repository import PostgresDictionaryRepository
router = APIRouter(prefix="/dictionary", tags=["dictionary"])
# ── Response models ───────────────────────────────────────────────────────────
class SenseResponse(BaseModel):
id: str
sense_index: int
gloss: str
topics: list[str]
tags: list[str]
class LemmaResponse(BaseModel):
id: str
headword: str
language: str
pos_raw: str
pos_normalised: str | None
gender: str | None
tags: list[str]
class WordformMatch(BaseModel):
lemma: LemmaResponse
senses: list[SenseResponse]
# ── Endpoint ──────────────────────────────────────────────────────────────────
@router.get("/wordforms", response_model=list[WordformMatch])
async def search_wordforms(
lang_code: str,
text: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
) -> list[WordformMatch]:
"""
Search for a wordform by surface text within a language.
Returns one entry per matching lemma, each with the lemma's senses. A single
form (e.g. "allons") may resolve to more than one lemma when homographs exist.
"""
repo = PostgresDictionaryRepository(db)
wordforms = await repo.get_wordforms_by_form(text, lang_code)
if not wordforms:
return []
# Deduplicate lemma IDs — multiple wordform rows may point to the same lemma
seen_lemma_ids: set[str] = set()
results: list[WordformMatch] = []
for wf in wordforms:
if wf.lemma_id in seen_lemma_ids:
continue
seen_lemma_ids.add(wf.lemma_id)
lemma = await repo.get_lemma(uuid.UUID(wf.lemma_id))
if lemma is None:
continue
senses = await repo.get_senses_for_lemma(uuid.UUID(wf.lemma_id))
results.append(
WordformMatch(
lemma=LemmaResponse(
id=lemma.id,
headword=lemma.headword,
language=lemma.language,
pos_raw=lemma.pos_raw,
pos_normalised=lemma.pos_normalised,
gender=lemma.gender,
tags=lemma.tags,
),
senses=[
SenseResponse(
id=s.id,
sense_index=s.sense_index,
gloss=s.gloss,
topics=s.topics,
tags=s.tags,
)
for s in senses
],
)
)
return results

View file

@ -1,5 +1,6 @@
from .account import router as account_router from .account import router as account_router
from .auth import router as auth_router from .auth import router as auth_router
from .dictionary import router as dictionary_router
from .flashcards import router as flashcards_router from .flashcards import router as flashcards_router
from .pos import router as pos_router from .pos import router as pos_router
from .translate import router as translate_router from .translate import router as translate_router
@ -7,6 +8,8 @@ from .generation import router as generation_router
from .jobs import router as jobs_router from .jobs import router as jobs_router
from .learnable_languages import router as learnable_languages_router from .learnable_languages import router as learnable_languages_router
from .vocab import router as vocab_router from .vocab import router as vocab_router
from .packs import router as packs_router
from .admin.packs import router as admin_packs_router
from fastapi import APIRouter from fastapi import APIRouter
@ -14,6 +17,7 @@ api_router = APIRouter(prefix="/api", tags=["api"])
api_router.include_router(auth_router) api_router.include_router(auth_router)
api_router.include_router(account_router) api_router.include_router(account_router)
api_router.include_router(dictionary_router)
api_router.include_router(flashcards_router) api_router.include_router(flashcards_router)
api_router.include_router(pos_router) api_router.include_router(pos_router)
api_router.include_router(translate_router) api_router.include_router(translate_router)
@ -21,3 +25,5 @@ api_router.include_router(generation_router)
api_router.include_router(jobs_router) api_router.include_router(jobs_router)
api_router.include_router(learnable_languages_router) api_router.include_router(learnable_languages_router)
api_router.include_router(vocab_router) api_router.include_router(vocab_router)
api_router.include_router(packs_router)
api_router.include_router(admin_packs_router)

View file

@ -0,0 +1,151 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import verify_token
from ...domain.services.pack_service import PackService, PackNotFoundError, DuplicateEntryError
from ...outbound.postgres.database import get_db
from ...outbound.postgres.repositories.pack_repository import PostgresPackRepository
from ...outbound.postgres.repositories.vocab_repository import PostgresVocabRepository
from ...outbound.postgres.repositories.flashcard_repository import PostgresFlashcardRepository
from ...outbound.postgres.repositories.dictionary_repository import PostgresDictionaryRepository
router = APIRouter(prefix="/packs", tags=["packs"])
# ── Response models ───────────────────────────────────────────────────────────
class PackSummaryResponse(BaseModel):
id: str
name: str
name_target: str
description: str
description_target: str
source_lang: str
target_lang: str
proficiencies: list[str]
entry_count: int
class PackDetailResponse(PackSummaryResponse):
surface_texts: list[str]
class AddTobankRequest(BaseModel):
source_lang: str
target_lang: str
class AddTobankResponse(BaseModel):
added: list[str]
# ── Dependency ────────────────────────────────────────────────────────────────
def _service(db: AsyncSession) -> PackService:
return PackService(
pack_repo=PostgresPackRepository(db),
vocab_repo=PostgresVocabRepository(db),
flashcard_repo=PostgresFlashcardRepository(db),
dict_repo=PostgresDictionaryRepository(db),
)
def _pack_repo(db: AsyncSession) -> PostgresPackRepository:
return PostgresPackRepository(db)
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("", response_model=list[PackSummaryResponse])
async def list_packs(
source_lang: str | None = None,
target_lang: str | None = None,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
) -> list[PackSummaryResponse]:
repo = _pack_repo(db)
packs = await repo.list_packs(
source_lang=source_lang, target_lang=target_lang, published_only=True
)
responses = []
for pack in packs:
count = await repo.count_entries_for_pack(uuid.UUID(pack.id))
responses.append(
PackSummaryResponse(
id=pack.id,
name=pack.name,
name_target=pack.name_target,
description=pack.description,
description_target=pack.description_target,
source_lang=pack.source_lang,
target_lang=pack.target_lang,
proficiencies=pack.proficiencies,
entry_count=count,
)
)
return responses
@router.get("/{pack_id}", response_model=PackDetailResponse)
async def get_pack(
pack_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
) -> PackDetailResponse:
repo = _pack_repo(db)
pack = await repo.get_pack(_parse_uuid(pack_id))
if pack is None or not pack.is_published:
raise HTTPException(status_code=404, detail="Pack not found")
entries = await repo.get_entries_for_pack(uuid.UUID(pack.id))
count = len(entries)
surface_texts = [e.surface_text for e in entries]
return PackDetailResponse(
id=pack.id,
name=pack.name,
name_target=pack.name_target,
description=pack.description,
description_target=pack.description_target,
source_lang=pack.source_lang,
target_lang=pack.target_lang,
proficiencies=pack.proficiencies,
entry_count=count,
surface_texts=surface_texts,
)
@router.post("/{pack_id}/add-to-bank", response_model=AddTobankResponse, status_code=201)
async def add_pack_to_bank(
pack_id: str,
request: AddTobankRequest,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> AddTobankResponse:
user_id = uuid.UUID(token_data["sub"])
try:
result = await _service(db).add_pack_to_user_bank(
pack_id=_parse_uuid(pack_id),
user_id=user_id,
source_lang=request.source_lang,
target_lang=request.target_lang,
)
except PackNotFoundError:
raise HTTPException(status_code=404, detail="Pack not found")
except DuplicateEntryError as exc:
raise HTTPException(status_code=409, detail=str(exc))
return AddTobankResponse(added=result.added_surface_texts)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _parse_uuid(value: str) -> uuid.UUID:
try:
return uuid.UUID(value)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid UUID: {value!r}")

View file

@ -1,6 +1,7 @@
from .account import router as account_router from .account import router as account_router
from .articles import router as article_router from .articles import router as article_router
from .user_profile import router as user_profile_router from .user_profile import router as user_profile_router
from .packs import router as packs_router
from fastapi import APIRouter from fastapi import APIRouter
@ -9,3 +10,4 @@ bff_router = APIRouter(prefix="/bff", tags=["bff"])
bff_router.include_router(account_router) bff_router.include_router(account_router)
bff_router.include_router(article_router) bff_router.include_router(article_router)
bff_router.include_router(user_profile_router) bff_router.include_router(user_profile_router)
bff_router.include_router(packs_router)

View file

@ -0,0 +1,78 @@
import uuid
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import verify_token
from ...outbound.postgres.database import get_db
from ...outbound.postgres.entities.vocab_entities import UserLanguagePairEntity
from ...outbound.postgres.repositories.pack_repository import PostgresPackRepository
router = APIRouter(prefix="/packs", tags=["bff-packs"])
class PackSelectionItem(BaseModel):
id: str
name: str
name_target: str
description: str
description_target: str
source_lang: str
target_lang: str
proficiencies: list[str]
entry_count: int
already_added: bool
@router.get("", response_model=list[PackSelectionItem])
async def list_packs_for_selection(
source_lang: str,
target_lang: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> list[PackSelectionItem]:
user_id = uuid.UUID(token_data["sub"])
pack_repo = PostgresPackRepository(db)
packs = await pack_repo.list_packs(
source_lang=source_lang,
target_lang=target_lang,
published_only=True,
)
# The UserLanguagePair may not exist yet for brand-new users
result = await db.execute(
select(UserLanguagePairEntity).where(
UserLanguagePairEntity.user_id == user_id,
UserLanguagePairEntity.source_lang == source_lang,
UserLanguagePairEntity.target_lang == target_lang,
)
)
pair_entity = result.scalar_one_or_none()
already_added_ids: set[str] = set()
if pair_entity is not None:
already_added_ids = await pack_repo.get_pack_ids_added_by_user(
user_id, pair_entity.id
)
items = []
for pack in packs:
count = await pack_repo.count_entries_for_pack(uuid.UUID(pack.id))
items.append(
PackSelectionItem(
id=pack.id,
name=pack.name,
name_target=pack.name_target,
description=pack.description,
description_target=pack.description_target,
source_lang=pack.source_lang,
target_lang=pack.target_lang,
proficiencies=pack.proficiencies,
entry_count=count,
already_added=pack.id in already_added_ids,
)
)
return items

View file

@ -26,3 +26,10 @@ build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["app"] packages = ["app"]
[dependency-groups]
dev = [
"httpx>=0.28.1",
"pytest>=9.0.3",
"pytest-asyncio>=1.3.0",
]

423
tests/test_packs.py Normal file
View file

@ -0,0 +1,423 @@
"""
End-to-end tests for the Word Bank Pack feature.
Admin endpoints require a user whose email matches ADMIN_USER_EMAILS (admin@test.com).
User endpoints require any authenticated user.
"""
import uuid
import httpx
import pytest
ADMIN_EMAIL = "admin@test.com"
ADMIN_PASSWORD = "adminpassword123"
USER_EMAIL = "packuser@example.com"
USER_PASSWORD = "userpassword123"
# ── Auth helpers ──────────────────────────────────────────────────────────────
def _register_and_login(client: httpx.Client, email: str, password: str) -> str:
"""Return a Bearer token for the given credentials, registering first if needed."""
client.post("/auth/register", json={"email": email, "password": password})
resp = client.post("/auth/login", json={"email": email, "password": password})
return resp.json()["access_token"]
@pytest.fixture
def admin_client(client: httpx.Client) -> httpx.Client:
token = _register_and_login(client, ADMIN_EMAIL, ADMIN_PASSWORD)
client.headers["Authorization"] = f"Bearer {token}"
return client
@pytest.fixture
def user_client(client: httpx.Client) -> httpx.Client:
token = _register_and_login(client, USER_EMAIL, USER_PASSWORD)
client.headers["Authorization"] = f"Bearer {token}"
return client
@pytest.fixture
def unauthed_client(client: httpx.Client) -> httpx.Client:
return client
# ── Admin: create / list / update / publish ───────────────────────────────────
def test_admin_creates_pack(admin_client: httpx.Client):
resp = admin_client.post(
"/api/admin/packs",
json={
"name": "Food & Drink",
"name_target": "La Nourriture et les Boissons",
"description": "Common food and drink vocabulary.",
"description_target": "Vocabulaire courant de nourriture et de boissons.",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": ["A1", "A2"],
},
)
assert resp.status_code == 201
body = resp.json()
assert body["name"] == "Food & Drink"
assert body["is_published"] is False
assert body["proficiencies"] == ["A1", "A2"]
assert "id" in body
def test_non_admin_cannot_create_pack(user_client: httpx.Client):
resp = user_client.post(
"/api/admin/packs",
json={
"name": "Sneaky Pack",
"name_target": "Pack Sournois",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
assert resp.status_code == 403
def test_admin_lists_packs_including_unpublished(admin_client: httpx.Client):
admin_client.post(
"/api/admin/packs",
json={
"name": f"Draft Pack {uuid.uuid4()}",
"name_target": "Paquet Brouillon",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
resp = admin_client.get("/api/admin/packs")
assert resp.status_code == 200
packs = resp.json()
assert isinstance(packs, list)
assert len(packs) >= 1
def test_admin_updates_pack(admin_client: httpx.Client):
create_resp = admin_client.post(
"/api/admin/packs",
json={
"name": "Original Name",
"name_target": "Nom Original",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": ["A1"],
},
)
pack_id = create_resp.json()["id"]
resp = admin_client.patch(
f"/api/admin/packs/{pack_id}",
json={"name": "Updated Name", "proficiencies": ["A1", "A2"]},
)
assert resp.status_code == 200
assert resp.json()["name"] == "Updated Name"
assert resp.json()["proficiencies"] == ["A1", "A2"]
def test_admin_publishes_pack(admin_client: httpx.Client):
create_resp = admin_client.post(
"/api/admin/packs",
json={
"name": "Soon Published",
"name_target": "Bientôt Publié",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
pack_id = create_resp.json()["id"]
resp = admin_client.post(f"/api/admin/packs/{pack_id}/publish")
assert resp.status_code == 200
assert resp.json()["is_published"] is True
# ── Admin: entries and flashcard templates ────────────────────────────────────
def _create_published_pack(admin_client: httpx.Client) -> str:
resp = admin_client.post(
"/api/admin/packs",
json={
"name": f"Test Pack {uuid.uuid4()}",
"name_target": "Paquet Test",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": ["A1"],
},
)
pack_id = resp.json()["id"]
admin_client.post(f"/api/admin/packs/{pack_id}/publish")
return pack_id
def test_admin_adds_entry_to_pack(admin_client: httpx.Client):
pack_id = _create_published_pack(admin_client)
resp = admin_client.post(
f"/api/admin/packs/{pack_id}/entries",
json={"surface_text": "bonjour"},
)
assert resp.status_code == 201
assert resp.json()["surface_text"] == "bonjour"
assert resp.json()["pack_id"] == pack_id
def test_admin_adds_flashcard_template_to_entry(admin_client: httpx.Client):
pack_id = _create_published_pack(admin_client)
entry_resp = admin_client.post(
f"/api/admin/packs/{pack_id}/entries",
json={"surface_text": "aller"},
)
entry_id = entry_resp.json()["id"]
resp = admin_client.post(
f"/api/admin/packs/{pack_id}/entries/{entry_id}/flashcards",
json={
"card_direction": "target_to_source",
"prompt_text": "aller",
"answer_text": "to go",
"prompt_context_text": "il veut [aller] au cinéma",
"answer_context_text": "he wants [to go] to the cinema",
},
)
assert resp.status_code == 201
body = resp.json()
assert body["card_direction"] == "target_to_source"
assert body["prompt_context_text"] == "il veut [aller] au cinéma"
def test_admin_get_pack_detail_includes_entries_and_templates(admin_client: httpx.Client):
pack_id = _create_published_pack(admin_client)
entry_resp = admin_client.post(
f"/api/admin/packs/{pack_id}/entries",
json={"surface_text": "maison"},
)
entry_id = entry_resp.json()["id"]
admin_client.post(
f"/api/admin/packs/{pack_id}/entries/{entry_id}/flashcards",
json={
"card_direction": "source_to_target",
"prompt_text": "house",
"answer_text": "maison",
},
)
resp = admin_client.get(f"/api/admin/packs/{pack_id}")
assert resp.status_code == 200
body = resp.json()
assert len(body["entries"]) == 1
assert body["entries"][0]["surface_text"] == "maison"
assert len(body["entries"][0]["flashcard_templates"]) == 1
def test_admin_removes_entry_from_pack(admin_client: httpx.Client):
pack_id = _create_published_pack(admin_client)
entry_resp = admin_client.post(
f"/api/admin/packs/{pack_id}/entries",
json={"surface_text": "chat"},
)
entry_id = entry_resp.json()["id"]
del_resp = admin_client.delete(f"/api/admin/packs/{pack_id}/entries/{entry_id}")
assert del_resp.status_code == 204
detail = admin_client.get(f"/api/admin/packs/{pack_id}")
assert all(e["id"] != entry_id for e in detail.json()["entries"])
# ── User: browse published packs ──────────────────────────────────────────────
def test_user_only_sees_published_packs(admin_client: httpx.Client, user_client: httpx.Client):
# Create and leave unpublished
admin_client.post(
"/api/admin/packs",
json={
"name": f"Hidden {uuid.uuid4()}",
"name_target": "Caché",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
# Create and publish
create_resp = admin_client.post(
"/api/admin/packs",
json={
"name": f"Visible {uuid.uuid4()}",
"name_target": "Visible",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
visible_id = create_resp.json()["id"]
admin_client.post(f"/api/admin/packs/{visible_id}/publish")
resp = user_client.get("/api/packs", params={"source_lang": "en", "target_lang": "fr"})
assert resp.status_code == 200
ids = [p["id"] for p in resp.json()]
assert visible_id in ids
def test_user_cannot_see_unpublished_pack_by_id(
admin_client: httpx.Client, user_client: httpx.Client
):
create_resp = admin_client.post(
"/api/admin/packs",
json={
"name": "Secret Draft",
"name_target": "Brouillon Secret",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
pack_id = create_resp.json()["id"]
resp = user_client.get(f"/api/packs/{pack_id}")
assert resp.status_code == 404
def test_user_sees_surface_texts_in_pack_detail(
admin_client: httpx.Client, user_client: httpx.Client
):
pack_id = _create_published_pack(admin_client)
admin_client.post(
f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "chat"}
)
admin_client.post(
f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "chien"}
)
resp = user_client.get(f"/api/packs/{pack_id}")
assert resp.status_code == 200
body = resp.json()
assert body["entry_count"] == 2
assert set(body["surface_texts"]) == {"chat", "chien"}
# ── User: add pack to bank ────────────────────────────────────────────────────
def _setup_fresh_user(client: httpx.Client) -> None:
"""Register and log in as a fresh user (sets Authorization header on client)."""
email = f"packtest-{uuid.uuid4()}@example.com"
client.post("/auth/register", json={"email": email, "password": "password123"})
token_resp = client.post("/auth/login", json={"email": email, "password": "password123"})
client.headers["Authorization"] = f"Bearer {token_resp.json()['access_token']}"
def test_add_pack_to_bank_creates_bank_entries(
admin_client: httpx.Client, client: httpx.Client
):
pack_id = _create_published_pack(admin_client)
admin_client.post(f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "bonjour"})
admin_client.post(f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "merci"})
_setup_fresh_user(client)
resp = client.post(
f"/api/packs/{pack_id}/add-to-bank",
json={"source_lang": "en", "target_lang": "fr"},
)
assert resp.status_code == 201
body = resp.json()
assert set(body["added"]) == {"bonjour", "merci"}
def test_add_unpublished_pack_to_bank_returns_404(
admin_client: httpx.Client, client: httpx.Client
):
create_resp = admin_client.post(
"/api/admin/packs",
json={
"name": "Draft Only",
"name_target": "Brouillon Seulement",
"description": "d",
"description_target": "d",
"source_lang": "en",
"target_lang": "fr",
"proficiencies": [],
},
)
pack_id = create_resp.json()["id"]
_setup_fresh_user(client)
resp = client.post(
f"/api/packs/{pack_id}/add-to-bank",
json={"source_lang": "en", "target_lang": "fr"},
)
assert resp.status_code == 404
def test_add_pack_duplicate_plain_card_returns_409(
admin_client: httpx.Client, client: httpx.Client
):
"""Adding a pack whose plain-card entry the user already has returns 409."""
pack_id = _create_published_pack(admin_client)
admin_client.post(f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "maison"})
_setup_fresh_user(client)
# Add the pack once — succeeds
client.post(
f"/api/packs/{pack_id}/add-to-bank",
json={"source_lang": "en", "target_lang": "fr"},
)
# Add it again — same plain card, should 409
resp = client.post(
f"/api/packs/{pack_id}/add-to-bank",
json={"source_lang": "en", "target_lang": "fr"},
)
assert resp.status_code == 409
assert "maison" in resp.json()["detail"]
# ── BFF: pack selection screen ────────────────────────────────────────────────
def test_bff_packs_shows_already_added_flag(
admin_client: httpx.Client, client: httpx.Client
):
pack_id = _create_published_pack(admin_client)
admin_client.post(f"/api/admin/packs/{pack_id}/entries", json={"surface_text": "eau"})
_setup_fresh_user(client)
# Before adding
resp = client.get("/bff/packs", params={"source_lang": "en", "target_lang": "fr"})
assert resp.status_code == 200
pack_item = next((p for p in resp.json() if p["id"] == pack_id), None)
assert pack_item is not None
assert pack_item["already_added"] is False
# Add the pack
client.post(
f"/api/packs/{pack_id}/add-to-bank",
json={"source_lang": "en", "target_lang": "fr"},
)
# After adding
resp = client.get("/bff/packs", params={"source_lang": "en", "target_lang": "fr"})
pack_item = next((p for p in resp.json() if p["id"] == pack_id), None)
assert pack_item["already_added"] is True