hebrew_flash_cards/pealim_list_scrape.py
Sochen 08fb7009d8 Sprint 11: unified JSON architecture + consolidated scraping pipeline
Migrate from fragmented CSV + 10 JSON files to a single data/words.json
(9,104 entries) as the unified data store. All GUIDs preserved for Anki
study progress continuity.

New files:
- SCHEMA.yaml: authoritative schema for words.json
- pealim_list_scrape.py: consolidated list page scraper → words.json
- pealim_detail_scrape.py: noun/verb detail scraper → words.json
- pealim_audio_download.py: audio downloader reading from words.json
- scripts/migrate_to_json.py: one-time CSV→JSON migration
- scripts/validate_data.py: 17 data integrity tests
- scripts/check_guid_coverage.py: GUID preservation checker
- scripts/repair_slugs.py: slug deduplication repair tool
- tests/test_scraper_integration.py: live scraper integration tests

Updated:
- apkg_builder.py: reads from words.json (no more pandas)
- run.py: 8-step pipeline (list scrape → frequency → examples →
  detail scrape → audio download → fonts → images → build)
- benyehuda.py, frequency_lookup.py, image_fetch.py: TODO markers
  for future words.json integration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 10:54:58 +00:00

706 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Consolidated list page scraper for pealim.com.
Scrapes /dict/?page=N with two cookie variants (hebstyle=mo for nikkud,
hebstyle=vl for ktiv male) and writes results directly to data/words.json.
Usage:
python3 pealim_list_scrape.py [--test N] [--force-refresh]
"""
import argparse
import json
import logging
import os
import re
import time
from datetime import date
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from helpers import strip_nikkud
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
PROJECT_ROOT = Path(__file__).parent
DATA_DIR = PROJECT_ROOT / "data"
WORDS_JSON = DATA_DIR / "words.json"
PROGRESS_JSON = DATA_DIR / "list_scrape_progress.json"
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PEALIM_DICT_URL = "https://www.pealim.com/dict/"
REQUEST_DELAY = 1.5 # seconds between requests
REQUEST_TIMEOUT = 15 # seconds
DEFAULT_TOTAL_PAGES = 608
SAVE_EVERY = 10 # pages between incremental saves
TODAY = date.today().isoformat()
# Prefer lxml if available; html.parser is the fallback
try:
import lxml # type: ignore[import-untyped] # noqa: F401
BS4_PARSER = "lxml"
except ImportError:
BS4_PARSER = "html.parser"
# ---------------------------------------------------------------------------
# Part-of-speech mappings
# ---------------------------------------------------------------------------
POS_HEBREW: dict[str, str] = {
"Noun": "שֵׁם עֶצֶם",
"Verb": "פֹּעַל",
"Adjective": "שֵׁם תֹּאַר",
"Adverb": "תֹּאַר הַפֹּעַל",
"Pronoun": "כִּנּוּי גּוּף",
"Preposition": "מִילַּת יַחַס",
"Conjunction": "מִילַּת חִבּוּר",
"Interjection": "מִילַּת קְרִיאָה",
"Numeral": "שֵׁם מִסְפָּר",
"Cardinal numeral": "שֵׁם מִסְפָּר",
"Particle": "מִילִּית",
"Determiner": "מְגַדִּיר",
"Existential": "מִילַּת קִיּוּם",
"Interrogative": "מִילַּת שְׁאֵלָה",
}
# Use exact match on the POS string prefix; longer keys must be checked first.
POS_HEBREW_ORDERED: list[tuple[str, str]] = sorted(POS_HEBREW.items(), key=lambda x: -len(x[0]))
BINYAN_HEBREW: dict[str, str] = {
"Pa'al": "פָּעַל",
"Nif'al": "נִפְעַל",
"Pi'el": "פִּיעֵל",
"Pu'al": "פֻּעַל",
"Hif'il": "הִפְעִיל",
"Huf'al": "הֻפְעַל",
"Hitpa'el": "הִתְפַּעֵל",
}
# Regex for extracting emoji characters
EMOJI_RE = re.compile(
r"[\U0001F300-\U0001FFFF\U00002600-\U000027BF\U0001F000-\U0001F9FF\u2600-\u26FF\u2700-\u27BF]+",
re.UNICODE,
)
# Fields that must never be overwritten when updating an existing entry
PROTECTED_FIELDS = frozenset(
[
"vocab_legacy_guid",
"confusables_guid",
"frequency",
"pseudo_frequency",
"emoji",
"emoji_source",
"emoji_visible",
"image",
"image_source",
"hint",
"examples",
"noun_inflection",
"conjugation",
"adjective_inflection",
"preposition_inflection",
]
)
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# HTTP session
# ---------------------------------------------------------------------------
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki-scraper/1.0)"})
# ---------------------------------------------------------------------------
# Default entry template
# ---------------------------------------------------------------------------
def _default_entry() -> dict:
"""Return a fresh entry with all fields initialised to safe defaults."""
return {
"word": {"nikkud": "", "ktiv_male": ""},
"slug": "",
"root": [],
"pos": "",
"pos_hebrew": "",
"meaning": "",
"meaning_raw": "",
"audio_url": "",
"audio_file": "",
"tags": "",
"last_scrape_date": "",
"vocab_legacy_guid": None,
"frequency": None,
"pseudo_frequency": None,
"emoji": None,
"emoji_source": None,
"emoji_visible": False,
"image": None,
"image_source": None,
"hint": "",
"shared_roots": [],
"confusable_group": None,
"confusables_guid": None,
"examples": None,
"noun_inflection": None,
"conjugation": None,
"adjective_inflection": None,
"preposition_inflection": None,
}
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _extract_emoji(text: str) -> str | None:
"""Return the first emoji run found in *text*, or None."""
m = EMOJI_RE.search(text)
return m.group(0) if m else None
def _clean_meaning(raw: str) -> str:
"""Strip emoji and extra whitespace from a raw meaning string."""
cleaned = EMOJI_RE.sub("", raw)
return " ".join(cleaned.split())
def _parse_pos(pos_raw: str) -> tuple[str, str]:
"""
Parse raw PoS string into (pos_en, pos_hebrew).
Examples:
"Noun masculine" → ("Noun", "שֵׁם עֶצֶם")
"Verb pa'al" → ("Verb", "פֹּעַל — פָּעַל")
"Cardinal numeral" → ("Cardinal numeral", "שֵׁם מִסְפָּר")
"""
# Strip leading/trailing whitespace; normalise dashes
pos_clean = pos_raw.strip()
# Determine the base English PoS with longest-match strategy
pos_en = ""
for key, _ in POS_HEBREW_ORDERED:
if pos_clean.startswith(key):
pos_en = key
break
if not pos_en:
# Fallback: take everything up to " " or the full string
pos_en = pos_clean.split(" ")[0].split(" - ")[0].strip()
pos_heb = POS_HEBREW.get(pos_en, pos_en)
# For verbs, attempt to append binyan
if pos_en == "Verb":
# Look for binyan after dash; pealim uses "Verb pa'al"
dash_parts = re.split(r"\s*[-]\s*", pos_clean)
if len(dash_parts) >= 2:
binyan_raw = dash_parts[1].strip()
# Normalise capitalisation for lookup: "pa'al" → "Pa'al"
binyan_key = binyan_raw.capitalize()
# Handle mixed-case entries like "Nif'al"
for bkey in BINYAN_HEBREW:
if bkey.lower() == binyan_raw.lower():
binyan_key = bkey
break
binyan_heb = BINYAN_HEBREW.get(binyan_key)
if binyan_heb:
pos_heb = f"{pos_heb}{binyan_heb}"
return pos_en, pos_heb
def _parse_root(root_raw: str) -> list[str]:
"""
Convert raw root text to a list of consonants.
Pealim shows roots as "פ - ע - ל" or "פ.ע.ל" or "" (no root).
"""
if not root_raw or root_raw in ("-", "", ""):
return []
# Split on " - " or "." separators
parts = re.split(r"\s*[-–—.]\s*", root_raw.strip())
return [p.strip() for p in parts if p.strip()]
def _build_tags(pos_en: str, root: list[str]) -> str:
"""
Generate Anki tags string matching the existing project convention.
Examples:
pos=Noun, root=[] → "שם_עצם"
pos=Noun, root=["א","ב"] → "שורש::אב שם_עצם"
pos=Verb, root=["שמר"] → "שורש::שמר פעלים"
"""
pos_tag_map = {
"Noun": "שם_עצם",
"Verb": "פעלים",
"Adjective": "שם_תואר",
"Adverb": "תוארי_הפועל",
"Pronoun": "כינוייוף",
"Preposition": "מילות_יחס",
"Conjunction": "מילות_חיבור",
"Particle": "מילית",
"Numeral": "שם_מספר",
"Cardinal numeral": "שם_מספר",
"Determiner": "מגדיר",
"Existential": "מילת_קיום",
"Interrogative": "מילת_שאלה",
"Interjection": "מילת_קריאה",
}
parts: list[str] = []
if root:
root_str = "".join(strip_nikkud(c) for c in root)
parts.append(f"שורש::{root_str}")
pos_heb_tag = pos_tag_map.get(pos_en, "")
if pos_heb_tag:
parts.append(pos_heb_tag)
return " ".join(parts)
def _compute_audio_file(slug: str, ktiv_male: str) -> str:
"""
Return the local audio filename for an entry.
The actual confusable detection happens later (after all pages are scraped);
here we store a placeholder that post_process() will correct.
We default to the consonant-based name; confusables get slug-based names.
"""
consonants = strip_nikkud(ktiv_male) if ktiv_male else ""
return f"{consonants}.mp3" if consonants else f"{slug}.mp3"
# ---------------------------------------------------------------------------
# Page parsing
# ---------------------------------------------------------------------------
def _parse_mo_page(html: bytes) -> list[dict]:
"""
Parse a hebstyle=mo (nikkud) list page.
Returns a list of raw row dicts with keys:
nikkud, slug, root_raw, pos_raw, meaning_raw, audio_url
"""
soup = BeautifulSoup(html, BS4_PARSER)
rows: list[dict] = []
for tr in soup.select("table tr"):
tds = tr.find_all("td")
if len(tds) < 4:
continue
# Audio URL
audio_span = tds[0].find(attrs={"data-audio": True})
audio_url: str = audio_span["data-audio"] if audio_span else ""
# Slug
slug = ""
link = tds[0].find("a", href=True)
if link:
m = re.search(r"/dict/([^/]+)/", link["href"])
if m:
slug = m.group(1)
# Nikkud word
menukad = tds[0].find("span", class_="menukad")
nikkud = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True)
root_raw = tds[1].get_text(strip=True)
pos_raw = tds[2].get_text(strip=True)
meaning_raw = tds[3].get_text(strip=True)
if nikkud:
rows.append(
{
"nikkud": nikkud,
"slug": slug,
"root_raw": root_raw,
"pos_raw": pos_raw,
"meaning_raw": meaning_raw,
"audio_url": audio_url,
}
)
return rows
def _parse_vl_words(html: bytes) -> list[str]:
"""
Parse a hebstyle=vl (ktiv male) list page.
Returns ordered list of ktiv male strings (one per table row).
"""
soup = BeautifulSoup(html, BS4_PARSER)
words: list[str] = []
for tr in soup.select("table tr"):
tds = tr.find_all("td")
if len(tds) < 4:
continue
menukad = tds[0].find("span", class_="menukad")
word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True)
words.append(word)
return words
# ---------------------------------------------------------------------------
# words.json I/O
# ---------------------------------------------------------------------------
def _load_words() -> dict:
"""Load words.json; return empty dict if missing."""
if not WORDS_JSON.exists():
logger.info("data/words.json not found — starting fresh.")
return {}
with WORDS_JSON.open(encoding="utf-8") as fh:
return json.load(fh)
def _save_words(words: dict) -> None:
"""Atomically write words to words.json via a .tmp file."""
tmp = WORDS_JSON.with_suffix(".json.tmp")
with tmp.open("w", encoding="utf-8") as fh:
json.dump(words, fh, ensure_ascii=False, indent=2)
os.replace(tmp, WORDS_JSON)
logger.info("Saved data/words.json (%d entries)", len(words))
# ---------------------------------------------------------------------------
# Progress tracking
# ---------------------------------------------------------------------------
def _load_progress() -> set[int]:
"""Return set of already-completed page numbers."""
if not PROGRESS_JSON.exists():
return set()
with PROGRESS_JSON.open(encoding="utf-8") as fh:
data = json.load(fh)
return set(data.get("completed_pages", []))
def _save_progress(completed: set[int]) -> None:
"""Atomically write progress file."""
tmp = PROGRESS_JSON.with_suffix(".json.tmp")
with tmp.open("w", encoding="utf-8") as fh:
json.dump({"completed_pages": sorted(completed)}, fh)
os.replace(tmp, PROGRESS_JSON)
# ---------------------------------------------------------------------------
# Unique key generation
# ---------------------------------------------------------------------------
def _make_unique_key(nikkud: str, pos_en: str, meaning: str, existing_keys: set[str]) -> str:
"""
Generate a collision-free unique key for a new entry.
Escalation:
1. nikkud
2. nikkud|pos_en
3. nikkud|pos_en|meaning
4. nikkud|pos_en|meaning|N (N = 2, 3, …)
"""
candidate = nikkud
if candidate not in existing_keys:
return candidate
candidate = f"{nikkud}|{pos_en}"
if candidate not in existing_keys:
return candidate
candidate = f"{nikkud}|{pos_en}|{meaning}"
if candidate not in existing_keys:
return candidate
n = 2
while True:
candidate = f"{nikkud}|{pos_en}|{meaning}|{n}"
if candidate not in existing_keys:
return candidate
n += 1
# ---------------------------------------------------------------------------
# Core: merge one scraped row into words dict
# ---------------------------------------------------------------------------
def _merge_row(
words: dict,
slug_index: dict[str, str],
nikkud: str,
ktiv_male: str,
slug: str,
root_raw: str,
pos_raw: str,
meaning_raw_raw: str,
audio_url: str,
) -> None:
"""
Upsert a single scraped row into *words* in-place.
*slug_index* maps slug → unique_key for fast lookup and is updated here
when a new entry is created.
"""
# Derived fields
pos_en, pos_heb = _parse_pos(pos_raw)
root = _parse_root(root_raw)
meaning_raw = meaning_raw_raw
meaning = _clean_meaning(meaning_raw)
emoji = _extract_emoji(meaning_raw_raw)
tags = _build_tags(pos_en, root)
audio_file = _compute_audio_file(slug, ktiv_male)
# ---- locate existing entry ----
unique_key: str | None = slug_index.get(slug) if slug else None
if unique_key and unique_key in words:
# Update list-level fields only; never touch protected fields
entry = words[unique_key]
entry["word"]["nikkud"] = nikkud
entry["word"]["ktiv_male"] = ktiv_male
entry["slug"] = slug
entry["root"] = root
entry["pos"] = pos_en
entry["pos_hebrew"] = pos_heb
entry["meaning"] = meaning
entry["meaning_raw"] = meaning_raw
entry["audio_url"] = audio_url
entry["audio_file"] = audio_file
entry["tags"] = tags
entry["last_scrape_date"] = TODAY
else:
# Create new entry
unique_key = _make_unique_key(nikkud, pos_en, meaning, set(words.keys()))
entry = _default_entry()
entry["word"]["nikkud"] = nikkud
entry["word"]["ktiv_male"] = ktiv_male
entry["slug"] = slug
entry["root"] = root
entry["pos"] = pos_en
entry["pos_hebrew"] = pos_heb
entry["meaning"] = meaning
entry["meaning_raw"] = meaning_raw
entry["emoji"] = emoji
entry["emoji_source"] = "from_pealim" if emoji else None
entry["audio_url"] = audio_url
entry["audio_file"] = audio_file
entry["tags"] = tags
entry["last_scrape_date"] = TODAY
words[unique_key] = entry
if slug:
slug_index[slug] = unique_key
# ---------------------------------------------------------------------------
# Post-processing: recompute confusable_group, shared_roots, audio_file
# ---------------------------------------------------------------------------
def _post_process(words: dict) -> None:
"""
After all pages are scraped, recompute derived cross-entry fields:
- confusable_group: entries sharing the same ktiv_male (2+)
- shared_roots: entries sharing the same root (excluding self)
- audio_file: slug-based for confusables, consonant-based otherwise
"""
logger.info("Post-processing: recomputing confusable groups and shared roots...")
# --- confusable groups ---
ktiv_to_keys: dict[str, list[str]] = {}
for key, entry in words.items():
ktiv = entry.get("word", {}).get("ktiv_male", "")
if ktiv:
ktiv_to_keys.setdefault(ktiv, []).append(key)
for _, entry in words.items():
ktiv = entry.get("word", {}).get("ktiv_male", "")
group = ktiv_to_keys.get(ktiv, [])
if len(group) >= 2:
entry["confusable_group"] = sorted(group)
# Confusable → slug-based audio filename
slug = entry.get("slug", "")
if slug:
entry["audio_file"] = f"{slug}.mp3"
else:
# Only clear confusable_group if it wasn't set by enrichment (i.e. no confusables_guid)
if not entry.get("confusables_guid"):
entry["confusable_group"] = None
# Non-confusable → consonant-based audio filename
ktiv_male = entry.get("word", {}).get("ktiv_male", "")
consonants = strip_nikkud(ktiv_male) if ktiv_male else ""
slug = entry.get("slug", "")
entry["audio_file"] = f"{consonants}.mp3" if consonants else f"{slug}.mp3"
# --- shared roots ---
root_to_keys: dict[str, list[str]] = {}
for key, entry in words.items():
root = entry.get("root")
if root:
root_str = "|".join(root) # canonical form for grouping
root_to_keys.setdefault(root_str, []).append(key)
for key, entry in words.items():
root = entry.get("root")
if root:
root_str = "|".join(root)
siblings = root_to_keys.get(root_str, [])
entry["shared_roots"] = sorted(k for k in siblings if k != key)
else:
entry["shared_roots"] = []
logger.info("Post-processing complete.")
# ---------------------------------------------------------------------------
# Scraping loop
# ---------------------------------------------------------------------------
def _build_slug_index(words: dict) -> dict[str, str]:
"""Build slug → unique_key lookup from the current words dict."""
index: dict[str, str] = {}
for key, entry in words.items():
slug = entry.get("slug", "")
if slug and slug not in index:
index[slug] = key
return index
def _fetch_page(url: str, cookies: dict) -> bytes | None:
"""Fetch a single page; return raw bytes or None on failure."""
try:
resp = session.get(url, cookies=cookies, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.content
except requests.RequestException as exc:
logger.error("Request failed for %s: %s", url, exc)
return None
def run_scrape(total_pages: int, force_refresh: bool) -> None:
"""
Main scrape loop.
Args:
total_pages: Number of list pages to scrape.
force_refresh: If True, ignore progress file and re-scrape all pages.
"""
words = _load_words()
slug_index = _build_slug_index(words)
completed = set() if force_refresh else _load_progress()
if force_refresh and completed:
logger.info("--force-refresh: ignoring %d completed pages.", len(completed))
pages_to_do = [p for p in range(1, total_pages + 1) if p not in completed]
logger.info(
"Pages to scrape: %d / %d (already done: %d)",
len(pages_to_do),
total_pages,
len(completed),
)
pages_since_save = 0
for page_num in pages_to_do:
url = f"{PEALIM_DICT_URL}?page={page_num}"
logger.info("Scraping page %d / %d", page_num, total_pages)
# --- hebstyle=mo (nikkud + audio + slug) ---
mo_html = _fetch_page(url, {"translit": "none", "hebstyle": "mo"})
if mo_html is None:
logger.warning("Skipping page %d (mo fetch failed).", page_num)
time.sleep(REQUEST_DELAY * 2)
continue
time.sleep(REQUEST_DELAY)
# --- hebstyle=vl (ktiv male) ---
vl_html = _fetch_page(url, {"translit": "none", "hebstyle": "vl"})
if vl_html is None:
logger.warning("Skipping page %d (vl fetch failed).", page_num)
time.sleep(REQUEST_DELAY * 2)
continue
# Parse
mo_rows = _parse_mo_page(mo_html)
vl_words = _parse_vl_words(vl_html)
if not mo_rows:
logger.warning("Page %d returned no rows — might be past end.", page_num)
completed.add(page_num)
_save_progress(completed)
time.sleep(REQUEST_DELAY)
continue
# Merge each row
for i, row in enumerate(mo_rows):
ktiv_male = vl_words[i] if i < len(vl_words) else ""
_merge_row(
words=words,
slug_index=slug_index,
nikkud=row["nikkud"],
ktiv_male=ktiv_male,
slug=row["slug"],
root_raw=row["root_raw"],
pos_raw=row["pos_raw"],
meaning_raw_raw=row["meaning_raw"],
audio_url=row["audio_url"],
)
completed.add(page_num)
pages_since_save += 1
# Incremental save every SAVE_EVERY pages
if pages_since_save >= SAVE_EVERY:
_save_words(words)
_save_progress(completed)
pages_since_save = 0
time.sleep(REQUEST_DELAY)
# Final save + post-processing
logger.info("All pages scraped. Running post-processing…")
_post_process(words)
_save_words(words)
_save_progress(completed)
logger.info("Done. Total entries in words.json: %d", len(words))
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
"""Entry point."""
parser = argparse.ArgumentParser(description="Scrape pealim.com list pages into data/words.json.")
parser.add_argument(
"--test",
metavar="N",
type=int,
default=None,
help="Scrape only the first N pages (for testing).",
)
parser.add_argument(
"--force-refresh",
action="store_true",
default=False,
help="Re-scrape all pages, ignoring existing progress.",
)
args = parser.parse_args()
total_pages = args.test if args.test is not None else DEFAULT_TOTAL_PAGES
logger.info(
"Starting pealim list scraper | pages=%d | force=%s | parser=%s",
total_pages,
args.force_refresh,
BS4_PARSER,
)
run_scrape(total_pages=total_pages, force_refresh=args.force_refresh)
if __name__ == "__main__":
main()