chore: remove legacy scraping scripts replaced by unified pipeline
Removed 11 files that are no longer called by the active pipeline: - hebrew_extract.py (replaced by pealim_list_scrape.py) - conjugation_extract.py (replaced by pealim_detail_scrape.py) - scripts/scrape_noun_plurals.py, scrape_verb_ktiv.py, scrape_ktiv_male.py (all replaced by pealim_detail_scrape.py) - scripts/migrate_to_json.py, repair_slugs.py (one-time migration, complete) - epub_examples.py, rebuild_sentence_matches.py (unused utilities) - scripts/extract_pdf_sentences.py, add_slugs.py (unused one-off scripts) Kept: check_guid_coverage.py, validate_data.py, extract_verb_list.py, validate_apkg.py, validate_verb_list.py, release.py (standalone utilities) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
08fb7009d8
commit
6c2a0f8eed
11 changed files with 0 additions and 4319 deletions
|
|
@ -1,690 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Extract Hebrew verb conjugations from pealim.com.
|
||||
Input: verbs_input.txt (one Hebrew infinitive per line;
|
||||
lines starting with '# 3ms:' search by 3ms past form for Pu'al/Huf'al)
|
||||
Output: data/conjugations.json
|
||||
|
||||
For each verb:
|
||||
1. Search pealim.com/search/?q=<verb> to find URL slug
|
||||
2. Fetch /dict/<slug>/ with hebstyle=mo cookie
|
||||
3. Parse conjugation table by row labels
|
||||
4. Capture audio URLs per form
|
||||
5. Parse passive (Pu'al/Huf'al) forms from the same page
|
||||
|
||||
Resume-safe: verbs already in conjugations.json are skipped.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from helpers import strip_nikkud as _strip_nikkud
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PEALIM_BASE = "https://www.pealim.com"
|
||||
REQUEST_DELAY = 1.5
|
||||
REQUEST_TIMEOUT = 15
|
||||
VERBS_INPUT = Path(__file__).parent / "verbs_input.txt"
|
||||
CONJUGATIONS_PATH = Path(__file__).parent / "data" / "conjugations.json"
|
||||
DICT_CSV = next(
|
||||
(
|
||||
p
|
||||
for p in [
|
||||
Path(__file__).parent / "data" / "hebrew_dict_for_anki.csv",
|
||||
Path(__file__).parent / "data" / "pealim_dict_for_anki.csv",
|
||||
]
|
||||
if p.exists()
|
||||
),
|
||||
Path(__file__).parent / "data" / "hebrew_dict_for_anki.csv",
|
||||
)
|
||||
|
||||
# Pronoun labels (for card front display)
|
||||
PRONOUN_LABELS = {
|
||||
"present_ms": "",
|
||||
"present_fs": "",
|
||||
"present_mp": "",
|
||||
"present_fp": "",
|
||||
"past_1s": "אֲנִי",
|
||||
"past_1p": "אֲנַחְנוּ",
|
||||
"past_2ms": "אַתָּה",
|
||||
"past_2fs": "אַתְּ",
|
||||
"past_2mp": "אַתֶּם",
|
||||
"past_2fp": "אַתֶּן",
|
||||
"past_3ms": "הוּא",
|
||||
"past_3fs": "הִיא",
|
||||
"past_3p": "הֵם / הֵן",
|
||||
"future_1s": "אֲנִי",
|
||||
"future_1p": "אֲנַחְנוּ",
|
||||
"future_2ms": "אַתָּה",
|
||||
"future_2fs": "אַתְּ",
|
||||
"future_2mp": "אַתֶּם",
|
||||
"future_2fp": "אַתֶּן",
|
||||
"future_3ms": "הוּא",
|
||||
"future_3fs": "הִיא",
|
||||
"future_3mp": "הֵם",
|
||||
"future_3fp": "הֵן",
|
||||
"imperative_ms": "אַתָּה",
|
||||
"imperative_fs": "אַתְּ",
|
||||
"imperative_mp": "אַתֶּם",
|
||||
"imperative_fp": "אַתֶּן",
|
||||
"infinitive": "",
|
||||
}
|
||||
|
||||
# Human-readable tense description for card front
|
||||
TENSE_DESCRIPTION = {
|
||||
"present_ms": "הוֹוֶה",
|
||||
"present_fs": "הוֹוֶה",
|
||||
"present_mp": "הוֹוֶה",
|
||||
"present_fp": "הוֹוֶה",
|
||||
"past_1s": "עָבָר",
|
||||
"past_1p": "עָבָר",
|
||||
"past_2ms": "עָבָר",
|
||||
"past_2fs": "עָבָר",
|
||||
"past_2mp": "עָבָר",
|
||||
"past_2fp": "עָבָר",
|
||||
"past_3ms": "עָבָר",
|
||||
"past_3fs": "עָבָר",
|
||||
"past_3p": "עָבָר",
|
||||
"future_1s": "עָתִיד",
|
||||
"future_1p": "עָתִיד",
|
||||
"future_2ms": "עָתִיד",
|
||||
"future_2fs": "עָתִיד",
|
||||
"future_2mp": "עָתִיד",
|
||||
"future_2fp": "עָתִיד",
|
||||
"future_3ms": "עָתִיד",
|
||||
"future_3fs": "עָתִיד",
|
||||
"future_3mp": "עָתִיד",
|
||||
"future_3fp": "עָתִיד",
|
||||
"imperative_ms": "צִוּוּי",
|
||||
"imperative_fs": "צִוּוּי",
|
||||
"imperative_mp": "צִוּוּי",
|
||||
"imperative_fp": "צִוּוּי",
|
||||
"infinitive": "מְקוֹר",
|
||||
}
|
||||
|
||||
BINYAN_NAMES: tuple[str, ...] = ("Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al")
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki/2.0)"})
|
||||
|
||||
|
||||
|
||||
def _build_pos_lookup() -> dict[str, str]:
|
||||
"""Build word_stripped → binyan dict from pealim_dict_for_anki.csv."""
|
||||
lookup: dict[str, str] = {}
|
||||
if not DICT_CSV.exists():
|
||||
return lookup
|
||||
|
||||
try:
|
||||
import pandas as pd
|
||||
|
||||
try:
|
||||
df = pd.read_csv(DICT_CSV, sep=";", index_col=0)
|
||||
if df.shape[1] < 3:
|
||||
raise ValueError("too few columns")
|
||||
except (ValueError, pd.errors.ParserError):
|
||||
df = pd.read_csv(DICT_CSV, index_col=0)
|
||||
|
||||
for _, row in df.iterrows():
|
||||
word = str(row.get("Word", "")).strip()
|
||||
pos = str(row.get("Part of speech", row.get("Part of Speech", ""))).strip()
|
||||
if word and pos and "nan" not in pos.lower():
|
||||
lookup[_strip_nikkud(word)] = pos
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not load PoS lookup: {e}")
|
||||
|
||||
return lookup
|
||||
|
||||
|
||||
# Cache PoS lookup (built once)
|
||||
_pos_lookup: dict[str, str] | None = None
|
||||
|
||||
|
||||
def _get_pos_lookup() -> dict[str, str]:
|
||||
global _pos_lookup
|
||||
if _pos_lookup is None:
|
||||
_pos_lookup = _build_pos_lookup()
|
||||
return _pos_lookup
|
||||
|
||||
|
||||
def _binyan_from_pos(word: str) -> str:
|
||||
"""Look up binyan from PoS field: 'Verb – pa\'al' or 'Verb – Pi\'el' → canonical name."""
|
||||
lookup = _get_pos_lookup()
|
||||
pos_str = lookup.get(_strip_nikkud(word), "")
|
||||
if not pos_str:
|
||||
return ""
|
||||
|
||||
pos_lower = pos_str.lower()
|
||||
# Map lowercase pealim.com PoS variants → canonical names
|
||||
for bname, variants in [
|
||||
("Pa'al", ["pa'al", "paal"]),
|
||||
("Nif'al", ["nif'al", "nifal"]),
|
||||
("Pi'el", ["pi'el", "piel"]),
|
||||
("Pu'al", ["pu'al", "pual"]),
|
||||
("Hitpa'el", ["hitpa'el", "hitpael"]),
|
||||
("Hif'il", ["hif'il", "hifil"]),
|
||||
("Huf'al", ["huf'al", "hufal"]),
|
||||
]:
|
||||
if any(v in pos_lower for v in variants):
|
||||
return bname
|
||||
return ""
|
||||
|
||||
|
||||
def _find_slug(query: str) -> str | None:
|
||||
"""Search pealim.com/search/?q=<verb> and return the URL slug."""
|
||||
url = f"{PEALIM_BASE}/search/?q={urllib.parse.quote(query)}"
|
||||
try:
|
||||
resp = session.get(url, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
slugs = re.findall(r"/dict/(\d+[-][^/?\"<>\s]+)/", resp.text)
|
||||
if slugs:
|
||||
slug = slugs[0]
|
||||
logger.info(f" Slug: {slug}")
|
||||
return slug
|
||||
except Exception as e:
|
||||
logger.error(f" Error searching for '{query}': {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _is_passive_binyan(binyan: str) -> bool:
|
||||
"""Return True if the binyan is a passive (Pu'al or Huf'al)."""
|
||||
return any(marker in binyan for marker in ("פֻּעַל", "הֻפְעַל", "Pu'al", "Huf'al"))
|
||||
|
||||
|
||||
def _get_menukad(cell) -> tuple[str, str]:
|
||||
"""
|
||||
Extract nikkud Hebrew text and audio URL from a table cell.
|
||||
Returns (form_text, audio_url).
|
||||
"""
|
||||
# Audio URL
|
||||
audio_span = cell.find("span", class_=lambda c: c and "audio-play" in c)
|
||||
audio_url = ""
|
||||
if audio_span:
|
||||
audio_url = audio_span.get("data-audio", "")
|
||||
|
||||
span = cell.find("span", class_="menukad")
|
||||
if span:
|
||||
return span.get_text(strip=True), audio_url
|
||||
|
||||
txt = cell.get_text(strip=True)
|
||||
if re.search(r"[\u05d0-\u05ea]", txt):
|
||||
return txt, audio_url
|
||||
return "", audio_url
|
||||
|
||||
|
||||
def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> dict[str, dict]:
|
||||
"""
|
||||
Parse the pealim conjugation table and return form_key -> {form, audio_url} mapping.
|
||||
If passive=True, look for the passive table (after "Passive" heading).
|
||||
If table_el is provided (and passive=False), parse that table directly.
|
||||
"""
|
||||
if passive:
|
||||
# Find <h3> containing "Passive"
|
||||
passive_h3 = None
|
||||
for h3 in soup.find_all("h3"):
|
||||
if "passive" in h3.get_text(strip=True).lower():
|
||||
passive_h3 = h3
|
||||
break
|
||||
if not passive_h3:
|
||||
return {}
|
||||
# Find next conjugation table after this heading
|
||||
table = None
|
||||
for sib in passive_h3.find_all_next():
|
||||
if sib.name == "table" and "conjugation-table" in sib.get("class", []):
|
||||
table = sib
|
||||
break
|
||||
if not table:
|
||||
return {}
|
||||
elif table_el is not None:
|
||||
table = table_el
|
||||
else:
|
||||
table = soup.find("table", class_="conjugation-table")
|
||||
|
||||
if not table:
|
||||
return {}
|
||||
|
||||
rows = table.find_all("tr")
|
||||
if len(rows) < 9:
|
||||
return {}
|
||||
|
||||
forms: dict[str, dict] = {}
|
||||
|
||||
def first_heb_forms(row_idx: int) -> list[tuple[str, str]]:
|
||||
"""Get only the Hebrew-text cells from a row (skip label cells)."""
|
||||
cells = rows[row_idx].find_all(["th", "td"])
|
||||
result = []
|
||||
for cell in cells:
|
||||
txt, audio_url = _get_menukad(cell)
|
||||
colspan = int(cell.get("colspan", 1))
|
||||
if txt and re.search(r"[\u05d0-\u05ea]", txt):
|
||||
for _ in range(colspan):
|
||||
result.append((txt, audio_url))
|
||||
return result
|
||||
|
||||
def deduplicate(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
|
||||
"""Return pairs with duplicate form-text entries removed (first occurrence kept)."""
|
||||
seen: set[str] = set()
|
||||
out: list[tuple[str, str]] = []
|
||||
for pair in pairs:
|
||||
if pair[0] not in seen:
|
||||
seen.add(pair[0])
|
||||
out.append(pair)
|
||||
return out
|
||||
|
||||
# Find rows by tense label
|
||||
present_row = past_row = future_row = imp_row = inf_row = -1
|
||||
for i, row in enumerate(rows):
|
||||
label = row.get_text(" ", strip=True).lower()
|
||||
if "present" in label and present_row < 0:
|
||||
present_row = i
|
||||
elif "past" in label and past_row < 0:
|
||||
past_row = i
|
||||
elif "future" in label and future_row < 0:
|
||||
future_row = i
|
||||
elif "imperative" in label and imp_row < 0:
|
||||
imp_row = i
|
||||
elif "infinitive" in label and inf_row < 0:
|
||||
inf_row = i
|
||||
|
||||
def store(key: str, form: str, audio_url: str) -> None:
|
||||
if form:
|
||||
forms[key] = {"form": form, "audio_url": audio_url}
|
||||
|
||||
# Present tense (4 forms: ms fs mp fp)
|
||||
if present_row >= 0:
|
||||
hf = first_heb_forms(present_row)
|
||||
keys = ["present_ms", "present_fs", "present_mp", "present_fp"]
|
||||
for k, (v, au) in zip(keys, hf, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
# Past tense
|
||||
if past_row >= 0:
|
||||
unique = deduplicate(first_heb_forms(past_row))
|
||||
if len(unique) >= 1:
|
||||
store("past_1s", unique[0][0], unique[0][1])
|
||||
if len(unique) >= 2:
|
||||
store("past_1p", unique[1][0], unique[1][1])
|
||||
|
||||
if past_row + 1 < len(rows):
|
||||
hf2 = first_heb_forms(past_row + 1)
|
||||
keys2 = ["past_2ms", "past_2fs", "past_2mp", "past_2fp"]
|
||||
for k, (v, au) in zip(keys2, hf2, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
if past_row + 2 < len(rows):
|
||||
unique3 = deduplicate(first_heb_forms(past_row + 2))
|
||||
keys3 = ["past_3ms", "past_3fs", "past_3p"]
|
||||
for k, (v, au) in zip(keys3, unique3, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
# Future tense
|
||||
if future_row >= 0:
|
||||
unique_f = deduplicate(first_heb_forms(future_row))
|
||||
if len(unique_f) >= 1:
|
||||
store("future_1s", unique_f[0][0], unique_f[0][1])
|
||||
if len(unique_f) >= 2:
|
||||
store("future_1p", unique_f[1][0], unique_f[1][1])
|
||||
|
||||
if future_row + 1 < len(rows):
|
||||
hf2 = first_heb_forms(future_row + 1)
|
||||
keys2 = ["future_2ms", "future_2fs", "future_2mp", "future_2fp"]
|
||||
for k, (v, au) in zip(keys2, hf2, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
if future_row + 2 < len(rows):
|
||||
hf3 = first_heb_forms(future_row + 2)
|
||||
keys3 = ["future_3ms", "future_3fs", "future_3mp", "future_3fp"]
|
||||
for k, (v, au) in zip(keys3, hf3, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
# Imperative
|
||||
if imp_row >= 0:
|
||||
hf = first_heb_forms(imp_row)
|
||||
keys = ["imperative_ms", "imperative_fs", "imperative_mp", "imperative_fp"]
|
||||
for k, (v, au) in zip(keys, hf, strict=False):
|
||||
store(k, v, au)
|
||||
|
||||
# Infinitive
|
||||
if inf_row >= 0:
|
||||
hf = first_heb_forms(inf_row)
|
||||
if hf:
|
||||
store("infinitive", hf[0][0], hf[0][1])
|
||||
|
||||
return forms
|
||||
|
||||
|
||||
def _extract_binyan_from_page(soup: BeautifulSoup) -> str:
|
||||
"""Extract binyan from page header span."""
|
||||
for h3 in soup.find_all("h3", class_="page-header"):
|
||||
text = h3.get_text(" ", strip=True)
|
||||
for bname in BINYAN_NAMES:
|
||||
if bname in text:
|
||||
return bname
|
||||
# Also try og:description
|
||||
meta = soup.find("meta", {"property": "og:description"})
|
||||
if meta:
|
||||
desc = meta.get("content", "")
|
||||
for bname in BINYAN_NAMES:
|
||||
if bname in desc:
|
||||
return bname
|
||||
return ""
|
||||
|
||||
|
||||
def _extract_passive_binyan_from_page(soup: BeautifulSoup) -> str:
|
||||
"""Extract passive binyan name from passive section heading."""
|
||||
for h3 in soup.find_all("h3"):
|
||||
text = h3.get_text(" ", strip=True)
|
||||
if "passive" in text.lower():
|
||||
for bname in ("Pu'al", "Huf'al"):
|
||||
if bname in text:
|
||||
return bname
|
||||
# Infer: Pa'al/Pi'el → Pu'al; Hif'il → Huf'al (stored as span text)
|
||||
span = h3.find("span", class_="small")
|
||||
if span:
|
||||
span_text = span.get_text(strip=True)
|
||||
for bname in ("Pu'al", "Huf'al"):
|
||||
if bname in span_text:
|
||||
return bname
|
||||
return ""
|
||||
|
||||
|
||||
def _extract_conjugations(
|
||||
slug: str, search_term: str, is_3ms_search: bool = False, binyan_hint: str = ""
|
||||
) -> dict | None:
|
||||
"""Fetch /dict/<slug>/ and parse conjugation table (active + passive)."""
|
||||
url = f"{PEALIM_BASE}/dict/{slug}/"
|
||||
try:
|
||||
resp = session.get(url, cookies={"hebstyle": "mo"}, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
logger.error(f" Error fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(resp.text, "lxml")
|
||||
|
||||
# Extract meaning from <div class="lead"> (English translation)
|
||||
meaning = ""
|
||||
lead_div = soup.find("div", class_="lead")
|
||||
if lead_div:
|
||||
meaning = lead_div.get_text(strip=True)
|
||||
|
||||
# Extract root
|
||||
root = ""
|
||||
for span in soup.find_all("span", class_="menukad"):
|
||||
txt = span.get_text(strip=True)
|
||||
if txt and re.search(r"[\u05d0-\u05ea]", txt) and "-" in txt:
|
||||
root = txt
|
||||
break
|
||||
|
||||
# Extract binyan: try PoS lookup first, then page header, then section hint
|
||||
binyan = _binyan_from_pos(search_term) if not is_3ms_search else ""
|
||||
if not binyan:
|
||||
binyan = _extract_binyan_from_page(soup)
|
||||
if not binyan:
|
||||
binyan = binyan_hint
|
||||
|
||||
# Parse active forms table
|
||||
forms_raw = _parse_table(soup, passive=False)
|
||||
|
||||
if not forms_raw:
|
||||
logger.warning(f" No forms found for {slug}")
|
||||
return None
|
||||
|
||||
is_passive = _is_passive_binyan(binyan)
|
||||
|
||||
# For passive binyan search (3ms search), the "active" table is actually the passive one
|
||||
# Determine reference form
|
||||
infinitive_form = forms_raw.get("infinitive", {}).get("form", "") if not is_passive else ""
|
||||
past_3ms_form = forms_raw.get("past_3ms", {}).get("form", "")
|
||||
|
||||
reference_form = (past_3ms_form or search_term) if is_passive else (infinitive_form or search_term)
|
||||
|
||||
# Build active result
|
||||
result = {
|
||||
"infinitive": search_term,
|
||||
"slug": slug,
|
||||
"root": root,
|
||||
"binyan": binyan,
|
||||
"meaning": meaning,
|
||||
"is_passive": is_passive,
|
||||
"reference_form": reference_form,
|
||||
"forms": {},
|
||||
}
|
||||
|
||||
for key, form_data in forms_raw.items():
|
||||
if key in PRONOUN_LABELS:
|
||||
result["forms"][key] = {
|
||||
"form": form_data["form"],
|
||||
"audio_url": form_data.get("audio_url", ""),
|
||||
"pronoun": PRONOUN_LABELS[key],
|
||||
"tense": TENSE_DESCRIPTION.get(key, ""),
|
||||
}
|
||||
|
||||
# Check for a second conjugation table (alternate paradigm, e.g. להתגלות)
|
||||
# Collect all active tables (exclude passive tables which follow the "Passive" h3)
|
||||
passive_h3 = next(
|
||||
(h for h in soup.find_all("h3") if "passive" in h.get_text(strip=True).lower()),
|
||||
None,
|
||||
)
|
||||
passive_table_ids = {
|
||||
id(t) for t in (passive_h3.find_all_next("table", class_="conjugation-table") if passive_h3 else [])
|
||||
}
|
||||
active_tables = [t for t in soup.find_all("table", class_="conjugation-table") if id(t) not in passive_table_ids]
|
||||
if len(active_tables) >= 2:
|
||||
alt_raw = _parse_table(soup, passive=False, table_el=active_tables[1])
|
||||
alternate_forms = {}
|
||||
for key, form_data in alt_raw.items():
|
||||
if key in PRONOUN_LABELS:
|
||||
alt_form = form_data["form"]
|
||||
primary_form = forms_raw.get(key, {}).get("form", "")
|
||||
if alt_form and alt_form != primary_form:
|
||||
alternate_forms[key] = alt_form
|
||||
if alternate_forms:
|
||||
result["alternate_forms"] = alternate_forms
|
||||
logger.info(f" Found {len(alternate_forms)} alternate forms for {search_term}")
|
||||
|
||||
logger.info(f" Extracted {len(result['forms'])} forms for {search_term}")
|
||||
return result
|
||||
|
||||
|
||||
def _load_conjugations() -> dict:
|
||||
if CONJUGATIONS_PATH.exists():
|
||||
with open(CONJUGATIONS_PATH, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
return {}
|
||||
|
||||
|
||||
def _save_conjugations(data: dict) -> None:
|
||||
CONJUGATIONS_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(CONJUGATIONS_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def _extract_passive_from_active_slug(active_slug: str, search_term: str, binyan_hint: str = "") -> dict | None:
|
||||
"""Fetch active verb page and extract only the passive section forms.
|
||||
Used for Pu'al/Huf'al 3ms entries where we know the active verb's slug."""
|
||||
url = f"{PEALIM_BASE}/dict/{active_slug}/"
|
||||
try:
|
||||
resp = session.get(url, cookies={"hebstyle": "mo"}, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
logger.error(f" Error fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(resp.text, "lxml")
|
||||
|
||||
# Extract meaning (this is the active verb's meaning — useful context for passive)
|
||||
meaning = ""
|
||||
lead_div = soup.find("div", class_="lead")
|
||||
if lead_div:
|
||||
meaning = lead_div.get_text(strip=True)
|
||||
|
||||
root = ""
|
||||
for span in soup.find_all("span", class_="menukad"):
|
||||
txt = span.get_text(strip=True)
|
||||
if txt and re.search(r"[\u05d0-\u05ea]", txt) and "-" in txt:
|
||||
root = txt
|
||||
break
|
||||
|
||||
active_binyan = _extract_binyan_from_page(soup)
|
||||
active_forms_raw = _parse_table(soup, passive=False)
|
||||
active_infinitive = active_forms_raw.get("infinitive", {}).get("form", "")
|
||||
|
||||
passive_forms_raw = _parse_table(soup, passive=True)
|
||||
if not passive_forms_raw:
|
||||
logger.warning(f" No passive forms found on {active_slug} for {search_term}")
|
||||
return None
|
||||
|
||||
passive_binyan = _extract_passive_binyan_from_page(soup)
|
||||
if not passive_binyan:
|
||||
passive_binyan = "Pu'al" if active_binyan == "Pi'el" else "Huf'al" if active_binyan == "Hif'il" else ""
|
||||
if not passive_binyan:
|
||||
passive_binyan = binyan_hint
|
||||
|
||||
result = {
|
||||
"infinitive": search_term,
|
||||
"slug": active_slug,
|
||||
"root": root,
|
||||
"binyan": passive_binyan,
|
||||
"meaning": meaning,
|
||||
"is_passive": True,
|
||||
"reference_form": active_infinitive or search_term,
|
||||
"forms": {},
|
||||
}
|
||||
for key, form_data in passive_forms_raw.items():
|
||||
if key in PRONOUN_LABELS:
|
||||
result["forms"][key] = {
|
||||
"form": form_data["form"],
|
||||
"audio_url": form_data.get("audio_url", ""),
|
||||
"pronoun": PRONOUN_LABELS[key],
|
||||
"tense": TENSE_DESCRIPTION.get(key, ""),
|
||||
}
|
||||
|
||||
logger.info(f" Extracted {len(result['forms'])} passive forms for {search_term} from {active_slug}")
|
||||
return result
|
||||
|
||||
|
||||
def main(verbs_file: Path = VERBS_INPUT) -> dict:
|
||||
"""Read verbs from file and extract conjugations. Returns full conjugations dict."""
|
||||
if not verbs_file.exists():
|
||||
logger.warning(f"verbs_input.txt not found at {verbs_file} — skipping")
|
||||
return _load_conjugations()
|
||||
|
||||
raw_lines = verbs_file.read_text(encoding="utf-8").splitlines()
|
||||
|
||||
# Parse slug overrides: "# slug: VERB SLUG" anywhere in the file
|
||||
slug_overrides: dict[str, str] = {}
|
||||
for line in raw_lines:
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("# slug:"):
|
||||
parts = stripped[len("# slug:") :].strip().split()
|
||||
if len(parts) >= 2:
|
||||
slug_overrides[parts[0]] = parts[1]
|
||||
|
||||
# Map section header keywords → binyan name (for binyan_hint fallback)
|
||||
SECTION_BINYAN = {
|
||||
"pa'al": "Pa'al",
|
||||
"nif'al": "Nif'al",
|
||||
"pi'el": "Pi'el",
|
||||
"pu'al": "Pu'al",
|
||||
"hitpa'el": "Hitpa'el",
|
||||
"hif'il": "Hif'il",
|
||||
"huf'al": "Huf'al",
|
||||
}
|
||||
|
||||
# Parse: regular verbs and # 3ms: lines (optional active slug on 3ms lines)
|
||||
# Track current section binyan from comment headers for use as a hint
|
||||
verbs: list[tuple[str, bool, str | None, str]] = [] # (search_term, is_3ms_search, active_slug, binyan_hint)
|
||||
current_binyan_hint = ""
|
||||
for line in raw_lines:
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("# slug:"):
|
||||
continue
|
||||
if stripped.startswith("# 3ms:"):
|
||||
parts = stripped[len("# 3ms:") :].strip().split()
|
||||
if parts:
|
||||
form = parts[0]
|
||||
active_slug = parts[1] if len(parts) >= 2 else None
|
||||
verbs.append((form, True, active_slug, current_binyan_hint))
|
||||
elif stripped.startswith("#"):
|
||||
# Check if this is a section header setting the binyan context
|
||||
low = stripped.lower()
|
||||
for key, bname in SECTION_BINYAN.items():
|
||||
if key in low:
|
||||
current_binyan_hint = bname
|
||||
break
|
||||
else:
|
||||
verbs.append((stripped, False, None, current_binyan_hint))
|
||||
|
||||
logger.info(f"Loaded {len(verbs)} verbs from {verbs_file} ({sum(1 for _, p, _, _ in verbs if p)} passive 3ms)")
|
||||
if slug_overrides:
|
||||
logger.info(f" Slug overrides: {slug_overrides}")
|
||||
|
||||
conjugations = _load_conjugations()
|
||||
new_count = 0
|
||||
|
||||
for verb, is_3ms, active_slug, binyan_hint in verbs:
|
||||
if verb in conjugations:
|
||||
logger.info(f"Skipping {verb} (cached)")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing: {verb} {'(3ms search)' if is_3ms else ''}")
|
||||
time.sleep(REQUEST_DELAY)
|
||||
|
||||
if is_3ms:
|
||||
# Passive-only extraction: use provided active slug or search to find it
|
||||
if active_slug:
|
||||
slug = active_slug
|
||||
logger.info(f" Using active slug {slug} for passive extraction")
|
||||
else:
|
||||
slug = _find_slug(verb)
|
||||
if not slug:
|
||||
logger.warning(f" No slug found for {verb}")
|
||||
conjugations[verb] = None
|
||||
_save_conjugations(conjugations)
|
||||
continue
|
||||
logger.info(f" Found active slug {slug} for passive extraction")
|
||||
time.sleep(REQUEST_DELAY)
|
||||
data = _extract_passive_from_active_slug(slug, verb, binyan_hint=binyan_hint)
|
||||
else:
|
||||
override = slug_overrides.get(verb)
|
||||
if override:
|
||||
logger.info(f" Slug override: {override}")
|
||||
slug = override
|
||||
else:
|
||||
slug = _find_slug(verb)
|
||||
if not slug:
|
||||
logger.warning(f" No slug found for {verb}")
|
||||
conjugations[verb] = None
|
||||
_save_conjugations(conjugations)
|
||||
continue
|
||||
time.sleep(REQUEST_DELAY)
|
||||
data = _extract_conjugations(slug, verb, is_3ms_search=False, binyan_hint=binyan_hint)
|
||||
|
||||
conjugations[verb] = data
|
||||
_save_conjugations(conjugations)
|
||||
new_count += 1
|
||||
|
||||
logger.info(f"Done: {new_count} new verbs processed")
|
||||
return conjugations
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
result = main()
|
||||
for verb, data in result.items():
|
||||
if data:
|
||||
forms = data.get("forms", {})
|
||||
print(f"{verb}: {len(forms)} forms, binyan={data.get('binyan')}")
|
||||
sample_form = next(iter(forms.values()), {}) if forms else {}
|
||||
print(f" sample audio_url: {sample_form.get('audio_url', 'MISSING')[:60]}")
|
||||
else:
|
||||
print(f"{verb}: no data")
|
||||
446
epub_examples.py
446
epub_examples.py
|
|
@ -1,446 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Extract example sentences from nikud'd Hebrew EPUBs (and PDFs where possible),
|
||||
match them against the vocab list, and produce examples_cache.json.
|
||||
|
||||
Usage:
|
||||
python3 epub_examples.py
|
||||
|
||||
Outputs:
|
||||
data/epub_sentence_index.json — full sentence corpus
|
||||
data/examples_cache.json — best sentence(s) per vocab word
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
from html.parser import HTMLParser
|
||||
from pathlib import Path
|
||||
|
||||
from helpers import strip_nikkud
|
||||
|
||||
DATA_DIR = Path(__file__).parent / "data"
|
||||
EPUB_DIR = DATA_DIR / "epubs"
|
||||
DICT_CSV = DATA_DIR / "hebrew_dict_for_anki.csv"
|
||||
|
||||
# Book metadata: filename -> display name
|
||||
EPUB_BOOKS = {
|
||||
"little_prince.epub": "הנסיך הקטן",
|
||||
"time_tunnel_82.epub": "מנהרת הזמן 82",
|
||||
}
|
||||
|
||||
# PDF books are excluded — pypdf produces garbled RTL text (reversed chars within
|
||||
# words). If/when a proper EPUB version becomes available on Calibre, add it to
|
||||
# EPUB_BOOKS above instead.
|
||||
PDF_BOOKS: dict[str, str] = {}
|
||||
|
||||
# Sentence length bounds (word count)
|
||||
MIN_WORDS = 4
|
||||
MAX_WORDS = 15
|
||||
|
||||
|
||||
|
||||
# ── HTML text extraction ─────────────────────────────────────────
|
||||
|
||||
|
||||
class _TextExtractor(HTMLParser):
|
||||
"""Extract text content from HTML, skipping script/style tags."""
|
||||
|
||||
SKIP_TAGS = {"script", "style", "head"}
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.parts: list[str] = []
|
||||
self._skip_depth = 0
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if tag in self.SKIP_TAGS:
|
||||
self._skip_depth += 1
|
||||
# Insert space for block-level elements to avoid word concatenation
|
||||
if tag in (
|
||||
"p",
|
||||
"div",
|
||||
"br",
|
||||
"li",
|
||||
"h1",
|
||||
"h2",
|
||||
"h3",
|
||||
"h4",
|
||||
"h5",
|
||||
"h6",
|
||||
"td",
|
||||
"th",
|
||||
"tr",
|
||||
"blockquote",
|
||||
"section",
|
||||
):
|
||||
self.parts.append("\n")
|
||||
|
||||
def handle_endtag(self, tag):
|
||||
if tag in self.SKIP_TAGS:
|
||||
self._skip_depth = max(0, self._skip_depth - 1)
|
||||
|
||||
def handle_data(self, data):
|
||||
if self._skip_depth == 0:
|
||||
self.parts.append(data)
|
||||
|
||||
def get_text(self) -> str:
|
||||
return "".join(self.parts)
|
||||
|
||||
|
||||
def extract_text_from_html(html: str) -> str:
|
||||
"""Parse HTML and return plain text."""
|
||||
parser = _TextExtractor()
|
||||
parser.feed(html)
|
||||
return parser.get_text()
|
||||
|
||||
|
||||
# ── EPUB processing ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def _content_files_from_epub(zf: zipfile.ZipFile) -> list[str]:
|
||||
"""Get ordered list of content XHTML files from the OPF manifest."""
|
||||
# Find the OPF file
|
||||
opf_path = None
|
||||
for name in zf.namelist():
|
||||
if name.endswith(".opf"):
|
||||
opf_path = name
|
||||
break
|
||||
if not opf_path:
|
||||
# Fallback: just use all xhtml files
|
||||
return sorted(
|
||||
n
|
||||
for n in zf.namelist()
|
||||
if n.endswith((".xhtml", ".html"))
|
||||
and "toc" not in n.lower()
|
||||
and "cover" not in n.lower()
|
||||
and "nav" not in n.lower()
|
||||
)
|
||||
|
||||
# Parse OPF to get spine order
|
||||
opf_content = zf.read(opf_path).decode("utf-8")
|
||||
opf_dir = os.path.dirname(opf_path)
|
||||
|
||||
# Extract manifest items: id -> href
|
||||
manifest = {}
|
||||
for m in re.finditer(r'<item\s+[^>]*id="([^"]+)"[^>]*href="([^"]+)"', opf_content):
|
||||
manifest[m.group(1)] = m.group(2)
|
||||
# Also try reversed attribute order
|
||||
for m in re.finditer(r'<item\s+[^>]*href="([^"]+)"[^>]*id="([^"]+)"', opf_content):
|
||||
manifest[m.group(2)] = m.group(1)
|
||||
|
||||
# Extract spine order
|
||||
spine_ids = re.findall(r'<itemref\s+[^>]*idref="([^"]+)"', opf_content)
|
||||
|
||||
result = []
|
||||
for sid in spine_ids:
|
||||
href = manifest.get(sid, "")
|
||||
if href and href.endswith((".xhtml", ".html")):
|
||||
full_path = os.path.join(opf_dir, href) if opf_dir else href
|
||||
# Normalize path separators
|
||||
full_path = full_path.replace("\\", "/")
|
||||
if full_path in zf.namelist():
|
||||
result.append(full_path)
|
||||
|
||||
if not result:
|
||||
# Fallback
|
||||
return sorted(
|
||||
n
|
||||
for n in zf.namelist()
|
||||
if n.endswith((".xhtml", ".html")) and "toc" not in n.lower() and "cover" not in n.lower()
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def extract_sentences_from_epub(epub_path: Path, book_name: str) -> list[dict]:
|
||||
"""Extract sentences from an EPUB file.
|
||||
|
||||
Returns list of {"text": str, "book": str, "stripped": str}
|
||||
"""
|
||||
zf = zipfile.ZipFile(epub_path)
|
||||
content_files = _content_files_from_epub(zf)
|
||||
|
||||
all_text = []
|
||||
for cf in content_files:
|
||||
try:
|
||||
html = zf.read(cf).decode("utf-8")
|
||||
except (KeyError, UnicodeDecodeError):
|
||||
continue
|
||||
text = extract_text_from_html(html)
|
||||
all_text.append(text)
|
||||
|
||||
full_text = "\n".join(all_text)
|
||||
return _split_into_sentences(full_text, book_name)
|
||||
|
||||
|
||||
# ── PDF processing ───────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_sentences_from_pdf(pdf_path: Path, book_name: str) -> list[dict]:
|
||||
"""Extract sentences from a PDF file (best-effort, handles RTL reversal)."""
|
||||
try:
|
||||
import pypdf
|
||||
except ImportError:
|
||||
print(f" [SKIP] pypdf not installed, cannot process {pdf_path.name}")
|
||||
return []
|
||||
|
||||
reader = pypdf.PdfReader(pdf_path)
|
||||
all_text_parts = []
|
||||
|
||||
for page in reader.pages:
|
||||
raw = page.extract_text()
|
||||
if not raw:
|
||||
continue
|
||||
# pypdf often reverses word order for RTL text; fix it
|
||||
fixed_lines = []
|
||||
for line in raw.split("\n"):
|
||||
words = line.split()
|
||||
# Check if this line is predominantly Hebrew
|
||||
hebrew_chars = sum(1 for c in line if "\u0590" <= c <= "\u05ff")
|
||||
if hebrew_chars > len(line) * 0.3 and len(words) > 1:
|
||||
# Reverse word order
|
||||
fixed_lines.append(" ".join(reversed(words)))
|
||||
else:
|
||||
fixed_lines.append(line)
|
||||
all_text_parts.append("\n".join(fixed_lines))
|
||||
|
||||
full_text = "\n".join(all_text_parts)
|
||||
return _split_into_sentences(full_text, book_name)
|
||||
|
||||
|
||||
# ── Sentence splitting ───────────────────────────────────────────
|
||||
|
||||
# Hebrew sentence terminators: period, exclamation, question mark, sof pasuk
|
||||
_SENT_SPLIT = re.compile(r"[.!?\u05C3]+")
|
||||
|
||||
# Punctuation to strip from word boundaries when matching
|
||||
_PUNCT = re.compile(
|
||||
r'^[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+|[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+$'
|
||||
)
|
||||
|
||||
|
||||
def _split_into_sentences(text: str, book_name: str) -> list[dict]:
|
||||
"""Split text into sentences and filter by length."""
|
||||
# Normalize whitespace
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
raw_sentences = _SENT_SPLIT.split(text)
|
||||
results = []
|
||||
seen = set()
|
||||
|
||||
for sent in raw_sentences:
|
||||
sent = sent.strip()
|
||||
if not sent:
|
||||
continue
|
||||
|
||||
# Count Hebrew words (skip non-Hebrew tokens like numbers)
|
||||
words = sent.split()
|
||||
hebrew_words = [w for w in words if any("\u0590" <= c <= "\u05ff" for c in w)]
|
||||
|
||||
if len(hebrew_words) < MIN_WORDS or len(hebrew_words) > MAX_WORDS:
|
||||
continue
|
||||
|
||||
# Skip duplicates
|
||||
stripped = strip_nikkud(sent)
|
||||
if stripped in seen:
|
||||
continue
|
||||
seen.add(stripped)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"text": sent,
|
||||
"book": book_name,
|
||||
"stripped": stripped,
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ── Vocab loading ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def load_vocab(csv_path: Path) -> dict:
|
||||
"""Load vocab CSV and return {stripped_form: nikkud_word} mapping.
|
||||
|
||||
Also returns reverse mapping for lookup.
|
||||
Returns (word_to_nikkud, nikkud_words_set)
|
||||
"""
|
||||
words_by_stripped: dict[str, list[str]] = {} # stripped -> [nikkud words]
|
||||
|
||||
with open(csv_path, encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f, delimiter=";")
|
||||
for row in reader:
|
||||
nikkud_word = row.get("Word", "").strip()
|
||||
word_no_nik = row.get("Word Without Nikkud", "").strip()
|
||||
if not nikkud_word:
|
||||
continue
|
||||
|
||||
# Method 1: strip nikkud from the Word column
|
||||
stripped_from_nikkud = strip_nikkud(nikkud_word)
|
||||
|
||||
# Add both forms for matching
|
||||
for form in {stripped_from_nikkud, word_no_nik}:
|
||||
if form:
|
||||
words_by_stripped.setdefault(form, []).append(nikkud_word)
|
||||
|
||||
return words_by_stripped
|
||||
|
||||
|
||||
# ── Matching ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def match_sentences(sentences: list[dict], words_by_stripped: dict) -> dict:
|
||||
"""Match sentences against vocab words.
|
||||
|
||||
Returns {nikkud_word: [sentences]} with best (shortest) first.
|
||||
"""
|
||||
# Build a set of all stripped forms for fast lookup
|
||||
all_forms = set(words_by_stripped.keys())
|
||||
|
||||
# Hebrew single-letter prefixes: ב, ה, ו, כ, ל, מ, ש, ד (של)
|
||||
_HEB_PREFIXES = set("בהוכלמשד")
|
||||
|
||||
# For each sentence, extract stripped words
|
||||
matches: dict[str, list[tuple[int, str]]] = {} # nikkud_word -> [(word_count, sentence)]
|
||||
|
||||
for sent_info in sentences:
|
||||
sent_text = sent_info["text"]
|
||||
sent_stripped = sent_info["stripped"]
|
||||
word_count = len(sent_text.split())
|
||||
|
||||
# Get stripped words from the sentence
|
||||
raw_words = sent_stripped.split()
|
||||
# Map: candidate_form -> set of original cleaned words that produced it
|
||||
# This lets us verify that prefix stripping is plausible
|
||||
candidates: dict[str, str] = {} # form -> original_word
|
||||
for w in raw_words:
|
||||
cleaned = _PUNCT.sub("", w)
|
||||
if not cleaned:
|
||||
continue
|
||||
# Direct match (always try)
|
||||
candidates[cleaned] = cleaned
|
||||
# Prefix stripping: only if remaining stem is >= 2 chars
|
||||
# and the prefix char is a known Hebrew prefix letter
|
||||
for prefix_len in (1, 2):
|
||||
if len(cleaned) > prefix_len + 1:
|
||||
prefix = cleaned[:prefix_len]
|
||||
stem = cleaned[prefix_len:]
|
||||
if all(c in _HEB_PREFIXES for c in prefix) and len(stem) >= 2:
|
||||
candidates[stem] = cleaned
|
||||
|
||||
# Check which vocab words appear in this sentence
|
||||
matched_forms = set(candidates.keys()) & all_forms
|
||||
for form in matched_forms:
|
||||
# Skip spurious matches: very short vocab forms (1-2 chars)
|
||||
# should only match via direct word match, not prefix stripping
|
||||
if len(form) <= 2 and form not in {_PUNCT.sub("", w) for w in raw_words}:
|
||||
continue
|
||||
for nikkud_word in words_by_stripped[form]:
|
||||
matches.setdefault(nikkud_word, []).append((word_count, sent_text))
|
||||
|
||||
# Sort by word count (prefer shorter sentences) and deduplicate
|
||||
result = {}
|
||||
for nikkud_word, sent_list in matches.items():
|
||||
sent_list.sort(key=lambda x: x[0])
|
||||
seen = set()
|
||||
unique = []
|
||||
for _, sent in sent_list:
|
||||
if sent not in seen:
|
||||
seen.add(sent)
|
||||
unique.append(sent)
|
||||
if len(unique) >= 5: # Keep top 5 per word
|
||||
break
|
||||
result[nikkud_word] = unique
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print("EPUB Example Sentence Extraction Pipeline")
|
||||
print("=" * 60)
|
||||
|
||||
# Step 1: Extract sentences from all books
|
||||
all_sentences = []
|
||||
book_counts = {}
|
||||
|
||||
for filename, book_name in EPUB_BOOKS.items():
|
||||
path = EPUB_DIR / filename
|
||||
if not path.exists():
|
||||
print(f"\n[SKIP] {filename} not found")
|
||||
continue
|
||||
print(f"\n[EPUB] Extracting: {book_name} ({filename})")
|
||||
sentences = extract_sentences_from_epub(path, book_name)
|
||||
book_counts[book_name] = len(sentences)
|
||||
all_sentences.extend(sentences)
|
||||
print(f" -> {len(sentences)} sentences")
|
||||
|
||||
for filename, book_name in PDF_BOOKS.items():
|
||||
path = EPUB_DIR / filename
|
||||
if not path.exists():
|
||||
print(f"\n[SKIP] {filename} not found")
|
||||
continue
|
||||
print(f"\n[PDF] Extracting: {book_name} ({filename})")
|
||||
sentences = extract_sentences_from_pdf(path, book_name)
|
||||
book_counts[book_name] = len(sentences)
|
||||
all_sentences.extend(sentences)
|
||||
print(f" -> {len(sentences)} sentences")
|
||||
|
||||
print(f"\nTotal sentences: {len(all_sentences)}")
|
||||
|
||||
# Step 2: Save sentence index
|
||||
index_path = DATA_DIR / "epub_sentence_index.json"
|
||||
with open(index_path, "w", encoding="utf-8") as f:
|
||||
json.dump({"sentences": all_sentences}, f, ensure_ascii=False, indent=2)
|
||||
print(f"\nSaved sentence index: {index_path}")
|
||||
|
||||
# Step 3: Load vocab and match
|
||||
print(f"\nLoading vocab from {DICT_CSV} ...")
|
||||
words_by_stripped = load_vocab(DICT_CSV)
|
||||
total_vocab = len({w for wlist in words_by_stripped.values() for w in wlist})
|
||||
print(f" {total_vocab} unique vocab words ({len(words_by_stripped)} lookup forms)")
|
||||
|
||||
print("\nMatching sentences against vocab ...")
|
||||
examples_cache = match_sentences(all_sentences, words_by_stripped)
|
||||
|
||||
# Step 4: Save examples_cache
|
||||
cache_path = DATA_DIR / "examples_cache.json"
|
||||
with open(cache_path, "w", encoding="utf-8") as f:
|
||||
json.dump(examples_cache, f, ensure_ascii=False, indent=2)
|
||||
print(f"Saved examples cache: {cache_path}")
|
||||
|
||||
# Step 5: Summary stats
|
||||
print("\n" + "=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
print("\nSentences per book:")
|
||||
for book_name, count in book_counts.items():
|
||||
print(f" {book_name}: {count}")
|
||||
print(f" Total: {len(all_sentences)}")
|
||||
|
||||
print("\nVocab matching:")
|
||||
print(f" Total vocab words: {total_vocab}")
|
||||
print(f" Words with examples: {len(examples_cache)}")
|
||||
coverage = 100 * len(examples_cache) / total_vocab if total_vocab else 0
|
||||
print(f" Coverage: {coverage:.1f}%")
|
||||
|
||||
# Show some sample matches
|
||||
print("\nSample matches:")
|
||||
count = 0
|
||||
for word, sents in examples_cache.items():
|
||||
if count >= 5:
|
||||
break
|
||||
print(f" {word} -> {sents[0][:60]}...")
|
||||
count += 1
|
||||
|
||||
return examples_cache
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,225 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Extract Hebrew vocabulary from pealim.com dictionary.
|
||||
Scrapes word entries, roots, parts of speech, and audio URLs for Anki flashcards.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Session for connection pooling
|
||||
session = requests.Session()
|
||||
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"})
|
||||
|
||||
PEALIM_DICT_URL = "https://www.pealim.com/dict/"
|
||||
REQUEST_DELAY = 1.5 # seconds between requests (respectful scraping)
|
||||
REQUEST_TIMEOUT = 10 # seconds
|
||||
|
||||
|
||||
def get_total_pages() -> int:
|
||||
"""Dynamically determine total pages from first request."""
|
||||
try:
|
||||
logger.info("Fetching total page count...")
|
||||
cookies = {"translit": "none", "hebstyle": "mo"}
|
||||
response = session.get(PEALIM_DICT_URL, cookies=cookies, timeout=REQUEST_TIMEOUT)
|
||||
response.raise_for_status()
|
||||
# Hardcoded — pealim.com has ~608 pages at ~15 words/page
|
||||
return 608
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching page count: {e}. Using default (608).")
|
||||
return 608
|
||||
|
||||
|
||||
def _parse_page_with_audio(html_bytes: bytes) -> list[dict]:
|
||||
"""
|
||||
Parse a dict page with BeautifulSoup to extract word data + audio URL.
|
||||
Returns list of dicts with keys: Word, Root, Part of Speech, Meaning, audio_url, slug.
|
||||
"""
|
||||
soup = BeautifulSoup(html_bytes, "html.parser")
|
||||
rows = []
|
||||
for tr in soup.select("table tr"):
|
||||
tds = tr.find_all("td")
|
||||
if len(tds) < 4:
|
||||
continue
|
||||
# Audio URL from span[data-audio] in first td
|
||||
audio_span = tds[0].find(attrs={"data-audio": True})
|
||||
audio_url = audio_span["data-audio"] if audio_span else ""
|
||||
# Slug from the detail page link (e.g., /dict/6009-av/ → 6009-av)
|
||||
slug = ""
|
||||
link = tds[0].find("a", href=True)
|
||||
if link:
|
||||
m = re.search(r"/dict/([^/]+)/", link["href"])
|
||||
if m:
|
||||
slug = m.group(1)
|
||||
# Word with nikkud
|
||||
menukad = tds[0].find("span", class_="menukad")
|
||||
word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True)
|
||||
# Root (may be link or plain text)
|
||||
root = tds[1].get_text(strip=True)
|
||||
# Part of speech
|
||||
pos = tds[2].get_text(strip=True)
|
||||
# Meaning
|
||||
meaning = tds[3].get_text(strip=True)
|
||||
if word:
|
||||
rows.append(
|
||||
{
|
||||
"Word": word,
|
||||
"Root": root if root else "-",
|
||||
"Part of Speech": pos,
|
||||
"Meaning": meaning,
|
||||
"audio_url": audio_url,
|
||||
"slug": slug,
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def extract_from_website(max_pages: int | None = None) -> pd.DataFrame:
|
||||
"""
|
||||
Extract dictionary entries from pealim.com.
|
||||
Captures audio URLs from each word entry's data-audio attribute.
|
||||
|
||||
Args:
|
||||
max_pages: Maximum pages to scrape (None = all)
|
||||
|
||||
Returns:
|
||||
DataFrame with Word, Root, Part of Speech, Meaning, Word Without Nikkud, audio_url columns
|
||||
"""
|
||||
total_pages = max_pages or get_total_pages()
|
||||
logger.info(f"Starting extraction from {total_pages} pages...")
|
||||
|
||||
all_rows: list[dict] = []
|
||||
|
||||
for page_num in range(1, total_pages + 1):
|
||||
try:
|
||||
url = f"{PEALIM_DICT_URL}?page={page_num}"
|
||||
|
||||
# First request: with nikkud — parse with BeautifulSoup for audio URL
|
||||
cookies = {"translit": "none", "hebstyle": "mo"}
|
||||
response = session.get(url, cookies=cookies, timeout=REQUEST_TIMEOUT)
|
||||
response.raise_for_status()
|
||||
page_rows = _parse_page_with_audio(response.content)
|
||||
|
||||
# Second request: without nikkud — just get the word column
|
||||
cookies_vl = {"translit": "none", "hebstyle": "vl", "showmeaning": "off"}
|
||||
resp_vl = session.get(url, cookies=cookies_vl, timeout=REQUEST_TIMEOUT)
|
||||
resp_vl.raise_for_status()
|
||||
soup_vl = BeautifulSoup(resp_vl.content, "html.parser")
|
||||
no_nik_words = []
|
||||
for tr in soup_vl.select("table tr"):
|
||||
tds = tr.find_all("td")
|
||||
if len(tds) < 4:
|
||||
continue
|
||||
menukad = tds[0].find("span", class_="menukad")
|
||||
w = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True)
|
||||
no_nik_words.append(w)
|
||||
|
||||
# Merge no-nikkud words into rows
|
||||
for i, row in enumerate(page_rows):
|
||||
row["Word Without Nikkud"] = no_nik_words[i] if i < len(no_nik_words) else ""
|
||||
|
||||
all_rows.extend(page_rows)
|
||||
|
||||
if page_num % 50 == 0:
|
||||
logger.info(f"Processed {page_num}/{total_pages} pages ({len(all_rows)} words so far)...")
|
||||
|
||||
time.sleep(REQUEST_DELAY)
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Error fetching page {page_num}: {e}. Retrying...")
|
||||
time.sleep(REQUEST_DELAY * 2)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error on page {page_num}: {e}")
|
||||
continue
|
||||
|
||||
df = pd.DataFrame(all_rows)
|
||||
audio_count = (df["audio_url"] != "").sum() if "audio_url" in df.columns else 0
|
||||
logger.info(f"Extraction complete. Total words: {len(df)}, with audio URL: {audio_count}")
|
||||
return df
|
||||
|
||||
|
||||
def modify_for_anki(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Transform dictionary DataFrame for Anki import.
|
||||
Adds shared root words and Hebrew tags. Preserves audio_url column.
|
||||
"""
|
||||
logger.info("Preparing data for Anki...")
|
||||
|
||||
# Find shared root words
|
||||
shared_root_words = []
|
||||
for _idx, row in df.iterrows():
|
||||
root = row["Root"]
|
||||
word = row["Word"]
|
||||
|
||||
if root != "-" and pd.notna(root):
|
||||
same_root = df[(df["Root"] == root) & (df["Word"] != word)]["Word"].values
|
||||
shared = " ".join(str(w) for w in same_root)
|
||||
shared_root_words.append(shared)
|
||||
else:
|
||||
shared_root_words.append("")
|
||||
|
||||
df["shared roots"] = shared_root_words
|
||||
|
||||
# Generate Hebrew tags
|
||||
tags = []
|
||||
for _idx, row in df.iterrows():
|
||||
tag_parts = []
|
||||
|
||||
root = str(row["Root"]).replace(" ", "").replace("-", "")
|
||||
if "nan" not in root and root:
|
||||
root_clean = root.replace(".", "")
|
||||
tag_parts.append(f"שורש::{root_clean}")
|
||||
|
||||
pos = str(row["Part of Speech"])
|
||||
pos_tags = {
|
||||
"Adverb": "תוארי_הפועל",
|
||||
"Pronoun": "כינויי_גוף",
|
||||
"Noun": "שם_עצם",
|
||||
"Verb": "פעלים",
|
||||
"Adjective": "שם_תואר",
|
||||
"Preposition": "מילות_יחס",
|
||||
"Conjunction": "מילות_חיבור",
|
||||
"Particle": "מילית",
|
||||
}
|
||||
|
||||
for key, value in pos_tags.items():
|
||||
if key in pos:
|
||||
tag_parts.append(value)
|
||||
break
|
||||
|
||||
tags.append(" ".join(tag_parts))
|
||||
|
||||
df["tags"] = tags
|
||||
logger.info("Anki preparation complete.")
|
||||
return df
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
try:
|
||||
df = extract_from_website()
|
||||
df.to_csv("hebrew_dict.csv", index=True)
|
||||
logger.info("Saved: hebrew_dict.csv")
|
||||
|
||||
df = modify_for_anki(df)
|
||||
df.to_csv("hebrew_dict_for_anki.csv", sep=";", index=True)
|
||||
logger.info("Saved: hebrew_dict_for_anki.csv")
|
||||
|
||||
logger.info("Complete!")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,183 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Rebuild vocab_sentence_matches.json using both direct word matching
|
||||
and ktiv male conjugated/declined form matching.
|
||||
|
||||
This dramatically improves sentence coverage by matching not just
|
||||
dictionary forms but all conjugated verbs and declined nouns.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from helpers import strip_nikkud as _strip_nikkud
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DATA_DIR = Path(__file__).parent / "data"
|
||||
|
||||
|
||||
def main():
|
||||
# Load sentences
|
||||
with open(DATA_DIR / "epub_sentence_index.json") as f:
|
||||
sentences = json.load(f).get("sentences", [])
|
||||
logger.info(f"Loaded {len(sentences)} sentences")
|
||||
|
||||
# Load vocab CSV
|
||||
csv_path = DATA_DIR / "hebrew_dict_for_anki.csv"
|
||||
try:
|
||||
df = pd.read_csv(csv_path, sep=";", index_col=0)
|
||||
if df.shape[1] < 3:
|
||||
raise ValueError
|
||||
except (ValueError, pd.errors.ParserError):
|
||||
df = pd.read_csv(csv_path, index_col=0)
|
||||
logger.info(f"Loaded {len(df)} vocab entries")
|
||||
|
||||
# Build word lookup: stripped_form → (word_nikkud, word_no_nikkud)
|
||||
word_lookup: dict[str, list[tuple[str, str]]] = {}
|
||||
for _, row in df.iterrows():
|
||||
word = str(row.get("Word", "")).strip()
|
||||
wni = str(row.get("Word Without Nikkud", "")).strip()
|
||||
if not word or word in ("nan", "None"):
|
||||
continue
|
||||
stripped = _strip_nikkud(word)
|
||||
if stripped:
|
||||
word_lookup.setdefault(stripped, []).append((word, wni))
|
||||
|
||||
# Load ktiv male forms: ktiv_male_form → [{word_nikkud, form_type, ...}]
|
||||
ktiv_path = DATA_DIR / "ktiv_male_forms.json"
|
||||
ktiv_forms: dict[str, list[dict]] = {}
|
||||
if ktiv_path.exists():
|
||||
with open(ktiv_path) as f:
|
||||
ktiv_forms = json.load(f)
|
||||
logger.info(f"Loaded {len(ktiv_forms)} ktiv male forms")
|
||||
else:
|
||||
logger.warning("No ktiv_male_forms.json — only using direct matching")
|
||||
|
||||
# Build reverse lookup: ktiv_male → set of dictionary words (nikkud)
|
||||
ktiv_to_word: dict[str, set[str]] = {}
|
||||
for ktiv, entries in ktiv_forms.items():
|
||||
for entry in entries:
|
||||
word_nikkud = entry.get("word_nikkud", "")
|
||||
if word_nikkud:
|
||||
ktiv_to_word.setdefault(ktiv, set()).add(word_nikkud)
|
||||
|
||||
# Also add all vocab words' own stripped forms to ktiv_to_word
|
||||
for stripped, entries in word_lookup.items():
|
||||
for word_nikkud, _ in entries:
|
||||
ktiv_to_word.setdefault(stripped, set()).add(word_nikkud)
|
||||
|
||||
logger.info(f"Total matchable forms: {len(ktiv_to_word)}")
|
||||
|
||||
# Tokenize all sentences once
|
||||
sentence_tokens: list[tuple[dict, list[str]]] = []
|
||||
for s in sentences:
|
||||
stripped = s.get("stripped", _strip_nikkud(s.get("text", "")))
|
||||
tokens = [re.sub(r'[.,!?;:"\'\u05be]', "", t) for t in stripped.split()]
|
||||
tokens = [t for t in tokens if t] # remove empty
|
||||
sentence_tokens.append((s, tokens))
|
||||
|
||||
# Match: for each sentence token, check ktiv_to_word lookup
|
||||
# Build word_nikkud → [sentence_info]
|
||||
matches: dict[str, list[dict]] = {} # word_nikkud → [sentences]
|
||||
|
||||
for sent, tokens in sentence_tokens:
|
||||
text = sent.get("text", "")
|
||||
book = sent.get("book", "")
|
||||
word_len = len(tokens)
|
||||
|
||||
# Skip sentences that are too short or too long
|
||||
if word_len < 4 or word_len > 15:
|
||||
continue
|
||||
|
||||
for tok in tokens:
|
||||
if tok in ktiv_to_word:
|
||||
for word_nikkud in ktiv_to_word[tok]:
|
||||
matches.setdefault(word_nikkud, []).append(
|
||||
{
|
||||
"text": text,
|
||||
"book": book,
|
||||
"matched_form": tok,
|
||||
"word_count": word_len,
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(f"Words with at least 1 match: {len(matches)}")
|
||||
|
||||
# Deduplicate and limit to 3 best sentences per word
|
||||
# Prefer shorter sentences (6-12 words ideal)
|
||||
output: dict[str, dict] = {}
|
||||
for word_nikkud, sents in matches.items():
|
||||
# Deduplicate by text
|
||||
seen_texts = set()
|
||||
unique = []
|
||||
for s in sents:
|
||||
if s["text"] not in seen_texts:
|
||||
seen_texts.add(s["text"])
|
||||
unique.append(s)
|
||||
|
||||
# Score: prefer 6-12 word sentences
|
||||
def score(s):
|
||||
wc = s["word_count"]
|
||||
if 6 <= wc <= 12:
|
||||
return 0 # ideal
|
||||
return abs(wc - 9) # distance from ideal
|
||||
|
||||
unique.sort(key=score)
|
||||
best = unique[:3]
|
||||
|
||||
# Find the Word Without Nikkud for this word
|
||||
stripped = _strip_nikkud(word_nikkud)
|
||||
wni = stripped # default
|
||||
if stripped in word_lookup:
|
||||
for wn, w_wni in word_lookup[stripped]:
|
||||
if wn == word_nikkud:
|
||||
wni = w_wni
|
||||
break
|
||||
|
||||
output[wni] = {
|
||||
"word_nikkud": word_nikkud,
|
||||
"sentences": [{"text": s["text"], "book": s["book"]} for s in best],
|
||||
}
|
||||
|
||||
# Save
|
||||
out_path = DATA_DIR / "vocab_sentence_matches.json"
|
||||
with open(out_path, "w") as f:
|
||||
json.dump(output, f, ensure_ascii=False, indent=1)
|
||||
|
||||
total_sents = sum(len(v["sentences"]) for v in output.values())
|
||||
logger.info(f"Saved {len(output)} words with {total_sents} sentences → {out_path}")
|
||||
|
||||
# Stats
|
||||
total_vocab = len(df)
|
||||
pct = len(output) * 100 / total_vocab
|
||||
logger.info(f"Coverage: {len(output)}/{total_vocab} ({pct:.1f}%)")
|
||||
|
||||
# Breakdown by match type
|
||||
direct_only = 0
|
||||
ktiv_only = 0
|
||||
both = 0
|
||||
for _wni, info in output.items():
|
||||
word = info["word_nikkud"]
|
||||
stripped = _strip_nikkud(word)
|
||||
has_direct = stripped in word_lookup
|
||||
has_ktiv = any(s.get("matched_form", "") != stripped for s in info["sentences"])
|
||||
if has_direct and has_ktiv:
|
||||
both += 1
|
||||
elif has_ktiv:
|
||||
ktiv_only += 1
|
||||
else:
|
||||
direct_only += 1
|
||||
|
||||
logger.info(f" Direct matches only: {direct_only}")
|
||||
logger.info(f" Ktiv male matches only: {ktiv_only}")
|
||||
logger.info(f" Both: {both}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,57 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""One-time script: scrape slugs from pealim.com dict pages and add to CSV."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", stream=sys.stderr)
|
||||
logger = logging.getLogger()
|
||||
|
||||
dict_csv = "data/hebrew_dict_for_anki.csv"
|
||||
df = pd.read_csv(dict_csv, sep=";", index_col=0)
|
||||
logger.info(f"Loaded {len(df)} rows")
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"})
|
||||
|
||||
word_slug_map: dict[str, str] = {}
|
||||
total_pages = 608
|
||||
|
||||
for page_num in range(1, total_pages + 1):
|
||||
url = f"https://www.pealim.com/dict/?page={page_num}"
|
||||
cookies = {"translit": "none", "hebstyle": "mo"}
|
||||
try:
|
||||
resp = session.get(url, cookies=cookies, timeout=10)
|
||||
resp.raise_for_status()
|
||||
soup = BeautifulSoup(resp.content, "html.parser")
|
||||
for tr in soup.select("table tr"):
|
||||
tds = tr.find_all("td")
|
||||
if len(tds) < 4:
|
||||
continue
|
||||
menukad = tds[0].find("span", class_="menukad")
|
||||
word = menukad.get_text(strip=True) if menukad else tds[0].get_text(strip=True)
|
||||
link = tds[0].find("a", href=True)
|
||||
slug = ""
|
||||
if link:
|
||||
m = re.search(r"/dict/([^/]+)/", link["href"])
|
||||
if m:
|
||||
slug = m.group(1)
|
||||
if word and slug:
|
||||
word_slug_map[word] = slug
|
||||
except Exception as e:
|
||||
logger.warning(f"Page {page_num} failed: {e}")
|
||||
|
||||
if page_num % 50 == 0:
|
||||
logger.info(f"Scraped {page_num}/{total_pages} pages ({len(word_slug_map)} slugs)")
|
||||
time.sleep(0.8)
|
||||
|
||||
df["slug"] = df["Word"].map(word_slug_map).fillna("")
|
||||
df.to_csv(dict_csv, sep=";", index=True)
|
||||
matched = (df["slug"] != "").sum()
|
||||
logger.info(f"Done. {matched}/{len(df)} words have slugs. Saved → {dict_csv}")
|
||||
|
|
@ -1,405 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Extract sentences from PDF books and match vocab words to sentences.
|
||||
|
||||
1. Extract sentences from alice.pdf and lion_strawberry.pdf
|
||||
2. Merge into existing epub_sentence_index.json
|
||||
3. Match vocab words to sentences, produce vocab_sentence_matches.json
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
# Use the venv with pymupdf
|
||||
sys.path.insert(0, "/home/node/projects/pealim/venv_pdf/lib/python3.11/site-packages")
|
||||
# Also need the main venv for pandas
|
||||
sys.path.insert(0, "/home/node/projects/pealim/lib/python3.11/site-packages")
|
||||
|
||||
import fitz
|
||||
import pandas as pd
|
||||
|
||||
BASE_DIR = "/home/node/projects/pealim"
|
||||
DATA_DIR = os.path.join(BASE_DIR, "data")
|
||||
EPUBS_DIR = os.path.join(DATA_DIR, "epubs")
|
||||
SENTENCE_INDEX = os.path.join(DATA_DIR, "epub_sentence_index.json")
|
||||
VOCAB_CSV = os.path.join(DATA_DIR, "hebrew_dict_for_anki.csv")
|
||||
MATCHES_FILE = os.path.join(DATA_DIR, "vocab_sentence_matches.json")
|
||||
|
||||
NIKKUD_RE = re.compile(r"[\u0591-\u05C7]")
|
||||
HEBREW_RE = re.compile(r"[\u05d0-\u05ea]")
|
||||
HEBREW_CHAR_RE = re.compile(r"[\u05d0-\u05ea\ufb20-\ufb4f]")
|
||||
|
||||
|
||||
def strip_nikkud(text):
|
||||
"""Remove all Hebrew nikkud/cantillation marks."""
|
||||
return NIKKUD_RE.sub("", text)
|
||||
|
||||
|
||||
def collapse_hebrew_spaces(text):
|
||||
"""Collapse spaces between Hebrew letter fragments (for badly-encoded PDFs).
|
||||
|
||||
Strategy: strip nikkud first, then iteratively remove spaces between
|
||||
Hebrew characters. Real word boundaries are detected by:
|
||||
- Final-form letters (ם ן ף ך ץ) followed by space
|
||||
- Punctuation (.,;:!?"')
|
||||
- Non-Hebrew characters
|
||||
"""
|
||||
stripped = strip_nikkud(text)
|
||||
# Normalize presentation forms to standard Hebrew
|
||||
# FB20-FB4F contains presentation forms
|
||||
for code in range(0xFB2A, 0xFB50):
|
||||
ch = chr(code)
|
||||
if ch in stripped:
|
||||
# Map shin/sin dots, dagesh forms back to base
|
||||
# FB2A = שׁ (shin+dot), FB2B = שׂ (sin+dot)
|
||||
base_map = {
|
||||
"\ufb2a": "ש",
|
||||
"\ufb2b": "ש",
|
||||
"\ufb35": "ו",
|
||||
"\ufb4b": "ו",
|
||||
"\ufb30": "א",
|
||||
"\ufb31": "ב",
|
||||
"\ufb32": "ג",
|
||||
"\ufb33": "ד",
|
||||
"\ufb34": "ה",
|
||||
"\ufb36": "ז",
|
||||
"\ufb38": "ט",
|
||||
"\ufb39": "י",
|
||||
"\ufb3a": "כ",
|
||||
"\ufb3b": "כ",
|
||||
"\ufb3c": "ל",
|
||||
"\ufb3e": "מ",
|
||||
"\ufb40": "נ",
|
||||
"\ufb41": "ס",
|
||||
"\ufb43": "פ",
|
||||
"\ufb44": "פ",
|
||||
"\ufb46": "צ",
|
||||
"\ufb47": "ק",
|
||||
"\ufb48": "ר",
|
||||
"\ufb49": "ש",
|
||||
"\ufb4a": "ת",
|
||||
}
|
||||
if ch in base_map:
|
||||
stripped = stripped.replace(ch, base_map[ch])
|
||||
|
||||
# Replace multiple spaces with single
|
||||
stripped = re.sub(r" {2,}", " ", stripped)
|
||||
|
||||
# Now rebuild text, keeping spaces only at word boundaries
|
||||
# Word boundary markers: final-form letters, punctuation, non-Hebrew
|
||||
final_forms = set("םןףךץ")
|
||||
result = []
|
||||
i = 0
|
||||
chars = list(stripped)
|
||||
|
||||
while i < len(chars):
|
||||
if chars[i] != " ":
|
||||
result.append(chars[i])
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# It's a space. Decide if it's a word boundary.
|
||||
# Look back for the last non-space character
|
||||
prev_ch = None
|
||||
for j in range(len(result) - 1, -1, -1):
|
||||
if result[j] != " ":
|
||||
prev_ch = result[j]
|
||||
break
|
||||
|
||||
# Look forward for next non-space character
|
||||
next_ch = None
|
||||
for j in range(i + 1, len(chars)):
|
||||
if chars[j] != " ":
|
||||
next_ch = chars[j]
|
||||
break
|
||||
|
||||
is_boundary = False
|
||||
|
||||
# After final-form letter = word boundary
|
||||
if prev_ch and prev_ch in final_forms:
|
||||
is_boundary = True
|
||||
|
||||
# Before/after punctuation or non-Hebrew = word boundary
|
||||
if prev_ch and not HEBREW_RE.match(prev_ch):
|
||||
is_boundary = True
|
||||
if next_ch and not HEBREW_RE.match(next_ch):
|
||||
is_boundary = True
|
||||
|
||||
# If either side is not Hebrew at all, boundary
|
||||
if prev_ch is None or next_ch is None:
|
||||
is_boundary = True
|
||||
|
||||
if is_boundary:
|
||||
result.append(" ")
|
||||
# else: skip the space (collapse intra-word gap)
|
||||
i += 1
|
||||
|
||||
return "".join(result).strip()
|
||||
|
||||
|
||||
def extract_pdf_sentences(pdf_path, book_name):
|
||||
"""Extract sentences from a PDF file."""
|
||||
doc = fitz.open(pdf_path)
|
||||
sentences = []
|
||||
|
||||
for page_num in range(len(doc)):
|
||||
page = doc[page_num]
|
||||
text = page.get_text()
|
||||
|
||||
if not text.strip():
|
||||
continue
|
||||
|
||||
# Split into lines first, then split on sentence-ending punctuation
|
||||
lines = text.split("\n")
|
||||
|
||||
raw_sentences = []
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
# Split on sentence-ending punctuation followed by space or at end
|
||||
parts = re.split(r"(?<=[.?!])\s+", line)
|
||||
raw_sentences.extend(parts)
|
||||
|
||||
for sent in raw_sentences:
|
||||
sent = sent.strip()
|
||||
if not sent:
|
||||
continue
|
||||
|
||||
# Must contain Hebrew characters
|
||||
if not HEBREW_RE.search(sent):
|
||||
continue
|
||||
|
||||
# Create stripped version (no nikkud, collapsed spaces for PDF)
|
||||
stripped = collapse_hebrew_spaces(sent)
|
||||
|
||||
# Count Hebrew words in stripped version
|
||||
words = [w for w in stripped.split() if HEBREW_RE.search(w)]
|
||||
word_count = len(words)
|
||||
|
||||
# Filter: 4-15 Hebrew words
|
||||
if word_count < 4 or word_count > 15:
|
||||
continue
|
||||
|
||||
# Drop metadata-like lines
|
||||
# Page numbers (just digits)
|
||||
if re.match(r"^\d+$", sent.strip()):
|
||||
continue
|
||||
# Copyright text
|
||||
if any(kw in sent.lower() for kw in ["copyright", "©", "isbn", "printed in"]):
|
||||
continue
|
||||
|
||||
sentences.append(
|
||||
{
|
||||
"text": sent,
|
||||
"book": book_name,
|
||||
"stripped": stripped,
|
||||
}
|
||||
)
|
||||
|
||||
doc.close()
|
||||
return sentences
|
||||
|
||||
|
||||
def has_extractable_text(pdf_path):
|
||||
"""Check if a PDF has extractable text."""
|
||||
doc = fitz.open(pdf_path)
|
||||
text_found = False
|
||||
for i in range(min(len(doc), 10)):
|
||||
if doc[i].get_text().strip():
|
||||
text_found = True
|
||||
break
|
||||
doc.close()
|
||||
return text_found
|
||||
|
||||
|
||||
def load_sentence_index():
|
||||
"""Load existing sentence index."""
|
||||
if os.path.exists(SENTENCE_INDEX):
|
||||
with open(SENTENCE_INDEX, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
return {"sentences": []}
|
||||
|
||||
|
||||
def save_sentence_index(data):
|
||||
"""Save sentence index."""
|
||||
with open(SENTENCE_INDEX, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def match_vocab_to_sentences(sentences, vocab_df):
|
||||
"""Match vocab words to sentences."""
|
||||
matches = {}
|
||||
|
||||
# Build lookup: word_no_nikkud -> word_nikkud
|
||||
vocab_words = []
|
||||
for _, row in vocab_df.iterrows():
|
||||
word_no_nik = str(row.get("Word Without Nikkud", "")).strip()
|
||||
word_nik = str(row.get("Word", "")).strip()
|
||||
if word_no_nik and word_nik:
|
||||
vocab_words.append((word_no_nik, word_nik))
|
||||
|
||||
print(f"Matching {len(vocab_words)} vocab words against {len(sentences)} sentences...")
|
||||
|
||||
# Precompute: for each sentence, get the stripped text
|
||||
sent_data = []
|
||||
for s in sentences:
|
||||
stripped = s.get("stripped", "")
|
||||
# For PDF sentences, stripped already has collapsed spaces but words may be joined
|
||||
# For EPUB sentences, stripped has proper word spacing
|
||||
sent_data.append(
|
||||
{
|
||||
"text": s["text"],
|
||||
"book": s["book"],
|
||||
"stripped": stripped,
|
||||
"word_count": len(stripped.split()),
|
||||
}
|
||||
)
|
||||
|
||||
matched_count = 0
|
||||
|
||||
for word_no_nik, word_nik in vocab_words:
|
||||
if len(word_no_nik) < 2:
|
||||
continue
|
||||
|
||||
# Build regex for word boundary matching
|
||||
# Use both approaches: proper word boundary and substring for PDF text
|
||||
pattern = re.compile(r"(?:^|\s)" + re.escape(word_no_nik) + r"(?:\s|$)")
|
||||
# For PDF texts with collapsed spaces, also try substring match
|
||||
# but only for words >= 3 chars to avoid false positives
|
||||
use_substring = len(word_no_nik) >= 3
|
||||
|
||||
word_matches = []
|
||||
|
||||
for sd in sent_data:
|
||||
stripped = sd["stripped"]
|
||||
|
||||
# Try word-boundary match first
|
||||
if pattern.search(stripped):
|
||||
word_matches.append(sd)
|
||||
elif use_substring and word_no_nik in stripped:
|
||||
# Substring match for PDF texts with collapsed spaces
|
||||
# Verify it's not part of a longer word by checking the character
|
||||
# before and after in the collapsed text
|
||||
idx = stripped.find(word_no_nik)
|
||||
before_ok = idx == 0 or not HEBREW_RE.match(stripped[idx - 1])
|
||||
after_idx = idx + len(word_no_nik)
|
||||
after_ok = after_idx >= len(stripped) or not HEBREW_RE.match(stripped[after_idx])
|
||||
# Only count if at least one boundary is clear
|
||||
# (for PDF collapsed text, boundaries are often missing)
|
||||
# For PDF books, we accept substring matches
|
||||
if sd["book"] in ("אליס בארץ הפלאות", "האריה שאהב תות") or before_ok or after_ok:
|
||||
word_matches.append(sd)
|
||||
|
||||
if word_matches:
|
||||
matched_count += 1
|
||||
|
||||
# Sort by preference: 6-12 words ideal, then shorter is better
|
||||
def score(sd):
|
||||
wc = sd["word_count"]
|
||||
if 6 <= wc <= 12:
|
||||
return (0, wc) # ideal range, prefer shorter
|
||||
if wc < 6:
|
||||
return (1, -wc) # too short
|
||||
return (2, wc) # too long
|
||||
|
||||
word_matches.sort(key=score)
|
||||
best = word_matches[:3]
|
||||
|
||||
matches[word_no_nik] = {
|
||||
"word_nikkud": word_nik,
|
||||
"sentences": [{"text": m["text"], "book": m["book"]} for m in best],
|
||||
}
|
||||
|
||||
print(
|
||||
f"Words with at least 1 match: {matched_count}/{len(vocab_words)} ({100 * matched_count / len(vocab_words):.1f}%)"
|
||||
)
|
||||
return matches
|
||||
|
||||
|
||||
def main():
|
||||
# ── Step 1: Extract from PDFs ──
|
||||
pdfs = [
|
||||
("alice.pdf", "אליס בארץ הפלאות"),
|
||||
("lion_strawberry.pdf", "האריה שאהב תות"),
|
||||
]
|
||||
|
||||
all_new_sentences = []
|
||||
|
||||
for filename, book_name in pdfs:
|
||||
pdf_path = os.path.join(EPUBS_DIR, filename)
|
||||
if not os.path.exists(pdf_path):
|
||||
print(f"SKIP: {filename} not found")
|
||||
continue
|
||||
|
||||
if not has_extractable_text(pdf_path):
|
||||
print(f"SKIP: {filename} has no extractable text (likely scanned images)")
|
||||
continue
|
||||
|
||||
print(f"Extracting from {filename} ({book_name})...")
|
||||
sentences = extract_pdf_sentences(pdf_path, book_name)
|
||||
print(f" Extracted {len(sentences)} sentences")
|
||||
all_new_sentences.extend(sentences)
|
||||
|
||||
# ── Step 2: Merge with existing index ──
|
||||
index = load_sentence_index()
|
||||
existing_count = len(index["sentences"])
|
||||
|
||||
# Deduplicate by (stripped, book)
|
||||
existing_keys = set()
|
||||
for s in index["sentences"]:
|
||||
key = (s.get("stripped", ""), s.get("book", ""))
|
||||
existing_keys.add(key)
|
||||
|
||||
added = 0
|
||||
for s in all_new_sentences:
|
||||
key = (s["stripped"], s["book"])
|
||||
if key not in existing_keys:
|
||||
index["sentences"].append(s)
|
||||
existing_keys.add(key)
|
||||
added += 1
|
||||
|
||||
save_sentence_index(index)
|
||||
total = len(index["sentences"])
|
||||
print(f"\nSentence index: {existing_count} existing + {added} new = {total} total")
|
||||
|
||||
# ── Per-book stats ──
|
||||
book_counts = {}
|
||||
for s in index["sentences"]:
|
||||
book = s.get("book", "unknown")
|
||||
book_counts[book] = book_counts.get(book, 0) + 1
|
||||
|
||||
print("\nSentences per book:")
|
||||
for book, count in sorted(book_counts.items(), key=lambda x: -x[1]):
|
||||
print(f" {book}: {count}")
|
||||
|
||||
# ── Step 3: Match vocab words to sentences ──
|
||||
print(f"\nLoading vocab from {VOCAB_CSV}...")
|
||||
vocab_df = pd.read_csv(VOCAB_CSV, sep=";", index_col=0)
|
||||
print(f" {len(vocab_df)} vocab words loaded")
|
||||
|
||||
matches = match_vocab_to_sentences(index["sentences"], vocab_df)
|
||||
|
||||
with open(MATCHES_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(matches, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"\nWrote {len(matches)} word matches to {MATCHES_FILE}")
|
||||
|
||||
# ── Step 4: Summary stats ──
|
||||
total_words = len(vocab_df)
|
||||
matched_words = len(matches)
|
||||
print(f"\n{'=' * 50}")
|
||||
print("SUMMARY")
|
||||
print(f"{'=' * 50}")
|
||||
print(f"Total sentences: {total}")
|
||||
for book, count in sorted(book_counts.items(), key=lambda x: -x[1]):
|
||||
print(f" {book}: {count}")
|
||||
print(f"Total vocab words: {total_words}")
|
||||
print(f"Words with sentences: {matched_words} ({100 * matched_words / total_words:.1f}%)")
|
||||
print(f"Words without sentences: {total_words - matched_words}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,420 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Repair duplicate slugs in data/words.json.
|
||||
|
||||
Homographs (words with identical spelling but different meanings) were
|
||||
assigned the same slug by the scraper. This script fetches the pealim.com
|
||||
search page for each affected word, matches entries by meaning (and nikkud),
|
||||
and writes the corrected slugs back to words.json and the source CSV.
|
||||
|
||||
Usage:
|
||||
python3 scripts/repair_slugs.py [--dry-run]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from difflib import SequenceMatcher
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Paths
|
||||
# ---------------------------------------------------------------------------
|
||||
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||||
WORDS_JSON = PROJECT_ROOT / "data" / "words.json"
|
||||
CSV_PATH = PROJECT_ROOT / "data" / "hebrew_dict_for_anki.csv"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP session
|
||||
# ---------------------------------------------------------------------------
|
||||
SESSION = requests.Session()
|
||||
SESSION.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-scraper/1.0)"})
|
||||
COOKIES: dict[str, str] = {"translit": "none", "hebstyle": "mo"}
|
||||
REQUEST_DELAY = 1.5 # seconds between requests
|
||||
REQUEST_TIMEOUT = 15 # seconds
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logging
|
||||
# ---------------------------------------------------------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Similarity helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
FUZZY_THRESHOLD = 0.4
|
||||
|
||||
|
||||
def _similarity(a: str, b: str) -> float:
|
||||
"""Return SequenceMatcher ratio between two strings (both lowercased)."""
|
||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||||
|
||||
|
||||
def _best_match(
|
||||
our_meaning: str,
|
||||
candidates: list[dict],
|
||||
our_nikkud: str,
|
||||
) -> tuple[dict | None, float]:
|
||||
"""
|
||||
Return (best_candidate, ratio) by comparing our_meaning against each
|
||||
candidate's meaning field. Nikkud exact-match gives a bonus to break ties.
|
||||
"""
|
||||
best: dict | None = None
|
||||
best_score = -1.0
|
||||
|
||||
for cand in candidates:
|
||||
ratio = _similarity(our_meaning, cand["meaning"])
|
||||
# Nikkud exact match adds a small bonus so the right homograph wins
|
||||
# even when meanings are very similar
|
||||
if our_nikkud and cand["word"] == our_nikkud:
|
||||
ratio = min(1.0, ratio + 0.05)
|
||||
if ratio > best_score:
|
||||
best_score = ratio
|
||||
best = cand
|
||||
|
||||
return best, best_score
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Search-page parser
|
||||
# ---------------------------------------------------------------------------
|
||||
def _parse_search_results(html: bytes) -> list[dict]:
|
||||
"""
|
||||
Parse pealim.com search results page.
|
||||
|
||||
Each ``div.verb-search-result`` block contains:
|
||||
- div.verb-search-data > a[href] → slug
|
||||
- div.verb-search-lemma > span.menukad → nikkud word
|
||||
- div.verb-search-binyan → part of speech
|
||||
- div.verb-search-meaning → meaning text
|
||||
|
||||
Returns a list of dicts with keys: slug, word, pos, meaning.
|
||||
"""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
results: list[dict] = []
|
||||
|
||||
for block in soup.find_all("div", class_="verb-search-result"):
|
||||
data_div = block.find("div", class_="verb-search-data")
|
||||
if not data_div:
|
||||
continue
|
||||
|
||||
# Slug from the detail-page link
|
||||
slug = ""
|
||||
link = data_div.find("a", href=True)
|
||||
if link:
|
||||
m = re.search(r"/dict/([^/#]+)/", link["href"])
|
||||
if m:
|
||||
slug = m.group(1)
|
||||
|
||||
# Nikkud word
|
||||
lemma_div = block.find("div", class_="verb-search-lemma")
|
||||
menukad = lemma_div.find("span", class_="menukad") if lemma_div else None
|
||||
word = menukad.get_text(strip=True) if menukad else (lemma_div.get_text(strip=True) if lemma_div else "")
|
||||
|
||||
# Part of speech
|
||||
pos_div = block.find("div", class_="verb-search-binyan")
|
||||
pos = pos_div.get_text(strip=True).replace("Part of speech:", "").strip() if pos_div else ""
|
||||
|
||||
# Meaning
|
||||
meaning_div = block.find("div", class_="verb-search-meaning")
|
||||
meaning = meaning_div.get_text(strip=True) if meaning_div else ""
|
||||
|
||||
if slug:
|
||||
results.append({"slug": slug, "word": word, "pos": pos, "meaning": meaning})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _fetch_search_results(ktiv_male: str) -> list[dict]:
|
||||
"""Fetch and parse search results for a given consonant-only spelling."""
|
||||
url = f"https://www.pealim.com/search/?q={ktiv_male}"
|
||||
logger.debug("GET %s", url)
|
||||
resp = SESSION.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
return _parse_search_results(resp.content)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core logic
|
||||
# ---------------------------------------------------------------------------
|
||||
def find_duplicate_groups(data: dict) -> dict[str, list[str]]:
|
||||
"""
|
||||
Return mapping slug → [word_key, ...] for all slugs shared by 2+ entries.
|
||||
The word_key is the top-level key in words.json (nikkud + PoS + meaning).
|
||||
"""
|
||||
slug_to_keys: dict[str, list[str]] = defaultdict(list)
|
||||
for key, entry in data.items():
|
||||
slug = entry.get("slug", "")
|
||||
if slug:
|
||||
slug_to_keys[slug].append(key)
|
||||
return {slug: keys for slug, keys in slug_to_keys.items() if len(keys) > 1}
|
||||
|
||||
|
||||
def repair_group(
|
||||
slug: str,
|
||||
keys: list[str],
|
||||
data: dict,
|
||||
dry_run: bool,
|
||||
) -> tuple[int, int]:
|
||||
"""
|
||||
Attempt to repair one group of entries sharing *slug*.
|
||||
|
||||
Homographs can have different ktiv_male spellings (e.g. אבידה vs אבדה for
|
||||
the two spellings of אֲבֵדָה). We therefore build a union of all search
|
||||
results obtained by querying each distinct ktiv_male in the group.
|
||||
|
||||
Returns (fixed_count, skipped_count).
|
||||
"""
|
||||
# Collect distinct ktiv_male values across the group (usually one, but
|
||||
# sometimes two when homographs have different consonant spellings).
|
||||
ktiv_to_keys: dict[str, list[str]] = defaultdict(list)
|
||||
for k in keys:
|
||||
ktiv = data[k]["word"]["ktiv_male"]
|
||||
ktiv_to_keys[ktiv].append(k)
|
||||
|
||||
nikkud_word = data[keys[0]]["word"]["nikkud"]
|
||||
logger.info(
|
||||
" Fetching search results for %s — %d entries share slug %s",
|
||||
nikkud_word,
|
||||
len(keys),
|
||||
slug,
|
||||
)
|
||||
|
||||
# Fetch search results for every distinct ktiv_male and merge
|
||||
all_candidates: list[dict] = []
|
||||
seen_slugs: set[str] = set()
|
||||
for ktiv in ktiv_to_keys:
|
||||
try:
|
||||
results = _fetch_search_results(ktiv)
|
||||
except requests.RequestException as exc:
|
||||
logger.warning(" HTTP error for %s: %s", ktiv, exc)
|
||||
results = []
|
||||
for r in results:
|
||||
if r["slug"] not in seen_slugs:
|
||||
seen_slugs.add(r["slug"])
|
||||
all_candidates.append(r)
|
||||
if len(ktiv_to_keys) > 1:
|
||||
# Small delay between sub-queries within the same group
|
||||
time.sleep(REQUEST_DELAY)
|
||||
|
||||
if not all_candidates:
|
||||
logger.warning(" No search results — skipping group")
|
||||
return 0, len(keys)
|
||||
|
||||
# Filter candidates to those whose nikkud word matches the entry's nikkud.
|
||||
# This avoids accidentally matching a completely different word that shares
|
||||
# the same consonant spelling (e.g. different voweling entirely).
|
||||
group_nikkuds = {data[k]["word"]["nikkud"] for k in keys}
|
||||
filtered = [c for c in all_candidates if c["word"] in group_nikkuds]
|
||||
|
||||
if not filtered:
|
||||
logger.warning(
|
||||
" Search results don't contain nikkud %s — candidates: %s — skipping",
|
||||
group_nikkuds,
|
||||
[c["word"] for c in all_candidates],
|
||||
)
|
||||
return 0, len(keys)
|
||||
|
||||
fixed = 0
|
||||
skipped = 0
|
||||
|
||||
for key in keys:
|
||||
entry = data[key]
|
||||
our_meaning = entry.get("meaning", "")
|
||||
our_nikkud = entry["word"]["nikkud"]
|
||||
|
||||
# Only consider candidates that match this entry's nikkud
|
||||
nikkud_filtered = [c for c in filtered if c["word"] == our_nikkud]
|
||||
pool = nikkud_filtered if nikkud_filtered else filtered
|
||||
|
||||
best, score = _best_match(our_meaning, pool, our_nikkud)
|
||||
|
||||
if best is None or score < FUZZY_THRESHOLD:
|
||||
logger.warning(
|
||||
" SKIP key=%s | meaning=%r | best_score=%.2f",
|
||||
key,
|
||||
our_meaning,
|
||||
score,
|
||||
)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
new_slug = best["slug"]
|
||||
old_slug = entry["slug"]
|
||||
|
||||
if new_slug == old_slug:
|
||||
logger.info(" SAME key=%s | slug=%s (score=%.2f)", key, old_slug, score)
|
||||
fixed += 1
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
" FIX key=%s | %s → %s | matched=%r (score=%.2f)",
|
||||
key,
|
||||
old_slug,
|
||||
new_slug,
|
||||
best["meaning"],
|
||||
score,
|
||||
)
|
||||
|
||||
if not dry_run:
|
||||
data[key]["slug"] = new_slug
|
||||
|
||||
fixed += 1
|
||||
|
||||
return fixed, skipped
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CSV update
|
||||
# ---------------------------------------------------------------------------
|
||||
def update_csv(data: dict, dry_run: bool) -> None:
|
||||
"""
|
||||
Re-write the CSV so every row's slug column matches words.json.
|
||||
|
||||
The CSV is semicolon-delimited; the slug column is named 'slug'.
|
||||
We match rows by 'Word Without Nikkud' (ktiv_male) AND 'Meaning' because
|
||||
homographs share the same ktiv_male.
|
||||
"""
|
||||
df = pd.read_csv(CSV_PATH, sep=";", dtype=str)
|
||||
|
||||
if "slug" not in df.columns:
|
||||
logger.warning("CSV has no 'slug' column — skipping CSV update")
|
||||
return
|
||||
|
||||
# Build a lookup: (ktiv_male, meaning) → new_slug from words.json
|
||||
lookup: dict[tuple[str, str], str] = {}
|
||||
for entry in data.values():
|
||||
ktiv = entry["word"].get("ktiv_male", "")
|
||||
meaning = entry.get("meaning", "")
|
||||
slug = entry.get("slug", "")
|
||||
if ktiv and slug:
|
||||
lookup[(ktiv, meaning)] = slug
|
||||
|
||||
changes = 0
|
||||
for idx, row in df.iterrows():
|
||||
ktiv = str(row.get("Word Without Nikkud", "")).strip()
|
||||
meaning = str(row.get("Meaning", "")).strip()
|
||||
key = (ktiv, meaning)
|
||||
if key in lookup:
|
||||
new_slug = lookup[key]
|
||||
old_slug = str(row["slug"]).strip()
|
||||
if new_slug != old_slug:
|
||||
logger.info(
|
||||
" CSV row %d: %s → %s (%s)",
|
||||
idx,
|
||||
old_slug,
|
||||
new_slug,
|
||||
ktiv,
|
||||
)
|
||||
if not dry_run:
|
||||
df.at[idx, "slug"] = new_slug
|
||||
changes += 1
|
||||
|
||||
logger.info("CSV: %d slug(s) to update", changes)
|
||||
if not dry_run and changes:
|
||||
df.to_csv(CSV_PATH, sep=";", index=True)
|
||||
logger.info("CSV written to %s", CSV_PATH)
|
||||
elif dry_run:
|
||||
logger.info("DRY-RUN: CSV not written")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Repair duplicate slugs in data/words.json")
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Preview changes without writing any files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
"-v",
|
||||
action="store_true",
|
||||
help="Enable debug logging",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
if args.dry_run:
|
||||
logger.info("=== DRY-RUN mode — no files will be modified ===")
|
||||
|
||||
# Load data
|
||||
logger.info("Loading %s", WORDS_JSON)
|
||||
with WORDS_JSON.open(encoding="utf-8") as fh:
|
||||
data: dict = json.load(fh)
|
||||
logger.info("Loaded %d entries", len(data))
|
||||
|
||||
# Identify duplicate groups
|
||||
groups = find_duplicate_groups(data)
|
||||
total_groups = len(groups)
|
||||
total_entries = sum(len(v) for v in groups.values())
|
||||
logger.info(
|
||||
"Found %d duplicate-slug groups covering %d entries",
|
||||
total_groups,
|
||||
total_entries,
|
||||
)
|
||||
|
||||
# Process each group
|
||||
total_fixed = 0
|
||||
total_skipped = 0
|
||||
|
||||
for group_idx, (slug, keys) in enumerate(sorted(groups.items()), 1):
|
||||
logger.info(
|
||||
"[%d/%d] slug=%s (%d entries)",
|
||||
group_idx,
|
||||
total_groups,
|
||||
slug,
|
||||
len(keys),
|
||||
)
|
||||
fixed, skipped = repair_group(slug, keys, data, dry_run=args.dry_run)
|
||||
total_fixed += fixed
|
||||
total_skipped += skipped
|
||||
|
||||
# Respectful delay between HTTP requests
|
||||
if group_idx < total_groups:
|
||||
time.sleep(REQUEST_DELAY)
|
||||
|
||||
logger.info(
|
||||
"Summary: %d fixed, %d skipped (out of %d entries in %d groups)",
|
||||
total_fixed,
|
||||
total_skipped,
|
||||
total_entries,
|
||||
total_groups,
|
||||
)
|
||||
|
||||
# Write updated words.json
|
||||
if not args.dry_run:
|
||||
logger.info("Writing %s", WORDS_JSON)
|
||||
with WORDS_JSON.open("w", encoding="utf-8") as fh:
|
||||
json.dump(data, fh, ensure_ascii=False, indent=2)
|
||||
logger.info("words.json written")
|
||||
else:
|
||||
logger.info("DRY-RUN: words.json not written")
|
||||
|
||||
# Update CSV
|
||||
logger.info("Updating CSV %s", CSV_PATH)
|
||||
update_csv(data, dry_run=args.dry_run)
|
||||
|
||||
return 0 if total_skipped == 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -1,237 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Scrape ktiv male (plene/vowelless) forms from pealim.com.
|
||||
|
||||
Uses hebstyle=vl cookie to get vowelless writing with matres lectionis.
|
||||
Builds a lookup: ktiv_male_form → [{word_nikkud, form_type, pos, slug}]
|
||||
|
||||
This enables matching Hebrew text (which is normally in ktiv male)
|
||||
against our vocabulary, including conjugated verbs and noun plurals.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
|
||||
OUTPUT_PATH = DATA_DIR / "ktiv_male_forms.json"
|
||||
COOKIES = {"translit": "none", "hebstyle": "vl"}
|
||||
REQUEST_TIMEOUT = 15
|
||||
DELAY = 1.5 # seconds between requests
|
||||
|
||||
|
||||
def fetch_verb_ktiv_male(slug: str, infinitive_nikkud: str) -> list[dict]:
|
||||
"""Fetch all conjugated forms in ktiv male for a verb."""
|
||||
url = f"https://www.pealim.com/dict/{slug}/"
|
||||
resp = requests.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
|
||||
forms = []
|
||||
table = soup.find("table", class_="conjugation-table")
|
||||
if not table:
|
||||
return forms
|
||||
|
||||
# Also get the infinitive from the page
|
||||
lead = soup.find("div", class_="lead")
|
||||
if lead:
|
||||
inf_spans = lead.find_all("span", class_="menukad")
|
||||
for s in inf_spans:
|
||||
ktiv = s.text.strip()
|
||||
if ktiv:
|
||||
forms.append(
|
||||
{
|
||||
"ktiv_male": ktiv,
|
||||
"word_nikkud": infinitive_nikkud,
|
||||
"form_type": "infinitive",
|
||||
"pos": "Verb",
|
||||
"slug": slug,
|
||||
}
|
||||
)
|
||||
|
||||
rows = table.find_all("tr")
|
||||
for row in rows:
|
||||
menukad_spans = row.find_all("span", class_="menukad")
|
||||
for span in menukad_spans:
|
||||
ktiv = span.text.strip()
|
||||
if ktiv and ktiv not in {f["ktiv_male"] for f in forms}:
|
||||
forms.append(
|
||||
{
|
||||
"ktiv_male": ktiv,
|
||||
"word_nikkud": infinitive_nikkud,
|
||||
"form_type": "conjugation",
|
||||
"pos": "Verb",
|
||||
"slug": slug,
|
||||
}
|
||||
)
|
||||
|
||||
return forms
|
||||
|
||||
|
||||
def fetch_noun_ktiv_male(slug: str, singular_nikkud: str, gender: str) -> list[dict]:
|
||||
"""Fetch noun declension forms in ktiv male."""
|
||||
url = f"https://www.pealim.com/dict/{slug}/"
|
||||
resp = requests.get(url, cookies=COOKIES, timeout=REQUEST_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
|
||||
forms = []
|
||||
table = soup.find("table", class_="conjugation-table")
|
||||
if not table:
|
||||
return forms
|
||||
|
||||
rows = table.find_all("tr")
|
||||
form_labels = ["absolute_singular", "absolute_plural", "construct_singular", "construct_plural"]
|
||||
label_idx = 0
|
||||
|
||||
for row in rows:
|
||||
menukad_spans = row.find_all("span", class_="menukad")
|
||||
for span in menukad_spans:
|
||||
ktiv = span.text.strip()
|
||||
if ktiv:
|
||||
ft = form_labels[label_idx] if label_idx < len(form_labels) else "other"
|
||||
forms.append(
|
||||
{
|
||||
"ktiv_male": ktiv,
|
||||
"word_nikkud": singular_nikkud,
|
||||
"form_type": ft,
|
||||
"pos": "Noun",
|
||||
"slug": slug,
|
||||
"gender": gender,
|
||||
}
|
||||
)
|
||||
label_idx += 1
|
||||
|
||||
return forms
|
||||
|
||||
|
||||
def scrape_verbs() -> list[dict]:
|
||||
"""Scrape ktiv male forms for all verbs in conjugations.json."""
|
||||
conj_path = DATA_DIR / "conjugations.json"
|
||||
if not conj_path.exists():
|
||||
logger.warning("No conjugations.json found")
|
||||
return []
|
||||
|
||||
with open(conj_path) as f:
|
||||
conjugations = json.load(f)
|
||||
|
||||
all_forms = []
|
||||
slugs_done = set()
|
||||
|
||||
for verb, data in conjugations.items():
|
||||
if not data or not data.get("slug"):
|
||||
continue
|
||||
slug = data["slug"]
|
||||
if slug in slugs_done:
|
||||
continue
|
||||
slugs_done.add(slug)
|
||||
|
||||
try:
|
||||
forms = fetch_verb_ktiv_male(slug, verb)
|
||||
all_forms.extend(forms)
|
||||
logger.info(f" Verb {verb} ({slug}): {len(forms)} forms")
|
||||
except Exception as e:
|
||||
logger.warning(f" Verb {verb} ({slug}) failed: {e}")
|
||||
|
||||
time.sleep(DELAY)
|
||||
|
||||
return all_forms
|
||||
|
||||
|
||||
def scrape_nouns() -> list[dict]:
|
||||
"""Scrape ktiv male forms for all nouns in noun_slug_map.json."""
|
||||
slug_path = DATA_DIR / "noun_slug_map.json"
|
||||
if not slug_path.exists():
|
||||
logger.warning("No noun_slug_map.json found")
|
||||
return []
|
||||
|
||||
with open(slug_path) as f:
|
||||
slug_map = json.load(f)
|
||||
|
||||
# Also load existing plurals to get nikkud singular form
|
||||
plurals_path = DATA_DIR / "noun_plurals.json"
|
||||
plurals = {}
|
||||
if plurals_path.exists():
|
||||
with open(plurals_path) as f:
|
||||
plurals = json.load(f)
|
||||
|
||||
all_forms = []
|
||||
done = 0
|
||||
total = len(slug_map)
|
||||
|
||||
for word, info in slug_map.items():
|
||||
slug = info.get("slug", "")
|
||||
if not slug:
|
||||
continue
|
||||
|
||||
# Get nikkud form from plurals data or slug map
|
||||
nikkud = info.get("word_nikkud", word)
|
||||
if word in plurals:
|
||||
nikkud = plurals[word].get("singular", nikkud)
|
||||
gender = info.get("gender", "")
|
||||
|
||||
try:
|
||||
forms = fetch_noun_ktiv_male(slug, nikkud, gender)
|
||||
all_forms.extend(forms)
|
||||
done += 1
|
||||
if done % 50 == 0:
|
||||
logger.info(f" Nouns: {done}/{total} ({len(all_forms)} forms)")
|
||||
# Save incrementally
|
||||
_save_forms(all_forms, partial=True)
|
||||
except Exception as e:
|
||||
logger.warning(f" Noun {word} ({slug}) failed: {e}")
|
||||
done += 1
|
||||
|
||||
time.sleep(DELAY)
|
||||
|
||||
return all_forms
|
||||
|
||||
|
||||
def _save_forms(all_forms: list[dict], partial: bool = False):
|
||||
"""Build and save the ktiv male lookup dict."""
|
||||
lookup: dict[str, list[dict]] = {}
|
||||
for entry in all_forms:
|
||||
ktiv = entry["ktiv_male"]
|
||||
# Don't include ktiv_male in the stored entry (it's the key)
|
||||
stored = {k: v for k, v in entry.items() if k != "ktiv_male"}
|
||||
lookup.setdefault(ktiv, []).append(stored)
|
||||
|
||||
suffix = ".partial" if partial else ""
|
||||
out = OUTPUT_PATH.parent / (OUTPUT_PATH.name + suffix)
|
||||
with open(out, "w") as f:
|
||||
json.dump(lookup, f, ensure_ascii=False, indent=1)
|
||||
|
||||
logger.info(f" Saved {len(lookup)} unique ktiv male forms → {out}")
|
||||
|
||||
|
||||
def main():
|
||||
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
|
||||
|
||||
all_forms = []
|
||||
|
||||
if mode in ("all", "verbs"):
|
||||
logger.info("=== Scraping verb ktiv male forms ===")
|
||||
verb_forms = scrape_verbs()
|
||||
all_forms.extend(verb_forms)
|
||||
logger.info(f"Verbs done: {len(verb_forms)} forms from {len({f['slug'] for f in verb_forms})} verbs")
|
||||
|
||||
if mode in ("all", "nouns"):
|
||||
logger.info("=== Scraping noun ktiv male forms ===")
|
||||
noun_forms = scrape_nouns()
|
||||
all_forms.extend(noun_forms)
|
||||
logger.info(f"Nouns done: {len(noun_forms)} forms")
|
||||
|
||||
_save_forms(all_forms)
|
||||
logger.info(f"Total: {len(all_forms)} forms → {OUTPUT_PATH}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,365 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Scrape pealim.com for noun plural and construct forms.
|
||||
|
||||
Step 1: Collect noun slugs from list pages (/dict/?pos=noun&page=N)
|
||||
Step 2: Fetch detail pages for plural + construct forms
|
||||
Step 3: Print summary statistics
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
BASE_URL = "https://www.pealim.com"
|
||||
COOKIES = {"translit": "none", "hebstyle": "mo"}
|
||||
HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; PealimScraper/1.0)"}
|
||||
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
|
||||
SLUG_MAP_FILE = DATA_DIR / "noun_slug_map.json"
|
||||
PROGRESS_FILE = DATA_DIR / "noun_slug_map_progress.json"
|
||||
PLURALS_FILE = DATA_DIR / "noun_plurals.json"
|
||||
DELAY = 1.5 # seconds between requests
|
||||
|
||||
|
||||
def load_json(path, default=None):
|
||||
if path.exists():
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return default if default is not None else {}
|
||||
|
||||
|
||||
def save_json(path, data):
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def fetch_with_retry(url, max_retries=5):
|
||||
"""Fetch URL with exponential backoff."""
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
r = requests.get(url, cookies=COOKIES, headers=HEADERS, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r
|
||||
except (requests.RequestException, ConnectionError) as e:
|
||||
wait = min(2**attempt * 2, 60)
|
||||
print(f" Retry {attempt + 1}/{max_retries} for {url}: {e} (waiting {wait}s)")
|
||||
time.sleep(wait)
|
||||
print(f" FAILED after {max_retries} retries: {url}")
|
||||
return None
|
||||
|
||||
|
||||
def get_total_pages():
|
||||
"""Get total number of noun list pages."""
|
||||
r = fetch_with_retry(f"{BASE_URL}/dict/?pos=noun&page=1")
|
||||
if not r:
|
||||
return 0
|
||||
soup = BeautifulSoup(r.text, "lxml")
|
||||
pages = set()
|
||||
for a in soup.select("ul.pagination li a"):
|
||||
href = a.get("href", "")
|
||||
m = re.search(r"page=(\d+)", href)
|
||||
if m:
|
||||
pages.add(int(m.group(1)))
|
||||
return max(pages) if pages else 1
|
||||
|
||||
|
||||
def parse_list_page(html):
|
||||
"""Parse a noun list page and return list of noun entries."""
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
table = soup.select_one("table.dict-table")
|
||||
if not table:
|
||||
return []
|
||||
|
||||
entries = []
|
||||
for row in table.select("tr")[1:]: # skip header
|
||||
tds = row.select("td")
|
||||
if len(tds) < 3:
|
||||
continue
|
||||
|
||||
# First td: word + link
|
||||
first_td = tds[0]
|
||||
a = first_td.select_one("a")
|
||||
if not a:
|
||||
continue
|
||||
href = a.get("href", "")
|
||||
slug_match = re.search(r"/dict/([^/]+)/", href)
|
||||
if not slug_match:
|
||||
continue
|
||||
slug = slug_match.group(1)
|
||||
|
||||
menukad = first_td.select_one("span.menukad")
|
||||
word_nikkud = menukad.get_text(strip=True) if menukad else ""
|
||||
|
||||
# Word without nikkud (strip combining marks)
|
||||
word_plain = re.sub(r"[\u0591-\u05C7]", "", word_nikkud)
|
||||
|
||||
# Third td: part of speech
|
||||
pos_text = tds[2].get_text(strip=True)
|
||||
|
||||
# Gender
|
||||
gender = ""
|
||||
if "masculine" in pos_text.lower():
|
||||
gender = "masculine"
|
||||
elif "feminine" in pos_text.lower():
|
||||
gender = "feminine"
|
||||
|
||||
# Mishkal pattern
|
||||
mishkal = ""
|
||||
m = re.search(r"(\w+)\s*pattern", pos_text.lower())
|
||||
if m:
|
||||
mishkal = m.group(1)
|
||||
|
||||
entries.append(
|
||||
{
|
||||
"word_plain": word_plain,
|
||||
"slug": slug,
|
||||
"word_nikkud": word_nikkud,
|
||||
"pos": pos_text,
|
||||
"gender": gender,
|
||||
"mishkal": mishkal,
|
||||
}
|
||||
)
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def step1_collect_slugs():
|
||||
"""Step 1: Collect noun slugs from list pages."""
|
||||
print("=" * 60)
|
||||
print("STEP 1: Collecting noun slugs from list pages")
|
||||
print("=" * 60)
|
||||
|
||||
slug_map = load_json(SLUG_MAP_FILE, {})
|
||||
progress = load_json(PROGRESS_FILE, [])
|
||||
completed_pages = set(progress) if isinstance(progress, list) else set()
|
||||
|
||||
# Get total pages
|
||||
total_pages = get_total_pages()
|
||||
print(f"Total pages: {total_pages}")
|
||||
print(f"Already completed: {len(completed_pages)} pages, {len(slug_map)} nouns")
|
||||
|
||||
remaining = [p for p in range(1, total_pages + 1) if p not in completed_pages]
|
||||
print(f"Remaining pages: {len(remaining)}")
|
||||
|
||||
if not remaining:
|
||||
print("All pages already scraped!")
|
||||
return slug_map
|
||||
|
||||
for i, page_num in enumerate(remaining):
|
||||
url = f"{BASE_URL}/dict/?pos=noun&page={page_num}"
|
||||
r = fetch_with_retry(url)
|
||||
if not r:
|
||||
print(f" Skipping page {page_num}")
|
||||
continue
|
||||
|
||||
entries = parse_list_page(r.text)
|
||||
for entry in entries:
|
||||
word = entry["word_plain"]
|
||||
slug_map[word] = {
|
||||
"slug": entry["slug"],
|
||||
"word_nikkud": entry["word_nikkud"],
|
||||
"pos": entry["pos"],
|
||||
"gender": entry["gender"],
|
||||
"mishkal": entry["mishkal"],
|
||||
}
|
||||
|
||||
completed_pages.add(page_num)
|
||||
done = len(completed_pages)
|
||||
print(f" Page {page_num} ({done}/{total_pages}): {len(entries)} nouns (total: {len(slug_map)})")
|
||||
|
||||
# Save progress every 10 pages
|
||||
if (i + 1) % 10 == 0 or page_num == remaining[-1]:
|
||||
save_json(SLUG_MAP_FILE, slug_map)
|
||||
save_json(PROGRESS_FILE, sorted(completed_pages))
|
||||
print(f" [Saved progress: {len(slug_map)} nouns, {done} pages]")
|
||||
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Final save
|
||||
save_json(SLUG_MAP_FILE, slug_map)
|
||||
save_json(PROGRESS_FILE, sorted(completed_pages))
|
||||
print(f"\nStep 1 complete: {len(slug_map)} total nouns from {len(completed_pages)} pages")
|
||||
return slug_map
|
||||
|
||||
|
||||
def parse_detail_page(html, slug, gender, mishkal):
|
||||
"""Parse a noun detail page for plural/construct forms."""
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
tables = soup.select("table.conjugation-table")
|
||||
if not tables:
|
||||
return None
|
||||
|
||||
table = tables[0]
|
||||
rows = table.select("tr")
|
||||
|
||||
result = {
|
||||
"slug": slug,
|
||||
"singular": "",
|
||||
"singular_audio": "",
|
||||
"plural": "",
|
||||
"plural_audio": "",
|
||||
"construct_singular": "",
|
||||
"construct_plural": "",
|
||||
"gender": gender,
|
||||
"mishkal": mishkal,
|
||||
}
|
||||
|
||||
for row in rows:
|
||||
th = row.select_one("th")
|
||||
if not th:
|
||||
continue
|
||||
label = th.get_text(strip=True).lower()
|
||||
tds = row.select("td")
|
||||
|
||||
if "absolute" in label:
|
||||
if len(tds) >= 1:
|
||||
td = tds[0]
|
||||
m = td.select_one("span.menukad")
|
||||
result["singular"] = m.get_text(strip=True) if m else ""
|
||||
audio_el = td.select_one("[data-audio]")
|
||||
result["singular_audio"] = audio_el.get("data-audio", "") if audio_el else td.get("data-audio", "")
|
||||
if len(tds) >= 2:
|
||||
td = tds[1]
|
||||
m = td.select_one("span.menukad")
|
||||
result["plural"] = m.get_text(strip=True) if m else ""
|
||||
audio_el = td.select_one("[data-audio]")
|
||||
result["plural_audio"] = audio_el.get("data-audio", "") if audio_el else td.get("data-audio", "")
|
||||
|
||||
elif "construct" in label:
|
||||
if len(tds) >= 1:
|
||||
td = tds[0]
|
||||
m = td.select_one("span.menukad")
|
||||
result["construct_singular"] = m.get_text(strip=True) if m else ""
|
||||
if len(tds) >= 2:
|
||||
td = tds[1]
|
||||
m = td.select_one("span.menukad")
|
||||
result["construct_plural"] = m.get_text(strip=True) if m else ""
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def step2_fetch_plurals(slug_map):
|
||||
"""Step 2: Fetch detail pages for plural + construct forms."""
|
||||
print("\n" + "=" * 60)
|
||||
print("STEP 2: Fetching plural + construct forms from detail pages")
|
||||
print("=" * 60)
|
||||
|
||||
plurals = load_json(PLURALS_FILE, {})
|
||||
already_done = set(plurals.keys())
|
||||
|
||||
# Build work list: nouns not yet in plurals
|
||||
work = []
|
||||
for word, info in slug_map.items():
|
||||
if word not in already_done:
|
||||
work.append((word, info))
|
||||
|
||||
print(f"Already have plural data: {len(already_done)}")
|
||||
print(f"Remaining to fetch: {len(work)}")
|
||||
|
||||
if not work:
|
||||
print("All nouns already have plural data!")
|
||||
return plurals
|
||||
|
||||
skipped = 0
|
||||
for i, (word, info) in enumerate(work):
|
||||
slug = info["slug"]
|
||||
url = f"{BASE_URL}/dict/{slug}/"
|
||||
r = fetch_with_retry(url)
|
||||
if not r:
|
||||
print(f" Skipping {word} ({slug})")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
entry = parse_detail_page(r.text, slug, info.get("gender", ""), info.get("mishkal", ""))
|
||||
if entry:
|
||||
plurals[word] = entry
|
||||
else:
|
||||
# No declension table - store minimal entry
|
||||
plurals[word] = {
|
||||
"slug": slug,
|
||||
"singular": info.get("word_nikkud", ""),
|
||||
"singular_audio": "",
|
||||
"plural": "",
|
||||
"plural_audio": "",
|
||||
"construct_singular": "",
|
||||
"construct_plural": "",
|
||||
"gender": info.get("gender", ""),
|
||||
"mishkal": info.get("mishkal", ""),
|
||||
"no_declension_table": True,
|
||||
}
|
||||
|
||||
done = len(already_done) + i + 1 - skipped
|
||||
total = len(already_done) + len(work)
|
||||
if (i + 1) % 50 == 0 or i == 0:
|
||||
print(
|
||||
f" [{i + 1}/{len(work)}] {word} ({slug}): "
|
||||
f"plural={entry['plural'] if entry else 'N/A'} "
|
||||
f"(total: {done}/{total})"
|
||||
)
|
||||
|
||||
# Save every 50 entries
|
||||
if (i + 1) % 50 == 0 or i == len(work) - 1:
|
||||
save_json(PLURALS_FILE, plurals)
|
||||
print(f" [Saved: {len(plurals)} entries]")
|
||||
|
||||
time.sleep(DELAY)
|
||||
|
||||
save_json(PLURALS_FILE, plurals)
|
||||
print(f"\nStep 2 complete: {len(plurals)} total noun entries with plural data")
|
||||
return plurals
|
||||
|
||||
|
||||
def step3_summary(slug_map, plurals):
|
||||
"""Step 3: Print summary statistics."""
|
||||
print("\n" + "=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
|
||||
total_slugs = len(slug_map)
|
||||
total_plurals = len(plurals)
|
||||
has_plural = sum(1 for v in plurals.values() if v.get("plural"))
|
||||
has_construct = sum(1 for v in plurals.values() if v.get("construct_singular") or v.get("construct_plural"))
|
||||
has_audio = sum(1 for v in plurals.values() if v.get("singular_audio") or v.get("plural_audio"))
|
||||
no_table = sum(1 for v in plurals.values() if v.get("no_declension_table"))
|
||||
|
||||
# Irregular plurals: masculine with ות- ending, feminine with ים- ending
|
||||
irregular = 0
|
||||
for _word, v in plurals.items():
|
||||
plural = v.get("plural", "")
|
||||
gender = v.get("gender", "")
|
||||
if not plural or not gender:
|
||||
continue
|
||||
plain_plural = re.sub(r"[\u0591-\u05C7]", "", plural)
|
||||
if (
|
||||
gender == "masculine"
|
||||
and plain_plural.endswith("ות")
|
||||
or gender == "feminine"
|
||||
and plain_plural.endswith("ים")
|
||||
):
|
||||
irregular += 1
|
||||
|
||||
print(f"Total nouns in slug map: {total_slugs}")
|
||||
print(f"Total nouns with plural data: {total_plurals}")
|
||||
print(f" - With plural form: {has_plural}")
|
||||
print(f" - With construct forms: {has_construct}")
|
||||
print(f" - With audio URLs: {has_audio}")
|
||||
print(f" - No declension table: {no_table}")
|
||||
print(f" - Irregular plurals: {irregular}")
|
||||
|
||||
|
||||
def main():
|
||||
print("Pealim Noun Plural Scraper")
|
||||
print(f"Data directory: {DATA_DIR}")
|
||||
print()
|
||||
|
||||
slug_map = step1_collect_slugs()
|
||||
plurals = step2_fetch_plurals(slug_map)
|
||||
step3_summary(slug_map, plurals)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,250 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Scrape ktiv male (vowelless plene) conjugation forms for top 500 verbs from pealim.com."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.stdout.reconfigure(line_buffering=True)
|
||||
import requests # noqa: E402
|
||||
from bs4 import BeautifulSoup # noqa: E402
|
||||
|
||||
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
|
||||
INPUT_FILE = os.path.join(DATA_DIR, "top_verbs_to_scrape.json")
|
||||
OUTPUT_FILE = os.path.join(DATA_DIR, "ktiv_male_forms.json")
|
||||
PARTIAL_FILE = os.path.join(DATA_DIR, "ktiv_male_forms_partial.json")
|
||||
PROGRESS_FILE = os.path.join(DATA_DIR, "ktiv_scrape_progress.json")
|
||||
|
||||
COOKIES = {"translit": "none", "hebstyle": "vl"}
|
||||
HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; PealimScraper/1.0)"}
|
||||
DELAY = 1.5
|
||||
|
||||
session = requests.Session()
|
||||
session.cookies.update(COOKIES)
|
||||
session.headers.update(HEADERS)
|
||||
|
||||
|
||||
def load_json(path):
|
||||
if os.path.exists(path):
|
||||
with open(path, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
return {}
|
||||
|
||||
|
||||
def save_json(data, path):
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=1)
|
||||
|
||||
|
||||
def search_slug(wni):
|
||||
"""Search pealim for a verb and return the first result's slug."""
|
||||
url = "https://www.pealim.com/search/"
|
||||
resp = session.get(url, params={"q": wni}, timeout=15)
|
||||
resp.raise_for_status()
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
|
||||
# Look for result links like /dict/SLUG/
|
||||
for a in soup.select("a[href]"):
|
||||
href = a["href"]
|
||||
m = re.match(r"/dict/(\d+-[^/]+)/", href)
|
||||
if m:
|
||||
return m.group(1)
|
||||
return None
|
||||
|
||||
|
||||
def scrape_verb_forms(slug):
|
||||
"""Fetch a verb's detail page and extract all ktiv male conjugation forms."""
|
||||
url = f"https://www.pealim.com/dict/{slug}/"
|
||||
resp = session.get(url, timeout=15)
|
||||
resp.raise_for_status()
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
|
||||
forms = set()
|
||||
|
||||
# Get infinitive from div.lead or page title
|
||||
lead = soup.select_one("div.lead")
|
||||
if lead:
|
||||
menukad_spans = lead.select("span.menukad")
|
||||
for span in menukad_spans:
|
||||
text = span.get_text(strip=True)
|
||||
if text:
|
||||
forms.add(text)
|
||||
|
||||
# Get word_nikkud (the nikkud form of the infinitive) from the page
|
||||
# We need to fetch with mo cookie for that, but we already have it from input data
|
||||
# Instead, get the page title which usually has the nikkud form
|
||||
word_nikkud = None
|
||||
title = soup.select_one("h1")
|
||||
if title:
|
||||
menukad_in_title = title.select_one("span.menukad")
|
||||
if menukad_in_title:
|
||||
word_nikkud = menukad_in_title.get_text(strip=True)
|
||||
|
||||
# Get ALL span.menukad elements from conjugation tables
|
||||
for span in soup.select("span.menukad"):
|
||||
text = span.get_text(strip=True)
|
||||
if text:
|
||||
forms.add(text)
|
||||
|
||||
return forms, word_nikkud
|
||||
|
||||
|
||||
def main():
|
||||
verbs = load_json(INPUT_FILE)
|
||||
if not verbs:
|
||||
print("ERROR: No verbs found in input file")
|
||||
sys.exit(1)
|
||||
|
||||
# Load existing forms
|
||||
existing_forms = load_json(OUTPUT_FILE)
|
||||
new_forms = {} # Will be merged into existing at the end
|
||||
|
||||
# Load progress to resume
|
||||
progress = load_json(PROGRESS_FILE)
|
||||
done_wnis = set(progress.get("done_wnis", []))
|
||||
slug_cache = progress.get("slug_cache", {})
|
||||
|
||||
# Pre-populate slug cache from conjugations.json
|
||||
conj_file = os.path.join(DATA_DIR, "conjugations.json")
|
||||
if os.path.exists(conj_file):
|
||||
conj_data = load_json(conj_file)
|
||||
for wni_key, cdata in conj_data.items():
|
||||
if isinstance(cdata, dict) and "slug" in cdata and wni_key not in slug_cache:
|
||||
slug_cache[wni_key] = cdata["slug"]
|
||||
print(f"Pre-populated {len(slug_cache)} slugs from conjugations.json")
|
||||
|
||||
# Deduplicate verbs by wni
|
||||
seen_wni = set()
|
||||
unique_verbs = []
|
||||
for v in verbs:
|
||||
if v["wni"] not in seen_wni:
|
||||
seen_wni.add(v["wni"])
|
||||
unique_verbs.append(v)
|
||||
|
||||
total = len(unique_verbs)
|
||||
to_scrape = [v for v in unique_verbs if v["wni"] not in done_wnis]
|
||||
print(f"Total unique verbs: {total}, already done: {total - len(to_scrape)}, to scrape: {len(to_scrape)}")
|
||||
|
||||
scraped_count = 0
|
||||
skipped_count = 0
|
||||
total_new_forms = 0
|
||||
sample_verbs = {} # For summary: wni -> list of forms
|
||||
|
||||
for i, verb in enumerate(to_scrape):
|
||||
wni = verb["wni"]
|
||||
word_nikkud_input = verb["word"]
|
||||
|
||||
try:
|
||||
# Step 1: Find slug
|
||||
if wni in slug_cache:
|
||||
slug = slug_cache[wni]
|
||||
else:
|
||||
slug = search_slug(wni)
|
||||
time.sleep(DELAY)
|
||||
|
||||
if not slug:
|
||||
print(f" [{i + 1}/{len(to_scrape)}] SKIP {wni} - not found on pealim")
|
||||
skipped_count += 1
|
||||
done_wnis.add(wni)
|
||||
continue
|
||||
|
||||
slug_cache[wni] = slug
|
||||
|
||||
# Step 2: Scrape forms
|
||||
forms, page_nikkud = scrape_verb_forms(slug)
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Use the nikkud form from our input data (more reliable)
|
||||
nikkud_to_use = word_nikkud_input
|
||||
|
||||
# Build entries for each form
|
||||
for form in forms:
|
||||
entry = {
|
||||
"word_nikkud": nikkud_to_use,
|
||||
"form_type": "conjugation",
|
||||
"pos": "Verb",
|
||||
"slug": slug,
|
||||
}
|
||||
if form not in new_forms:
|
||||
new_forms[form] = []
|
||||
# Check for duplicate entry
|
||||
if not any(e["slug"] == slug for e in new_forms[form]):
|
||||
new_forms[form].append(entry)
|
||||
total_new_forms += 1
|
||||
|
||||
scraped_count += 1
|
||||
# Collect samples (first 3 completed)
|
||||
if len(sample_verbs) < 3:
|
||||
sample_verbs[wni] = sorted(forms)
|
||||
|
||||
print(f" [{i + 1}/{len(to_scrape)}] {wni} -> {slug} ({len(forms)} forms)")
|
||||
done_wnis.add(wni)
|
||||
|
||||
except Exception as e:
|
||||
print(f" [{i + 1}/{len(to_scrape)}] ERROR {wni}: {e}")
|
||||
skipped_count += 1
|
||||
done_wnis.add(wni)
|
||||
|
||||
# Save progress every 50 verbs
|
||||
if (i + 1) % 50 == 0:
|
||||
progress = {"done_wnis": list(done_wnis), "slug_cache": slug_cache}
|
||||
save_json(progress, PROGRESS_FILE)
|
||||
# Save partial merged result
|
||||
merged = dict(existing_forms)
|
||||
for form, entries in new_forms.items():
|
||||
if form in merged:
|
||||
existing_slugs = {e["slug"] for e in merged[form]}
|
||||
for entry in entries:
|
||||
if entry["slug"] not in existing_slugs:
|
||||
merged[form].append(entry)
|
||||
else:
|
||||
merged[form] = entries
|
||||
save_json(merged, PARTIAL_FILE)
|
||||
print(f" -- Progress saved at {i + 1}/{len(to_scrape)} --")
|
||||
|
||||
# Final merge
|
||||
merged = dict(existing_forms)
|
||||
for form, entries in new_forms.items():
|
||||
if form in merged:
|
||||
existing_slugs = {e["slug"] for e in merged[form]}
|
||||
for entry in entries:
|
||||
if entry["slug"] not in existing_slugs:
|
||||
merged[form].append(entry)
|
||||
else:
|
||||
merged[form] = entries
|
||||
|
||||
save_json(merged, OUTPUT_FILE)
|
||||
|
||||
# Save final progress
|
||||
progress = {"done_wnis": list(done_wnis), "slug_cache": slug_cache}
|
||||
save_json(progress, PROGRESS_FILE)
|
||||
|
||||
# Clean up partial file
|
||||
if os.path.exists(PARTIAL_FILE):
|
||||
os.remove(PARTIAL_FILE)
|
||||
|
||||
# Summary
|
||||
print(f"\n{'=' * 50}")
|
||||
print("SUMMARY")
|
||||
print(f"{'=' * 50}")
|
||||
print(f"Verbs scraped: {scraped_count}")
|
||||
print(f"Verbs skipped: {skipped_count}")
|
||||
print(f"New forms added: {total_new_forms}")
|
||||
print(f"Total unique ktiv male forms: {len(merged)}")
|
||||
print(f"Previous forms count: {len(existing_forms)}")
|
||||
print(f"Net new form keys: {len(merged) - len(existing_forms)}")
|
||||
|
||||
if sample_verbs:
|
||||
print("\nSample verbs:")
|
||||
for wni, forms in list(sample_verbs.items())[:3]:
|
||||
print(f"\n {wni} ({len(forms)} forms):")
|
||||
for f in forms[:8]:
|
||||
print(f" {f}")
|
||||
if len(forms) > 8:
|
||||
print(f" ... and {len(forms) - 8} more")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in a new issue