From 6c2a0f8eed4150edca1494c5909884ba129dae2c Mon Sep 17 00:00:00 2001 From: Sochen Date: Sun, 8 Mar 2026 11:08:33 +0000 Subject: [PATCH] chore: remove legacy scraping scripts replaced by unified pipeline Removed 11 files that are no longer called by the active pipeline: - hebrew_extract.py (replaced by pealim_list_scrape.py) - conjugation_extract.py (replaced by pealim_detail_scrape.py) - scripts/scrape_noun_plurals.py, scrape_verb_ktiv.py, scrape_ktiv_male.py (all replaced by pealim_detail_scrape.py) - scripts/migrate_to_json.py, repair_slugs.py (one-time migration, complete) - epub_examples.py, rebuild_sentence_matches.py (unused utilities) - scripts/extract_pdf_sentences.py, add_slugs.py (unused one-off scripts) Kept: check_guid_coverage.py, validate_data.py, extract_verb_list.py, validate_apkg.py, validate_verb_list.py, release.py (standalone utilities) Co-Authored-By: Claude Opus 4.6 --- conjugation_extract.py | 690 -------------------- epub_examples.py | 446 ------------- hebrew_extract.py | 225 ------- rebuild_sentence_matches.py | 183 ------ scripts/add_slugs.py | 57 -- scripts/extract_pdf_sentences.py | 405 ------------ scripts/migrate_to_json.py | 1041 ------------------------------ scripts/repair_slugs.py | 420 ------------ scripts/scrape_ktiv_male.py | 237 ------- scripts/scrape_noun_plurals.py | 365 ----------- scripts/scrape_verb_ktiv.py | 250 ------- 11 files changed, 4319 deletions(-) delete mode 100755 conjugation_extract.py delete mode 100644 epub_examples.py delete mode 100644 hebrew_extract.py delete mode 100644 rebuild_sentence_matches.py delete mode 100644 scripts/add_slugs.py delete mode 100644 scripts/extract_pdf_sentences.py delete mode 100644 scripts/migrate_to_json.py delete mode 100644 scripts/repair_slugs.py delete mode 100644 scripts/scrape_ktiv_male.py delete mode 100644 scripts/scrape_noun_plurals.py delete mode 100644 scripts/scrape_verb_ktiv.py diff --git a/conjugation_extract.py b/conjugation_extract.py deleted file mode 100755 index e90c174..0000000 --- a/conjugation_extract.py +++ /dev/null @@ -1,690 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract Hebrew verb conjugations from pealim.com. -Input: verbs_input.txt (one Hebrew infinitive per line; - lines starting with '# 3ms:' search by 3ms past form for Pu'al/Huf'al) -Output: data/conjugations.json - -For each verb: - 1. Search pealim.com/search/?q= to find URL slug - 2. Fetch /dict// with hebstyle=mo cookie - 3. Parse conjugation table by row labels - 4. Capture audio URLs per form - 5. Parse passive (Pu'al/Huf'al) forms from the same page - -Resume-safe: verbs already in conjugations.json are skipped. -""" - -import json -import logging -import re -import time -import urllib.parse -from pathlib import Path - -import requests -from bs4 import BeautifulSoup - -from helpers import strip_nikkud as _strip_nikkud - -logger = logging.getLogger(__name__) - -PEALIM_BASE = "https://www.pealim.com" -REQUEST_DELAY = 1.5 -REQUEST_TIMEOUT = 15 -VERBS_INPUT = Path(__file__).parent / "verbs_input.txt" -CONJUGATIONS_PATH = Path(__file__).parent / "data" / "conjugations.json" -DICT_CSV = next( - ( - p - for p in [ - Path(__file__).parent / "data" / "hebrew_dict_for_anki.csv", - Path(__file__).parent / "data" / "pealim_dict_for_anki.csv", - ] - if p.exists() - ), - Path(__file__).parent / "data" / "hebrew_dict_for_anki.csv", -) - -# Pronoun labels (for card front display) -PRONOUN_LABELS = { - "present_ms": "", - "present_fs": "", - "present_mp": "", - "present_fp": "", - "past_1s": "אֲנִי", - "past_1p": "אֲנַחְנוּ", - "past_2ms": "אַתָּה", - "past_2fs": "אַתְּ", - "past_2mp": "אַתֶּם", - "past_2fp": "אַתֶּן", - "past_3ms": "הוּא", - "past_3fs": "הִיא", - "past_3p": "הֵם / הֵן", - "future_1s": "אֲנִי", - "future_1p": "אֲנַחְנוּ", - "future_2ms": "אַתָּה", - "future_2fs": "אַתְּ", - "future_2mp": "אַתֶּם", - "future_2fp": "אַתֶּן", - "future_3ms": "הוּא", - "future_3fs": "הִיא", - "future_3mp": "הֵם", - "future_3fp": "הֵן", - "imperative_ms": "אַתָּה", - "imperative_fs": "אַתְּ", - "imperative_mp": "אַתֶּם", - "imperative_fp": "אַתֶּן", - "infinitive": "", -} - -# Human-readable tense description for card front -TENSE_DESCRIPTION = { - "present_ms": "הוֹוֶה", - "present_fs": "הוֹוֶה", - "present_mp": "הוֹוֶה", - "present_fp": "הוֹוֶה", - "past_1s": "עָבָר", - "past_1p": "עָבָר", - "past_2ms": "עָבָר", - "past_2fs": "עָבָר", - "past_2mp": "עָבָר", - "past_2fp": "עָבָר", - "past_3ms": "עָבָר", - "past_3fs": "עָבָר", - "past_3p": "עָבָר", - "future_1s": "עָתִיד", - "future_1p": "עָתִיד", - "future_2ms": "עָתִיד", - "future_2fs": "עָתִיד", - "future_2mp": "עָתִיד", - "future_2fp": "עָתִיד", - "future_3ms": "עָתִיד", - "future_3fs": "עָתִיד", - "future_3mp": "עָתִיד", - "future_3fp": "עָתִיד", - "imperative_ms": "צִוּוּי", - "imperative_fs": "צִוּוּי", - "imperative_mp": "צִוּוּי", - "imperative_fp": "צִוּוּי", - "infinitive": "מְקוֹר", -} - -BINYAN_NAMES: tuple[str, ...] = ("Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al") - -session = requests.Session() -session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki/2.0)"}) - - - -def _build_pos_lookup() -> dict[str, str]: - """Build word_stripped → binyan dict from pealim_dict_for_anki.csv.""" - lookup: dict[str, str] = {} - if not DICT_CSV.exists(): - return lookup - - try: - import pandas as pd - - try: - df = pd.read_csv(DICT_CSV, sep=";", index_col=0) - if df.shape[1] < 3: - raise ValueError("too few columns") - except (ValueError, pd.errors.ParserError): - df = pd.read_csv(DICT_CSV, index_col=0) - - for _, row in df.iterrows(): - word = str(row.get("Word", "")).strip() - pos = str(row.get("Part of speech", row.get("Part of Speech", ""))).strip() - if word and pos and "nan" not in pos.lower(): - lookup[_strip_nikkud(word)] = pos - except Exception as e: - logger.debug(f"Could not load PoS lookup: {e}") - - return lookup - - -# Cache PoS lookup (built once) -_pos_lookup: dict[str, str] | None = None - - -def _get_pos_lookup() -> dict[str, str]: - global _pos_lookup - if _pos_lookup is None: - _pos_lookup = _build_pos_lookup() - return _pos_lookup - - -def _binyan_from_pos(word: str) -> str: - """Look up binyan from PoS field: 'Verb – pa\'al' or 'Verb – Pi\'el' → canonical name.""" - lookup = _get_pos_lookup() - pos_str = lookup.get(_strip_nikkud(word), "") - if not pos_str: - return "" - - pos_lower = pos_str.lower() - # Map lowercase pealim.com PoS variants → canonical names - for bname, variants in [ - ("Pa'al", ["pa'al", "paal"]), - ("Nif'al", ["nif'al", "nifal"]), - ("Pi'el", ["pi'el", "piel"]), - ("Pu'al", ["pu'al", "pual"]), - ("Hitpa'el", ["hitpa'el", "hitpael"]), - ("Hif'il", ["hif'il", "hifil"]), - ("Huf'al", ["huf'al", "hufal"]), - ]: - if any(v in pos_lower for v in variants): - return bname - return "" - - -def _find_slug(query: str) -> str | None: - """Search pealim.com/search/?q= and return the URL slug.""" - url = f"{PEALIM_BASE}/search/?q={urllib.parse.quote(query)}" - try: - resp = session.get(url, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - slugs = re.findall(r"/dict/(\d+[-][^/?\"<>\s]+)/", resp.text) - if slugs: - slug = slugs[0] - logger.info(f" Slug: {slug}") - return slug - except Exception as e: - logger.error(f" Error searching for '{query}': {e}") - return None - - -def _is_passive_binyan(binyan: str) -> bool: - """Return True if the binyan is a passive (Pu'al or Huf'al).""" - return any(marker in binyan for marker in ("פֻּעַל", "הֻפְעַל", "Pu'al", "Huf'al")) - - -def _get_menukad(cell) -> tuple[str, str]: - """ - Extract nikkud Hebrew text and audio URL from a table cell. - Returns (form_text, audio_url). - """ - # Audio URL - audio_span = cell.find("span", class_=lambda c: c and "audio-play" in c) - audio_url = "" - if audio_span: - audio_url = audio_span.get("data-audio", "") - - span = cell.find("span", class_="menukad") - if span: - return span.get_text(strip=True), audio_url - - txt = cell.get_text(strip=True) - if re.search(r"[\u05d0-\u05ea]", txt): - return txt, audio_url - return "", audio_url - - -def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> dict[str, dict]: - """ - Parse the pealim conjugation table and return form_key -> {form, audio_url} mapping. - If passive=True, look for the passive table (after "Passive" heading). - If table_el is provided (and passive=False), parse that table directly. - """ - if passive: - # Find

