#!/usr/bin/env python3 """ Consolidated list page scraper for pealim.com. Scrapes /dict/?page=N with two cookie variants (hebstyle=mo for nikkud, hebstyle=vl for ktiv male) and writes results directly to data/words.json. Usage: python3 pealim_list_scrape.py [--test N] [--force-refresh] """ import argparse import json import logging import os import re import time from datetime import date from pathlib import Path import requests from bs4 import BeautifulSoup # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- PROJECT_ROOT = Path(__file__).parent DATA_DIR = PROJECT_ROOT / "data" WORDS_JSON = DATA_DIR / "words.json" PROGRESS_JSON = DATA_DIR / "list_scrape_progress.json" # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- PEALIM_DICT_URL = "https://www.pealim.com/dict/" REQUEST_DELAY = 1.5 # seconds between requests REQUEST_TIMEOUT = 15 # seconds DEFAULT_TOTAL_PAGES = 608 SAVE_EVERY = 10 # pages between incremental saves TODAY = date.today().isoformat() # Prefer lxml if available; html.parser is the fallback try: import lxml # type: ignore[import-untyped] # noqa: F401 BS4_PARSER = "lxml" except ImportError: BS4_PARSER = "html.parser" # --------------------------------------------------------------------------- # Part-of-speech mappings # --------------------------------------------------------------------------- POS_HEBREW: dict[str, str] = { "Noun": "שֵׁם עֶצֶם", "Verb": "פֹּעַל", "Adjective": "שֵׁם תֹּאַר", "Adverb": "תֹּאַר הַפֹּעַל", "Pronoun": "כִּנּוּי גּוּף", "Preposition": "מִילַּת יַחַס", "Conjunction": "מִילַּת חִבּוּר", "Interjection": "מִילַּת קְרִיאָה", "Numeral": "שֵׁם מִסְפָּר", "Cardinal numeral": "שֵׁם מִסְפָּר", "Particle": "מִילִּית", "Determiner": "מְגַדִּיר", "Existential": "מִילַּת קִיּוּם", "Interrogative": "מִילַּת שְׁאֵלָה", } # Use exact match on the POS string prefix; longer keys must be checked first. POS_HEBREW_ORDERED: list[tuple[str, str]] = sorted(POS_HEBREW.items(), key=lambda x: -len(x[0])) BINYAN_HEBREW: dict[str, str] = { "Pa'al": "פָּעַל", "Nif'al": "נִפְעַל", "Pi'el": "פִּיעֵל", "Pu'al": "פֻּעַל", "Hif'il": "הִפְעִיל", "Huf'al": "הֻפְעַל", "Hitpa'el": "הִתְפַּעֵל", } # Regex for extracting emoji characters EMOJI_RE = re.compile( r"[\U0001F300-\U0001FFFF\U00002600-\U000027BF\U0001F000-\U0001F9FF\u2600-\u26FF\u2700-\u27BF\uFE0E\uFE0F\u200D]+", re.UNICODE, ) # Regex for extracting Hebrew prepositions wrapped in parentheses, e.g. "(על)" or "(ב-)" HBPAREN_RE = re.compile(r"\(([\u05b0-\u05ea\u05f0-\u05f4\-]+)\)") # Fields that must never be overwritten when updating an existing entry PROTECTED_FIELDS = frozenset( [ "vocab_legacy_guid", "confusables_guid", "frequency", "pseudo_frequency", "emoji", "emoji_source", "emoji_visible", "image", "image_source", "hint", "examples", "noun_inflection", "conjugation", "adjective_inflection", "preposition_inflection", ] ) # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # HTTP session # --------------------------------------------------------------------------- session = requests.Session() session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki-scraper/1.0)"}) # --------------------------------------------------------------------------- # Default entry template # --------------------------------------------------------------------------- def _default_entry() -> dict: """Return a fresh entry with all fields initialised to safe defaults.""" return { "word": {"nikkud": "", "ktiv_male": ""}, "slug": "", "root": [], "pos": "", "pos_hebrew": "", "meaning": "", "meaning_raw": "", "audio_url": "", "audio_file": "", "tags": "", "last_scrape_date": "", "vocab_legacy_guid": None, "frequency": None, "pseudo_frequency": None, "emoji": None, "emoji_source": None, "emoji_visible": False, "image": None, "image_source": None, "hint": "", "prep": None, "shared_roots": [], "confusable_group": None, "confusables_guid": None, "examples": None, "noun_inflection": None, "conjugation": None, "adjective_inflection": None, "preposition_inflection": None, } # --------------------------------------------------------------------------- # Parsing helpers # --------------------------------------------------------------------------- def _extract_emoji(text: str) -> str | None: """Return the first emoji run found in *text*, or None.""" m = EMOJI_RE.search(text) return m.group(0) if m else None def _clean_meaning(raw: str) -> str: """Strip emoji, Hebrew parenthesized prepositions, and extra whitespace from a raw meaning string.""" cleaned = EMOJI_RE.sub("", raw) cleaned = HBPAREN_RE.sub("", cleaned) return " ".join(cleaned.split()) def _parse_pos(pos_raw: str) -> tuple[str, str]: """ Parse raw PoS string into (pos_en, pos_hebrew). Examples: "Noun – masculine" → ("Noun", "שֵׁם עֶצֶם") "Verb – pa'al" → ("Verb", "פֹּעַל — פָּעַל") "Cardinal numeral" → ("Cardinal numeral", "שֵׁם מִסְפָּר") """ # Strip leading/trailing whitespace; normalise dashes pos_clean = pos_raw.strip() # Determine the base English PoS with longest-match strategy pos_en = "" for key, _ in POS_HEBREW_ORDERED: if pos_clean.startswith(key): pos_en = key break if not pos_en: # Fallback: take everything up to " – " or the full string pos_en = pos_clean.split(" – ")[0].split(" - ")[0].strip() pos_heb = POS_HEBREW.get(pos_en, pos_en) # For verbs, attempt to append binyan if pos_en == "Verb": # Look for binyan after dash; pealim uses "Verb – pa'al" dash_parts = re.split(r"\s*[–-]\s*", pos_clean) if len(dash_parts) >= 2: binyan_raw = dash_parts[1].strip() # Normalise capitalisation for lookup: "pa'al" → "Pa'al" binyan_key = binyan_raw.capitalize() # Handle mixed-case entries like "Nif'al" for bkey in BINYAN_HEBREW: if bkey.lower() == binyan_raw.lower(): binyan_key = bkey break binyan_heb = BINYAN_HEBREW.get(binyan_key) if binyan_heb: pos_heb = f"{pos_heb} — {binyan_heb}" return pos_en, pos_heb def _parse_root(root_raw: str) -> list[str]: """ Convert raw root text to a list of consonants. Pealim shows roots as "פ - ע - ל" or "פ.ע.ל" or "—" (no root). """ if not root_raw or root_raw in ("-", "—", "–"): return [] # Split on " - " or "." separators parts = re.split(r"\s*[-–—.]\s*", root_raw.strip()) return [p.strip() for p in parts if p.strip()] def _build_tags(pos_en: str, root: list[str]) -> str: """ Generate Anki tags string matching the existing project convention. Examples: pos=Noun, root=[] → "שם_עצם" pos=Noun, root=["א","ב"] → "שורש::אב שם_עצם" pos=Verb, root=["שמר"] → "שורש::שמר פעלים" """ pos_tag_map = { "Noun": "שם_עצם", "Verb": "פעלים", "Adjective": "שם_תואר", "Adverb": "תוארי_הפועל", "Pronoun": "כינויי_גוף", "Preposition": "מילות_יחס", "Conjunction": "מילות_חיבור", "Particle": "מילית", "Numeral": "שם_מספר", "Cardinal numeral": "שם_מספר", "Determiner": "מגדיר", "Existential": "מילת_קיום", "Interrogative": "מילת_שאלה", "Interjection": "מילת_קריאה", } parts: list[str] = [] if root: root_str = "".join(root) parts.append(f"שורש::{root_str}") pos_heb_tag = pos_tag_map.get(pos_en, "") if pos_heb_tag: parts.append(pos_heb_tag) return " ".join(parts) def _compute_audio_file(slug: str, ktiv_male: str) -> str: """ Return the local audio filename for an entry. The actual confusable detection happens later (after all pages are scraped); here we store a placeholder that post_process() will correct. We default to the consonant-based name; confusables get slug-based names. """ consonants = ktiv_male or "" return f"{consonants}.mp3" if consonants else f"{slug}.mp3" # --------------------------------------------------------------------------- # Page parsing # --------------------------------------------------------------------------- def _parse_mo_page(html: bytes) -> list[dict]: """ Parse a hebstyle=mo (nikkud) list page. Returns a list of raw row dicts with keys: nikkud, slug, root_raw, pos_raw, meaning_raw, audio_url """ soup = BeautifulSoup(html, BS4_PARSER) rows: list[dict] = [] for tr in soup.select("table tr"): tds = tr.find_all("td") if len(tds) < 4: continue # Audio URL audio_span = tds[0].find(attrs={"data-audio": True}) audio_url: str = audio_span["data-audio"] if audio_span else "" # Slug slug = "" link = tds[0].find("a", href=True) if link: m = re.search(r"/dict/([^/]+)/", link["href"]) if m: slug = m.group(1) # Nikkud word menukad = tds[0].find("span", class_="menukad") nikkud = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True) root_raw = tds[1].get_text(strip=True) pos_raw = tds[2].get_text(strip=True) meaning_raw = tds[3].get_text(strip=True) if nikkud: rows.append( { "nikkud": nikkud, "slug": slug, "root_raw": root_raw, "pos_raw": pos_raw, "meaning_raw": meaning_raw, "audio_url": audio_url, } ) return rows def _parse_vl_words(html: bytes) -> list[str]: """ Parse a hebstyle=vl (ktiv male) list page. Returns ordered list of ktiv male strings (one per table row). """ soup = BeautifulSoup(html, BS4_PARSER) words: list[str] = [] for tr in soup.select("table tr"): tds = tr.find_all("td") if len(tds) < 4: continue menukad = tds[0].find("span", class_="menukad") word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True) words.append(word) return words # --------------------------------------------------------------------------- # words.json I/O # --------------------------------------------------------------------------- def _load_words() -> dict: """Load words.json; return empty dict if missing.""" if not WORDS_JSON.exists(): logger.info("data/words.json not found — starting fresh.") return {} with WORDS_JSON.open(encoding="utf-8") as fh: return json.load(fh) def _save_words(words: dict) -> None: """Atomically write words to words.json via a .tmp file.""" tmp = WORDS_JSON.with_suffix(".json.tmp") with tmp.open("w", encoding="utf-8") as fh: json.dump(words, fh, ensure_ascii=False, indent=2) os.replace(tmp, WORDS_JSON) logger.info("Saved data/words.json (%d entries)", len(words)) # --------------------------------------------------------------------------- # Progress tracking # --------------------------------------------------------------------------- def _load_progress() -> set[int]: """Return set of already-completed page numbers.""" if not PROGRESS_JSON.exists(): return set() with PROGRESS_JSON.open(encoding="utf-8") as fh: data = json.load(fh) return set(data.get("completed_pages", [])) def _save_progress(completed: set[int]) -> None: """Atomically write progress file.""" tmp = PROGRESS_JSON.with_suffix(".json.tmp") with tmp.open("w", encoding="utf-8") as fh: json.dump({"completed_pages": sorted(completed)}, fh) os.replace(tmp, PROGRESS_JSON) # --------------------------------------------------------------------------- # Unique key generation # --------------------------------------------------------------------------- def _make_unique_key(nikkud: str, pos_en: str, meaning: str, existing_keys: set[str]) -> str: """ Generate a collision-free unique key for a new entry. Escalation: 1. nikkud 2. nikkud|pos_en 3. nikkud|pos_en|meaning 4. nikkud|pos_en|meaning|N (N = 2, 3, …) """ candidate = nikkud if candidate not in existing_keys: return candidate candidate = f"{nikkud}|{pos_en}" if candidate not in existing_keys: return candidate candidate = f"{nikkud}|{pos_en}|{meaning}" if candidate not in existing_keys: return candidate n = 2 while True: candidate = f"{nikkud}|{pos_en}|{meaning}|{n}" if candidate not in existing_keys: return candidate n += 1 # --------------------------------------------------------------------------- # Core: merge one scraped row into words dict # --------------------------------------------------------------------------- def _merge_row( words: dict, slug_index: dict[str, str], nikkud: str, ktiv_male: str, slug: str, root_raw: str, pos_raw: str, meaning_raw_raw: str, audio_url: str, ) -> None: """ Upsert a single scraped row into *words* in-place. *slug_index* maps slug → unique_key for fast lookup and is updated here when a new entry is created. """ # Derived fields pos_en, pos_heb = _parse_pos(pos_raw) root = _parse_root(root_raw) meaning_raw = meaning_raw_raw meaning = _clean_meaning(meaning_raw) emoji = _extract_emoji(meaning_raw_raw) tags = _build_tags(pos_en, root) audio_file = _compute_audio_file(slug, ktiv_male) # Extract Hebrew preposition(s) from the raw meaning (e.g. "(על)" → "על") prep_matches = HBPAREN_RE.findall(meaning_raw) prep: str | None = " ".join(prep_matches) if prep_matches else None # ---- locate existing entry ---- unique_key: str | None = slug_index.get(slug) if slug else None if unique_key and unique_key in words: # Update list-level fields only; never touch protected fields entry = words[unique_key] entry["word"]["nikkud"] = nikkud entry["word"]["ktiv_male"] = ktiv_male entry["slug"] = slug entry["root"] = root entry["pos"] = pos_en entry["pos_hebrew"] = pos_heb entry["meaning"] = meaning entry["meaning_raw"] = meaning_raw entry["prep"] = prep entry["audio_url"] = audio_url entry["audio_file"] = audio_file entry["tags"] = tags entry["last_scrape_date"] = TODAY else: # Create new entry unique_key = _make_unique_key(nikkud, pos_en, meaning, set(words.keys())) entry = _default_entry() entry["word"]["nikkud"] = nikkud entry["word"]["ktiv_male"] = ktiv_male entry["slug"] = slug entry["root"] = root entry["pos"] = pos_en entry["pos_hebrew"] = pos_heb entry["meaning"] = meaning entry["meaning_raw"] = meaning_raw entry["prep"] = prep entry["emoji"] = emoji entry["emoji_source"] = "from_pealim" if emoji else None entry["audio_url"] = audio_url entry["audio_file"] = audio_file entry["tags"] = tags entry["last_scrape_date"] = TODAY words[unique_key] = entry if slug: slug_index[slug] = unique_key # --------------------------------------------------------------------------- # Post-processing: recompute confusable_group, shared_roots, audio_file # --------------------------------------------------------------------------- def _post_process(words: dict) -> None: """ After all pages are scraped, recompute derived cross-entry fields: - confusable_group: entries sharing the same ktiv_male (2+) - shared_roots: entries sharing the same root (excluding self) - audio_file: slug-based for confusables, consonant-based otherwise """ logger.info("Post-processing: recomputing confusable groups and shared roots...") # --- confusable groups --- ktiv_to_keys: dict[str, list[str]] = {} for key, entry in words.items(): ktiv = entry.get("word", {}).get("ktiv_male", "") if ktiv: ktiv_to_keys.setdefault(ktiv, []).append(key) for _, entry in words.items(): ktiv = entry.get("word", {}).get("ktiv_male", "") group = ktiv_to_keys.get(ktiv, []) if len(group) >= 2: entry["confusable_group"] = sorted(group) # Confusable → slug-based audio filename slug = entry.get("slug", "") if slug: entry["audio_file"] = f"{slug}.mp3" else: # Only clear confusable_group if it wasn't set by enrichment (i.e. no confusables_guid) if not entry.get("confusables_guid"): entry["confusable_group"] = None # Non-confusable → consonant-based audio filename ktiv_male = entry.get("word", {}).get("ktiv_male", "") consonants = ktiv_male or "" slug = entry.get("slug", "") entry["audio_file"] = f"{consonants}.mp3" if consonants else f"{slug}.mp3" # --- shared roots --- root_to_keys: dict[str, list[str]] = {} for key, entry in words.items(): root = entry.get("root") if root: root_str = "|".join(root) # canonical form for grouping root_to_keys.setdefault(root_str, []).append(key) for key, entry in words.items(): root = entry.get("root") if root: root_str = "|".join(root) siblings = root_to_keys.get(root_str, []) entry["shared_roots"] = sorted(k for k in siblings if k != key) else: entry["shared_roots"] = [] logger.info("Post-processing complete.") # --------------------------------------------------------------------------- # Scraping loop # --------------------------------------------------------------------------- def _build_slug_index(words: dict) -> dict[str, str]: """Build slug → unique_key lookup from the current words dict.""" index: dict[str, str] = {} for key, entry in words.items(): slug = entry.get("slug", "") if slug and slug not in index: index[slug] = key return index def _fetch_page(url: str, cookies: dict) -> bytes | None: """Fetch a single page; return raw bytes or None on failure.""" try: resp = session.get(url, cookies=cookies, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.content except requests.RequestException as exc: logger.error("Request failed for %s: %s", url, exc) return None def run_scrape(total_pages: int, force_refresh: bool) -> None: """ Main scrape loop. Args: total_pages: Number of list pages to scrape. force_refresh: If True, ignore progress file and re-scrape all pages. """ words = _load_words() slug_index = _build_slug_index(words) completed = set() if force_refresh else _load_progress() if force_refresh and completed: logger.info("--force-refresh: ignoring %d completed pages.", len(completed)) pages_to_do = [p for p in range(1, total_pages + 1) if p not in completed] logger.info( "Pages to scrape: %d / %d (already done: %d)", len(pages_to_do), total_pages, len(completed), ) pages_since_save = 0 for page_num in pages_to_do: url = f"{PEALIM_DICT_URL}?page={page_num}" logger.info("Scraping page %d / %d …", page_num, total_pages) # --- hebstyle=mo (nikkud + audio + slug) --- mo_html = _fetch_page(url, {"translit": "none", "hebstyle": "mo"}) if mo_html is None: logger.warning("Skipping page %d (mo fetch failed).", page_num) time.sleep(REQUEST_DELAY * 2) continue time.sleep(REQUEST_DELAY) # --- hebstyle=vl (ktiv male) --- vl_html = _fetch_page(url, {"translit": "none", "hebstyle": "vl"}) if vl_html is None: logger.warning("Skipping page %d (vl fetch failed).", page_num) time.sleep(REQUEST_DELAY * 2) continue # Parse mo_rows = _parse_mo_page(mo_html) vl_words = _parse_vl_words(vl_html) if not mo_rows: logger.warning("Page %d returned no rows — might be past end.", page_num) completed.add(page_num) _save_progress(completed) time.sleep(REQUEST_DELAY) continue # Merge each row for i, row in enumerate(mo_rows): ktiv_male = vl_words[i] if i < len(vl_words) else "" _merge_row( words=words, slug_index=slug_index, nikkud=row["nikkud"], ktiv_male=ktiv_male, slug=row["slug"], root_raw=row["root_raw"], pos_raw=row["pos_raw"], meaning_raw_raw=row["meaning_raw"], audio_url=row["audio_url"], ) completed.add(page_num) pages_since_save += 1 # Incremental save every SAVE_EVERY pages if pages_since_save >= SAVE_EVERY: _save_words(words) _save_progress(completed) pages_since_save = 0 time.sleep(REQUEST_DELAY) # Final save + post-processing logger.info("All pages scraped. Running post-processing…") _post_process(words) _save_words(words) _save_progress(completed) logger.info("Done. Total entries in words.json: %d", len(words)) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main() -> None: """Entry point.""" parser = argparse.ArgumentParser(description="Scrape pealim.com list pages into data/words.json.") parser.add_argument( "--test", metavar="N", type=int, default=None, help="Scrape only the first N pages (for testing).", ) parser.add_argument( "--force-refresh", action="store_true", default=False, help="Re-scrape all pages, ignoring existing progress.", ) args = parser.parse_args() total_pages = args.test if args.test is not None else DEFAULT_TOTAL_PAGES logger.info( "Starting pealim list scraper | pages=%d | force=%s | parser=%s", total_pages, args.force_refresh, BS4_PARSER, ) run_scrape(total_pages=total_pages, force_refresh=args.force_refresh) if __name__ == "__main__": main()