hebrew_flash_cards/run.py
Sochen 3fc3a21a33 fix: load conjugations from cache when --skip-conjugations passed
Previously --skip-conjugations returned None, causing build_all_variants()
to produce near-empty conjugation decks (0.3MB font-only files). Now loads
from conjugations.json cache so all 6 release variants build correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 21:01:47 +00:00

525 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Pealim Anki Deck Builder — full pipeline orchestrator.
Usage:
python run.py [options]
Options:
--only {vocab,conjugations} Run only one deck (skips all unrelated steps)
--skip-scrape Use existing data/pealim_dict.csv (no pealim.com dict scraping)
--skip-audio Skip audio .mp3 downloads
--skip-examples Skip Ben Yehuda example fetching
--skip-conjugations Skip verb conjugation extraction
--skip-images Skip image fetching for concrete nouns
--refresh-examples Force rebuild of Ben Yehuda index (delete old, download nikkud corpus)
--test N Process only the first N dictionary words (for quick testing)
"""
import argparse
import json
import logging
import re
import sys
import time
import unicodedata
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
DATA_DIR = Path(__file__).parent / "data"
OUTPUT_DIR = Path(__file__).parent / "output"
AUDIO_DIR = DATA_DIR / "audio"
AUDIO_CONJ_DIR = DATA_DIR / "audio_conj"
FONTS_DIR = DATA_DIR / "fonts"
def parse_args():
p = argparse.ArgumentParser(description="Pealim Anki deck builder")
p.add_argument("--only", choices=["vocab", "conjugations"], help="Run only one deck (skips all unrelated steps)")
p.add_argument("--skip-scrape", action="store_true", help="Skip dict scraping; use cached CSV")
p.add_argument("--skip-audio", action="store_true", help="Skip audio downloads")
p.add_argument("--skip-examples", action="store_true", help="Skip Ben Yehuda example lookup")
p.add_argument("--skip-conjugations", action="store_true", help="Skip verb conjugation extraction (deprecated: use --only vocab)")
p.add_argument("--skip-images", action="store_true", help="Skip image fetching")
p.add_argument("--refresh-examples", action="store_true", help="Force rebuild of Ben Yehuda index")
p.add_argument("--test", type=int, metavar="N", help="Limit to first N words")
return p.parse_args()
def step_scrape(args):
"""Step 1 — scrape or load dictionary."""
dict_csv = DATA_DIR / "hebrew_dict.csv"
anki_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
# Legacy fallback names
legacy_dict = DATA_DIR / "pealim_dict.csv"
legacy_anki = DATA_DIR / "pealim_dict_for_anki.csv"
if args.skip_scrape:
if dict_csv.exists():
logger.info(f"[1] Using existing {dict_csv}")
elif legacy_dict.exists():
logger.info(f"[1] Using legacy {legacy_dict} (consider renaming)")
else:
logger.error(f"[1] --skip-scrape set but {dict_csv} not found. Aborting.")
sys.exit(1)
return
logger.info("[1] Scraping dictionary from pealim.com …")
import hebrew_extract
import pandas as pd
df = hebrew_extract.extract_from_website()
df.to_csv(dict_csv, index=True)
logger.info(f" Saved {len(df)} words → {dict_csv}")
df = hebrew_extract.modify_for_anki(df)
df.to_csv(anki_csv, sep=";", index=True)
logger.info(f" Saved Anki CSV → {anki_csv}")
def step_frequency() -> dict[str, int]:
"""Step 2 — load/download word frequency data."""
logger.info("[2] Loading word frequency data …")
import frequency_lookup
frequency_lookup.load()
return frequency_lookup._freq
def step_examples(args, freq_cache: dict):
"""Step 3 — load/build Ben Yehuda example index."""
if args.skip_examples:
logger.info("[3] Skipping examples (--skip-examples)")
examples_path = DATA_DIR / "examples_cache.json"
if examples_path.exists():
with open(examples_path) as f:
return json.load(f)
return {}
logger.info("[3] Loading Ben Yehuda example index …")
import benyehuda
benyehuda.load(force_rebuild=args.refresh_examples)
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "hebrew_dict.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict.csv"
try:
import pandas as pd
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
if args.test:
df = df.head(args.test)
logger.info(f" Pre-fetching examples for {len(df)} words …")
for _, row in df.iterrows():
# Use nikkud word form as primary key (nikkud corpus)
word_nikkud = str(row.get("Word", "")).strip()
if word_nikkud:
benyehuda.get_examples(word_nikkud)
except Exception as e:
logger.warning(f" Could not pre-fetch all examples: {e}")
benyehuda.save_examples_cache()
return benyehuda._examples_cache
def step_audio(args):
"""Step 4 — download vocabulary audio .mp3 files from audio_url column in CSV."""
if args.skip_audio:
logger.info("[4] Skipping audio (--skip-audio)")
return
logger.info("[4] Downloading vocabulary audio files …")
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "hebrew_dict.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict.csv"
import pandas as pd
import requests
try:
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
if 'audio_url' not in df.columns:
logger.warning(" No audio_url column in CSV — re-scrape with hebrew_extract.py to capture audio URLs")
return
if args.test:
df = df.head(args.test)
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
downloaded = 0
skipped = 0
no_url = 0
def strip_nik(t: str) -> str:
return "".join(c for c in unicodedata.normalize("NFD", t)
if unicodedata.category(c) != "Mn")
for _, row in df.iterrows():
word = str(row.get("Word", "")).strip()
word_plain = str(row.get("Word Without Nikkud", "")).strip()
audio_url = str(row.get("audio_url", "")).strip()
if not word:
continue
safe_name = re.sub(r"[^\u05d0-\u05ea]", "", strip_nik(word_plain or word))
if not safe_name:
continue
mp3_path = AUDIO_DIR / f"{safe_name}.mp3"
if mp3_path.exists():
skipped += 1
continue
if not audio_url or audio_url in ("nan", "None", ""):
no_url += 1
continue
try:
resp = requests.get(audio_url, timeout=10)
resp.raise_for_status()
mp3_path.write_bytes(resp.content)
downloaded += 1
time.sleep(0.3)
except Exception as e:
logger.debug(f" Audio download failed for {word}: {e}")
logger.info(f" Audio: {downloaded} downloaded, {skipped} already cached, {no_url} without URL")
except Exception as e:
logger.warning(f" Audio step failed: {e}")
def step_conj_audio(args, conjugations: dict):
"""Step 4b — download conjugation audio .mp3 files."""
if args.skip_audio:
logger.info("[4b] Skipping conjugation audio (--skip-audio)")
return
logger.info("[4b] Downloading conjugation audio files …")
AUDIO_CONJ_DIR.mkdir(parents=True, exist_ok=True)
import requests
downloaded = 0
skipped = 0
failed = 0
for infinitive, data in conjugations.items():
if not data or not data.get("forms"):
continue
slug = data.get("slug", "")
if not slug:
continue
# Active forms
for form_key, form_data in data["forms"].items():
audio_url = form_data.get("audio_url", "")
if not audio_url:
continue
filename = f"{slug}_{form_key}.mp3"
mp3_path = AUDIO_CONJ_DIR / filename
if mp3_path.exists():
skipped += 1
continue
try:
resp = requests.get(audio_url, timeout=10)
resp.raise_for_status()
mp3_path.write_bytes(resp.content)
downloaded += 1
time.sleep(0.2)
except Exception as e:
logger.debug(f" Conj audio failed {filename}: {e}")
failed += 1
# Passive partner forms
passive = data.get("passive_partner")
if passive and passive.get("forms"):
for form_key, form_data in passive["forms"].items():
audio_url = form_data.get("audio_url", "")
if not audio_url:
continue
filename = f"{slug}_passive_{form_key}.mp3"
mp3_path = AUDIO_CONJ_DIR / filename
if mp3_path.exists():
skipped += 1
continue
try:
resp = requests.get(audio_url, timeout=10)
resp.raise_for_status()
mp3_path.write_bytes(resp.content)
downloaded += 1
time.sleep(0.2)
except Exception as e:
logger.debug(f" Conj audio failed {filename}: {e}")
failed += 1
logger.info(
f" Conjugation audio: {downloaded} downloaded, "
f"{skipped} cached, {failed} failed"
)
def step_fonts(args):
"""Step 4c — download Heebo font files (one-time, cached)."""
FONTS_DIR.mkdir(parents=True, exist_ok=True)
regular = FONTS_DIR / "_Heebo-Regular.ttf"
bold = FONTS_DIR / "_Heebo-Bold.ttf"
if regular.exists() and bold.exists():
logger.info("[4c] Heebo fonts already cached")
return
logger.info("[4c] Downloading Heebo fonts from Google Fonts …")
# Fetch CSS to get actual TTF source URLs (static subset for Hebrew + Latin)
import requests as _req
headers = {
# Request TTF (not woff2) so Anki can embed them
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Gecko/20100101 Firefox/120.0"
}
css_url = "https://fonts.googleapis.com/css2?family=Heebo:wght@400;700"
try:
css_resp = _req.get(css_url, headers=headers, timeout=15)
css_resp.raise_for_status()
css_text = css_resp.text
# Find all src: url(...) references (may be woff2 for modern UA)
font_urls = re.findall(r"src:\s*url\(([^)]+)\)", css_text)
logger.debug(f" Found {len(font_urls)} font URL(s) in CSS")
# Prefer TTF; if only woff2 available, download first two and note
downloaded = []
for i, fu in enumerate(font_urls[:2]):
fu = fu.strip("'\"")
dest = regular if i == 0 else bold
if dest.exists():
continue
fr = _req.get(fu, timeout=15)
fr.raise_for_status()
dest.write_bytes(fr.content)
downloaded.append(dest.name)
logger.info(f" Downloaded → {dest.name}")
if not downloaded:
logger.info(" All font files already present")
except Exception as e:
logger.warning(f" Heebo download failed: {e}")
logger.warning(" Cards will fall back to Arial Hebrew / David.")
logger.warning(
" To install manually: download Heebo-Regular.ttf and Heebo-Bold.ttf "
"from https://fonts.google.com/specimen/Heebo and rename with _ prefix "
f"into {FONTS_DIR}"
)
def step_images(args) -> dict:
"""Step 4d — fetch images for concrete nouns (resume-safe)."""
if args.skip_images:
logger.info("[4d] Skipping images (--skip-images)")
cache_path = DATA_DIR / "image_cache.json"
if cache_path.exists():
with open(cache_path) as f:
return json.load(f)
return {}
limit = args.test # When in test mode, limit images too
logger.info("[4d] Fetching images for concrete nouns …")
import image_fetch
return image_fetch.run(limit=limit)
def step_build_all(args, examples_cache: dict, freq_cache: dict, conjugations: dict | None, image_cache: dict | None = None):
"""Step 5 — build all 6 release variants (4 vocab + 2 conj)."""
logger.info("[5] Building all deck variants …")
import apkg_builder
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "hebrew_dict.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict.csv"
apkg_builder.build_all_variants(
dict_csv,
conjugations=conjugations or {},
examples_cache=examples_cache,
freq_cache=freq_cache,
image_cache=image_cache or {},
limit=args.test,
)
def step_conjugations(args):
"""Step 6 — extract conjugations (returns data; building handled by step_build_all).
--skip-conjugations skips re-extraction from pealim.com but still loads
from cache so conj deck variants are built correctly.
"""
conj_cache = DATA_DIR / "conjugations.json"
if args.skip_conjugations:
if conj_cache.exists():
logger.info("[6] --skip-conjugations: loading from cache …")
with open(conj_cache) as f:
import json as _json
return _json.load(f)
logger.info("[6] --skip-conjugations: no cache found, skipping conj decks")
return None
verbs_file = Path(__file__).parent / "verbs_input.txt"
if not verbs_file.exists():
logger.info("[6] verbs_input.txt not found — skipping conjugation deck")
return None
if conj_cache.exists():
logger.info("[6] Using cached conjugations.json …")
with open(conj_cache) as f:
import json as _json
conjugations = _json.load(f)
else:
logger.info("[6] Extracting verb conjugations …")
import conjugation_extract
conjugations = conjugation_extract.main(verbs_file)
# Download conjugation audio
step_conj_audio(args, conjugations)
return conjugations
def print_summary(args, examples_cache, freq_cache, conjugations):
logger.info("")
logger.info("=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "hebrew_dict.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
if not dict_csv.exists():
dict_csv = DATA_DIR / "pealim_dict.csv"
if dict_csv.exists():
import pandas as pd
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
logger.info(f" Dictionary words: {len(df)}")
logger.info(f" Frequency entries: {len(freq_cache)}")
logger.info(f" Example cache entries: {len(examples_cache)}")
covered = sum(1 for v in examples_cache.values() if v)
if examples_cache:
logger.info(f" Example coverage: {covered}/{len(examples_cache)} ({100*covered//len(examples_cache)}%)")
if AUDIO_DIR.exists():
mp3s = list(AUDIO_DIR.glob("*.mp3"))
logger.info(f" Vocabulary audio files: {len(mp3s)}")
if AUDIO_CONJ_DIR.exists():
mp3s = list(AUDIO_CONJ_DIR.glob("*.mp3"))
logger.info(f" Conjugation audio files: {len(mp3s)}")
image_cache_path = DATA_DIR / "image_cache.json"
if image_cache_path.exists():
with open(image_cache_path) as f:
ic = json.load(f)
found_imgs = sum(1 for v in ic.values() if v)
logger.info(f" Images: {found_imgs}/{len(ic)} nouns with images")
import apkg_builder as _ab
all_apkgs = [
_ab.VOCAB_APKG, _ab.VOCAB_APKG_AUDIO, _ab.VOCAB_APKG_IMAGES, _ab.VOCAB_APKG_AUDIO_IMAGES,
_ab.CONJ_APKG, _ab.CONJ_APKG_AUDIO,
]
for apkg in all_apkgs:
if apkg.exists():
size_mb = apkg.stat().st_size / 1e6
logger.info(f" {apkg.name}: {size_mb:.1f} MB")
if conjugations:
verb_count = sum(1 for v in conjugations.values() if v)
logger.info(f" Verbs in conjugation deck: {verb_count}")
logger.info("=" * 60)
logger.info("DONE")
def main():
args = parse_args()
logger.info("=" * 60)
logger.info("PEALIM ANKI DECK BUILDER")
if args.only:
logger.info(f" MODE: --only {args.only}")
if args.test:
logger.info(f" TEST MODE: {args.test} words")
if args.refresh_examples:
logger.info(" REFRESH EXAMPLES: Ben Yehuda index will be rebuilt")
logger.info("=" * 60)
if args.only == "conjugations":
step_fonts(args)
conjugations = step_conjugations(args)
if conjugations:
import apkg_builder
apkg_builder.build_all_variants(
DATA_DIR / "hebrew_dict_for_anki.csv",
conjugations=conjugations,
limit=args.test,
)
print_summary(args, {}, {}, conjugations or {})
return
if args.only == "vocab":
args.skip_conjugations = True
step_scrape(args)
freq_cache = step_frequency()
examples_cache = step_examples(args, freq_cache)
step_audio(args)
step_fonts(args)
image_cache = step_images(args)
conjugations = step_conjugations(args)
step_build_all(args, examples_cache, freq_cache, conjugations, image_cache)
print_summary(args, examples_cache, freq_cache, conjugations or {})
if __name__ == "__main__":
main()