containing "Passive" - passive_h3 = None - for h3 in soup.find_all("h3"): - if "passive" in h3.get_text(strip=True).lower(): - passive_h3 = h3 - break - if not passive_h3: - return {} - # Find next conjugation table after this heading - table = None - for sib in passive_h3.find_all_next(): - if sib.name == "table" and "conjugation-table" in sib.get("class", []): - table = sib - break - if not table: - return {} - elif table_el is not None: - table = table_el - else: - table = soup.find("table", class_="conjugation-table") - - if not table: - return {} - - rows = table.find_all("tr") - if len(rows) < 9: - return {} - - forms: dict[str, dict] = {} - - def first_heb_forms(row_idx: int) -> list[tuple[str, str]]: - """Get only the Hebrew-text cells from a row (skip label cells).""" - cells = rows[row_idx].find_all(["th", "td"]) - result = [] - for cell in cells: - txt, audio_url = _get_menukad(cell) - colspan = int(cell.get("colspan", 1)) - if txt and re.search(r"[\u05d0-\u05ea]", txt): - for _ in range(colspan): - result.append((txt, audio_url)) - return result - - def deduplicate(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: - """Return pairs with duplicate form-text entries removed (first occurrence kept).""" - seen: set[str] = set() - out: list[tuple[str, str]] = [] - for pair in pairs: - if pair[0] not in seen: - seen.add(pair[0]) - out.append(pair) - return out - - # Find rows by tense label - present_row = past_row = future_row = imp_row = inf_row = -1 - for i, row in enumerate(rows): - label = row.get_text(" ", strip=True).lower() - if "present" in label and present_row < 0: - present_row = i - elif "past" in label and past_row < 0: - past_row = i - elif "future" in label and future_row < 0: - future_row = i - elif "imperative" in label and imp_row < 0: - imp_row = i - elif "infinitive" in label and inf_row < 0: - inf_row = i - - def store(key: str, form: str, audio_url: str) -> None: - if form: - forms[key] = {"form": form, "audio_url": audio_url} - - # Present tense (4 forms: ms fs mp fp) - if present_row >= 0: - hf = first_heb_forms(present_row) - keys = ["present_ms", "present_fs", "present_mp", "present_fp"] - for k, (v, au) in zip(keys, hf, strict=False): - store(k, v, au) - - # Past tense - if past_row >= 0: - unique = deduplicate(first_heb_forms(past_row)) - if len(unique) >= 1: - store("past_1s", unique[0][0], unique[0][1]) - if len(unique) >= 2: - store("past_1p", unique[1][0], unique[1][1]) - - if past_row + 1 < len(rows): - hf2 = first_heb_forms(past_row + 1) - keys2 = ["past_2ms", "past_2fs", "past_2mp", "past_2fp"] - for k, (v, au) in zip(keys2, hf2, strict=False): - store(k, v, au) - - if past_row + 2 < len(rows): - unique3 = deduplicate(first_heb_forms(past_row + 2)) - keys3 = ["past_3ms", "past_3fs", "past_3p"] - for k, (v, au) in zip(keys3, unique3, strict=False): - store(k, v, au) - - # Future tense - if future_row >= 0: - unique_f = deduplicate(first_heb_forms(future_row)) - if len(unique_f) >= 1: - store("future_1s", unique_f[0][0], unique_f[0][1]) - if len(unique_f) >= 2: - store("future_1p", unique_f[1][0], unique_f[1][1]) - - if future_row + 1 < len(rows): - hf2 = first_heb_forms(future_row + 1) - keys2 = ["future_2ms", "future_2fs", "future_2mp", "future_2fp"] - for k, (v, au) in zip(keys2, hf2, strict=False): - store(k, v, au) - - if future_row + 2 < len(rows): - hf3 = first_heb_forms(future_row + 2) - keys3 = ["future_3ms", "future_3fs", "future_3mp", "future_3fp"] - for k, (v, au) in zip(keys3, hf3, strict=False): - store(k, v, au) - - # Imperative - if imp_row >= 0: - hf = first_heb_forms(imp_row) - keys = ["imperative_ms", "imperative_fs", "imperative_mp", "imperative_fp"] - for k, (v, au) in zip(keys, hf, strict=False): - store(k, v, au) - - # Infinitive - if inf_row >= 0: - hf = first_heb_forms(inf_row) - if hf: - store("infinitive", hf[0][0], hf[0][1]) - - return forms - - -def _extract_binyan_from_page(soup: BeautifulSoup) -> str: - """Extract binyan from page header span.""" - for h3 in soup.find_all("h3", class_="page-header"): - text = h3.get_text(" ", strip=True) - for bname in BINYAN_NAMES: - if bname in text: - return bname - # Also try og:description - meta = soup.find("meta", {"property": "og:description"}) - if meta: - desc = meta.get("content", "") - for bname in BINYAN_NAMES: - if bname in desc: - return bname - return "" - - -def _extract_passive_binyan_from_page(soup: BeautifulSoup) -> str: - """Extract passive binyan name from passive section heading.""" - for h3 in soup.find_all("h3"): - text = h3.get_text(" ", strip=True) - if "passive" in text.lower(): - for bname in ("Pu'al", "Huf'al"): - if bname in text: - return bname - # Infer: Pa'al/Pi'el → Pu'al; Hif'il → Huf'al (stored as span text) - span = h3.find("span", class_="small") - if span: - span_text = span.get_text(strip=True) - for bname in ("Pu'al", "Huf'al"): - if bname in span_text: - return bname - return "" - - -def _extract_conjugations( - slug: str, search_term: str, is_3ms_search: bool = False, binyan_hint: str = "" -) -> dict | None: - """Fetch /dict// and parse conjugation table (active + passive).""" - url = f"{PEALIM_BASE}/dict/{slug}/" - try: - resp = session.get(url, cookies={"hebstyle": "mo"}, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - except Exception as e: - logger.error(f" Error fetching {url}: {e}") - return None - - soup = BeautifulSoup(resp.text, "lxml") - - # Extract meaning from
(English translation) - meaning = "" - lead_div = soup.find("div", class_="lead") - if lead_div: - meaning = lead_div.get_text(strip=True) - - # Extract root - root = "" - for span in soup.find_all("span", class_="menukad"): - txt = span.get_text(strip=True) - if txt and re.search(r"[\u05d0-\u05ea]", txt) and "-" in txt: - root = txt - break - - # Extract binyan: try PoS lookup first, then page header, then section hint - binyan = _binyan_from_pos(search_term) if not is_3ms_search else "" - if not binyan: - binyan = _extract_binyan_from_page(soup) - if not binyan: - binyan = binyan_hint - - # Parse active forms table - forms_raw = _parse_table(soup, passive=False) - - if not forms_raw: - logger.warning(f" No forms found for {slug}") - return None - - is_passive = _is_passive_binyan(binyan) - - # For passive binyan search (3ms search), the "active" table is actually the passive one - # Determine reference form - infinitive_form = forms_raw.get("infinitive", {}).get("form", "") if not is_passive else "" - past_3ms_form = forms_raw.get("past_3ms", {}).get("form", "") - - reference_form = (past_3ms_form or search_term) if is_passive else (infinitive_form or search_term) - - # Build active result - result = { - "infinitive": search_term, - "slug": slug, - "root": root, - "binyan": binyan, - "meaning": meaning, - "is_passive": is_passive, - "reference_form": reference_form, - "forms": {}, - } - - for key, form_data in forms_raw.items(): - if key in PRONOUN_LABELS: - result["forms"][key] = { - "form": form_data["form"], - "audio_url": form_data.get("audio_url", ""), - "pronoun": PRONOUN_LABELS[key], - "tense": TENSE_DESCRIPTION.get(key, ""), - } - - # Check for a second conjugation table (alternate paradigm, e.g. להתגלות) - # Collect all active tables (exclude passive tables which follow the "Passive" h3) - passive_h3 = next( - (h for h in soup.find_all("h3") if "passive" in h.get_text(strip=True).lower()), - None, - ) - passive_table_ids = { - id(t) for t in (passive_h3.find_all_next("table", class_="conjugation-table") if passive_h3 else []) - } - active_tables = [t for t in soup.find_all("table", class_="conjugation-table") if id(t) not in passive_table_ids] - if len(active_tables) >= 2: - alt_raw = _parse_table(soup, passive=False, table_el=active_tables[1]) - alternate_forms = {} - for key, form_data in alt_raw.items(): - if key in PRONOUN_LABELS: - alt_form = form_data["form"] - primary_form = forms_raw.get(key, {}).get("form", "") - if alt_form and alt_form != primary_form: - alternate_forms[key] = alt_form - if alternate_forms: - result["alternate_forms"] = alternate_forms - logger.info(f" Found {len(alternate_forms)} alternate forms for {search_term}") - - logger.info(f" Extracted {len(result['forms'])} forms for {search_term}") - return result - - -def _load_conjugations() -> dict: - if CONJUGATIONS_PATH.exists(): - with open(CONJUGATIONS_PATH, encoding="utf-8") as f: - return json.load(f) - return {} - - -def _save_conjugations(data: dict) -> None: - CONJUGATIONS_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(CONJUGATIONS_PATH, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def _extract_passive_from_active_slug(active_slug: str, search_term: str, binyan_hint: str = "") -> dict | None: - """Fetch active verb page and extract only the passive section forms. - Used for Pu'al/Huf'al 3ms entries where we know the active verb's slug.""" - url = f"{PEALIM_BASE}/dict/{active_slug}/" - try: - resp = session.get(url, cookies={"hebstyle": "mo"}, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - except Exception as e: - logger.error(f" Error fetching {url}: {e}") - return None - - soup = BeautifulSoup(resp.text, "lxml") - - # Extract meaning (this is the active verb's meaning — useful context for passive) - meaning = "" - lead_div = soup.find("div", class_="lead") - if lead_div: - meaning = lead_div.get_text(strip=True) - - root = "" - for span in soup.find_all("span", class_="menukad"): - txt = span.get_text(strip=True) - if txt and re.search(r"[\u05d0-\u05ea]", txt) and "-" in txt: - root = txt - break - - active_binyan = _extract_binyan_from_page(soup) - active_forms_raw = _parse_table(soup, passive=False) - active_infinitive = active_forms_raw.get("infinitive", {}).get("form", "") - - passive_forms_raw = _parse_table(soup, passive=True) - if not passive_forms_raw: - logger.warning(f" No passive forms found on {active_slug} for {search_term}") - return None - - passive_binyan = _extract_passive_binyan_from_page(soup) - if not passive_binyan: - passive_binyan = "Pu'al" if active_binyan == "Pi'el" else "Huf'al" if active_binyan == "Hif'il" else "" - if not passive_binyan: - passive_binyan = binyan_hint - - result = { - "infinitive": search_term, - "slug": active_slug, - "root": root, - "binyan": passive_binyan, - "meaning": meaning, - "is_passive": True, - "reference_form": active_infinitive or search_term, - "forms": {}, - } - for key, form_data in passive_forms_raw.items(): - if key in PRONOUN_LABELS: - result["forms"][key] = { - "form": form_data["form"], - "audio_url": form_data.get("audio_url", ""), - "pronoun": PRONOUN_LABELS[key], - "tense": TENSE_DESCRIPTION.get(key, ""), - } - - logger.info(f" Extracted {len(result['forms'])} passive forms for {search_term} from {active_slug}") - return result - - -def main(verbs_file: Path = VERBS_INPUT) -> dict: - """Read verbs from file and extract conjugations. Returns full conjugations dict.""" - if not verbs_file.exists(): - logger.warning(f"verbs_input.txt not found at {verbs_file} — skipping") - return _load_conjugations() - - raw_lines = verbs_file.read_text(encoding="utf-8").splitlines() - - # Parse slug overrides: "# slug: VERB SLUG" anywhere in the file - slug_overrides: dict[str, str] = {} - for line in raw_lines: - stripped = line.strip() - if stripped.startswith("# slug:"): - parts = stripped[len("# slug:") :].strip().split() - if len(parts) >= 2: - slug_overrides[parts[0]] = parts[1] - - # Map section header keywords → binyan name (for binyan_hint fallback) - SECTION_BINYAN = { - "pa'al": "Pa'al", - "nif'al": "Nif'al", - "pi'el": "Pi'el", - "pu'al": "Pu'al", - "hitpa'el": "Hitpa'el", - "hif'il": "Hif'il", - "huf'al": "Huf'al", - } - - # Parse: regular verbs and # 3ms: lines (optional active slug on 3ms lines) - # Track current section binyan from comment headers for use as a hint - verbs: list[tuple[str, bool, str | None, str]] = [] # (search_term, is_3ms_search, active_slug, binyan_hint) - current_binyan_hint = "" - for line in raw_lines: - stripped = line.strip() - if not stripped or stripped.startswith("# slug:"): - continue - if stripped.startswith("# 3ms:"): - parts = stripped[len("# 3ms:") :].strip().split() - if parts: - form = parts[0] - active_slug = parts[1] if len(parts) >= 2 else None - verbs.append((form, True, active_slug, current_binyan_hint)) - elif stripped.startswith("#"): - # Check if this is a section header setting the binyan context - low = stripped.lower() - for key, bname in SECTION_BINYAN.items(): - if key in low: - current_binyan_hint = bname - break - else: - verbs.append((stripped, False, None, current_binyan_hint)) - - logger.info(f"Loaded {len(verbs)} verbs from {verbs_file} ({sum(1 for _, p, _, _ in verbs if p)} passive 3ms)") - if slug_overrides: - logger.info(f" Slug overrides: {slug_overrides}") - - conjugations = _load_conjugations() - new_count = 0 - - for verb, is_3ms, active_slug, binyan_hint in verbs: - if verb in conjugations: - logger.info(f"Skipping {verb} (cached)") - continue - - logger.info(f"Processing: {verb} {'(3ms search)' if is_3ms else ''}") - time.sleep(REQUEST_DELAY) - - if is_3ms: - # Passive-only extraction: use provided active slug or search to find it - if active_slug: - slug = active_slug - logger.info(f" Using active slug {slug} for passive extraction") - else: - slug = _find_slug(verb) - if not slug: - logger.warning(f" No slug found for {verb}") - conjugations[verb] = None - _save_conjugations(conjugations) - continue - logger.info(f" Found active slug {slug} for passive extraction") - time.sleep(REQUEST_DELAY) - data = _extract_passive_from_active_slug(slug, verb, binyan_hint=binyan_hint) - else: - override = slug_overrides.get(verb) - if override: - logger.info(f" Slug override: {override}") - slug = override - else: - slug = _find_slug(verb) - if not slug: - logger.warning(f" No slug found for {verb}") - conjugations[verb] = None - _save_conjugations(conjugations) - continue - time.sleep(REQUEST_DELAY) - data = _extract_conjugations(slug, verb, is_3ms_search=False, binyan_hint=binyan_hint) - - conjugations[verb] = data - _save_conjugations(conjugations) - new_count += 1 - - logger.info(f"Done: {new_count} new verbs processed") - return conjugations - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") - result = main() - for verb, data in result.items(): - if data: - forms = data.get("forms", {}) - print(f"{verb}: {len(forms)} forms, binyan={data.get('binyan')}") - sample_form = next(iter(forms.values()), {}) if forms else {} - print(f" sample audio_url: {sample_form.get('audio_url', 'MISSING')[:60]}") - else: - print(f"{verb}: no data") diff --git a/epub_examples.py b/epub_examples.py deleted file mode 100644 index 891db60..0000000 --- a/epub_examples.py +++ /dev/null @@ -1,446 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract example sentences from nikud'd Hebrew EPUBs (and PDFs where possible), -match them against the vocab list, and produce examples_cache.json. - -Usage: - python3 epub_examples.py - -Outputs: - data/epub_sentence_index.json — full sentence corpus - data/examples_cache.json — best sentence(s) per vocab word -""" - -import csv -import json -import os -import re -import zipfile -from html.parser import HTMLParser -from pathlib import Path - -from helpers import strip_nikkud - -DATA_DIR = Path(__file__).parent / "data" -EPUB_DIR = DATA_DIR / "epubs" -DICT_CSV = DATA_DIR / "hebrew_dict_for_anki.csv" - -# Book metadata: filename -> display name -EPUB_BOOKS = { - "little_prince.epub": "הנסיך הקטן", - "time_tunnel_82.epub": "מנהרת הזמן 82", -} - -# PDF books are excluded — pypdf produces garbled RTL text (reversed chars within -# words). If/when a proper EPUB version becomes available on Calibre, add it to -# EPUB_BOOKS above instead. -PDF_BOOKS: dict[str, str] = {} - -# Sentence length bounds (word count) -MIN_WORDS = 4 -MAX_WORDS = 15 - - - -# ── HTML text extraction ───────────────────────────────────────── - - -class _TextExtractor(HTMLParser): - """Extract text content from HTML, skipping script/style tags.""" - - SKIP_TAGS = {"script", "style", "head"} - - def __init__(self): - super().__init__() - self.parts: list[str] = [] - self._skip_depth = 0 - - def handle_starttag(self, tag, attrs): - if tag in self.SKIP_TAGS: - self._skip_depth += 1 - # Insert space for block-level elements to avoid word concatenation - if tag in ( - "p", - "div", - "br", - "li", - "h1", - "h2", - "h3", - "h4", - "h5", - "h6", - "td", - "th", - "tr", - "blockquote", - "section", - ): - self.parts.append("\n") - - def handle_endtag(self, tag): - if tag in self.SKIP_TAGS: - self._skip_depth = max(0, self._skip_depth - 1) - - def handle_data(self, data): - if self._skip_depth == 0: - self.parts.append(data) - - def get_text(self) -> str: - return "".join(self.parts) - - -def extract_text_from_html(html: str) -> str: - """Parse HTML and return plain text.""" - parser = _TextExtractor() - parser.feed(html) - return parser.get_text() - - -# ── EPUB processing ────────────────────────────────────────────── - - -def _content_files_from_epub(zf: zipfile.ZipFile) -> list[str]: - """Get ordered list of content XHTML files from the OPF manifest.""" - # Find the OPF file - opf_path = None - for name in zf.namelist(): - if name.endswith(".opf"): - opf_path = name - break - if not opf_path: - # Fallback: just use all xhtml files - return sorted( - n - for n in zf.namelist() - if n.endswith((".xhtml", ".html")) - and "toc" not in n.lower() - and "cover" not in n.lower() - and "nav" not in n.lower() - ) - - # Parse OPF to get spine order - opf_content = zf.read(opf_path).decode("utf-8") - opf_dir = os.path.dirname(opf_path) - - # Extract manifest items: id -> href - manifest = {} - for m in re.finditer(r']*id="([^"]+)"[^>]*href="([^"]+)"', opf_content): - manifest[m.group(1)] = m.group(2) - # Also try reversed attribute order - for m in re.finditer(r']*href="([^"]+)"[^>]*id="([^"]+)"', opf_content): - manifest[m.group(2)] = m.group(1) - - # Extract spine order - spine_ids = re.findall(r']*idref="([^"]+)"', opf_content) - - result = [] - for sid in spine_ids: - href = manifest.get(sid, "") - if href and href.endswith((".xhtml", ".html")): - full_path = os.path.join(opf_dir, href) if opf_dir else href - # Normalize path separators - full_path = full_path.replace("\\", "/") - if full_path in zf.namelist(): - result.append(full_path) - - if not result: - # Fallback - return sorted( - n - for n in zf.namelist() - if n.endswith((".xhtml", ".html")) and "toc" not in n.lower() and "cover" not in n.lower() - ) - return result - - -def extract_sentences_from_epub(epub_path: Path, book_name: str) -> list[dict]: - """Extract sentences from an EPUB file. - - Returns list of {"text": str, "book": str, "stripped": str} - """ - zf = zipfile.ZipFile(epub_path) - content_files = _content_files_from_epub(zf) - - all_text = [] - for cf in content_files: - try: - html = zf.read(cf).decode("utf-8") - except (KeyError, UnicodeDecodeError): - continue - text = extract_text_from_html(html) - all_text.append(text) - - full_text = "\n".join(all_text) - return _split_into_sentences(full_text, book_name) - - -# ── PDF processing ─────────────────────────────────────────────── - - -def extract_sentences_from_pdf(pdf_path: Path, book_name: str) -> list[dict]: - """Extract sentences from a PDF file (best-effort, handles RTL reversal).""" - try: - import pypdf - except ImportError: - print(f" [SKIP] pypdf not installed, cannot process {pdf_path.name}") - return [] - - reader = pypdf.PdfReader(pdf_path) - all_text_parts = [] - - for page in reader.pages: - raw = page.extract_text() - if not raw: - continue - # pypdf often reverses word order for RTL text; fix it - fixed_lines = [] - for line in raw.split("\n"): - words = line.split() - # Check if this line is predominantly Hebrew - hebrew_chars = sum(1 for c in line if "\u0590" <= c <= "\u05ff") - if hebrew_chars > len(line) * 0.3 and len(words) > 1: - # Reverse word order - fixed_lines.append(" ".join(reversed(words))) - else: - fixed_lines.append(line) - all_text_parts.append("\n".join(fixed_lines)) - - full_text = "\n".join(all_text_parts) - return _split_into_sentences(full_text, book_name) - - -# ── Sentence splitting ─────────────────────────────────────────── - -# Hebrew sentence terminators: period, exclamation, question mark, sof pasuk -_SENT_SPLIT = re.compile(r"[.!?\u05C3]+") - -# Punctuation to strip from word boundaries when matching -_PUNCT = re.compile( - r'^[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+|[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+$' -) - - -def _split_into_sentences(text: str, book_name: str) -> list[dict]: - """Split text into sentences and filter by length.""" - # Normalize whitespace - text = re.sub(r"\s+", " ", text).strip() - - raw_sentences = _SENT_SPLIT.split(text) - results = [] - seen = set() - - for sent in raw_sentences: - sent = sent.strip() - if not sent: - continue - - # Count Hebrew words (skip non-Hebrew tokens like numbers) - words = sent.split() - hebrew_words = [w for w in words if any("\u0590" <= c <= "\u05ff" for c in w)] - - if len(hebrew_words) < MIN_WORDS or len(hebrew_words) > MAX_WORDS: - continue - - # Skip duplicates - stripped = strip_nikkud(sent) - if stripped in seen: - continue - seen.add(stripped) - - results.append( - { - "text": sent, - "book": book_name, - "stripped": stripped, - } - ) - - return results - - -# ── Vocab loading ──────────────────────────────────────────────── - - -def load_vocab(csv_path: Path) -> dict: - """Load vocab CSV and return {stripped_form: nikkud_word} mapping. - - Also returns reverse mapping for lookup. - Returns (word_to_nikkud, nikkud_words_set) - """ - words_by_stripped: dict[str, list[str]] = {} # stripped -> [nikkud words] - - with open(csv_path, encoding="utf-8") as f: - reader = csv.DictReader(f, delimiter=";") - for row in reader: - nikkud_word = row.get("Word", "").strip() - word_no_nik = row.get("Word Without Nikkud", "").strip() - if not nikkud_word: - continue - - # Method 1: strip nikkud from the Word column - stripped_from_nikkud = strip_nikkud(nikkud_word) - - # Add both forms for matching - for form in {stripped_from_nikkud, word_no_nik}: - if form: - words_by_stripped.setdefault(form, []).append(nikkud_word) - - return words_by_stripped - - -# ── Matching ───────────────────────────────────────────────────── - - -def match_sentences(sentences: list[dict], words_by_stripped: dict) -> dict: - """Match sentences against vocab words. - - Returns {nikkud_word: [sentences]} with best (shortest) first. - """ - # Build a set of all stripped forms for fast lookup - all_forms = set(words_by_stripped.keys()) - - # Hebrew single-letter prefixes: ב, ה, ו, כ, ל, מ, ש, ד (של) - _HEB_PREFIXES = set("בהוכלמשד") - - # For each sentence, extract stripped words - matches: dict[str, list[tuple[int, str]]] = {} # nikkud_word -> [(word_count, sentence)] - - for sent_info in sentences: - sent_text = sent_info["text"] - sent_stripped = sent_info["stripped"] - word_count = len(sent_text.split()) - - # Get stripped words from the sentence - raw_words = sent_stripped.split() - # Map: candidate_form -> set of original cleaned words that produced it - # This lets us verify that prefix stripping is plausible - candidates: dict[str, str] = {} # form -> original_word - for w in raw_words: - cleaned = _PUNCT.sub("", w) - if not cleaned: - continue - # Direct match (always try) - candidates[cleaned] = cleaned - # Prefix stripping: only if remaining stem is >= 2 chars - # and the prefix char is a known Hebrew prefix letter - for prefix_len in (1, 2): - if len(cleaned) > prefix_len + 1: - prefix = cleaned[:prefix_len] - stem = cleaned[prefix_len:] - if all(c in _HEB_PREFIXES for c in prefix) and len(stem) >= 2: - candidates[stem] = cleaned - - # Check which vocab words appear in this sentence - matched_forms = set(candidates.keys()) & all_forms - for form in matched_forms: - # Skip spurious matches: very short vocab forms (1-2 chars) - # should only match via direct word match, not prefix stripping - if len(form) <= 2 and form not in {_PUNCT.sub("", w) for w in raw_words}: - continue - for nikkud_word in words_by_stripped[form]: - matches.setdefault(nikkud_word, []).append((word_count, sent_text)) - - # Sort by word count (prefer shorter sentences) and deduplicate - result = {} - for nikkud_word, sent_list in matches.items(): - sent_list.sort(key=lambda x: x[0]) - seen = set() - unique = [] - for _, sent in sent_list: - if sent not in seen: - seen.add(sent) - unique.append(sent) - if len(unique) >= 5: # Keep top 5 per word - break - result[nikkud_word] = unique - - return result - - -# ── Main ───────────────────────────────────────────────────────── - - -def main(): - print("=" * 60) - print("EPUB Example Sentence Extraction Pipeline") - print("=" * 60) - - # Step 1: Extract sentences from all books - all_sentences = [] - book_counts = {} - - for filename, book_name in EPUB_BOOKS.items(): - path = EPUB_DIR / filename - if not path.exists(): - print(f"\n[SKIP] {filename} not found") - continue - print(f"\n[EPUB] Extracting: {book_name} ({filename})") - sentences = extract_sentences_from_epub(path, book_name) - book_counts[book_name] = len(sentences) - all_sentences.extend(sentences) - print(f" -> {len(sentences)} sentences") - - for filename, book_name in PDF_BOOKS.items(): - path = EPUB_DIR / filename - if not path.exists(): - print(f"\n[SKIP] {filename} not found") - continue - print(f"\n[PDF] Extracting: {book_name} ({filename})") - sentences = extract_sentences_from_pdf(path, book_name) - book_counts[book_name] = len(sentences) - all_sentences.extend(sentences) - print(f" -> {len(sentences)} sentences") - - print(f"\nTotal sentences: {len(all_sentences)}") - - # Step 2: Save sentence index - index_path = DATA_DIR / "epub_sentence_index.json" - with open(index_path, "w", encoding="utf-8") as f: - json.dump({"sentences": all_sentences}, f, ensure_ascii=False, indent=2) - print(f"\nSaved sentence index: {index_path}") - - # Step 3: Load vocab and match - print(f"\nLoading vocab from {DICT_CSV} ...") - words_by_stripped = load_vocab(DICT_CSV) - total_vocab = len({w for wlist in words_by_stripped.values() for w in wlist}) - print(f" {total_vocab} unique vocab words ({len(words_by_stripped)} lookup forms)") - - print("\nMatching sentences against vocab ...") - examples_cache = match_sentences(all_sentences, words_by_stripped) - - # Step 4: Save examples_cache - cache_path = DATA_DIR / "examples_cache.json" - with open(cache_path, "w", encoding="utf-8") as f: - json.dump(examples_cache, f, ensure_ascii=False, indent=2) - print(f"Saved examples cache: {cache_path}") - - # Step 5: Summary stats - print("\n" + "=" * 60) - print("SUMMARY") - print("=" * 60) - print("\nSentences per book:") - for book_name, count in book_counts.items(): - print(f" {book_name}: {count}") - print(f" Total: {len(all_sentences)}") - - print("\nVocab matching:") - print(f" Total vocab words: {total_vocab}") - print(f" Words with examples: {len(examples_cache)}") - coverage = 100 * len(examples_cache) / total_vocab if total_vocab else 0 - print(f" Coverage: {coverage:.1f}%") - - # Show some sample matches - print("\nSample matches:") - count = 0 - for word, sents in examples_cache.items(): - if count >= 5: - break - print(f" {word} -> {sents[0][:60]}...") - count += 1 - - return examples_cache - - -if __name__ == "__main__": - main() diff --git a/hebrew_extract.py b/hebrew_extract.py deleted file mode 100644 index 3a02495..0000000 --- a/hebrew_extract.py +++ /dev/null @@ -1,225 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract Hebrew vocabulary from pealim.com dictionary. -Scrapes word entries, roots, parts of speech, and audio URLs for Anki flashcards. -""" - -import logging -import re -import time - -import pandas as pd -import requests -from bs4 import BeautifulSoup - -# Configure logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - -# Session for connection pooling -session = requests.Session() -session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"}) - -PEALIM_DICT_URL = "https://www.pealim.com/dict/" -REQUEST_DELAY = 1.5 # seconds between requests (respectful scraping) -REQUEST_TIMEOUT = 10 # seconds - - -def get_total_pages() -> int: - """Dynamically determine total pages from first request.""" - try: - logger.info("Fetching total page count...") - cookies = {"translit": "none", "hebstyle": "mo"} - response = session.get(PEALIM_DICT_URL, cookies=cookies, timeout=REQUEST_TIMEOUT) - response.raise_for_status() - # Hardcoded — pealim.com has ~608 pages at ~15 words/page - return 608 - except Exception as e: - logger.error(f"Error fetching page count: {e}. Using default (608).") - return 608 - - -def _parse_page_with_audio(html_bytes: bytes) -> list[dict]: - """ - Parse a dict page with BeautifulSoup to extract word data + audio URL. - Returns list of dicts with keys: Word, Root, Part of Speech, Meaning, audio_url, slug. - """ - soup = BeautifulSoup(html_bytes, "html.parser") - rows = [] - for tr in soup.select("table tr"): - tds = tr.find_all("td") - if len(tds) < 4: - continue - # Audio URL from span[data-audio] in first td - audio_span = tds[0].find(attrs={"data-audio": True}) - audio_url = audio_span["data-audio"] if audio_span else "" - # Slug from the detail page link (e.g., /dict/6009-av/ → 6009-av) - slug = "" - link = tds[0].find("a", href=True) - if link: - m = re.search(r"/dict/([^/]+)/", link["href"]) - if m: - slug = m.group(1) - # Word with nikkud - menukad = tds[0].find("span", class_="menukad") - word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True) - # Root (may be link or plain text) - root = tds[1].get_text(strip=True) - # Part of speech - pos = tds[2].get_text(strip=True) - # Meaning - meaning = tds[3].get_text(strip=True) - if word: - rows.append( - { - "Word": word, - "Root": root if root else "-", - "Part of Speech": pos, - "Meaning": meaning, - "audio_url": audio_url, - "slug": slug, - } - ) - return rows - - -def extract_from_website(max_pages: int | None = None) -> pd.DataFrame: - """ - Extract dictionary entries from pealim.com. - Captures audio URLs from each word entry's data-audio attribute. - - Args: - max_pages: Maximum pages to scrape (None = all) - - Returns: - DataFrame with Word, Root, Part of Speech, Meaning, Word Without Nikkud, audio_url columns - """ - total_pages = max_pages or get_total_pages() - logger.info(f"Starting extraction from {total_pages} pages...") - - all_rows: list[dict] = [] - - for page_num in range(1, total_pages + 1): - try: - url = f"{PEALIM_DICT_URL}?page={page_num}" - - # First request: with nikkud — parse with BeautifulSoup for audio URL - cookies = {"translit": "none", "hebstyle": "mo"} - response = session.get(url, cookies=cookies, timeout=REQUEST_TIMEOUT) - response.raise_for_status() - page_rows = _parse_page_with_audio(response.content) - - # Second request: without nikkud — just get the word column - cookies_vl = {"translit": "none", "hebstyle": "vl", "showmeaning": "off"} - resp_vl = session.get(url, cookies=cookies_vl, timeout=REQUEST_TIMEOUT) - resp_vl.raise_for_status() - soup_vl = BeautifulSoup(resp_vl.content, "html.parser") - no_nik_words = [] - for tr in soup_vl.select("table tr"): - tds = tr.find_all("td") - if len(tds) < 4: - continue - menukad = tds[0].find("span", class_="menukad") - w = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True) - no_nik_words.append(w) - - # Merge no-nikkud words into rows - for i, row in enumerate(page_rows): - row["Word Without Nikkud"] = no_nik_words[i] if i < len(no_nik_words) else "" - - all_rows.extend(page_rows) - - if page_num % 50 == 0: - logger.info(f"Processed {page_num}/{total_pages} pages ({len(all_rows)} words so far)...") - - time.sleep(REQUEST_DELAY) - - except requests.RequestException as e: - logger.error(f"Error fetching page {page_num}: {e}. Retrying...") - time.sleep(REQUEST_DELAY * 2) - except Exception as e: - logger.error(f"Unexpected error on page {page_num}: {e}") - continue - - df = pd.DataFrame(all_rows) - audio_count = (df["audio_url"] != "").sum() if "audio_url" in df.columns else 0 - logger.info(f"Extraction complete. Total words: {len(df)}, with audio URL: {audio_count}") - return df - - -def modify_for_anki(df: pd.DataFrame) -> pd.DataFrame: - """ - Transform dictionary DataFrame for Anki import. - Adds shared root words and Hebrew tags. Preserves audio_url column. - """ - logger.info("Preparing data for Anki...") - - # Find shared root words - shared_root_words = [] - for _idx, row in df.iterrows(): - root = row["Root"] - word = row["Word"] - - if root != "-" and pd.notna(root): - same_root = df[(df["Root"] == root) & (df["Word"] != word)]["Word"].values - shared = " ".join(str(w) for w in same_root) - shared_root_words.append(shared) - else: - shared_root_words.append("") - - df["shared roots"] = shared_root_words - - # Generate Hebrew tags - tags = [] - for _idx, row in df.iterrows(): - tag_parts = [] - - root = str(row["Root"]).replace(" ", "").replace("-", "") - if "nan" not in root and root: - root_clean = root.replace(".", "") - tag_parts.append(f"שורש::{root_clean}") - - pos = str(row["Part of Speech"]) - pos_tags = { - "Adverb": "תוארי_הפועל", - "Pronoun": "כינויי_גוף", - "Noun": "שם_עצם", - "Verb": "פעלים", - "Adjective": "שם_תואר", - "Preposition": "מילות_יחס", - "Conjunction": "מילות_חיבור", - "Particle": "מילית", - } - - for key, value in pos_tags.items(): - if key in pos: - tag_parts.append(value) - break - - tags.append(" ".join(tag_parts)) - - df["tags"] = tags - logger.info("Anki preparation complete.") - return df - - -def main(): - """Main entry point.""" - try: - df = extract_from_website() - df.to_csv("hebrew_dict.csv", index=True) - logger.info("Saved: hebrew_dict.csv") - - df = modify_for_anki(df) - df.to_csv("hebrew_dict_for_anki.csv", sep=";", index=True) - logger.info("Saved: hebrew_dict_for_anki.csv") - - logger.info("Complete!") - - except Exception as e: - logger.error(f"Fatal error: {e}") - raise - - -if __name__ == "__main__": - main() diff --git a/rebuild_sentence_matches.py b/rebuild_sentence_matches.py deleted file mode 100644 index 1d8b1cb..0000000 --- a/rebuild_sentence_matches.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python3 -""" -Rebuild vocab_sentence_matches.json using both direct word matching -and ktiv male conjugated/declined form matching. - -This dramatically improves sentence coverage by matching not just -dictionary forms but all conjugated verbs and declined nouns. -""" - -import json -import logging -import re -from pathlib import Path - -import pandas as pd - -from helpers import strip_nikkud as _strip_nikkud - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") -logger = logging.getLogger(__name__) - -DATA_DIR = Path(__file__).parent / "data" - - -def main(): - # Load sentences - with open(DATA_DIR / "epub_sentence_index.json") as f: - sentences = json.load(f).get("sentences", []) - logger.info(f"Loaded {len(sentences)} sentences") - - # Load vocab CSV - csv_path = DATA_DIR / "hebrew_dict_for_anki.csv" - try: - df = pd.read_csv(csv_path, sep=";", index_col=0) - if df.shape[1] < 3: - raise ValueError - except (ValueError, pd.errors.ParserError): - df = pd.read_csv(csv_path, index_col=0) - logger.info(f"Loaded {len(df)} vocab entries") - - # Build word lookup: stripped_form → (word_nikkud, word_no_nikkud) - word_lookup: dict[str, list[tuple[str, str]]] = {} - for _, row in df.iterrows(): - word = str(row.get("Word", "")).strip() - wni = str(row.get("Word Without Nikkud", "")).strip() - if not word or word in ("nan", "None"): - continue - stripped = _strip_nikkud(word) - if stripped: - word_lookup.setdefault(stripped, []).append((word, wni)) - - # Load ktiv male forms: ktiv_male_form → [{word_nikkud, form_type, ...}] - ktiv_path = DATA_DIR / "ktiv_male_forms.json" - ktiv_forms: dict[str, list[dict]] = {} - if ktiv_path.exists(): - with open(ktiv_path) as f: - ktiv_forms = json.load(f) - logger.info(f"Loaded {len(ktiv_forms)} ktiv male forms") - else: - logger.warning("No ktiv_male_forms.json — only using direct matching") - - # Build reverse lookup: ktiv_male → set of dictionary words (nikkud) - ktiv_to_word: dict[str, set[str]] = {} - for ktiv, entries in ktiv_forms.items(): - for entry in entries: - word_nikkud = entry.get("word_nikkud", "") - if word_nikkud: - ktiv_to_word.setdefault(ktiv, set()).add(word_nikkud) - - # Also add all vocab words' own stripped forms to ktiv_to_word - for stripped, entries in word_lookup.items(): - for word_nikkud, _ in entries: - ktiv_to_word.setdefault(stripped, set()).add(word_nikkud) - - logger.info(f"Total matchable forms: {len(ktiv_to_word)}") - - # Tokenize all sentences once - sentence_tokens: list[tuple[dict, list[str]]] = [] - for s in sentences: - stripped = s.get("stripped", _strip_nikkud(s.get("text", ""))) - tokens = [re.sub(r'[.,!?;:"\'\u05be]', "", t) for t in stripped.split()] - tokens = [t for t in tokens if t] # remove empty - sentence_tokens.append((s, tokens)) - - # Match: for each sentence token, check ktiv_to_word lookup - # Build word_nikkud → [sentence_info] - matches: dict[str, list[dict]] = {} # word_nikkud → [sentences] - - for sent, tokens in sentence_tokens: - text = sent.get("text", "") - book = sent.get("book", "") - word_len = len(tokens) - - # Skip sentences that are too short or too long - if word_len < 4 or word_len > 15: - continue - - for tok in tokens: - if tok in ktiv_to_word: - for word_nikkud in ktiv_to_word[tok]: - matches.setdefault(word_nikkud, []).append( - { - "text": text, - "book": book, - "matched_form": tok, - "word_count": word_len, - } - ) - - logger.info(f"Words with at least 1 match: {len(matches)}") - - # Deduplicate and limit to 3 best sentences per word - # Prefer shorter sentences (6-12 words ideal) - output: dict[str, dict] = {} - for word_nikkud, sents in matches.items(): - # Deduplicate by text - seen_texts = set() - unique = [] - for s in sents: - if s["text"] not in seen_texts: - seen_texts.add(s["text"]) - unique.append(s) - - # Score: prefer 6-12 word sentences - def score(s): - wc = s["word_count"] - if 6 <= wc <= 12: - return 0 # ideal - return abs(wc - 9) # distance from ideal - - unique.sort(key=score) - best = unique[:3] - - # Find the Word Without Nikkud for this word - stripped = _strip_nikkud(word_nikkud) - wni = stripped # default - if stripped in word_lookup: - for wn, w_wni in word_lookup[stripped]: - if wn == word_nikkud: - wni = w_wni - break - - output[wni] = { - "word_nikkud": word_nikkud, - "sentences": [{"text": s["text"], "book": s["book"]} for s in best], - } - - # Save - out_path = DATA_DIR / "vocab_sentence_matches.json" - with open(out_path, "w") as f: - json.dump(output, f, ensure_ascii=False, indent=1) - - total_sents = sum(len(v["sentences"]) for v in output.values()) - logger.info(f"Saved {len(output)} words with {total_sents} sentences → {out_path}") - - # Stats - total_vocab = len(df) - pct = len(output) * 100 / total_vocab - logger.info(f"Coverage: {len(output)}/{total_vocab} ({pct:.1f}%)") - - # Breakdown by match type - direct_only = 0 - ktiv_only = 0 - both = 0 - for _wni, info in output.items(): - word = info["word_nikkud"] - stripped = _strip_nikkud(word) - has_direct = stripped in word_lookup - has_ktiv = any(s.get("matched_form", "") != stripped for s in info["sentences"]) - if has_direct and has_ktiv: - both += 1 - elif has_ktiv: - ktiv_only += 1 - else: - direct_only += 1 - - logger.info(f" Direct matches only: {direct_only}") - logger.info(f" Ktiv male matches only: {ktiv_only}") - logger.info(f" Both: {both}") - - -if __name__ == "__main__": - main() diff --git a/scripts/add_slugs.py b/scripts/add_slugs.py deleted file mode 100644 index 0242e47..0000000 --- a/scripts/add_slugs.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -"""One-time script: scrape slugs from pealim.com dict pages and add to CSV.""" - -import logging -import re -import sys -import time - -import pandas as pd -import requests -from bs4 import BeautifulSoup - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", stream=sys.stderr) -logger = logging.getLogger() - -dict_csv = "data/hebrew_dict_for_anki.csv" -df = pd.read_csv(dict_csv, sep=";", index_col=0) -logger.info(f"Loaded {len(df)} rows") - -session = requests.Session() -session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"}) - -word_slug_map: dict[str, str] = {} -total_pages = 608 - -for page_num in range(1, total_pages + 1): - url = f"https://www.pealim.com/dict/?page={page_num}" - cookies = {"translit": "none", "hebstyle": "mo"} - try: - resp = session.get(url, cookies=cookies, timeout=10) - resp.raise_for_status() - soup = BeautifulSoup(resp.content, "html.parser") - for tr in soup.select("table tr"): - tds = tr.find_all("td") - if len(tds) < 4: - continue - menukad = tds[0].find("span", class_="menukad") - word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True) - link = tds[0].find("a", href=True) - slug = "" - if link: - m = re.search(r"/dict/([^/]+)/", link["href"]) - if m: - slug = m.group(1) - if word and slug: - word_slug_map[word] = slug - except Exception as e: - logger.warning(f"Page {page_num} failed: {e}") - - if page_num % 50 == 0: - logger.info(f"Scraped {page_num}/{total_pages} pages ({len(word_slug_map)} slugs)") - time.sleep(0.8) - -df["slug"] = df["Word"].map(word_slug_map).fillna("") -df.to_csv(dict_csv, sep=";", index=True) -matched = (df["slug"] != "").sum() -logger.info(f"Done. {matched}/{len(df)} words have slugs. Saved → {dict_csv}") diff --git a/scripts/extract_pdf_sentences.py b/scripts/extract_pdf_sentences.py deleted file mode 100644 index e67ad5a..0000000 --- a/scripts/extract_pdf_sentences.py +++ /dev/null @@ -1,405 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract sentences from PDF books and match vocab words to sentences. - -1. Extract sentences from alice.pdf and lion_strawberry.pdf -2. Merge into existing epub_sentence_index.json -3. Match vocab words to sentences, produce vocab_sentence_matches.json -""" - -import json -import os -import re -import sys - -# Use the venv with pymupdf -sys.path.insert(0, "/home/node/projects/pealim/venv_pdf/lib/python3.11/site-packages") -# Also need the main venv for pandas -sys.path.insert(0, "/home/node/projects/pealim/lib/python3.11/site-packages") - -import fitz -import pandas as pd - -BASE_DIR = "/home/node/projects/pealim" -DATA_DIR = os.path.join(BASE_DIR, "data") -EPUBS_DIR = os.path.join(DATA_DIR, "epubs") -SENTENCE_INDEX = os.path.join(DATA_DIR, "epub_sentence_index.json") -VOCAB_CSV = os.path.join(DATA_DIR, "hebrew_dict_for_anki.csv") -MATCHES_FILE = os.path.join(DATA_DIR, "vocab_sentence_matches.json") - -NIKKUD_RE = re.compile(r"[\u0591-\u05C7]") -HEBREW_RE = re.compile(r"[\u05d0-\u05ea]") -HEBREW_CHAR_RE = re.compile(r"[\u05d0-\u05ea\ufb20-\ufb4f]") - - -def strip_nikkud(text): - """Remove all Hebrew nikkud/cantillation marks.""" - return NIKKUD_RE.sub("", text) - - -def collapse_hebrew_spaces(text): - """Collapse spaces between Hebrew letter fragments (for badly-encoded PDFs). - - Strategy: strip nikkud first, then iteratively remove spaces between - Hebrew characters. Real word boundaries are detected by: - - Final-form letters (ם ן ף ך ץ) followed by space - - Punctuation (.,;:!?"') - - Non-Hebrew characters - """ - stripped = strip_nikkud(text) - # Normalize presentation forms to standard Hebrew - # FB20-FB4F contains presentation forms - for code in range(0xFB2A, 0xFB50): - ch = chr(code) - if ch in stripped: - # Map shin/sin dots, dagesh forms back to base - # FB2A = שׁ (shin+dot), FB2B = שׂ (sin+dot) - base_map = { - "\ufb2a": "ש", - "\ufb2b": "ש", - "\ufb35": "ו", - "\ufb4b": "ו", - "\ufb30": "א", - "\ufb31": "ב", - "\ufb32": "ג", - "\ufb33": "ד", - "\ufb34": "ה", - "\ufb36": "ז", - "\ufb38": "ט", - "\ufb39": "י", - "\ufb3a": "כ", - "\ufb3b": "כ", - "\ufb3c": "ל", - "\ufb3e": "מ", - "\ufb40": "נ", - "\ufb41": "ס", - "\ufb43": "פ", - "\ufb44": "פ", - "\ufb46": "צ", - "\ufb47": "ק", - "\ufb48": "ר", - "\ufb49": "ש", - "\ufb4a": "ת", - } - if ch in base_map: - stripped = stripped.replace(ch, base_map[ch]) - - # Replace multiple spaces with single - stripped = re.sub(r" {2,}", " ", stripped) - - # Now rebuild text, keeping spaces only at word boundaries - # Word boundary markers: final-form letters, punctuation, non-Hebrew - final_forms = set("םןףךץ") - result = [] - i = 0 - chars = list(stripped) - - while i < len(chars): - if chars[i] != " ": - result.append(chars[i]) - i += 1 - continue - - # It's a space. Decide if it's a word boundary. - # Look back for the last non-space character - prev_ch = None - for j in range(len(result) - 1, -1, -1): - if result[j] != " ": - prev_ch = result[j] - break - - # Look forward for next non-space character - next_ch = None - for j in range(i + 1, len(chars)): - if chars[j] != " ": - next_ch = chars[j] - break - - is_boundary = False - - # After final-form letter = word boundary - if prev_ch and prev_ch in final_forms: - is_boundary = True - - # Before/after punctuation or non-Hebrew = word boundary - if prev_ch and not HEBREW_RE.match(prev_ch): - is_boundary = True - if next_ch and not HEBREW_RE.match(next_ch): - is_boundary = True - - # If either side is not Hebrew at all, boundary - if prev_ch is None or next_ch is None: - is_boundary = True - - if is_boundary: - result.append(" ") - # else: skip the space (collapse intra-word gap) - i += 1 - - return "".join(result).strip() - - -def extract_pdf_sentences(pdf_path, book_name): - """Extract sentences from a PDF file.""" - doc = fitz.open(pdf_path) - sentences = [] - - for page_num in range(len(doc)): - page = doc[page_num] - text = page.get_text() - - if not text.strip(): - continue - - # Split into lines first, then split on sentence-ending punctuation - lines = text.split("\n") - - raw_sentences = [] - for line in lines: - line = line.strip() - if not line: - continue - # Split on sentence-ending punctuation followed by space or at end - parts = re.split(r"(?<=[.?!])\s+", line) - raw_sentences.extend(parts) - - for sent in raw_sentences: - sent = sent.strip() - if not sent: - continue - - # Must contain Hebrew characters - if not HEBREW_RE.search(sent): - continue - - # Create stripped version (no nikkud, collapsed spaces for PDF) - stripped = collapse_hebrew_spaces(sent) - - # Count Hebrew words in stripped version - words = [w for w in stripped.split() if HEBREW_RE.search(w)] - word_count = len(words) - - # Filter: 4-15 Hebrew words - if word_count < 4 or word_count > 15: - continue - - # Drop metadata-like lines - # Page numbers (just digits) - if re.match(r"^\d+$", sent.strip()): - continue - # Copyright text - if any(kw in sent.lower() for kw in ["copyright", "©", "isbn", "printed in"]): - continue - - sentences.append( - { - "text": sent, - "book": book_name, - "stripped": stripped, - } - ) - - doc.close() - return sentences - - -def has_extractable_text(pdf_path): - """Check if a PDF has extractable text.""" - doc = fitz.open(pdf_path) - text_found = False - for i in range(min(len(doc), 10)): - if doc[i].get_text().strip(): - text_found = True - break - doc.close() - return text_found - - -def load_sentence_index(): - """Load existing sentence index.""" - if os.path.exists(SENTENCE_INDEX): - with open(SENTENCE_INDEX, encoding="utf-8") as f: - return json.load(f) - return {"sentences": []} - - -def save_sentence_index(data): - """Save sentence index.""" - with open(SENTENCE_INDEX, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def match_vocab_to_sentences(sentences, vocab_df): - """Match vocab words to sentences.""" - matches = {} - - # Build lookup: word_no_nikkud -> word_nikkud - vocab_words = [] - for _, row in vocab_df.iterrows(): - word_no_nik = str(row.get("Word Without Nikkud", "")).strip() - word_nik = str(row.get("Word", "")).strip() - if word_no_nik and word_nik: - vocab_words.append((word_no_nik, word_nik)) - - print(f"Matching {len(vocab_words)} vocab words against {len(sentences)} sentences...") - - # Precompute: for each sentence, get the stripped text - sent_data = [] - for s in sentences: - stripped = s.get("stripped", "") - # For PDF sentences, stripped already has collapsed spaces but words may be joined - # For EPUB sentences, stripped has proper word spacing - sent_data.append( - { - "text": s["text"], - "book": s["book"], - "stripped": stripped, - "word_count": len(stripped.split()), - } - ) - - matched_count = 0 - - for word_no_nik, word_nik in vocab_words: - if len(word_no_nik) < 2: - continue - - # Build regex for word boundary matching - # Use both approaches: proper word boundary and substring for PDF text - pattern = re.compile(r"(?:^|\s)" + re.escape(word_no_nik) + r"(?:\s|$)") - # For PDF texts with collapsed spaces, also try substring match - # but only for words >= 3 chars to avoid false positives - use_substring = len(word_no_nik) >= 3 - - word_matches = [] - - for sd in sent_data: - stripped = sd["stripped"] - - # Try word-boundary match first - if pattern.search(stripped): - word_matches.append(sd) - elif use_substring and word_no_nik in stripped: - # Substring match for PDF texts with collapsed spaces - # Verify it's not part of a longer word by checking the character - # before and after in the collapsed text - idx = stripped.find(word_no_nik) - before_ok = idx == 0 or not HEBREW_RE.match(stripped[idx - 1]) - after_idx = idx + len(word_no_nik) - after_ok = after_idx >= len(stripped) or not HEBREW_RE.match(stripped[after_idx]) - # Only count if at least one boundary is clear - # (for PDF collapsed text, boundaries are often missing) - # For PDF books, we accept substring matches - if sd["book"] in ("אליס בארץ הפלאות", "האריה שאהב תות") or before_ok or after_ok: - word_matches.append(sd) - - if word_matches: - matched_count += 1 - - # Sort by preference: 6-12 words ideal, then shorter is better - def score(sd): - wc = sd["word_count"] - if 6 <= wc <= 12: - return (0, wc) # ideal range, prefer shorter - if wc < 6: - return (1, -wc) # too short - return (2, wc) # too long - - word_matches.sort(key=score) - best = word_matches[:3] - - matches[word_no_nik] = { - "word_nikkud": word_nik, - "sentences": [{"text": m["text"], "book": m["book"]} for m in best], - } - - print( - f"Words with at least 1 match: {matched_count}/{len(vocab_words)} ({100 * matched_count / len(vocab_words):.1f}%)" - ) - return matches - - -def main(): - # ── Step 1: Extract from PDFs ── - pdfs = [ - ("alice.pdf", "אליס בארץ הפלאות"), - ("lion_strawberry.pdf", "האריה שאהב תות"), - ] - - all_new_sentences = [] - - for filename, book_name in pdfs: - pdf_path = os.path.join(EPUBS_DIR, filename) - if not os.path.exists(pdf_path): - print(f"SKIP: {filename} not found") - continue - - if not has_extractable_text(pdf_path): - print(f"SKIP: {filename} has no extractable text (likely scanned images)") - continue - - print(f"Extracting from {filename} ({book_name})...") - sentences = extract_pdf_sentences(pdf_path, book_name) - print(f" Extracted {len(sentences)} sentences") - all_new_sentences.extend(sentences) - - # ── Step 2: Merge with existing index ── - index = load_sentence_index() - existing_count = len(index["sentences"]) - - # Deduplicate by (stripped, book) - existing_keys = set() - for s in index["sentences"]: - key = (s.get("stripped", ""), s.get("book", "")) - existing_keys.add(key) - - added = 0 - for s in all_new_sentences: - key = (s["stripped"], s["book"]) - if key not in existing_keys: - index["sentences"].append(s) - existing_keys.add(key) - added += 1 - - save_sentence_index(index) - total = len(index["sentences"]) - print(f"\nSentence index: {existing_count} existing + {added} new = {total} total") - - # ── Per-book stats ── - book_counts = {} - for s in index["sentences"]: - book = s.get("book", "unknown") - book_counts[book] = book_counts.get(book, 0) + 1 - - print("\nSentences per book:") - for book, count in sorted(book_counts.items(), key=lambda x: -x[1]): - print(f" {book}: {count}") - - # ── Step 3: Match vocab words to sentences ── - print(f"\nLoading vocab from {VOCAB_CSV}...") - vocab_df = pd.read_csv(VOCAB_CSV, sep=";", index_col=0) - print(f" {len(vocab_df)} vocab words loaded") - - matches = match_vocab_to_sentences(index["sentences"], vocab_df) - - with open(MATCHES_FILE, "w", encoding="utf-8") as f: - json.dump(matches, f, ensure_ascii=False, indent=2) - - print(f"\nWrote {len(matches)} word matches to {MATCHES_FILE}") - - # ── Step 4: Summary stats ── - total_words = len(vocab_df) - matched_words = len(matches) - print(f"\n{'=' * 50}") - print("SUMMARY") - print(f"{'=' * 50}") - print(f"Total sentences: {total}") - for book, count in sorted(book_counts.items(), key=lambda x: -x[1]): - print(f" {book}: {count}") - print(f"Total vocab words: {total_words}") - print(f"Words with sentences: {matched_words} ({100 * matched_words / total_words:.1f}%)") - print(f"Words without sentences: {total_words - matched_words}") - - -if __name__ == "__main__": - main() diff --git a/scripts/migrate_to_json.py b/scripts/migrate_to_json.py deleted file mode 100644 index 840ba1d..0000000 --- a/scripts/migrate_to_json.py +++ /dev/null @@ -1,1041 +0,0 @@ -"""Migration script: builds data/words.json from all existing data sources. - -Run: - python3 scripts/migrate_to_json.py - python3 scripts/migrate_to_json.py --dry-run -""" - -from __future__ import annotations - -import argparse -import csv -import json -import logging -import re -import sys -import unicodedata -from collections import defaultdict -from pathlib import Path -from typing import Any - -import genanki - -# --------------------------------------------------------------------------- -# Bootstrap: parent package helpers -# --------------------------------------------------------------------------- -sys.path.insert(0, str(Path(__file__).parent.parent)) -from helpers import strip_nikkud # noqa: E402 - -# --------------------------------------------------------------------------- -# Logging -# --------------------------------------------------------------------------- -logging.basicConfig( - format="%(levelname)s %(message)s", - level=logging.INFO, -) -log = logging.getLogger(__name__) - -# --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- -DATA_DIR = Path(__file__).parent.parent / "data" -OUTPUT_FILE = DATA_DIR / "words.json" -MIGRATION_DATE = "2026-03-08" - -EMOJI_RE = re.compile( - r"[\U0001F300-\U0001FFFF" - r"\U00002600-\U000027BF" - r"\U0001F000-\U0001F9FF" - r"\u2600-\u26FF" - r"\u2700-\u27BF]+", - re.UNICODE, -) - - -# NFC-normalise once; used throughout for consistent Unicode comparisons. -def _nfc(s: str) -> str: - return unicodedata.normalize("NFC", s) - - -# --------------------------------------------------------------------------- -# PoS → Hebrew mapping -# --------------------------------------------------------------------------- -POS_HEBREW: dict[str, str] = { - "Noun": "שֵׁם עֶצֶם", - "Verb": "פֹּעַל", - "Adjective": "שֵׁם תֹּאַר", - "Adverb": "תֹּאַר הַפֹּעַל", - "Pronoun": "כִּנּוּי גּוּף", - "Preposition": "מִילַּת יַחַס", - "Conjunction": "מִילַּת חִבּוּר", - "Interjection": "מִילַּת קְרִיאָה", - "Numeral": "שֵׁם מִסְפָּר", - "Cardinal numeral": "שֵׁם מִסְפָּר", - "Particle": "מִילִּית", - "Determiner": "מְגַדִּיר", - "Existential": "מִילַּת קִיּוּם", - "Interrogative": "מִילַּת שְׁאֵלָה", -} - -# Binyan suffix appended to pos_hebrew for verbs -BINYAN_HEBREW: dict[str, str] = { - "Pa'al": "פָּעַל", - "Nif'al": "נִפְעַל", - "Pi'el": "פִּיעֵל", - "Pu'al": "פֻּעַל", - "Hif'il": "הִפְעִיל", - "Huf'al": "הֻפְעַל", - "Hitpa'el": "הִתְפַּעֵל", -} - -# Conjugation form-key → person code -FORM_KEY_TO_PERSON: dict[str, str] = { - "present_ms": "ms", - "present_fs": "fs", - "present_mp": "mp", - "present_fp": "fp", - "past_1s": "1s", - "past_1p": "1p", - "past_2ms": "2ms", - "past_2fs": "2fs", - "past_2mp": "2mp", - "past_2fp": "2fp", - "past_3ms": "3ms", - "past_3fs": "3fs", - "past_3p": "3p", - "future_1s": "1s", - "future_1p": "1p", - "future_2ms": "2ms", - "future_2fs": "2fs", - "future_2mp": "2mp", - "future_2fp": "2fp", - "future_3ms": "3ms", - "future_3fs": "3fs", - "future_3mp": "3mp", - "future_3fp": "3fp", - "imperative_ms": "ms", - "imperative_fs": "fs", - "imperative_mp": "mp", - "imperative_fp": "fp", - "infinitive": "inf", -} - -# Mirrors apkg_builder.PRESENT_EXPANSION — all pronoun/tense choices per present form key. -# The builder uses a per-verb seeded RNG to pick one; we store all possible GUIDs. -PRESENT_EXPANSION: dict[str, list[tuple[str, str]]] = { - "present_ms": [ - ("אֲנִי (זָכָר)", "הוֹוֶה"), - ("אַתָּה", "הוֹוֶה"), - ("הוּא", "הוֹוֶה"), - ], - "present_fs": [ - ("אֲנִי (נְקֵבָה)", "הוֹוֶה"), - ("אַתְּ", "הוֹוֶה"), - ("הִיא", "הוֹוֶה"), - ], - "present_mp": [ - ("אֲנַחְנוּ (זָכָר)", "הוֹוֶה"), - ("אַתֶּם", "הוֹוֶה"), - ("הֵם", "הוֹוֶה"), - ], - "present_fp": [ - ("אֲנַחְנוּ (נְקֵבָה)", "הוֹוֶה"), - ("אַתֶּן", "הוֹוֶה"), - ("הֵן", "הוֹוֶה"), - ], -} - -# Mirrors apkg_builder.PAST_3P_EXPANSION -PAST_3P_EXPANSION: list[tuple[str, str]] = [ - ("הֵם", "עָבָר"), - ("הֵן", "עָבָר"), -] - -# Mirrors apkg_builder.FP_MODERN_FALLBACK -FP_MODERN_FALLBACK: dict[str, str] = { - "future_2fp": "future_2mp", - "future_3fp": "future_3mp", - "imperative_fp": "imperative_mp", -} - -# 1st-person forms that get a randomly assigned gender label in the builder -_FIRST_PERSON_GENDERED: set[str] = {"past_1s", "past_1p", "future_1s", "future_1p"} - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _strip(text: str) -> str: - """Strip nikkud using the shared helper.""" - return strip_nikkud(text) - - -def _hebrew_word(nikkud: str) -> dict[str, str]: - """Build a {nikkud, ktiv_male} sub-object.""" - return {"nikkud": nikkud, "ktiv_male": _strip(nikkud)} - - -def _parse_root(raw: str) -> list[str]: - """Parse root string like 'שׁ - מ - ר' into list of consonants. - - Returns empty list for '-' or empty input. - """ - raw = raw.strip() - if not raw or raw == "-": - return [] - parts = [p.strip() for p in raw.split(" - ")] - return [p for p in parts if p] - - -def _extract_emoji(meaning: str) -> tuple[str, str | None]: - """Split emoji from meaning string. - - Returns (clean_meaning, emoji_char_or_None). - """ - emoji_match = EMOJI_RE.search(meaning) - if not emoji_match: - return meaning.strip(), None - emoji = emoji_match.group(0) - clean = EMOJI_RE.sub("", meaning).strip() - # Collapse multiple spaces - clean = re.sub(r"\s{2,}", " ", clean).strip() - return clean, emoji - - -def _parse_pos(raw_pos: str) -> tuple[str, str]: - """Return (pos_english, pos_hebrew) from raw PoS string. - - Handles patterns like: - - "Noun – masculine" → ("Noun", "שֵׁם עֶצֶם") - - "Verb –pa'al" → ("Verb", "פֹּעַל — פָּעַל") - - "Noun –ketelpattern, feminine" → ("Noun", "שֵׁם עֶצֶם") - - "–" → ("Existential", "מִילַּת קִיּוּם") - - "Cardinal numeral – masculine" → ("Cardinal numeral", "שֵׁם מִסְפָּר") - """ - raw_pos = raw_pos.strip() - - # Special case for bare "–" (יש, אין) - if raw_pos == "–": - return "Existential", POS_HEBREW["Existential"] - - # Split on " – " (em-dash with spaces) or " –" (em-dash no space) - first_part = re.split(r"\s*–", raw_pos)[0].strip() - - # Map the first word to canonical English PoS key - # "Cardinal numeral" needs two words - if first_part.lower().startswith("cardinal"): - pos_en = "Cardinal numeral" - else: - pos_en = first_part.split()[0].capitalize() if first_part else raw_pos - - # Detect binyan for verbs: "Verb –pa'al" → part after – is binyan slug - binyan_hebrew: str | None = None - if pos_en == "Verb": - # extract the binyan part: everything after the dash, strip "pattern" etc. - after = re.split(r"–\s*", raw_pos, maxsplit=1) - if len(after) > 1: - binyan_slug_raw = after[1].split(",")[0].strip() - # Normalise: "pa'al" → "Pa'al", "hif'il" → "Hif'il" etc. - for k in BINYAN_HEBREW: - if k.lower() == binyan_slug_raw.lower(): - binyan_hebrew = BINYAN_HEBREW[k] - break - - base_hebrew = POS_HEBREW.get(pos_en, "") - if binyan_hebrew: - pos_hebrew = f"{base_hebrew} — {binyan_hebrew}" if base_hebrew else binyan_hebrew - else: - pos_hebrew = base_hebrew - - return pos_en, pos_hebrew - - -def _strip_construct_hyphen(form: str) -> str: - """Remove trailing maqqef hyphen from construct form (e.g. 'אֲבִי־' → 'אֲבִי').""" - return form.rstrip("־").rstrip("-").strip() - - -# --------------------------------------------------------------------------- -# Data loaders -# --------------------------------------------------------------------------- - - -def load_csv(path: Path) -> list[dict[str, str]]: - rows: list[dict[str, str]] = [] - with path.open(encoding="utf-8") as f: - reader = csv.DictReader(f, delimiter=";") - for row in reader: - rows.append(dict(row)) - log.info("Loaded %d rows from %s", len(rows), path.name) - return rows - - -def load_json(path: Path) -> Any: - with path.open(encoding="utf-8") as f: - data = json.load(f) - log.info("Loaded %s (%d entries)", path.name, len(data)) - return data - - -# --------------------------------------------------------------------------- -# Build legacy GUID lookup -# --------------------------------------------------------------------------- - - -def build_guid_lookup( - guid_map: dict[str, str], -) -> tuple[dict[str, str], dict[tuple[str, str], str]]: - """Split guid_map into plain-word and (word, meaning) keyed dicts. - - All keys NFC-normalised for consistent comparison. - """ - base: dict[str, str] = {} - disambig: dict[tuple[str, str], str] = {} - for raw_k, guid in guid_map.items(): - k = _nfc(raw_k) - if "||" in k: - word, meaning = k.split("||", 1) - disambig[(word, meaning)] = guid - else: - base[k] = guid - return base, disambig - - -def resolve_guid( - word_nikkud: str, - meaning: str, - base: dict[str, str], - disambig: dict[tuple[str, str], str], -) -> str | None: - w = _nfc(word_nikkud) - m = _nfc(meaning) - # Prefer explicit disambiguation - if (w, m) in disambig: - return disambig[(w, m)] - # Check any disambiguation key that starts with same prefix (truncated meanings) - for (dw, dm), g in disambig.items(): - if dw == w and (m.startswith(dm) or dm.startswith(m[:20])): - return g - return base.get(w) - - -# --------------------------------------------------------------------------- -# Unique key generation -# --------------------------------------------------------------------------- - - -def build_unique_keys( - rows: list[dict[str, str]], -) -> tuple[dict[int, str], list[str]]: - """Assign unique_key to each CSV row (by index). - - Escalation: - 1. nikkud word - 2. "word|pos" (if nikkud collides) - 3. "word|pos|meaning" (if nikkud+pos collides) - 4. "word|pos|meaning|N" (N=2,3,… for true CSV exact-duplicates) - - Returns: - idx_to_key — map from CSV row index to unique_key - collisions — list of collision descriptions logged - """ - collisions: list[str] = [] - idx_to_key: dict[int, str] = {} - - def _pos_short(pos: str) -> str: - """Canonical short PoS label for key construction.""" - if pos == "–": - return "Existential" - return re.split(r"\s*[–-]", pos)[0].strip() - - # Pass 1: try plain nikkud key - key_to_indices: dict[str, list[int]] = defaultdict(list) - for i, row in enumerate(rows): - k = row["Word"] - key_to_indices[k].append(i) - - for k, indices in key_to_indices.items(): - if len(indices) == 1: - idx_to_key[indices[0]] = k - else: - collisions.append(f"Nikkud collision '{k}' ({len(indices)} rows) — escalating to word|pos") - # Pass 2: try word|pos - pos_key_to_indices: dict[str, list[int]] = defaultdict(list) - for i in indices: - short_pos = _pos_short(rows[i]["Part of Speech"]) - pos_key = f"{k}|{short_pos}" - pos_key_to_indices[pos_key].append(i) - for pk, pk_indices in pos_key_to_indices.items(): - if len(pk_indices) == 1: - idx_to_key[pk_indices[0]] = pk - else: - collisions.append( - f" Nikkud+PoS collision '{pk}' ({len(pk_indices)} rows) — escalating to word|pos|meaning" - ) - # Pass 3: try word|pos|meaning - meaning_key_to_indices: dict[str, list[int]] = defaultdict(list) - for j in pk_indices: - meaning = rows[j]["Meaning"] - full_key = f"{pk}|{meaning}" - meaning_key_to_indices[full_key].append(j) - for mk, mk_indices in meaning_key_to_indices.items(): - if len(mk_indices) == 1: - idx_to_key[mk_indices[0]] = mk - else: - # True exact duplicates: append numeric suffix |2, |3, … - collisions.append( - f" Exact duplicate '{mk}' ({len(mk_indices)} rows, same slug) " - f"— appending numeric suffix" - ) - idx_to_key[mk_indices[0]] = mk - for n, j in enumerate(mk_indices[1:], start=2): - idx_to_key[j] = f"{mk}|{n}" - - # Verify completeness - unkeyed = [i for i in range(len(rows)) if i not in idx_to_key] - if unkeyed: - log.error("BUG: %d rows have no unique_key assigned!", len(unkeyed)) - - return idx_to_key, collisions - - -# --------------------------------------------------------------------------- -# Conjugation builder -# --------------------------------------------------------------------------- - - -def _conj_guids( - infinitive_nikkud: str, - form_key: str, - form_data: dict, -) -> list[str]: - """Return the list of possible GUIDs for a conjugation form. - - Mirrors apkg_builder's add_note call logic: - - Present tense: one GUID per PRESENT_EXPANSION choice (all stored). - - past_3p: two GUIDs (הֵם / הֵן). - - FP_MODERN_FALLBACK keys: GUID from form_data pronoun/tense directly. - - 1st-person gendered: two GUIDs (זָכָר / נְקֵבָה suffix). - - Standard: single GUID from form_data pronoun + tense. - - The builder uses a seeded RNG to *pick one* for present/past_3p; we store - all candidates so a future reader can identify which GUID is live. - """ - if form_key in PRESENT_EXPANSION: - return [genanki.guid_for(infinitive_nikkud, pronoun, tense) for pronoun, tense in PRESENT_EXPANSION[form_key]] - - if form_key == "past_3p": - return [genanki.guid_for(infinitive_nikkud, pronoun, tense) for pronoun, tense in PAST_3P_EXPANSION] - - if form_key in FP_MODERN_FALLBACK: - # Builder uses form_data pronoun/tense directly for these - pronoun = form_data.get("pronoun", "") - tense = form_data.get("tense", "") - return [genanki.guid_for(infinitive_nikkud, pronoun, tense)] - - pronoun = form_data.get("pronoun", "") - tense = form_data.get("tense", "") - - if form_key in _FIRST_PERSON_GENDERED: - # Builder appends " (זָכָר)" or " (נְקֵבָה)" — store both - return [ - genanki.guid_for(infinitive_nikkud, f"{pronoun} (זָכָר)", tense), - genanki.guid_for(infinitive_nikkud, f"{pronoun} (נְקֵבָה)", tense), - ] - - return [genanki.guid_for(infinitive_nikkud, pronoun, tense)] - - -def build_conjugation_forms(forms_dict: dict, infinitive_nikkud: str = "") -> list[dict]: - """Convert raw forms dict to list of ConjugationForm objects. - - Args: - forms_dict: Raw forms dict from conjugations.json. - infinitive_nikkud: Nikkud infinitive string used for GUID generation. - """ - result: list[dict] = [] - # We store all candidate GUIDs rather than selecting one - for form_key, form_data in forms_dict.items(): - if form_key == "infinitive": - continue # stored separately at conjugation.infinitive - person = FORM_KEY_TO_PERSON.get(form_key) - if person is None: - log.warning("Unknown form key: %s", form_key) - continue - nikkud_form = form_data.get("form", "") - if not nikkud_form: - continue - guids = _conj_guids(infinitive_nikkud, form_key, form_data) if infinitive_nikkud else [] - result.append( - { - "person": person, - "tense": form_data.get("tense", ""), - "pronoun_hebrew": form_data.get("pronoun", ""), - "form": _hebrew_word(nikkud_form), - "audio_url": form_data.get("audio_url") or None, - "audio_file": None, - "guid": guids[0] if len(guids) == 1 else None, - "guid_candidates": guids if len(guids) > 1 else None, - } - ) - return result - - -# --------------------------------------------------------------------------- -# Main migration -# --------------------------------------------------------------------------- - - -def migrate(dry_run: bool = False) -> None: # noqa: C901 (complex but linear) - # ------------------------------------------------------------------ - # 1. Load all sources - # ------------------------------------------------------------------ - csv_rows = load_csv(DATA_DIR / "hebrew_dict_for_anki.csv") - conjugations: dict = load_json(DATA_DIR / "conjugations.json") - noun_plurals: dict = load_json(DATA_DIR / "noun_plurals.json") - vetted_sentences: dict = load_json(DATA_DIR / "vetted_sentences.json") - guid_map_raw: dict = load_json(DATA_DIR / "legacy_guid_map.json") - refined_meanings: dict = load_json(DATA_DIR / "refined_meanings.json") - image_cache: dict = load_json(DATA_DIR / "image_cache.json") - frequency_cache: dict = load_json(DATA_DIR / "frequency_cache.json") - # ------------------------------------------------------------------ - # 2. Pre-process lookups - # ------------------------------------------------------------------ - guid_base, guid_disambig = build_guid_lookup(guid_map_raw) - - # noun_plurals: two lookup maps — by slug (primary), by nikkud singular (fallback) - plurals_by_slug: dict[str, dict] = {} - plurals_by_nikkud: dict[str, dict] = {} - for pdata in noun_plurals.values(): - slug = pdata.get("slug", "") - if slug: - plurals_by_slug[slug] = pdata - sing = _nfc(pdata.get("singular", "")) - if sing: - plurals_by_nikkud[sing] = pdata - - # vetted_sentences: keyed by stripped word; build NFC lookup of word_nikkud too - sentences_by_stripped: dict[str, dict] = {} - for sdata in vetted_sentences.values(): - wn = sdata.get("word_nikkud", "") - if wn: - sentences_by_stripped[_strip(wn)] = sdata - - # conjugations: indexed by slug (100% coverage) and by stripped infinitive - # Some active/passive pairs share the same slug (e.g. הופל/להפיל → 1231-lehapil). - # When slug collides, always prefer the ACTIVE verb in conj_by_slug so the - # entry is correctly associated with its active conjugation data. - conj_by_slug: dict[str, dict] = {} - conj_by_stripped_inf: dict[str, dict] = {} - for cdata in conjugations.values(): - slug = cdata.get("slug", "") - if slug: - existing = conj_by_slug.get(slug) - if existing is None: - conj_by_slug[slug] = cdata - elif cdata.get("is_passive") and not existing.get("is_passive"): - # Keep the active verb; skip overwriting with passive - pass - elif existing.get("is_passive") and not cdata.get("is_passive"): - # Replace passive with active - conj_by_slug[slug] = cdata - else: - conj_by_slug[slug] = cdata - inf = cdata.get("infinitive", "") - if inf: - conj_by_stripped_inf[_strip(inf)] = cdata - - # Build passive→active link: - # passive verbs store reference_form = nikkud infinitive of the ACTIVE verb - # We need: active_slug → passive_conj_data - passive_by_active_slug: dict[str, dict] = {} - for cdata in conjugations.values(): - if not cdata.get("is_passive"): - continue - ref_nikkud = cdata.get("reference_form", "") - ref_stripped = _strip(ref_nikkud) - # find the active verb's slug - active_cdata = conj_by_stripped_inf.get(ref_stripped) - if active_cdata: - active_slug = active_cdata.get("slug", "") - if active_slug: - passive_by_active_slug[active_slug] = cdata - else: - log.warning( - "Passive verb '%s' references active '%s' (stripped='%s') — no match in conjugations", - cdata.get("infinitive"), - ref_nikkud, - ref_stripped, - ) - - # refined_meanings: NFC-keyed - refined_nfc: dict[str, str] = {_nfc(k): v for k, v in refined_meanings.items()} - - # image_cache: stripped-word keyed - image_stripped: dict[str, str | None] = dict(image_cache) - - # frequency_cache: stripped-word keyed - freq_stripped: dict[str, int] = {k: int(v) for k, v in frequency_cache.items() if v is not None} - - # ------------------------------------------------------------------ - # 3. Assign unique keys - # ------------------------------------------------------------------ - idx_to_key, collisions = build_unique_keys(csv_rows) - for msg in collisions: - log.info("KEY COLLISION: %s", msg) - log.info("Collision summary: %d collision events", len(collisions)) - - # ------------------------------------------------------------------ - # 3b. Identify exact-duplicate |N suffix rows to skip - # ------------------------------------------------------------------ - # |N suffix rows (N=2,3,…) are true CSV exact-duplicates that share the - # same slug as the base entry. We drop them entirely so the unique_key - # space stays clean and no GUID collisions are emitted. - import re as _re - - _dup_indices: set[int] = set() - for _i, _k in idx_to_key.items(): - if _re.search(r"\|\d+$", _k): - _base_k = _re.sub(r"\|\d+$", "", _k) - _base_i = next((j for j, kk in idx_to_key.items() if kk == _base_k), None) - if _base_i is not None and csv_rows[_i]["slug"] == csv_rows[_base_i]["slug"]: - _dup_indices.add(_i) - if _dup_indices: - log.info( - "Skipping %d exact-duplicate |N suffix rows (same slug as base entry)", - len(_dup_indices), - ) - - # ------------------------------------------------------------------ - # 4. Confusable groups: group by ktiv_male (from ktiv_male_forms) - # ------------------------------------------------------------------ - # Build: stripped_word → set of slugs sharing that ktiv_male form - # We care about the *base* form (absolute_singular or absolute form of the word). - # Strategy: use "Word Without Nikkud" from CSV as ktiv_male, then group slugs. - # A confusable group = multiple *different* slugs sharing the same ktiv_male. - slug_to_ktiv_male: dict[str, str] = {} - for row in csv_rows: - slug_to_ktiv_male[row["slug"]] = row["Word Without Nikkud"] - - ktiv_male_to_slugs: dict[str, set[str]] = defaultdict(set) - for slug, km in slug_to_ktiv_male.items(): - ktiv_male_to_slugs[km].add(slug) - - # Only keep those with >1 distinct slug - confusable_slug_groups: dict[str, set[str]] = { - km: slugs for km, slugs in ktiv_male_to_slugs.items() if len(slugs) > 1 - } - log.info("Confusable ktiv_male groups: %d", len(confusable_slug_groups)) - - # Build reverse: slug → list of co-confusable slugs - slug_to_confusable_slugs: dict[str, set[str]] = {} - for _km, slugs in confusable_slug_groups.items(): - for slug in slugs: - slug_to_confusable_slugs[slug] = slugs - {slug} - - # We need to map slug → unique_key(s) for the confusable_group field - # But unique_key is per-row; one slug may map to multiple keys (duplicate entries with same slug). - # Exclude exact-duplicate rows so dropped entries don't pollute confusable groups. - slug_to_unique_keys: dict[str, list[str]] = defaultdict(list) - for i, row in enumerate(csv_rows): - if i not in _dup_indices: - slug_to_unique_keys[row["slug"]].append(idx_to_key[i]) - - # ------------------------------------------------------------------ - # 5. Build entries - # ------------------------------------------------------------------ - words: dict[str, dict] = {} - stats = { - "total": 0, - "has_conjugation": 0, - "has_noun_inflection": 0, - "has_examples": 0, - "has_guid": 0, - "has_image": 0, - "has_frequency": 0, - "has_hint": 0, - "has_emoji": 0, - "key_collisions": len(collisions), - } - - for i, row in enumerate(csv_rows): - if i in _dup_indices: - continue - unique_key = idx_to_key[i] - word_nikkud = row["Word"] - word_ktiv = row["Word Without Nikkud"] - slug = row["slug"] - raw_pos = row["Part of Speech"] - meaning_raw = row["Meaning"] - audio_url = row["audio_url"] or None - tags = row["tags"] or "" - - # -- PoS - pos_en, pos_hebrew = _parse_pos(raw_pos) - - # -- Root - root = _parse_root(row["Root"]) - - # -- Meaning + emoji - meaning_clean, emoji_char = _extract_emoji(meaning_raw) - - # -- GUID - guid = resolve_guid(word_nikkud, meaning_raw, guid_base, guid_disambig) - if guid: - stats["has_guid"] += 1 - - # -- Frequency (keyed by ktiv_male / stripped) - frequency = freq_stripped.get(word_ktiv) - if frequency: - stats["has_frequency"] += 1 - - # -- Image - image_filename = image_stripped.get(word_ktiv) - if image_filename: - stats["has_image"] += 1 - - # -- Hint (refined_meanings, NFC-keyed by nikkud) - hint = refined_nfc.get(_nfc(word_nikkud), "") - if hint: - stats["has_hint"] += 1 - - # -- Examples (vetted_sentences keyed by stripped word) - examples_block: dict | None = None - s_data = sentences_by_stripped.get(word_ktiv) - if s_data: - good = s_data.get("good_sentences", []) - if good: - vetted_list = [ - { - "text": s["text"], - "source": s.get("book", "unknown"), - "vetted": True, - } - for s in good - ] - # Pick best cloze sentence (first good one) - cloze_sent = good[0] - # cloze_guid: deterministic ID for the cloze card on this vocab note. - # Pattern: guid_for(word_nikkud, "cloze") — unique per word. - _cloze_guid = genanki.guid_for(word_nikkud, "cloze") - _cloze_text = cloze_sent["text"] - - # Compute cloze_word_start / cloze_word_end from the text. - # Strategy (in order): - # 1. Use stored offsets if already present in source data. - # 2. Exact nikkud form search. - # 3. Exact ktiv_male (plain consonants) search in the raw text. - # 4. Scan each Hebrew word token in the text; match by stripped consonants. - # This handles inflected/construct/plural forms with different nikkud. - _cw_start: int | None = cloze_sent.get("cloze_word_start") - _cw_end: int | None = cloze_sent.get("cloze_word_end") - if _cw_start is None or _cw_end is None: - _idx = _cloze_text.find(word_nikkud) - if _idx >= 0: - _cw_start = _idx - _cw_end = _idx + len(word_nikkud) - else: - # Try exact ktiv_male substring - _idx2 = _cloze_text.find(word_ktiv) - if _idx2 >= 0: - _cw_start = _idx2 - _cw_end = _idx2 + len(word_ktiv) - else: - # Scan Hebrew word tokens; find one whose stripped form - # matches word_ktiv (handles inflected/construct/plural). - _HEBREW_TOK = re.compile( - r"[\u05D0-\u05FA\u05B0-\u05BD\u05BF\u05C1\u05C2\u05C7" - r"\uFB1D-\uFB4E]+" - ) - for _m in _HEBREW_TOK.finditer(_cloze_text): - if _strip(_m.group(0)) == word_ktiv: - _cw_start = _m.start() - _cw_end = _m.end() - break - # else leave both as None - - cloze_block = { - "text": _cloze_text, - "cloze_word_start": _cw_start, - "cloze_word_end": _cw_end, - "cloze_hint": cloze_sent.get("cloze_hint"), - "cloze_guid": _cloze_guid, - } - examples_block = { - "vetted": vetted_list, - "cloze": cloze_block, - "rejected_count": s_data.get("rejected_count", 0), - } - stats["has_examples"] += 1 - - # -- Noun inflection - noun_inflection: dict | None = None - pdata = plurals_by_slug.get(slug) or plurals_by_nikkud.get(_nfc(word_nikkud)) - if pdata and pos_en.startswith("Noun"): - - def _hw_or_null(nk: str) -> dict | None: - nk = _strip_construct_hyphen(nk) - return _hebrew_word(nk) if nk else None - - gender = pdata.get("gender") or None - gender_hebrew_map = { - "masculine": {"nikkud": "זָכָר", "ktiv_male": "זכר"}, - "feminine": {"nikkud": "נְקֵבָה", "ktiv_male": "נקבה"}, - } - # Plural GUID mirrors apkg_builder line 1609: guid_for("plural", singular_nikkud) - _plural_singular_nikkud = pdata.get("singular", "") - _plurals_guid = genanki.guid_for("plural", _plural_singular_nikkud) if _plural_singular_nikkud else None - noun_inflection = { - "plurals_guid": _plurals_guid, - "singular": _hw_or_null(pdata.get("singular", "")), - "plural": _hw_or_null(pdata.get("plural", "")), - "singular_audio": pdata.get("singular_audio") or None, - "plural_audio": pdata.get("plural_audio") or None, - "construct_singular": _hw_or_null(pdata.get("construct_singular", "")), - "construct_plural": _hw_or_null(pdata.get("construct_plural", "")), - "pronominal_suffixes": None, - "gender": gender, - "gender_hebrew": gender_hebrew_map.get(gender) if gender else None, - "mishkal": pdata.get("mishkal") or None, - "mishkal_hebrew": None, - } - stats["has_noun_inflection"] += 1 - - # -- Verb conjugation - conjugation_block: dict | None = None - cdata = conj_by_slug.get(slug) - if cdata and not cdata.get("is_passive"): - # This entry is an active verb with conjugation data - forms_dict = cdata.get("forms", {}) - # Resolve infinitive nikkud for GUID generation (prefer forms dict, fall back to cdata key) - _inf_data = forms_dict.get("infinitive", {}) - _inf_nikkud_for_guid = _inf_data.get("form", "") or cdata.get("infinitive", "") - active_forms = build_conjugation_forms(forms_dict, _inf_nikkud_for_guid) - - # Passive counterpart, if any - passive_cdata = passive_by_active_slug.get(slug) - hufal_pual_forms: list | None = None - reference_form_passive: dict | None = None - if passive_cdata: - passive_forms_dict = passive_cdata.get("forms", {}) - _passive_inf_data = passive_forms_dict.get("infinitive", {}) - _passive_inf_nikkud = _passive_inf_data.get("form", "") or passive_cdata.get("infinitive", "") - hufal_pual_forms = build_conjugation_forms(passive_forms_dict, _passive_inf_nikkud) - # reference_form of passive = active infinitive; 3ms past is in its forms - rf_passive_nikkud = passive_cdata.get("forms", {}).get("past_3ms", {}).get("form", "") - if rf_passive_nikkud: - reference_form_passive = _hebrew_word(rf_passive_nikkud) - - # Infinitive form (from forms dict) - inf_form_data = forms_dict.get("infinitive", {}) - inf_nikkud = inf_form_data.get("form", "") or cdata.get("infinitive", "") - infinitive_hw = _hebrew_word(inf_nikkud) if inf_nikkud else None - - # Reference form - ref_nikkud = cdata.get("reference_form", "") - reference_form_hw = _hebrew_word(ref_nikkud) if ref_nikkud else None - - binyan = cdata.get("binyan", "") - binyan_hebrew = BINYAN_HEBREW.get(binyan, "") - - conjugation_block = { - "in_conjugation_deck": True, - "infinitive": infinitive_hw, - "reference_form": reference_form_hw, - "binyan": binyan, - "binyan_hebrew": binyan_hebrew, - "prep": None, - "active_forms": active_forms, - "hufal_pual_forms": hufal_pual_forms, - "reference_form_passive": reference_form_passive, - } - stats["has_conjugation"] += 1 - - elif cdata and cdata.get("is_passive"): - # Passive-only entry: store a minimal conjugation block referencing the active verb - binyan = cdata.get("binyan", "") - binyan_hebrew = BINYAN_HEBREW.get(binyan, "") - forms_dict = cdata.get("forms", {}) - _passive_only_inf_data = forms_dict.get("infinitive", {}) - _passive_only_inf_nikkud = _passive_only_inf_data.get("form", "") or cdata.get("infinitive", "") - passive_forms = build_conjugation_forms(forms_dict, _passive_only_inf_nikkud) - - inf_form_data = forms_dict.get("infinitive", {}) - inf_nikkud = inf_form_data.get("form", "") or cdata.get("infinitive", "") - infinitive_hw = _hebrew_word(inf_nikkud) if inf_nikkud else None - - ref_nikkud = cdata.get("reference_form", "") - reference_form_hw = _hebrew_word(ref_nikkud) if ref_nikkud else None - - conjugation_block = { - "in_conjugation_deck": True, - "infinitive": infinitive_hw, - "reference_form": reference_form_hw, - "binyan": binyan, - "binyan_hebrew": binyan_hebrew, - "prep": None, - "active_forms": passive_forms, - "hufal_pual_forms": None, - "reference_form_passive": None, - } - stats["has_conjugation"] += 1 - - # -- Confusable group (filled in pass 2 below) - # -- Shared roots (filled in pass 2 below) - - # -- Audio filename: slug-based for confusables, word-based otherwise - audio_file = f"{word_ktiv}.mp3" - - entry: dict = { - "word": {"nikkud": word_nikkud, "ktiv_male": word_ktiv}, - "slug": slug, - "root": root, - "pos": pos_en, - "pos_hebrew": pos_hebrew, - "meaning": meaning_clean, - "meaning_raw": meaning_raw, - "audio_url": audio_url, - "audio_file": audio_file, - "tags": tags, - "last_scrape_date": MIGRATION_DATE, - # Identity - "vocab_legacy_guid": guid, - # Frequency - "frequency": frequency, - "pseudo_frequency": None, - # Display - "emoji": emoji_char, - "emoji_source": "from_pealim" if emoji_char else None, - "emoji_visible": False, - "image": image_filename, - "image_source": "wikipedia" if image_filename else None, - "hint": hint, - # Populated in pass 2 - "shared_roots": [], - "confusable_group": None, - "confusables_guid": None, - # Sub-sections - "examples": examples_block, - "noun_inflection": noun_inflection, - "conjugation": conjugation_block, - "adjective_inflection": None, - "preposition_inflection": None, - } - - if emoji_char: - stats["has_emoji"] += 1 - - if unique_key in words: - log.warning( - "DUPLICATE unique_key '%s' — row %d would overwrite row %d", - unique_key, - i, - list(words.keys()).index(unique_key), - ) - words[unique_key] = entry - stats["total"] += 1 - - # ------------------------------------------------------------------ - # 6. Pass 2 — shared_roots and confusable_group - # ------------------------------------------------------------------ - - # shared_roots: group unique_keys by root tuple - root_to_keys: dict[tuple, list[str]] = defaultdict(list) - for uk, entry in words.items(): - r = entry["root"] - if r: - root_to_keys[tuple(r)].append(uk) - - for uks in root_to_keys.values(): - if len(uks) > 1: - for uk in uks: - words[uk]["shared_roots"] = [k for k in uks if k != uk] - - # confusable_group: update audio_file to slug-based for confusable words - # Also set confusables_guid: genanki.guid_for("confusable", ktiv_male) - # where ktiv_male is the shared stripped form (key in confusable_slug_groups). - # Build reverse: slug → ktiv_male (for GUID generation) - slug_to_confusable_ktiv_male: dict[str, str] = {} - for km, slugs in confusable_slug_groups.items(): - for slug_in_group in slugs: - slug_to_confusable_ktiv_male[slug_in_group] = km - - for i, row in enumerate(csv_rows): - if i in _dup_indices: - continue - slug = row["slug"] - uk = idx_to_key[i] - co_slugs = slug_to_confusable_slugs.get(slug, set()) - if co_slugs: - # Gather all unique_keys for co-confusable slugs - group_keys: list[str] = [] - for co_slug in co_slugs: - group_keys.extend(slug_to_unique_keys.get(co_slug, [])) - group_keys.append(uk) - group_keys = sorted(set(group_keys)) - words[uk]["confusable_group"] = group_keys - # confusables_guid: mirrors apkg_builder line 1401 - ktiv_male_key = slug_to_confusable_ktiv_male.get(slug, "") - if ktiv_male_key: - words[uk]["confusables_guid"] = genanki.guid_for("confusable", ktiv_male_key) - # Use slug-based audio file for confusables to disambiguate - words[uk]["audio_file"] = f"{slug}.mp3" - - # ------------------------------------------------------------------ - # 7. Stats report - # ------------------------------------------------------------------ - log.info("=" * 60) - log.info("MIGRATION COMPLETE — summary stats:") - log.info(" Total entries: %d", stats["total"]) - log.info(" Key collision events: %d", stats["key_collisions"]) - log.info(" Has conjugation: %d", stats["has_conjugation"]) - log.info(" Has noun_inflection: %d", stats["has_noun_inflection"]) - log.info(" Has examples: %d", stats["has_examples"]) - log.info(" Has legacy GUID: %d", stats["has_guid"]) - log.info(" Has image: %d", stats["has_image"]) - log.info(" Has frequency: %d", stats["has_frequency"]) - log.info(" Has hint: %d", stats["has_hint"]) - log.info(" Has emoji: %d", stats["has_emoji"]) - # Confusable entries - confusable_entries = sum(1 for e in words.values() if e["confusable_group"]) - log.info(" In confusable group: %d", confusable_entries) - # Entries with shared roots - with_shared_roots = sum(1 for e in words.values() if e["shared_roots"]) - log.info(" Has shared roots: %d", with_shared_roots) - - if dry_run: - log.info("DRY RUN — output file NOT written.") - return - - # ------------------------------------------------------------------ - # 8. Write output - # ------------------------------------------------------------------ - with OUTPUT_FILE.open("w", encoding="utf-8") as f: - json.dump(words, f, ensure_ascii=False, indent=2) - f.write("\n") - - log.info("Wrote %d entries to %s", len(words), OUTPUT_FILE) - - -# --------------------------------------------------------------------------- -# Entry point -# --------------------------------------------------------------------------- - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Migrate all pealim data sources into data/words.json", - ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Print stats without writing the output file.", - ) - args = parser.parse_args() - migrate(dry_run=args.dry_run) - - -if __name__ == "__main__": - main() diff --git a/scripts/repair_slugs.py b/scripts/repair_slugs.py deleted file mode 100644 index f0937c9..0000000 --- a/scripts/repair_slugs.py +++ /dev/null @@ -1,420 +0,0 @@ -#!/usr/bin/env python3 -""" -Repair duplicate slugs in data/words.json. - -Homographs (words with identical spelling but different meanings) were -assigned the same slug by the scraper. This script fetches the pealim.com -search page for each affected word, matches entries by meaning (and nikkud), -and writes the corrected slugs back to words.json and the source CSV. - -Usage: - python3 scripts/repair_slugs.py [--dry-run] -""" - -from __future__ import annotations - -import argparse -import json -import logging -import re -import sys -import time -from collections import defaultdict -from difflib import SequenceMatcher -from pathlib import Path - -import pandas as pd -import requests -from bs4 import BeautifulSoup - -# --------------------------------------------------------------------------- -# Paths -# --------------------------------------------------------------------------- -PROJECT_ROOT = Path(__file__).resolve().parent.parent -WORDS_JSON = PROJECT_ROOT / "data" / "words.json" -CSV_PATH = PROJECT_ROOT / "data" / "hebrew_dict_for_anki.csv" - -# --------------------------------------------------------------------------- -# HTTP session -# --------------------------------------------------------------------------- -SESSION = requests.Session() -SESSION.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"}) -COOKIES: dict[str, str] = {"translit": "none", "hebstyle": "mo"} -REQUEST_DELAY = 1.5 # seconds between requests -REQUEST_TIMEOUT = 15 # seconds - -# --------------------------------------------------------------------------- -# Logging -# --------------------------------------------------------------------------- -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s %(levelname)s %(message)s", - datefmt="%H:%M:%S", -) -logger = logging.getLogger(__name__) - -# --------------------------------------------------------------------------- -# Similarity helpers -# --------------------------------------------------------------------------- -FUZZY_THRESHOLD = 0.4 - - -def _similarity(a: str, b: str) -> float: - """Return SequenceMatcher ratio between two strings (both lowercased).""" - return SequenceMatcher(None, a.lower(), b.lower()).ratio() - - -def _best_match( - our_meaning: str, - candidates: list[dict], - our_nikkud: str, -) -> tuple[dict | None, float]: - """ - Return (best_candidate, ratio) by comparing our_meaning against each - candidate's meaning field. Nikkud exact-match gives a bonus to break ties. - """ - best: dict | None = None - best_score = -1.0 - - for cand in candidates: - ratio = _similarity(our_meaning, cand["meaning"]) - # Nikkud exact match adds a small bonus so the right homograph wins - # even when meanings are very similar - if our_nikkud and cand["word"] == our_nikkud: - ratio = min(1.0, ratio + 0.05) - if ratio > best_score: - best_score = ratio - best = cand - - return best, best_score - - -# --------------------------------------------------------------------------- -# Search-page parser -# --------------------------------------------------------------------------- -def _parse_search_results(html: bytes) -> list[dict]: - """ - Parse pealim.com search results page. - - Each ``div.verb-search-result`` block contains: - - div.verb-search-data > a[href] → slug - - div.verb-search-lemma > span.menukad → nikkud word - - div.verb-search-binyan → part of speech - - div.verb-search-meaning → meaning text - - Returns a list of dicts with keys: slug, word, pos, meaning. - """ - soup = BeautifulSoup(html, "html.parser") - results: list[dict] = [] - - for block in soup.find_all("div", class_="verb-search-result"): - data_div = block.find("div", class_="verb-search-data") - if not data_div: - continue - - # Slug from the detail-page link - slug = "" - link = data_div.find("a", href=True) - if link: - m = re.search(r"/dict/([^/#]+)/", link["href"]) - if m: - slug = m.group(1) - - # Nikkud word - lemma_div = block.find("div", class_="verb-search-lemma") - menukad = lemma_div.find("span", class_="menukad") if lemma_div else None - word = menukad.get_text(strip=True) if menukad else (lemma_div.get_text(strip=True) if lemma_div else "") - - # Part of speech - pos_div = block.find("div", class_="verb-search-binyan") - pos = pos_div.get_text(strip=True).replace("Part of speech:", "").strip() if pos_div else "" - - # Meaning - meaning_div = block.find("div", class_="verb-search-meaning") - meaning = meaning_div.get_text(strip=True) if meaning_div else "" - - if slug: - results.append({"slug": slug, "word": word, "pos": pos, "meaning": meaning}) - - return results - - -def _fetch_search_results(ktiv_male: str) -> list[dict]: - """Fetch and parse search results for a given consonant-only spelling.""" - url = f"https://www.pealim.com/search/?q={ktiv_male}" - logger.debug("GET %s", url) - resp = SESSION.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - return _parse_search_results(resp.content) - - -# --------------------------------------------------------------------------- -# Core logic -# --------------------------------------------------------------------------- -def find_duplicate_groups(data: dict) -> dict[str, list[str]]: - """ - Return mapping slug → [word_key, ...] for all slugs shared by 2+ entries. - The word_key is the top-level key in words.json (nikkud + PoS + meaning). - """ - slug_to_keys: dict[str, list[str]] = defaultdict(list) - for key, entry in data.items(): - slug = entry.get("slug", "") - if slug: - slug_to_keys[slug].append(key) - return {slug: keys for slug, keys in slug_to_keys.items() if len(keys) > 1} - - -def repair_group( - slug: str, - keys: list[str], - data: dict, - dry_run: bool, -) -> tuple[int, int]: - """ - Attempt to repair one group of entries sharing *slug*. - - Homographs can have different ktiv_male spellings (e.g. אבידה vs אבדה for - the two spellings of אֲבֵדָה). We therefore build a union of all search - results obtained by querying each distinct ktiv_male in the group. - - Returns (fixed_count, skipped_count). - """ - # Collect distinct ktiv_male values across the group (usually one, but - # sometimes two when homographs have different consonant spellings). - ktiv_to_keys: dict[str, list[str]] = defaultdict(list) - for k in keys: - ktiv = data[k]["word"]["ktiv_male"] - ktiv_to_keys[ktiv].append(k) - - nikkud_word = data[keys[0]]["word"]["nikkud"] - logger.info( - " Fetching search results for %s — %d entries share slug %s", - nikkud_word, - len(keys), - slug, - ) - - # Fetch search results for every distinct ktiv_male and merge - all_candidates: list[dict] = [] - seen_slugs: set[str] = set() - for ktiv in ktiv_to_keys: - try: - results = _fetch_search_results(ktiv) - except requests.RequestException as exc: - logger.warning(" HTTP error for %s: %s", ktiv, exc) - results = [] - for r in results: - if r["slug"] not in seen_slugs: - seen_slugs.add(r["slug"]) - all_candidates.append(r) - if len(ktiv_to_keys) > 1: - # Small delay between sub-queries within the same group - time.sleep(REQUEST_DELAY) - - if not all_candidates: - logger.warning(" No search results — skipping group") - return 0, len(keys) - - # Filter candidates to those whose nikkud word matches the entry's nikkud. - # This avoids accidentally matching a completely different word that shares - # the same consonant spelling (e.g. different voweling entirely). - group_nikkuds = {data[k]["word"]["nikkud"] for k in keys} - filtered = [c for c in all_candidates if c["word"] in group_nikkuds] - - if not filtered: - logger.warning( - " Search results don't contain nikkud %s — candidates: %s — skipping", - group_nikkuds, - [c["word"] for c in all_candidates], - ) - return 0, len(keys) - - fixed = 0 - skipped = 0 - - for key in keys: - entry = data[key] - our_meaning = entry.get("meaning", "") - our_nikkud = entry["word"]["nikkud"] - - # Only consider candidates that match this entry's nikkud - nikkud_filtered = [c for c in filtered if c["word"] == our_nikkud] - pool = nikkud_filtered if nikkud_filtered else filtered - - best, score = _best_match(our_meaning, pool, our_nikkud) - - if best is None or score < FUZZY_THRESHOLD: - logger.warning( - " SKIP key=%s | meaning=%r | best_score=%.2f", - key, - our_meaning, - score, - ) - skipped += 1 - continue - - new_slug = best["slug"] - old_slug = entry["slug"] - - if new_slug == old_slug: - logger.info(" SAME key=%s | slug=%s (score=%.2f)", key, old_slug, score) - fixed += 1 - continue - - logger.info( - " FIX key=%s | %s → %s | matched=%r (score=%.2f)", - key, - old_slug, - new_slug, - best["meaning"], - score, - ) - - if not dry_run: - data[key]["slug"] = new_slug - - fixed += 1 - - return fixed, skipped - - -# --------------------------------------------------------------------------- -# CSV update -# --------------------------------------------------------------------------- -def update_csv(data: dict, dry_run: bool) -> None: - """ - Re-write the CSV so every row's slug column matches words.json. - - The CSV is semicolon-delimited; the slug column is named 'slug'. - We match rows by 'Word Without Nikkud' (ktiv_male) AND 'Meaning' because - homographs share the same ktiv_male. - """ - df = pd.read_csv(CSV_PATH, sep=";", dtype=str) - - if "slug" not in df.columns: - logger.warning("CSV has no 'slug' column — skipping CSV update") - return - - # Build a lookup: (ktiv_male, meaning) → new_slug from words.json - lookup: dict[tuple[str, str], str] = {} - for entry in data.values(): - ktiv = entry["word"].get("ktiv_male", "") - meaning = entry.get("meaning", "") - slug = entry.get("slug", "") - if ktiv and slug: - lookup[(ktiv, meaning)] = slug - - changes = 0 - for idx, row in df.iterrows(): - ktiv = str(row.get("Word Without Nikkud", "")).strip() - meaning = str(row.get("Meaning", "")).strip() - key = (ktiv, meaning) - if key in lookup: - new_slug = lookup[key] - old_slug = str(row["slug"]).strip() - if new_slug != old_slug: - logger.info( - " CSV row %d: %s → %s (%s)", - idx, - old_slug, - new_slug, - ktiv, - ) - if not dry_run: - df.at[idx, "slug"] = new_slug - changes += 1 - - logger.info("CSV: %d slug(s) to update", changes) - if not dry_run and changes: - df.to_csv(CSV_PATH, sep=";", index=True) - logger.info("CSV written to %s", CSV_PATH) - elif dry_run: - logger.info("DRY-RUN: CSV not written") - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description="Repair duplicate slugs in data/words.json") - parser.add_argument( - "--dry-run", - action="store_true", - help="Preview changes without writing any files", - ) - parser.add_argument( - "--verbose", - "-v", - action="store_true", - help="Enable debug logging", - ) - args = parser.parse_args(argv) - - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - - if args.dry_run: - logger.info("=== DRY-RUN mode — no files will be modified ===") - - # Load data - logger.info("Loading %s", WORDS_JSON) - with WORDS_JSON.open(encoding="utf-8") as fh: - data: dict = json.load(fh) - logger.info("Loaded %d entries", len(data)) - - # Identify duplicate groups - groups = find_duplicate_groups(data) - total_groups = len(groups) - total_entries = sum(len(v) for v in groups.values()) - logger.info( - "Found %d duplicate-slug groups covering %d entries", - total_groups, - total_entries, - ) - - # Process each group - total_fixed = 0 - total_skipped = 0 - - for group_idx, (slug, keys) in enumerate(sorted(groups.items()), 1): - logger.info( - "[%d/%d] slug=%s (%d entries)", - group_idx, - total_groups, - slug, - len(keys), - ) - fixed, skipped = repair_group(slug, keys, data, dry_run=args.dry_run) - total_fixed += fixed - total_skipped += skipped - - # Respectful delay between HTTP requests - if group_idx < total_groups: - time.sleep(REQUEST_DELAY) - - logger.info( - "Summary: %d fixed, %d skipped (out of %d entries in %d groups)", - total_fixed, - total_skipped, - total_entries, - total_groups, - ) - - # Write updated words.json - if not args.dry_run: - logger.info("Writing %s", WORDS_JSON) - with WORDS_JSON.open("w", encoding="utf-8") as fh: - json.dump(data, fh, ensure_ascii=False, indent=2) - logger.info("words.json written") - else: - logger.info("DRY-RUN: words.json not written") - - # Update CSV - logger.info("Updating CSV %s", CSV_PATH) - update_csv(data, dry_run=args.dry_run) - - return 0 if total_skipped == 0 else 1 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/scripts/scrape_ktiv_male.py b/scripts/scrape_ktiv_male.py deleted file mode 100644 index d164594..0000000 --- a/scripts/scrape_ktiv_male.py +++ /dev/null @@ -1,237 +0,0 @@ -#!/usr/bin/env python3 -""" -Scrape ktiv male (plene/vowelless) forms from pealim.com. - -Uses hebstyle=vl cookie to get vowelless writing with matres lectionis. -Builds a lookup: ktiv_male_form → [{word_nikkud, form_type, pos, slug}] - -This enables matching Hebrew text (which is normally in ktiv male) -against our vocabulary, including conjugated verbs and noun plurals. -""" - -import json -import logging -import sys -import time -from pathlib import Path - -import requests -from bs4 import BeautifulSoup - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") -logger = logging.getLogger(__name__) - -DATA_DIR = Path(__file__).resolve().parent.parent / "data" -OUTPUT_PATH = DATA_DIR / "ktiv_male_forms.json" -COOKIES = {"translit": "none", "hebstyle": "vl"} -REQUEST_TIMEOUT = 15 -DELAY = 1.5 # seconds between requests - - -def fetch_verb_ktiv_male(slug: str, infinitive_nikkud: str) -> list[dict]: - """Fetch all conjugated forms in ktiv male for a verb.""" - url = f"https://www.pealim.com/dict/{slug}/" - resp = requests.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - - forms = [] - table = soup.find("table", class_="conjugation-table") - if not table: - return forms - - # Also get the infinitive from the page - lead = soup.find("div", class_="lead") - if lead: - inf_spans = lead.find_all("span", class_="menukad") - for s in inf_spans: - ktiv = s.text.strip() - if ktiv: - forms.append( - { - "ktiv_male": ktiv, - "word_nikkud": infinitive_nikkud, - "form_type": "infinitive", - "pos": "Verb", - "slug": slug, - } - ) - - rows = table.find_all("tr") - for row in rows: - menukad_spans = row.find_all("span", class_="menukad") - for span in menukad_spans: - ktiv = span.text.strip() - if ktiv and ktiv not in {f["ktiv_male"] for f in forms}: - forms.append( - { - "ktiv_male": ktiv, - "word_nikkud": infinitive_nikkud, - "form_type": "conjugation", - "pos": "Verb", - "slug": slug, - } - ) - - return forms - - -def fetch_noun_ktiv_male(slug: str, singular_nikkud: str, gender: str) -> list[dict]: - """Fetch noun declension forms in ktiv male.""" - url = f"https://www.pealim.com/dict/{slug}/" - resp = requests.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - - forms = [] - table = soup.find("table", class_="conjugation-table") - if not table: - return forms - - rows = table.find_all("tr") - form_labels = ["absolute_singular", "absolute_plural", "construct_singular", "construct_plural"] - label_idx = 0 - - for row in rows: - menukad_spans = row.find_all("span", class_="menukad") - for span in menukad_spans: - ktiv = span.text.strip() - if ktiv: - ft = form_labels[label_idx] if label_idx < len(form_labels) else "other" - forms.append( - { - "ktiv_male": ktiv, - "word_nikkud": singular_nikkud, - "form_type": ft, - "pos": "Noun", - "slug": slug, - "gender": gender, - } - ) - label_idx += 1 - - return forms - - -def scrape_verbs() -> list[dict]: - """Scrape ktiv male forms for all verbs in conjugations.json.""" - conj_path = DATA_DIR / "conjugations.json" - if not conj_path.exists(): - logger.warning("No conjugations.json found") - return [] - - with open(conj_path) as f: - conjugations = json.load(f) - - all_forms = [] - slugs_done = set() - - for verb, data in conjugations.items(): - if not data or not data.get("slug"): - continue - slug = data["slug"] - if slug in slugs_done: - continue - slugs_done.add(slug) - - try: - forms = fetch_verb_ktiv_male(slug, verb) - all_forms.extend(forms) - logger.info(f" Verb {verb} ({slug}): {len(forms)} forms") - except Exception as e: - logger.warning(f" Verb {verb} ({slug}) failed: {e}") - - time.sleep(DELAY) - - return all_forms - - -def scrape_nouns() -> list[dict]: - """Scrape ktiv male forms for all nouns in noun_slug_map.json.""" - slug_path = DATA_DIR / "noun_slug_map.json" - if not slug_path.exists(): - logger.warning("No noun_slug_map.json found") - return [] - - with open(slug_path) as f: - slug_map = json.load(f) - - # Also load existing plurals to get nikkud singular form - plurals_path = DATA_DIR / "noun_plurals.json" - plurals = {} - if plurals_path.exists(): - with open(plurals_path) as f: - plurals = json.load(f) - - all_forms = [] - done = 0 - total = len(slug_map) - - for word, info in slug_map.items(): - slug = info.get("slug", "") - if not slug: - continue - - # Get nikkud form from plurals data or slug map - nikkud = info.get("word_nikkud", word) - if word in plurals: - nikkud = plurals[word].get("singular", nikkud) - gender = info.get("gender", "") - - try: - forms = fetch_noun_ktiv_male(slug, nikkud, gender) - all_forms.extend(forms) - done += 1 - if done % 50 == 0: - logger.info(f" Nouns: {done}/{total} ({len(all_forms)} forms)") - # Save incrementally - _save_forms(all_forms, partial=True) - except Exception as e: - logger.warning(f" Noun {word} ({slug}) failed: {e}") - done += 1 - - time.sleep(DELAY) - - return all_forms - - -def _save_forms(all_forms: list[dict], partial: bool = False): - """Build and save the ktiv male lookup dict.""" - lookup: dict[str, list[dict]] = {} - for entry in all_forms: - ktiv = entry["ktiv_male"] - # Don't include ktiv_male in the stored entry (it's the key) - stored = {k: v for k, v in entry.items() if k != "ktiv_male"} - lookup.setdefault(ktiv, []).append(stored) - - suffix = ".partial" if partial else "" - out = OUTPUT_PATH.parent / (OUTPUT_PATH.name + suffix) - with open(out, "w") as f: - json.dump(lookup, f, ensure_ascii=False, indent=1) - - logger.info(f" Saved {len(lookup)} unique ktiv male forms → {out}") - - -def main(): - mode = sys.argv[1] if len(sys.argv) > 1 else "all" - - all_forms = [] - - if mode in ("all", "verbs"): - logger.info("=== Scraping verb ktiv male forms ===") - verb_forms = scrape_verbs() - all_forms.extend(verb_forms) - logger.info(f"Verbs done: {len(verb_forms)} forms from {len({f['slug'] for f in verb_forms})} verbs") - - if mode in ("all", "nouns"): - logger.info("=== Scraping noun ktiv male forms ===") - noun_forms = scrape_nouns() - all_forms.extend(noun_forms) - logger.info(f"Nouns done: {len(noun_forms)} forms") - - _save_forms(all_forms) - logger.info(f"Total: {len(all_forms)} forms → {OUTPUT_PATH}") - - -if __name__ == "__main__": - main() diff --git a/scripts/scrape_noun_plurals.py b/scripts/scrape_noun_plurals.py deleted file mode 100644 index 8b18b04..0000000 --- a/scripts/scrape_noun_plurals.py +++ /dev/null @@ -1,365 +0,0 @@ -#!/usr/bin/env python3 -""" -Scrape pealim.com for noun plural and construct forms. - -Step 1: Collect noun slugs from list pages (/dict/?pos=noun&page=N) -Step 2: Fetch detail pages for plural + construct forms -Step 3: Print summary statistics -""" - -import json -import re -import time -from pathlib import Path - -import requests -from bs4 import BeautifulSoup - -BASE_URL = "https://www.pealim.com" -COOKIES = {"translit": "none", "hebstyle": "mo"} -HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; PealimScraper/1.0)"} -DATA_DIR = Path(__file__).resolve().parent.parent / "data" -SLUG_MAP_FILE = DATA_DIR / "noun_slug_map.json" -PROGRESS_FILE = DATA_DIR / "noun_slug_map_progress.json" -PLURALS_FILE = DATA_DIR / "noun_plurals.json" -DELAY = 1.5 # seconds between requests - - -def load_json(path, default=None): - if path.exists(): - with open(path) as f: - return json.load(f) - return default if default is not None else {} - - -def save_json(path, data): - with open(path, "w") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def fetch_with_retry(url, max_retries=5): - """Fetch URL with exponential backoff.""" - for attempt in range(max_retries): - try: - r = requests.get(url, cookies=COOKIES, headers=HEADERS, timeout=30) - r.raise_for_status() - return r - except (requests.RequestException, ConnectionError) as e: - wait = min(2**attempt * 2, 60) - print(f" Retry {attempt + 1}/{max_retries} for {url}: {e} (waiting {wait}s)") - time.sleep(wait) - print(f" FAILED after {max_retries} retries: {url}") - return None - - -def get_total_pages(): - """Get total number of noun list pages.""" - r = fetch_with_retry(f"{BASE_URL}/dict/?pos=noun&page=1") - if not r: - return 0 - soup = BeautifulSoup(r.text, "lxml") - pages = set() - for a in soup.select("ul.pagination li a"): - href = a.get("href", "") - m = re.search(r"page=(\d+)", href) - if m: - pages.add(int(m.group(1))) - return max(pages) if pages else 1 - - -def parse_list_page(html): - """Parse a noun list page and return list of noun entries.""" - soup = BeautifulSoup(html, "lxml") - table = soup.select_one("table.dict-table") - if not table: - return [] - - entries = [] - for row in table.select("tr")[1:]: # skip header - tds = row.select("td") - if len(tds) < 3: - continue - - # First td: word + link - first_td = tds[0] - a = first_td.select_one("a") - if not a: - continue - href = a.get("href", "") - slug_match = re.search(r"/dict/([^/]+)/", href) - if not slug_match: - continue - slug = slug_match.group(1) - - menukad = first_td.select_one("span.menukad") - word_nikkud = menukad.get_text(strip=True) if menukad else "" - - # Word without nikkud (strip combining marks) - word_plain = re.sub(r"[\u0591-\u05C7]", "", word_nikkud) - - # Third td: part of speech - pos_text = tds[2].get_text(strip=True) - - # Gender - gender = "" - if "masculine" in pos_text.lower(): - gender = "masculine" - elif "feminine" in pos_text.lower(): - gender = "feminine" - - # Mishkal pattern - mishkal = "" - m = re.search(r"(\w+)\s*pattern", pos_text.lower()) - if m: - mishkal = m.group(1) - - entries.append( - { - "word_plain": word_plain, - "slug": slug, - "word_nikkud": word_nikkud, - "pos": pos_text, - "gender": gender, - "mishkal": mishkal, - } - ) - - return entries - - -def step1_collect_slugs(): - """Step 1: Collect noun slugs from list pages.""" - print("=" * 60) - print("STEP 1: Collecting noun slugs from list pages") - print("=" * 60) - - slug_map = load_json(SLUG_MAP_FILE, {}) - progress = load_json(PROGRESS_FILE, []) - completed_pages = set(progress) if isinstance(progress, list) else set() - - # Get total pages - total_pages = get_total_pages() - print(f"Total pages: {total_pages}") - print(f"Already completed: {len(completed_pages)} pages, {len(slug_map)} nouns") - - remaining = [p for p in range(1, total_pages + 1) if p not in completed_pages] - print(f"Remaining pages: {len(remaining)}") - - if not remaining: - print("All pages already scraped!") - return slug_map - - for i, page_num in enumerate(remaining): - url = f"{BASE_URL}/dict/?pos=noun&page={page_num}" - r = fetch_with_retry(url) - if not r: - print(f" Skipping page {page_num}") - continue - - entries = parse_list_page(r.text) - for entry in entries: - word = entry["word_plain"] - slug_map[word] = { - "slug": entry["slug"], - "word_nikkud": entry["word_nikkud"], - "pos": entry["pos"], - "gender": entry["gender"], - "mishkal": entry["mishkal"], - } - - completed_pages.add(page_num) - done = len(completed_pages) - print(f" Page {page_num} ({done}/{total_pages}): {len(entries)} nouns (total: {len(slug_map)})") - - # Save progress every 10 pages - if (i + 1) % 10 == 0 or page_num == remaining[-1]: - save_json(SLUG_MAP_FILE, slug_map) - save_json(PROGRESS_FILE, sorted(completed_pages)) - print(f" [Saved progress: {len(slug_map)} nouns, {done} pages]") - - time.sleep(DELAY) - - # Final save - save_json(SLUG_MAP_FILE, slug_map) - save_json(PROGRESS_FILE, sorted(completed_pages)) - print(f"\nStep 1 complete: {len(slug_map)} total nouns from {len(completed_pages)} pages") - return slug_map - - -def parse_detail_page(html, slug, gender, mishkal): - """Parse a noun detail page for plural/construct forms.""" - soup = BeautifulSoup(html, "lxml") - tables = soup.select("table.conjugation-table") - if not tables: - return None - - table = tables[0] - rows = table.select("tr") - - result = { - "slug": slug, - "singular": "", - "singular_audio": "", - "plural": "", - "plural_audio": "", - "construct_singular": "", - "construct_plural": "", - "gender": gender, - "mishkal": mishkal, - } - - for row in rows: - th = row.select_one("th") - if not th: - continue - label = th.get_text(strip=True).lower() - tds = row.select("td") - - if "absolute" in label: - if len(tds) >= 1: - td = tds[0] - m = td.select_one("span.menukad") - result["singular"] = m.get_text(strip=True) if m else "" - audio_el = td.select_one("[data-audio]") - result["singular_audio"] = audio_el.get("data-audio", "") if audio_el else td.get("data-audio", "") - if len(tds) >= 2: - td = tds[1] - m = td.select_one("span.menukad") - result["plural"] = m.get_text(strip=True) if m else "" - audio_el = td.select_one("[data-audio]") - result["plural_audio"] = audio_el.get("data-audio", "") if audio_el else td.get("data-audio", "") - - elif "construct" in label: - if len(tds) >= 1: - td = tds[0] - m = td.select_one("span.menukad") - result["construct_singular"] = m.get_text(strip=True) if m else "" - if len(tds) >= 2: - td = tds[1] - m = td.select_one("span.menukad") - result["construct_plural"] = m.get_text(strip=True) if m else "" - - return result - - -def step2_fetch_plurals(slug_map): - """Step 2: Fetch detail pages for plural + construct forms.""" - print("\n" + "=" * 60) - print("STEP 2: Fetching plural + construct forms from detail pages") - print("=" * 60) - - plurals = load_json(PLURALS_FILE, {}) - already_done = set(plurals.keys()) - - # Build work list: nouns not yet in plurals - work = [] - for word, info in slug_map.items(): - if word not in already_done: - work.append((word, info)) - - print(f"Already have plural data: {len(already_done)}") - print(f"Remaining to fetch: {len(work)}") - - if not work: - print("All nouns already have plural data!") - return plurals - - skipped = 0 - for i, (word, info) in enumerate(work): - slug = info["slug"] - url = f"{BASE_URL}/dict/{slug}/" - r = fetch_with_retry(url) - if not r: - print(f" Skipping {word} ({slug})") - skipped += 1 - continue - - entry = parse_detail_page(r.text, slug, info.get("gender", ""), info.get("mishkal", "")) - if entry: - plurals[word] = entry - else: - # No declension table - store minimal entry - plurals[word] = { - "slug": slug, - "singular": info.get("word_nikkud", ""), - "singular_audio": "", - "plural": "", - "plural_audio": "", - "construct_singular": "", - "construct_plural": "", - "gender": info.get("gender", ""), - "mishkal": info.get("mishkal", ""), - "no_declension_table": True, - } - - done = len(already_done) + i + 1 - skipped - total = len(already_done) + len(work) - if (i + 1) % 50 == 0 or i == 0: - print( - f" [{i + 1}/{len(work)}] {word} ({slug}): " - f"plural={entry['plural'] if entry else 'N/A'} " - f"(total: {done}/{total})" - ) - - # Save every 50 entries - if (i + 1) % 50 == 0 or i == len(work) - 1: - save_json(PLURALS_FILE, plurals) - print(f" [Saved: {len(plurals)} entries]") - - time.sleep(DELAY) - - save_json(PLURALS_FILE, plurals) - print(f"\nStep 2 complete: {len(plurals)} total noun entries with plural data") - return plurals - - -def step3_summary(slug_map, plurals): - """Step 3: Print summary statistics.""" - print("\n" + "=" * 60) - print("SUMMARY") - print("=" * 60) - - total_slugs = len(slug_map) - total_plurals = len(plurals) - has_plural = sum(1 for v in plurals.values() if v.get("plural")) - has_construct = sum(1 for v in plurals.values() if v.get("construct_singular") or v.get("construct_plural")) - has_audio = sum(1 for v in plurals.values() if v.get("singular_audio") or v.get("plural_audio")) - no_table = sum(1 for v in plurals.values() if v.get("no_declension_table")) - - # Irregular plurals: masculine with ות- ending, feminine with ים- ending - irregular = 0 - for _word, v in plurals.items(): - plural = v.get("plural", "") - gender = v.get("gender", "") - if not plural or not gender: - continue - plain_plural = re.sub(r"[\u0591-\u05C7]", "", plural) - if ( - gender == "masculine" - and plain_plural.endswith("ות") - or gender == "feminine" - and plain_plural.endswith("ים") - ): - irregular += 1 - - print(f"Total nouns in slug map: {total_slugs}") - print(f"Total nouns with plural data: {total_plurals}") - print(f" - With plural form: {has_plural}") - print(f" - With construct forms: {has_construct}") - print(f" - With audio URLs: {has_audio}") - print(f" - No declension table: {no_table}") - print(f" - Irregular plurals: {irregular}") - - -def main(): - print("Pealim Noun Plural Scraper") - print(f"Data directory: {DATA_DIR}") - print() - - slug_map = step1_collect_slugs() - plurals = step2_fetch_plurals(slug_map) - step3_summary(slug_map, plurals) - - -if __name__ == "__main__": - main() diff --git a/scripts/scrape_verb_ktiv.py b/scripts/scrape_verb_ktiv.py deleted file mode 100644 index a982f0c..0000000 --- a/scripts/scrape_verb_ktiv.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python3 -"""Scrape ktiv male (vowelless plene) conjugation forms for top 500 verbs from pealim.com.""" - -import json -import os -import re -import sys -import time - -sys.stdout.reconfigure(line_buffering=True) -import requests # noqa: E402 -from bs4 import BeautifulSoup # noqa: E402 - -DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") -INPUT_FILE = os.path.join(DATA_DIR, "top_verbs_to_scrape.json") -OUTPUT_FILE = os.path.join(DATA_DIR, "ktiv_male_forms.json") -PARTIAL_FILE = os.path.join(DATA_DIR, "ktiv_male_forms_partial.json") -PROGRESS_FILE = os.path.join(DATA_DIR, "ktiv_scrape_progress.json") - -COOKIES = {"translit": "none", "hebstyle": "vl"} -HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; PealimScraper/1.0)"} -DELAY = 1.5 - -session = requests.Session() -session.cookies.update(COOKIES) -session.headers.update(HEADERS) - - -def load_json(path): - if os.path.exists(path): - with open(path, encoding="utf-8") as f: - return json.load(f) - return {} - - -def save_json(data, path): - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=1) - - -def search_slug(wni): - """Search pealim for a verb and return the first result's slug.""" - url = "https://www.pealim.com/search/" - resp = session.get(url, params={"q": wni}, timeout=15) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - - # Look for result links like /dict/SLUG/ - for a in soup.select("a[href]"): - href = a["href"] - m = re.match(r"/dict/(\d+-[^/]+)/", href) - if m: - return m.group(1) - return None - - -def scrape_verb_forms(slug): - """Fetch a verb's detail page and extract all ktiv male conjugation forms.""" - url = f"https://www.pealim.com/dict/{slug}/" - resp = session.get(url, timeout=15) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - - forms = set() - - # Get infinitive from div.lead or page title - lead = soup.select_one("div.lead") - if lead: - menukad_spans = lead.select("span.menukad") - for span in menukad_spans: - text = span.get_text(strip=True) - if text: - forms.add(text) - - # Get word_nikkud (the nikkud form of the infinitive) from the page - # We need to fetch with mo cookie for that, but we already have it from input data - # Instead, get the page title which usually has the nikkud form - word_nikkud = None - title = soup.select_one("h1") - if title: - menukad_in_title = title.select_one("span.menukad") - if menukad_in_title: - word_nikkud = menukad_in_title.get_text(strip=True) - - # Get ALL span.menukad elements from conjugation tables - for span in soup.select("span.menukad"): - text = span.get_text(strip=True) - if text: - forms.add(text) - - return forms, word_nikkud - - -def main(): - verbs = load_json(INPUT_FILE) - if not verbs: - print("ERROR: No verbs found in input file") - sys.exit(1) - - # Load existing forms - existing_forms = load_json(OUTPUT_FILE) - new_forms = {} # Will be merged into existing at the end - - # Load progress to resume - progress = load_json(PROGRESS_FILE) - done_wnis = set(progress.get("done_wnis", [])) - slug_cache = progress.get("slug_cache", {}) - - # Pre-populate slug cache from conjugations.json - conj_file = os.path.join(DATA_DIR, "conjugations.json") - if os.path.exists(conj_file): - conj_data = load_json(conj_file) - for wni_key, cdata in conj_data.items(): - if isinstance(cdata, dict) and "slug" in cdata and wni_key not in slug_cache: - slug_cache[wni_key] = cdata["slug"] - print(f"Pre-populated {len(slug_cache)} slugs from conjugations.json") - - # Deduplicate verbs by wni - seen_wni = set() - unique_verbs = [] - for v in verbs: - if v["wni"] not in seen_wni: - seen_wni.add(v["wni"]) - unique_verbs.append(v) - - total = len(unique_verbs) - to_scrape = [v for v in unique_verbs if v["wni"] not in done_wnis] - print(f"Total unique verbs: {total}, already done: {total - len(to_scrape)}, to scrape: {len(to_scrape)}") - - scraped_count = 0 - skipped_count = 0 - total_new_forms = 0 - sample_verbs = {} # For summary: wni -> list of forms - - for i, verb in enumerate(to_scrape): - wni = verb["wni"] - word_nikkud_input = verb["word"] - - try: - # Step 1: Find slug - if wni in slug_cache: - slug = slug_cache[wni] - else: - slug = search_slug(wni) - time.sleep(DELAY) - - if not slug: - print(f" [{i + 1}/{len(to_scrape)}] SKIP {wni} - not found on pealim") - skipped_count += 1 - done_wnis.add(wni) - continue - - slug_cache[wni] = slug - - # Step 2: Scrape forms - forms, page_nikkud = scrape_verb_forms(slug) - time.sleep(DELAY) - - # Use the nikkud form from our input data (more reliable) - nikkud_to_use = word_nikkud_input - - # Build entries for each form - for form in forms: - entry = { - "word_nikkud": nikkud_to_use, - "form_type": "conjugation", - "pos": "Verb", - "slug": slug, - } - if form not in new_forms: - new_forms[form] = [] - # Check for duplicate entry - if not any(e["slug"] == slug for e in new_forms[form]): - new_forms[form].append(entry) - total_new_forms += 1 - - scraped_count += 1 - # Collect samples (first 3 completed) - if len(sample_verbs) < 3: - sample_verbs[wni] = sorted(forms) - - print(f" [{i + 1}/{len(to_scrape)}] {wni} -> {slug} ({len(forms)} forms)") - done_wnis.add(wni) - - except Exception as e: - print(f" [{i + 1}/{len(to_scrape)}] ERROR {wni}: {e}") - skipped_count += 1 - done_wnis.add(wni) - - # Save progress every 50 verbs - if (i + 1) % 50 == 0: - progress = {"done_wnis": list(done_wnis), "slug_cache": slug_cache} - save_json(progress, PROGRESS_FILE) - # Save partial merged result - merged = dict(existing_forms) - for form, entries in new_forms.items(): - if form in merged: - existing_slugs = {e["slug"] for e in merged[form]} - for entry in entries: - if entry["slug"] not in existing_slugs: - merged[form].append(entry) - else: - merged[form] = entries - save_json(merged, PARTIAL_FILE) - print(f" -- Progress saved at {i + 1}/{len(to_scrape)} --") - - # Final merge - merged = dict(existing_forms) - for form, entries in new_forms.items(): - if form in merged: - existing_slugs = {e["slug"] for e in merged[form]} - for entry in entries: - if entry["slug"] not in existing_slugs: - merged[form].append(entry) - else: - merged[form] = entries - - save_json(merged, OUTPUT_FILE) - - # Save final progress - progress = {"done_wnis": list(done_wnis), "slug_cache": slug_cache} - save_json(progress, PROGRESS_FILE) - - # Clean up partial file - if os.path.exists(PARTIAL_FILE): - os.remove(PARTIAL_FILE) - - # Summary - print(f"\n{'=' * 50}") - print("SUMMARY") - print(f"{'=' * 50}") - print(f"Verbs scraped: {scraped_count}") - print(f"Verbs skipped: {skipped_count}") - print(f"New forms added: {total_new_forms}") - print(f"Total unique ktiv male forms: {len(merged)}") - print(f"Previous forms count: {len(existing_forms)}") - print(f"Net new form keys: {len(merged) - len(existing_forms)}") - - if sample_verbs: - print("\nSample verbs:") - for wni, forms in list(sample_verbs.items())[:3]: - print(f"\n {wni} ({len(forms)} forms):") - for f in forms[:8]: - print(f" {f}") - if len(forms) > 8: - print(f" ... and {len(forms) - 8} more") - - -if __name__ == "__main__": - main()