#!/usr/bin/env python """ CLI import script for kaikki/wiktextract JSONL dictionary data. Usage (from api/ directory): uv run ./scripts/import_dictionary.py --lang fr # or via Make from the repo root: make import-dictionary lang=fr DATABASE_URL defaults to postgresql+asyncpg://langlearn:langlearn@localhost:5432/langlearn which matches the docker-compose dev credentials when the DB port is exposed on the host. """ import argparse import asyncio import json import os import sys import uuid from pathlib import Path import sqlalchemy as sa from sqlalchemy.dialects.postgresql import ARRAY, JSONB from sqlalchemy.dialects.postgresql import UUID as PG_UUID from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine _API_DIR = Path(__file__).parent.parent _REPO_ROOT = _API_DIR.parent _DICT_DIR = _REPO_ROOT / "dictionaries" / "kaikki" _LANG_FILE_MAP: dict[str, str] = { "fr": "french.jsonl", } _POS_MAP: dict[str, str] = { "noun": "NOUN", "verb": "VERB", "adj": "ADJ", "adv": "ADV", "det": "DET", "article": "DET", "pron": "PRON", "prep": "ADP", "adp": "ADP", "conj": "CCONJ", "cconj": "CCONJ", "sconj": "SCONJ", "intj": "INTJ", "num": "NUM", "numeral": "NUM", "part": "PART", "particle": "PART", "name": "PROPN", "propn": "PROPN", "proper noun": "PROPN", "punct": "PUNCT", "sym": "SYM", } _GENDER_MAP: dict[str, str] = { "f": "feminine", "m": "masculine", "masculine": "masculine", "masc": "masculine", "feminine": "feminine", "fem": "feminine", "neuter": "neuter", "common": "common", } # --------------------------------------------------------------------------- # Standalone table definitions — no app imports, no Settings() call # --------------------------------------------------------------------------- _meta = sa.MetaData() _lemma_table = sa.Table( "dictionary_lemma", _meta, sa.Column("id", PG_UUID(as_uuid=True), primary_key=True), sa.Column("headword", sa.Text(), nullable=False), sa.Column("language", sa.String(2), nullable=False), sa.Column("pos_raw", sa.Text(), nullable=False), sa.Column("pos_normalised", sa.Text(), nullable=True), sa.Column("gender", sa.Text(), nullable=True), sa.Column("tags", ARRAY(sa.Text()), nullable=False), ) _sense_table = sa.Table( "dictionary_sense", _meta, sa.Column("id", PG_UUID(as_uuid=True), primary_key=True), sa.Column("lemma_id", PG_UUID(as_uuid=True), nullable=False), sa.Column("sense_index", sa.Integer(), nullable=False), sa.Column("gloss", sa.Text(), nullable=False), sa.Column("topics", ARRAY(sa.Text()), nullable=False), sa.Column("tags", ARRAY(sa.Text()), nullable=False), ) _wordform_table = sa.Table( "dictionary_wordform", _meta, sa.Column("id", PG_UUID(as_uuid=True), primary_key=True), sa.Column("lemma_id", PG_UUID(as_uuid=True), nullable=False), sa.Column("form", sa.Text(), nullable=False), sa.Column("tags", ARRAY(sa.Text()), nullable=False), ) _raw_table = sa.Table( "dictionary_lemma_raw", _meta, sa.Column("id", PG_UUID(as_uuid=True), primary_key=True), sa.Column("lemma_id", PG_UUID(as_uuid=True), nullable=False), sa.Column("language", sa.String(2), nullable=False), sa.Column("raw", JSONB(), nullable=False), ) _sense_link_table = sa.Table( "dictionary_sense_link", _meta, sa.Column("id", PG_UUID(as_uuid=True), primary_key=True), sa.Column("sense_id", PG_UUID(as_uuid=True), nullable=False), sa.Column("link_text", sa.Text(), nullable=False), sa.Column("link_target", sa.Text(), nullable=False), sa.Column("target_lemma_id", PG_UUID(as_uuid=True), nullable=True), ) # --------------------------------------------------------------------------- # Normalisation helpers # --------------------------------------------------------------------------- def _normalise_pos(pos_raw: str) -> str | None: return _POS_MAP.get(pos_raw.lower().strip()) def _normalise_gender(value: str | None) -> str | None: if value is None: return None return _GENDER_MAP.get(value) # --------------------------------------------------------------------------- # Parsing # --------------------------------------------------------------------------- def _parse_entry(record: dict, lang_code: str) -> dict | None: """Parse one kaikki JSONL record into insertion-ready row dicts. Returns None if the entry should be skipped. """ if record.get("lang_code") != lang_code: return None word = (record.get("word") or "").strip() if not word: return None pos_raw = (record.get("pos") or "").strip() lemma_id = uuid.uuid4() _GENDER_TAGS = {"masculine", "feminine", "neuter"} gender: str | None = None senses = [] sense_links = [] for i, sense_record in enumerate(record.get("senses") or []): sense_id = uuid.uuid4() glosses = sense_record.get("glosses") or [] gloss = glosses[0] if glosses else "" topics = sense_record.get("topics") or [] sense_tags = sense_record.get("tags") or [] if gender is None: for tag in sense_tags: if tag in _GENDER_TAGS: gender = tag break senses.append( { "id": sense_id, "lemma_id": lemma_id, "sense_index": i, "gloss": gloss, "topics": topics, "tags": sense_tags, } ) for link_pair in (sense_record.get("links") or []): if isinstance(link_pair, list) and len(link_pair) == 2: sense_links.append( { "id": uuid.uuid4(), "sense_id": sense_id, "link_text": link_pair[0], "link_target": link_pair[1], "target_lemma_id": None, } ) wordforms = [] for f in record.get("forms") or []: form_text = (f.get("form") or "").strip() if not form_text or form_text == word: continue form_tags = f.get("tags") or [] wordforms.append( { "id": uuid.uuid4(), "lemma_id": lemma_id, "form": form_text, "tags": form_tags, } ) return { "lemma": { "id": lemma_id, "headword": word, "language": lang_code, "pos_raw": pos_raw, "pos_normalised": _normalise_pos(pos_raw), "gender": gender, "tags": record.get("tags") or [], }, "senses": senses, "sense_links": sense_links, "wordforms": wordforms, "raw": { "id": uuid.uuid4(), "lemma_id": lemma_id, "language": lang_code, "raw": record, }, } # --------------------------------------------------------------------------- # DB operations # --------------------------------------------------------------------------- async def _flush_batch(conn: sa.ext.asyncio.AsyncConnection, batch: list[dict]) -> None: lemma_rows = [e["lemma"] for e in batch] sense_rows = [s for e in batch for s in e["senses"]] sense_link_rows = [lnk for e in batch for lnk in e["sense_links"]] wordform_rows = [w for e in batch for w in e["wordforms"]] raw_rows = [e["raw"] for e in batch] if lemma_rows: await conn.execute(_lemma_table.insert(), lemma_rows) if sense_rows: await conn.execute(_sense_table.insert(), sense_rows) if sense_link_rows: await conn.execute(_sense_link_table.insert(), sense_link_rows) if wordform_rows: await conn.execute(_wordform_table.insert(), wordform_rows) if raw_rows: await conn.execute(_raw_table.insert(), raw_rows) await conn.commit() _LANG_SECTION_MAP: dict[str, str] = { "fr": "French", "de": "German", "es": "Spanish", "it": "Italian", "pt": "Portuguese", } async def _resolve_links(conn: sa.ext.asyncio.AsyncConnection, lang_code: str) -> int: """Resolve target_lemma_id for sense links whose target matches lang_code. Links in kaikki data look like ``["maboul", "maboul#French"]``. After all lemmas have been imported we can attempt to match the target word to a row in dictionary_lemma and store the foreign key. """ section = _LANG_SECTION_MAP.get(lang_code) if not section: return 0 suffix = f"#{section}" result = await conn.execute( sa.select( _sense_link_table.c.id, _sense_link_table.c.link_target, ).where(_sense_link_table.c.target_lemma_id.is_(None)) ) all_links = result.fetchall() # Filter to links that point at this language and extract the target word. candidates: list[tuple[uuid.UUID, str]] = [] for row in all_links: if row.link_target.endswith(suffix): word = row.link_target[: -len(suffix)] candidates.append((row.id, word)) if not candidates: return 0 target_words = list({w for _, w in candidates}) lemma_result = await conn.execute( sa.select(_lemma_table.c.id, _lemma_table.c.headword) .where(_lemma_table.c.language == lang_code) .where(_lemma_table.c.headword.in_(target_words)) ) lemma_map: dict[str, uuid.UUID] = {r.headword: r.id for r in lemma_result} resolved = 0 for link_id, word in candidates: if word in lemma_map: await conn.execute( _sense_link_table.update() .where(_sense_link_table.c.id == link_id) .values(target_lemma_id=lemma_map[word]) ) resolved += 1 await conn.commit() return resolved async def run_import(lang_code: str, batch_size: int = 1000) -> None: lang_file = _LANG_FILE_MAP.get(lang_code) if not lang_file: print( f"No file mapping for lang_code={lang_code!r}. Known: {list(_LANG_FILE_MAP)}", file=sys.stderr, ) sys.exit(1) jsonl_path = _DICT_DIR / lang_file if not jsonl_path.exists(): print(f"JSONL file not found: {jsonl_path}", file=sys.stderr) sys.exit(1) database_url = os.environ.get( "DATABASE_URL", "postgresql+asyncpg://langlearn:changeme@localhost:5432/langlearn", ) engine = create_async_engine(database_url, echo=False) try: async with engine.connect() as conn: print(f"Deleting existing entries for language={lang_code!r}...") await conn.execute( _lemma_table.delete().where(_lemma_table.c.language == lang_code) ) await conn.commit() print(f"Importing {jsonl_path} ...") batch: list[dict] = [] total_lemmas = 0 skipped = 0 with open(jsonl_path, encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: record = json.loads(line) except json.JSONDecodeError as exc: print( f" Line {line_num}: JSON parse error: {exc}", file=sys.stderr, ) skipped += 1 continue parsed = _parse_entry(record, lang_code) if parsed is None: skipped += 1 continue batch.append(parsed) if len(batch) >= batch_size: await _flush_batch(conn, batch) total_lemmas += len(batch) print(f" Committed {total_lemmas} lemmas...") batch = [] if batch: await _flush_batch(conn, batch) total_lemmas += len(batch) print(f"Done. Imported {total_lemmas} lemmas, skipped {skipped} lines.") async with engine.connect() as conn: print("Resolving sense links...") resolved = await _resolve_links(conn, lang_code) print(f"Resolved {resolved} sense links.") finally: await engine.dispose() def main() -> None: parser = argparse.ArgumentParser( description="Import kaikki dictionary JSONL into Postgres." ) parser.add_argument( "--lang", required=True, help="Language code to import (e.g. fr)" ) parser.add_argument( "--batch-size", type=int, default=1000, help="Rows per commit (default: 1000)" ) args = parser.parse_args() asyncio.run(run_import(args.lang, args.batch_size)) if __name__ == "__main__": main()