hebrew_flash_cards/pealim_detail_scrape.py
Sochen b2fef5aa8a Sprint 11.1: strip_nikkud cleanup, dead code removal, test fixes
Remove strip_nikkud from all pipeline files — use ktiv_male directly.
Fix case-insensitive binyan matching in detail scraper (og:description
uses UPPERCASE). Fix integration test slugs and test limits. Delete
legacy CSVs, stale .apkg, and dead scripts from git. Add vulture to
pre-commit hook.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 04:03:47 +00:00

1136 lines
39 KiB
Python

#!/usr/bin/env python3
"""
Consolidated detail page scraper for pealim.com.
Visits /dict/<slug>/ detail pages for nouns and verbs in data/words.json.
Makes two requests per slug:
1. hebstyle=mo cookie → nikkud forms
2. hebstyle=vl cookie → ktiv male forms
Updates entries in data/words.json with scraped detail data.
Usage:
python3 pealim_detail_scrape.py [--test N] [--force-refresh-detail]
[--nouns-only | --verbs-only]
"""
import argparse
import json
import logging
import os
import re
import time
from pathlib import Path
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PEALIM_BASE = "https://www.pealim.com"
REQUEST_DELAY = 1.5 # seconds between requests
REQUEST_TIMEOUT = 15
SAVE_INTERVAL = 50 # write words.json every N processed entries
WORDS_JSON = Path(__file__).parent / "data" / "words.json"
BINYAN_NAMES: tuple[str, ...] = ("Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al")
_BINYAN_NAMES_LOWER: tuple[str, ...] = tuple(b.lower() for b in BINYAN_NAMES)
BINYAN_HEBREW: dict[str, str] = {
"Pa'al": "פָּעַל",
"Nif'al": "נִפְעַל",
"Pi'el": "פִּיעֵל",
"Pu'al": "פֻּעַל",
"Hif'il": "הִפְעִיל",
"Huf'al": "הֻפְעַל",
"Hitpa'el": "הִתְפַּעֵל",
}
PRONOUN_LABELS: dict[str, str] = {
"present_ms": "",
"present_fs": "",
"present_mp": "",
"present_fp": "",
"past_1s": "אֲנִי",
"past_1p": "אֲנַחְנוּ",
"past_2ms": "אַתָּה",
"past_2fs": "אַתְּ",
"past_2mp": "אַתֶּם",
"past_2fp": "אַתֶּן",
"past_3ms": "הוּא",
"past_3fs": "הִיא",
"past_3p": "הֵם / הֵן",
"future_1s": "אֲנִי",
"future_1p": "אֲנַחְנוּ",
"future_2ms": "אַתָּה",
"future_2fs": "אַתְּ",
"future_2mp": "אַתֶּם",
"future_2fp": "אַתֶּן",
"future_3ms": "הוּא",
"future_3fs": "הִיא",
"future_3mp": "הֵם",
"future_3fp": "הֵן",
"imperative_ms": "אַתָּה",
"imperative_fs": "אַתְּ",
"imperative_mp": "אַתֶּם",
"imperative_fp": "אַתֶּן",
"infinitive": "",
}
TENSE_DESCRIPTION: dict[str, str] = {
"present_ms": "הוֹוֶה",
"present_fs": "הוֹוֶה",
"present_mp": "הוֹוֶה",
"present_fp": "הוֹוֶה",
"past_1s": "עָבָר",
"past_1p": "עָבָר",
"past_2ms": "עָבָר",
"past_2fs": "עָבָר",
"past_2mp": "עָבָר",
"past_2fp": "עָבָר",
"past_3ms": "עָבָר",
"past_3fs": "עָבָר",
"past_3p": "עָבָר",
"future_1s": "עָתִיד",
"future_1p": "עָתִיד",
"future_2ms": "עָתִיד",
"future_2fs": "עָתִיד",
"future_2mp": "עָתִיד",
"future_2fp": "עָתִיד",
"future_3ms": "עָתִיד",
"future_3fs": "עָתִיד",
"future_3mp": "עָתִיד",
"future_3fp": "עָתִיד",
"imperative_ms": "צִוּוּי",
"imperative_fs": "צִוּוּי",
"imperative_mp": "צִוּוּי",
"imperative_fp": "צִוּוּי",
"infinitive": "מְקוֹר",
}
FORM_KEY_TO_PERSON: dict[str, str] = {
"present_ms": "ms",
"present_fs": "fs",
"present_mp": "mp",
"present_fp": "fp",
"past_1s": "1s",
"past_1p": "1p",
"past_2ms": "2ms",
"past_2fs": "2fs",
"past_2mp": "2mp",
"past_2fp": "2fp",
"past_3ms": "3ms",
"past_3fs": "3fs",
"past_3p": "3p",
"future_1s": "1s",
"future_1p": "1p",
"future_2ms": "2ms",
"future_2fs": "2fs",
"future_2mp": "2mp",
"future_2fp": "2fp",
"future_3ms": "3ms",
"future_3fs": "3fs",
"future_3mp": "3mp",
"future_3fp": "3fp",
"imperative_ms": "ms",
"imperative_fs": "fs",
"imperative_mp": "mp",
"imperative_fp": "fp",
"infinitive": "inf",
}
# Mishkal English name → Hebrew nikkud mapping (common patterns)
MISHKAL_HEBREW: dict[str, str] = {
"CaCaC": "קָטָל",
"CeCeC": "קֶטֶל",
"CiCeC": "קִטֶל",
"CaCeC": "קָטֶל",
"CoCeC": "קוֹטֵל",
"CaCiC": "קָטִיד",
"CaCuC": "קָטוּר",
"miCCaC": "מִקְטָל",
"miCCeC": "מִקְטֶל",
"maCCeC": "מַקְטֶל",
"maCCiC": "מַקְטִיר",
"hiCCiC": "הִקְטִיל",
"CiCCuC": "קִטּוּל",
"hitCaCCeC": "הִתְקַטֵּל",
"CaCCan": "קַטְּלָן",
"CaCCaC": "קַטָּל",
"CiCCon": "קִטְּרוֹן",
"CaCCeC": "קַטֶּלֶת",
}
# ---------------------------------------------------------------------------
# HTTP session
# ---------------------------------------------------------------------------
_session = requests.Session()
_session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki/2.0)"})
def _fetch(url: str, hebstyle: str, backoff: float = REQUEST_DELAY) -> str | None:
"""Fetch a URL with the given hebstyle cookie. Returns HTML string or None on failure."""
cookies = {"translit": "none", "hebstyle": hebstyle}
max_wait = 60.0
while True:
try:
resp = _session.get(url, cookies=cookies, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.text
except requests.exceptions.HTTPError as exc:
status = exc.response.status_code if exc.response is not None else 0
if status == 404:
logger.warning(" 404 for %s — skipping", url)
return None
if status in (429, 503):
wait = min(backoff, max_wait)
logger.warning(" Rate limited (%s) — waiting %.0fs", status, wait)
time.sleep(wait)
backoff = min(backoff * 2, max_wait)
else:
logger.error(" HTTP %s for %s", status, url)
return None
except requests.RequestException as exc:
wait = min(backoff, max_wait)
logger.warning(" Request error for %s: %s — retrying in %.0fs", url, exc, wait)
time.sleep(wait)
backoff = min(backoff * 2, max_wait)
if backoff >= max_wait:
logger.error(" Giving up on %s", url)
return None
# ---------------------------------------------------------------------------
# Shared HTML parsing utilities
# ---------------------------------------------------------------------------
def _get_menukad_and_audio(cell) -> tuple[str, str]:
"""Extract (nikkud_text, audio_url) from a table cell."""
audio_url = ""
audio_span = cell.find("span", class_=lambda c: c and "audio-play" in c)
if audio_span:
audio_url = audio_span.get("data-audio", "")
# Also check direct data-audio attribute on cell
if not audio_url:
da = cell.get("data-audio", "")
if da:
audio_url = da
span = cell.find("span", class_="menukad")
if span:
return span.get_text(strip=True), audio_url
txt = cell.get_text(strip=True)
if re.search(r"[\u05d0-\u05ea]", txt):
return txt, audio_url
return "", audio_url
def _get_plain_text(cell) -> str:
"""Extract plain Hebrew text (no nikkud) from a cell — used for vl pages."""
span = cell.find("span", class_="menukad")
if span:
return span.get_text(strip=True)
txt = cell.get_text(strip=True)
if re.search(r"[\u05d0-\u05ea]", txt):
return txt
return ""
# ---------------------------------------------------------------------------
# Noun detail parsing
# ---------------------------------------------------------------------------
_GENDER_MAP = {
"masculine": "masculine",
"feminine": "feminine",
"זכר": "masculine",
"נקבה": "feminine",
"male": "masculine",
"female": "feminine",
}
_GENDER_HEBREW = {
"masculine": {"nikkud": "זָכָר", "ktiv_male": "זכר"},
"feminine": {"nikkud": "נְקֵבָה", "ktiv_male": "נקבה"},
}
def _parse_noun_table(soup: BeautifulSoup) -> dict[str, dict | str]:
"""
Parse the noun declension table from a pealim detail page soup.
Returns a dict with keys: singular, plural, construct_singular, construct_plural,
singular_audio, plural_audio — values are nikkud strings or audio URLs.
Returns empty dict if no table found.
"""
table = soup.find("table", class_="conjugation-table")
if not table:
return {}
rows = table.find_all("tr")
result: dict[str, dict | str] = {}
for row in rows:
label_cell = row.find("th") or (row.find("td") if row.find_all("td") else None)
if not label_cell:
continue
label_text = label_cell.get_text(strip=True).lower()
tds = row.find_all("td")
# Some rows have th + tds; tds may include the first label td
# We want data cells (the ones with Hebrew forms)
data_cells = [td for td in tds if re.search(r"[\u05d0-\u05ea]", td.get_text())]
if "absolute" in label_text or ("singular" in label_text and "construct" not in label_text):
# Singular and plural forms in two cells
if len(data_cells) >= 1:
nikkud_sg, audio_sg = _get_menukad_and_audio(data_cells[0])
result["singular_nikkud"] = nikkud_sg
if audio_sg:
result["singular_audio"] = audio_sg
if len(data_cells) >= 2:
nikkud_pl, audio_pl = _get_menukad_and_audio(data_cells[1])
result["plural_nikkud"] = nikkud_pl
if audio_pl:
result["plural_audio"] = audio_pl
elif "construct" in label_text or "סמיכות" in label_text:
if len(data_cells) >= 1:
nikkud_csg, _ = _get_menukad_and_audio(data_cells[0])
result["construct_singular_nikkud"] = nikkud_csg
if len(data_cells) >= 2:
nikkud_cpl, _ = _get_menukad_and_audio(data_cells[1])
result["construct_plural_nikkud"] = nikkud_cpl
return result
def _parse_noun_table_vl(soup: BeautifulSoup) -> dict[str, str]:
"""
Parse the noun declension table from a vl (ktiv male) page.
Returns dict with keys: singular_ktiv, plural_ktiv, construct_singular_ktiv, construct_plural_ktiv.
"""
table = soup.find("table", class_="conjugation-table")
if not table:
return {}
rows = table.find_all("tr")
result: dict[str, str] = {}
for row in rows:
label_cell = row.find("th")
if not label_cell:
tds_all = row.find_all("td")
if tds_all:
label_cell = tds_all[0]
if not label_cell:
continue
label_text = label_cell.get_text(strip=True).lower()
tds = row.find_all("td")
data_cells = [td for td in tds if re.search(r"[\u05d0-\u05ea]", td.get_text())]
if "absolute" in label_text or ("singular" in label_text and "construct" not in label_text):
if len(data_cells) >= 1:
result["singular_ktiv"] = _get_plain_text(data_cells[0])
if len(data_cells) >= 2:
result["plural_ktiv"] = _get_plain_text(data_cells[1])
elif "construct" in label_text or "סמיכות" in label_text:
if len(data_cells) >= 1:
result["construct_singular_ktiv"] = _get_plain_text(data_cells[0])
if len(data_cells) >= 2:
result["construct_plural_ktiv"] = _get_plain_text(data_cells[1])
return result
def _parse_noun_gender_mishkal(soup: BeautifulSoup) -> tuple[str, str]:
"""
Extract (gender, mishkal) from the PoS section of the detail page.
Returns ("masculine"|"feminine"|"", mishkal_english|"").
"""
gender = ""
mishkal = ""
# Try various selectors that pealim uses for PoS info
pos_section = soup.find("div", class_="pos") or soup.find("p", class_="pos")
if not pos_section:
# Look for it in the page header area
pos_section = soup.find("div", class_="page-header")
if pos_section:
text = pos_section.get_text(" ", strip=True)
# Gender detection
for raw, canonical in _GENDER_MAP.items():
if raw in text.lower():
gender = canonical
break
# Mishkal detection: look for CaCaC-style patterns
mishkal_match = re.search(r"\b([A-Z][a-zA-Z\']+)\b", text)
if mishkal_match:
candidate = mishkal_match.group(1)
# Validate: mishkal names contain uppercase letters in CaCaC pattern
if re.match(r"^[A-Za-z\']+$", candidate) and any(c.isupper() for c in candidate):
mishkal = candidate
# Also check the og:description or breadcrumbs for gender
if not gender:
meta = soup.find("meta", {"property": "og:description"})
if meta:
desc = meta.get("content", "").lower()
for raw, canonical in _GENDER_MAP.items():
if raw in desc:
gender = canonical
break
# Scan small/muted spans that often contain gender info
if not gender:
for span in soup.find_all("span", class_=lambda c: c and ("small" in c or "muted" in c or "pos" in c)):
txt = span.get_text(strip=True).lower()
for raw, canonical in _GENDER_MAP.items():
if raw in txt:
gender = canonical
break
if gender:
break
return gender, mishkal
def _scrape_noun_detail(_slug: str, mo_html: str, vl_html: str) -> dict:
"""
Parse noun detail pages (mo=nikkud, vl=ktiv male).
Returns dict to merge into entry's noun_inflection field.
"""
mo_soup = BeautifulSoup(mo_html, "lxml")
vl_soup = BeautifulSoup(vl_html, "lxml")
mo_data = _parse_noun_table(mo_soup)
vl_data = _parse_noun_table_vl(vl_soup)
gender, mishkal = _parse_noun_gender_mishkal(mo_soup)
def form_or_null(nikkud: str, ktiv: str) -> dict | None:
if not nikkud:
return None
if not ktiv:
logger.warning("No ktiv_male for noun form: %s", nikkud)
return {"nikkud": nikkud, "ktiv_male": ktiv}
singular_nikkud = str(mo_data.get("singular_nikkud", ""))
plural_nikkud = str(mo_data.get("plural_nikkud", ""))
construct_singular_nikkud = str(mo_data.get("construct_singular_nikkud", ""))
construct_plural_nikkud = str(mo_data.get("construct_plural_nikkud", ""))
singular_ktiv = str(vl_data.get("singular_ktiv", ""))
plural_ktiv = str(vl_data.get("plural_ktiv", ""))
construct_singular_ktiv = str(vl_data.get("construct_singular_ktiv", ""))
construct_plural_ktiv = str(vl_data.get("construct_plural_ktiv", ""))
result: dict = {
"singular": form_or_null(singular_nikkud, singular_ktiv),
"plural": form_or_null(plural_nikkud, plural_ktiv),
"construct_singular": form_or_null(construct_singular_nikkud, construct_singular_ktiv),
"construct_plural": form_or_null(construct_plural_nikkud, construct_plural_ktiv),
"singular_audio": mo_data.get("singular_audio"),
"plural_audio": mo_data.get("plural_audio"),
"pronominal_suffixes": None,
# plurals_guid is PRESERVED by the merge step — not set here
}
if gender:
result["gender"] = gender
result["gender_hebrew"] = _GENDER_HEBREW.get(gender)
if mishkal:
result["mishkal"] = mishkal
result["mishkal_hebrew"] = MISHKAL_HEBREW.get(mishkal)
return result
# ---------------------------------------------------------------------------
# Verb detail parsing (ported from conjugation_extract.py)
# ---------------------------------------------------------------------------
def _extract_binyan_from_page(soup: BeautifulSoup) -> str:
"""Extract binyan from page header span or og:description."""
texts = [h3.get_text(" ", strip=True) for h3 in soup.find_all("h3", class_="page-header")]
meta = soup.find("meta", {"property": "og:description"})
if meta:
texts.append(str(meta.get("content", "")))
for text in texts:
text_lower = text.lower()
for i, bname_lower in enumerate(_BINYAN_NAMES_LOWER):
if bname_lower in text_lower:
return BINYAN_NAMES[i]
return ""
def _parse_conjugation_table(
soup: BeautifulSoup,
passive: bool = False,
table_el=None,
) -> dict[str, dict]:
"""
Parse conjugation table. Returns form_key -> {form_nikkud, audio_url} dict.
If passive=True, locates the passive table (after "Passive" heading).
If table_el is provided, parses that table directly.
"""
if passive:
passive_h3 = next(
(h for h in soup.find_all("h3") if "passive" in h.get_text(strip=True).lower()),
None,
)
if not passive_h3:
return {}
table = next(
(
sib
for sib in passive_h3.find_all_next()
if sib.name == "table" and "conjugation-table" in sib.get("class", [])
),
None,
)
if not table:
return {}
elif table_el is not None:
table = table_el
else:
table = soup.find("table", class_="conjugation-table")
if not table:
return {}
rows = table.find_all("tr")
if len(rows) < 3:
return {}
forms: dict[str, dict] = {}
def heb_cells(row_idx: int) -> list[tuple[str, str]]:
"""Return (nikkud_text, audio_url) for each Hebrew-containing cell in the row."""
cells = rows[row_idx].find_all(["th", "td"])
result = []
for cell in cells:
txt, au = _get_menukad_and_audio(cell)
colspan = int(cell.get("colspan", 1))
if txt and re.search(r"[\u05d0-\u05ea]", txt):
for _ in range(colspan):
result.append((txt, au))
return result
def deduplicate(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
seen: set[str] = set()
out: list[tuple[str, str]] = []
for pair in pairs:
if pair[0] not in seen:
seen.add(pair[0])
out.append(pair)
return out
def store(key: str, nikkud: str, audio_url: str) -> None:
if nikkud:
forms[key] = {"form_nikkud": nikkud, "audio_url": audio_url}
# Locate tense rows by label text
present_row = past_row = future_row = imp_row = inf_row = -1
for i, row in enumerate(rows):
label = row.get_text(" ", strip=True).lower()
if "present" in label and present_row < 0:
present_row = i
elif "past" in label and past_row < 0:
past_row = i
elif "future" in label and future_row < 0:
future_row = i
elif "imperative" in label and imp_row < 0:
imp_row = i
elif "infinitive" in label and inf_row < 0:
inf_row = i
# Present: ms fs mp fp
if present_row >= 0:
hf = heb_cells(present_row)
for k, (v, au) in zip(["present_ms", "present_fs", "present_mp", "present_fp"], hf, strict=False):
store(k, v, au)
# Past row 0: 1s 1p (deduplicated)
if past_row >= 0:
uniq = deduplicate(heb_cells(past_row))
if len(uniq) >= 1:
store("past_1s", uniq[0][0], uniq[0][1])
if len(uniq) >= 2:
store("past_1p", uniq[1][0], uniq[1][1])
# Past row 1: 2ms 2fs 2mp 2fp
if past_row + 1 < len(rows):
for k, (v, au) in zip(
["past_2ms", "past_2fs", "past_2mp", "past_2fp"],
heb_cells(past_row + 1),
strict=False,
):
store(k, v, au)
# Past row 2: 3ms 3fs 3p (deduplicated)
if past_row + 2 < len(rows):
uniq3 = deduplicate(heb_cells(past_row + 2))
for k, (v, au) in zip(["past_3ms", "past_3fs", "past_3p"], uniq3, strict=False):
store(k, v, au)
# Future row 0: 1s 1p (deduplicated)
if future_row >= 0:
uniq_f = deduplicate(heb_cells(future_row))
if len(uniq_f) >= 1:
store("future_1s", uniq_f[0][0], uniq_f[0][1])
if len(uniq_f) >= 2:
store("future_1p", uniq_f[1][0], uniq_f[1][1])
# Future row 1: 2ms 2fs 2mp 2fp
if future_row + 1 < len(rows):
for k, (v, au) in zip(
["future_2ms", "future_2fs", "future_2mp", "future_2fp"],
heb_cells(future_row + 1),
strict=False,
):
store(k, v, au)
# Future row 2: 3ms 3fs 3mp 3fp
if future_row + 2 < len(rows):
for k, (v, au) in zip(
["future_3ms", "future_3fs", "future_3mp", "future_3fp"],
heb_cells(future_row + 2),
strict=False,
):
store(k, v, au)
# Imperative: ms fs mp fp
if imp_row >= 0:
hf = heb_cells(imp_row)
for k, (v, au) in zip(["imperative_ms", "imperative_fs", "imperative_mp", "imperative_fp"], hf, strict=False):
store(k, v, au)
# Infinitive
if inf_row >= 0:
hf = heb_cells(inf_row)
if hf:
store("infinitive", hf[0][0], hf[0][1])
return forms
def _parse_conjugation_table_vl(
soup: BeautifulSoup,
passive: bool = False,
table_el=None,
) -> dict[str, str]:
"""
Parse conjugation table from a vl (ktiv male) page.
Returns form_key -> ktiv_male_text dict.
Mirrors _parse_conjugation_table but extracts plain text.
"""
if passive:
passive_h3 = next(
(h for h in soup.find_all("h3") if "passive" in h.get_text(strip=True).lower()),
None,
)
if not passive_h3:
return {}
table = next(
(
sib
for sib in passive_h3.find_all_next()
if sib.name == "table" and "conjugation-table" in sib.get("class", [])
),
None,
)
if not table:
return {}
elif table_el is not None:
table = table_el
else:
table = soup.find("table", class_="conjugation-table")
if not table:
return {}
rows = table.find_all("tr")
if len(rows) < 3:
return {}
ktiv_forms: dict[str, str] = {}
def heb_cells_plain(row_idx: int) -> list[str]:
cells = rows[row_idx].find_all(["th", "td"])
result = []
for cell in cells:
txt = _get_plain_text(cell)
colspan = int(cell.get("colspan", 1))
if txt and re.search(r"[\u05d0-\u05ea]", txt):
for _ in range(colspan):
result.append(txt)
return result
def deduplicate_str(items: list[str]) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
out.append(item)
return out
present_row = past_row = future_row = imp_row = inf_row = -1
for i, row in enumerate(rows):
label = row.get_text(" ", strip=True).lower()
if "present" in label and present_row < 0:
present_row = i
elif "past" in label and past_row < 0:
past_row = i
elif "future" in label and future_row < 0:
future_row = i
elif "imperative" in label and imp_row < 0:
imp_row = i
elif "infinitive" in label and inf_row < 0:
inf_row = i
if present_row >= 0:
hf = heb_cells_plain(present_row)
for k, v in zip(["present_ms", "present_fs", "present_mp", "present_fp"], hf, strict=False):
if v:
ktiv_forms[k] = v
if past_row >= 0:
uniq = deduplicate_str(heb_cells_plain(past_row))
if len(uniq) >= 1:
ktiv_forms["past_1s"] = uniq[0]
if len(uniq) >= 2:
ktiv_forms["past_1p"] = uniq[1]
if past_row + 1 < len(rows):
for k, v in zip(
["past_2ms", "past_2fs", "past_2mp", "past_2fp"],
heb_cells_plain(past_row + 1),
strict=False,
):
if v:
ktiv_forms[k] = v
if past_row + 2 < len(rows):
uniq3 = deduplicate_str(heb_cells_plain(past_row + 2))
for k, v in zip(["past_3ms", "past_3fs", "past_3p"], uniq3, strict=False):
if v:
ktiv_forms[k] = v
if future_row >= 0:
uniq_f = deduplicate_str(heb_cells_plain(future_row))
if len(uniq_f) >= 1:
ktiv_forms["future_1s"] = uniq_f[0]
if len(uniq_f) >= 2:
ktiv_forms["future_1p"] = uniq_f[1]
if future_row + 1 < len(rows):
for k, v in zip(
["future_2ms", "future_2fs", "future_2mp", "future_2fp"],
heb_cells_plain(future_row + 1),
strict=False,
):
if v:
ktiv_forms[k] = v
if future_row + 2 < len(rows):
for k, v in zip(
["future_3ms", "future_3fs", "future_3mp", "future_3fp"],
heb_cells_plain(future_row + 2),
strict=False,
):
if v:
ktiv_forms[k] = v
if imp_row >= 0:
hf = heb_cells_plain(imp_row)
for k, v in zip(["imperative_ms", "imperative_fs", "imperative_mp", "imperative_fp"], hf, strict=False):
if v:
ktiv_forms[k] = v
if inf_row >= 0:
hf = heb_cells_plain(inf_row)
if hf:
ktiv_forms["infinitive"] = hf[0]
return ktiv_forms
def _forms_to_active_list(
mo_forms: dict[str, dict],
vl_forms: dict[str, str],
existing_forms: list[dict] | None,
) -> list[dict]:
"""
Convert parsed form dicts into the active_forms list structure (matches SCHEMA.yaml).
Preserves guid and guid_candidates from existing_forms where present.
"""
# Build a lookup of existing form data keyed by (person, tense) for GUID preservation
existing_lookup: dict[tuple[str, str], dict] = {}
if existing_forms:
for ef in existing_forms:
key = (ef.get("person", ""), ef.get("tense", ""))
existing_lookup[key] = ef
active_forms: list[dict] = []
for form_key, form_data in mo_forms.items():
person = FORM_KEY_TO_PERSON.get(form_key, form_key)
tense = TENSE_DESCRIPTION.get(form_key, "")
nikkud = form_data["form_nikkud"]
ktiv = vl_forms.get(form_key, "")
if not ktiv:
logger.warning("No ktiv_male for verb form %s: %s", form_key, nikkud)
audio_url = form_data.get("audio_url", "")
pronoun = PRONOUN_LABELS.get(form_key, "")
# Preserve GUIDs from existing entry
existing = existing_lookup.get((person, tense), {})
guid = existing.get("guid")
guid_candidates = existing.get("guid_candidates")
active_forms.append(
{
"person": person,
"tense": tense,
"pronoun_hebrew": pronoun,
"form": {"nikkud": nikkud, "ktiv_male": ktiv},
"audio_url": audio_url,
"audio_file": existing.get("audio_file"),
"guid": guid,
"guid_candidates": guid_candidates,
}
)
return active_forms
def _scrape_verb_detail(slug: str, mo_html: str, vl_html: str, existing_conj: dict | None) -> dict:
"""
Parse verb detail pages (mo=nikkud, vl=ktiv male).
Returns dict to merge into entry's conjugation field.
Preserves in_conjugation_deck, guid, guid_candidates from existing_conj.
"""
mo_soup = BeautifulSoup(mo_html, "lxml")
vl_soup = BeautifulSoup(vl_html, "lxml")
existing = existing_conj or {}
# Extract metadata from mo page
binyan = _extract_binyan_from_page(mo_soup)
meaning = ""
lead_div = mo_soup.find("div", class_="lead")
if lead_div:
meaning = lead_div.get_text(strip=True)
# Parse active forms
mo_active = _parse_conjugation_table(mo_soup, passive=False)
vl_active = _parse_conjugation_table_vl(vl_soup, passive=False)
if not mo_active:
logger.warning(" No active forms found for slug=%s", slug)
return {}
# Determine infinitive and reference form
infinitive_nikkud = mo_active.get("infinitive", {}).get("form_nikkud", "")
infinitive_ktiv = vl_active.get("infinitive", "")
if infinitive_nikkud and not infinitive_ktiv:
logger.warning("No ktiv_male for infinitive: %s (slug=%s)", infinitive_nikkud, slug)
past_3ms_nikkud = mo_active.get("past_3ms", {}).get("form_nikkud", "")
past_3ms_ktiv = vl_active.get("past_3ms", "")
if past_3ms_nikkud and not past_3ms_ktiv:
logger.warning("No ktiv_male for past_3ms: %s (slug=%s)", past_3ms_nikkud, slug)
# Build active forms list, preserving GUIDs
existing_active_forms = existing.get("active_forms")
active_forms = _forms_to_active_list(mo_active, vl_active, existing_active_forms)
# Check for passive section (Hif'il / Pi'el verbs)
passive_h3 = next(
(h for h in mo_soup.find_all("h3") if "passive" in h.get_text(strip=True).lower()),
None,
)
hufal_pual_forms = None
reference_form_passive = None
if passive_h3:
mo_passive = _parse_conjugation_table(mo_soup, passive=True)
vl_passive = _parse_conjugation_table_vl(vl_soup, passive=True)
if mo_passive:
existing_passive_forms = existing.get("hufal_pual_forms")
hufal_pual_forms = _forms_to_active_list(mo_passive, vl_passive, existing_passive_forms)
passive_3ms_nikkud = mo_passive.get("past_3ms", {}).get("form_nikkud", "")
passive_3ms_ktiv = vl_passive.get("past_3ms", "")
if passive_3ms_nikkud and not passive_3ms_ktiv:
logger.warning("No ktiv_male for passive past_3ms: %s (slug=%s)", passive_3ms_nikkud, slug)
if passive_3ms_nikkud:
reference_form_passive = {"nikkud": passive_3ms_nikkud, "ktiv_male": passive_3ms_ktiv}
result: dict = {
"in_conjugation_deck": existing.get("in_conjugation_deck", False),
"infinitive": {"nikkud": infinitive_nikkud, "ktiv_male": infinitive_ktiv} if infinitive_nikkud else None,
"reference_form": {"nikkud": past_3ms_nikkud, "ktiv_male": past_3ms_ktiv} if past_3ms_nikkud else None,
"binyan": binyan,
"binyan_hebrew": BINYAN_HEBREW.get(binyan, ""),
"meaning": meaning,
"prep": existing.get("prep"),
"active_forms": active_forms,
"hufal_pual_forms": hufal_pual_forms,
"reference_form_passive": reference_form_passive,
}
return result
# ---------------------------------------------------------------------------
# Merging strategy
# ---------------------------------------------------------------------------
def _merge_noun_inflection(existing_ni: dict | None, scraped: dict) -> dict:
"""
Merge scraped noun data into existing noun_inflection, preserving plurals_guid.
"""
result = dict(scraped)
if existing_ni:
# PRESERVE existing plurals_guid — never overwrite
if existing_ni.get("plurals_guid"):
result["plurals_guid"] = existing_ni["plurals_guid"]
# Preserve existing singular_audio if we didn't scrape one
if not result.get("singular_audio") and existing_ni.get("singular_audio"):
result["singular_audio"] = existing_ni["singular_audio"]
# Preserve existing plural_audio if we didn't scrape one
if not result.get("plural_audio") and existing_ni.get("plural_audio"):
result["plural_audio"] = existing_ni["plural_audio"]
# Preserve existing singular/plural if we failed to scrape them
for field in ("singular", "plural", "construct_singular", "construct_plural"):
if not result.get(field) and existing_ni.get(field):
result[field] = existing_ni[field]
else:
result.setdefault("plurals_guid", None)
return result
def _merge_conjugation(_existing_conj: dict | None, scraped: dict) -> dict:
"""
Merge scraped verb data into existing conjugation, preserving in_conjugation_deck
and all guid/guid_candidates fields (already handled in _forms_to_active_list).
"""
# The scraped dict already preserves in_conjugation_deck and GUIDs via _forms_to_active_list
return scraped
# ---------------------------------------------------------------------------
# I/O helpers
# ---------------------------------------------------------------------------
def _load_words() -> dict:
"""Load words.json. Returns empty dict if file not found."""
if WORDS_JSON.exists():
with open(WORDS_JSON, encoding="utf-8") as f:
return json.load(f)
return {}
def _save_words(data: dict) -> None:
"""Atomically write words.json via a .tmp file."""
WORDS_JSON.parent.mkdir(parents=True, exist_ok=True)
tmp_path = str(WORDS_JSON) + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, WORDS_JSON)
# ---------------------------------------------------------------------------
# Main scrape loop
# ---------------------------------------------------------------------------
def _should_process(entry: dict, pos: str, force: bool, nouns_only: bool, verbs_only: bool) -> bool:
"""Return True if this entry should be scraped."""
if not pos.startswith(("Noun", "Verb")):
return False
if nouns_only and not pos.startswith("Noun"):
return False
if verbs_only and not pos.startswith("Verb"):
return False
return force or not entry.get("detail_scraped")
def run(
test: int | None = None,
force_refresh: bool = False,
nouns_only: bool = False,
verbs_only: bool = False,
) -> None:
"""
Main scrape loop.
Args:
test: If set, scrape at most this many entries (for smoke-testing).
force_refresh: Re-scrape entries where detail_scraped=True.
nouns_only: Only scrape noun entries.
verbs_only: Only scrape verb entries.
"""
words = _load_words()
candidates = [
(unique_key, entry)
for unique_key, entry in words.items()
if _should_process(entry, entry.get("pos", ""), force_refresh, nouns_only, verbs_only) and entry.get("slug")
]
total = len(candidates)
if test is not None:
candidates = candidates[:test]
logger.info("Test mode: processing %d of %d eligible entries", len(candidates), total)
else:
logger.info("Processing %d eligible entries (nouns+verbs) from words.json", total)
processed = 0
errors = 0
for idx, (unique_key, entry) in enumerate(candidates, start=1):
slug = entry["slug"]
pos = entry.get("pos", "")
word_nikkud = entry.get("word", {}).get("nikkud", unique_key)
url = f"{PEALIM_BASE}/dict/{slug}/"
label = "Noun" if pos.startswith("Noun") else "Verb"
logger.info("[%d/%d] %s: %s (%s)", idx, len(candidates), label, word_nikkud, slug)
# Fetch mo (nikkud) page
time.sleep(REQUEST_DELAY)
mo_html = _fetch(url, hebstyle="mo")
if not mo_html:
logger.warning(" Skipping %s — failed to fetch mo page", slug)
errors += 1
continue
# Fetch vl (ktiv male) page
time.sleep(REQUEST_DELAY)
vl_html = _fetch(url, hebstyle="vl")
if not vl_html:
logger.warning(" Skipping %s — failed to fetch vl page", slug)
errors += 1
continue
# Parse and merge
try:
if pos.startswith("Noun"):
scraped = _scrape_noun_detail(slug, mo_html, vl_html)
if scraped:
existing_ni = entry.get("noun_inflection") or {}
merged = _merge_noun_inflection(existing_ni, scraped)
words[unique_key]["noun_inflection"] = merged
sg = merged.get("singular", {}) or {}
pl = merged.get("plural", {}) or {}
logger.info(
" singular=%s plural=%s",
sg.get("nikkud", ""),
pl.get("nikkud", ""),
)
else:
logger.warning(" No noun data scraped for %s", slug)
errors += 1
continue
else: # Verb
existing_conj = entry.get("conjugation")
scraped = _scrape_verb_detail(slug, mo_html, vl_html, existing_conj)
if scraped:
merged = _merge_conjugation(existing_conj, scraped)
words[unique_key]["conjugation"] = merged
n_forms = len(merged.get("active_forms", []))
logger.info(
" %s, %d forms",
merged.get("binyan", "?"),
n_forms,
)
else:
logger.warning(" No verb data scraped for %s", slug)
errors += 1
continue
except Exception as exc: # noqa: BLE001
logger.error(" Parse error for %s (%s): %s", slug, word_nikkud, exc, exc_info=True)
errors += 1
continue
words[unique_key]["detail_scraped"] = True
processed += 1
# Incremental save every SAVE_INTERVAL entries
if processed % SAVE_INTERVAL == 0:
logger.info(" Auto-saving after %d entries...", processed)
_save_words(words)
# Final save
_save_words(words)
logger.info(
"Done. Processed=%d, Errors=%d, Total eligible=%d",
processed,
errors,
len(candidates),
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Scrape pealim.com detail pages for nouns and verbs in data/words.json."
)
parser.add_argument(
"--test",
metavar="N",
type=int,
default=None,
help="Scrape only N entries (smoke-test mode).",
)
parser.add_argument(
"--force-refresh-detail",
action="store_true",
default=False,
help="Re-scrape entries where detail_scraped=True.",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--nouns-only",
action="store_true",
default=False,
help="Only scrape Noun entries.",
)
group.add_argument(
"--verbs-only",
action="store_true",
default=False,
help="Only scrape Verb entries.",
)
return parser
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
args = _build_parser().parse_args()
run(
test=args.test,
force_refresh=args.force_refresh_detail,
nouns_only=args.nouns_only,
verbs_only=args.verbs_only,
)