Previously --skip-conjugations returned None, causing build_all_variants() to produce near-empty conjugation decks (0.3MB font-only files). Now loads from conjugations.json cache so all 6 release variants build correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
525 lines
18 KiB
Python
525 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pealim Anki Deck Builder — full pipeline orchestrator.
|
|
|
|
Usage:
|
|
python run.py [options]
|
|
|
|
Options:
|
|
--only {vocab,conjugations} Run only one deck (skips all unrelated steps)
|
|
--skip-scrape Use existing data/pealim_dict.csv (no pealim.com dict scraping)
|
|
--skip-audio Skip audio .mp3 downloads
|
|
--skip-examples Skip Ben Yehuda example fetching
|
|
--skip-conjugations Skip verb conjugation extraction
|
|
--skip-images Skip image fetching for concrete nouns
|
|
--refresh-examples Force rebuild of Ben Yehuda index (delete old, download nikkud corpus)
|
|
--test N Process only the first N dictionary words (for quick testing)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import time
|
|
import unicodedata
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
OUTPUT_DIR = Path(__file__).parent / "output"
|
|
AUDIO_DIR = DATA_DIR / "audio"
|
|
AUDIO_CONJ_DIR = DATA_DIR / "audio_conj"
|
|
FONTS_DIR = DATA_DIR / "fonts"
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="Pealim Anki deck builder")
|
|
p.add_argument("--only", choices=["vocab", "conjugations"], help="Run only one deck (skips all unrelated steps)")
|
|
p.add_argument("--skip-scrape", action="store_true", help="Skip dict scraping; use cached CSV")
|
|
p.add_argument("--skip-audio", action="store_true", help="Skip audio downloads")
|
|
p.add_argument("--skip-examples", action="store_true", help="Skip Ben Yehuda example lookup")
|
|
p.add_argument("--skip-conjugations", action="store_true", help="Skip verb conjugation extraction (deprecated: use --only vocab)")
|
|
p.add_argument("--skip-images", action="store_true", help="Skip image fetching")
|
|
p.add_argument("--refresh-examples", action="store_true", help="Force rebuild of Ben Yehuda index")
|
|
p.add_argument("--test", type=int, metavar="N", help="Limit to first N words")
|
|
return p.parse_args()
|
|
|
|
|
|
def step_scrape(args):
|
|
"""Step 1 — scrape or load dictionary."""
|
|
dict_csv = DATA_DIR / "hebrew_dict.csv"
|
|
anki_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
|
|
# Legacy fallback names
|
|
legacy_dict = DATA_DIR / "pealim_dict.csv"
|
|
legacy_anki = DATA_DIR / "pealim_dict_for_anki.csv"
|
|
|
|
if args.skip_scrape:
|
|
if dict_csv.exists():
|
|
logger.info(f"[1] Using existing {dict_csv}")
|
|
elif legacy_dict.exists():
|
|
logger.info(f"[1] Using legacy {legacy_dict} (consider renaming)")
|
|
else:
|
|
logger.error(f"[1] --skip-scrape set but {dict_csv} not found. Aborting.")
|
|
sys.exit(1)
|
|
return
|
|
|
|
logger.info("[1] Scraping dictionary from pealim.com …")
|
|
import hebrew_extract
|
|
import pandas as pd
|
|
|
|
df = hebrew_extract.extract_from_website()
|
|
df.to_csv(dict_csv, index=True)
|
|
logger.info(f" Saved {len(df)} words → {dict_csv}")
|
|
|
|
df = hebrew_extract.modify_for_anki(df)
|
|
df.to_csv(anki_csv, sep=";", index=True)
|
|
logger.info(f" Saved Anki CSV → {anki_csv}")
|
|
|
|
|
|
def step_frequency() -> dict[str, int]:
|
|
"""Step 2 — load/download word frequency data."""
|
|
logger.info("[2] Loading word frequency data …")
|
|
import frequency_lookup
|
|
frequency_lookup.load()
|
|
return frequency_lookup._freq
|
|
|
|
|
|
def step_examples(args, freq_cache: dict):
|
|
"""Step 3 — load/build Ben Yehuda example index."""
|
|
if args.skip_examples:
|
|
logger.info("[3] Skipping examples (--skip-examples)")
|
|
examples_path = DATA_DIR / "examples_cache.json"
|
|
if examples_path.exists():
|
|
with open(examples_path) as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
logger.info("[3] Loading Ben Yehuda example index …")
|
|
import benyehuda
|
|
benyehuda.load(force_rebuild=args.refresh_examples)
|
|
|
|
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "hebrew_dict.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict.csv"
|
|
|
|
try:
|
|
import pandas as pd
|
|
try:
|
|
df = pd.read_csv(dict_csv, sep=";", index_col=0)
|
|
if df.shape[1] < 3:
|
|
raise ValueError("too few columns")
|
|
except (ValueError, pd.errors.ParserError):
|
|
df = pd.read_csv(dict_csv, index_col=0)
|
|
|
|
if args.test:
|
|
df = df.head(args.test)
|
|
|
|
logger.info(f" Pre-fetching examples for {len(df)} words …")
|
|
for _, row in df.iterrows():
|
|
# Use nikkud word form as primary key (nikkud corpus)
|
|
word_nikkud = str(row.get("Word", "")).strip()
|
|
if word_nikkud:
|
|
benyehuda.get_examples(word_nikkud)
|
|
|
|
except Exception as e:
|
|
logger.warning(f" Could not pre-fetch all examples: {e}")
|
|
|
|
benyehuda.save_examples_cache()
|
|
return benyehuda._examples_cache
|
|
|
|
|
|
def step_audio(args):
|
|
"""Step 4 — download vocabulary audio .mp3 files from audio_url column in CSV."""
|
|
if args.skip_audio:
|
|
logger.info("[4] Skipping audio (--skip-audio)")
|
|
return
|
|
|
|
logger.info("[4] Downloading vocabulary audio files …")
|
|
|
|
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "hebrew_dict.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict.csv"
|
|
|
|
import pandas as pd
|
|
import requests
|
|
try:
|
|
try:
|
|
df = pd.read_csv(dict_csv, sep=";", index_col=0)
|
|
if df.shape[1] < 3:
|
|
raise ValueError("too few columns")
|
|
except (ValueError, pd.errors.ParserError):
|
|
df = pd.read_csv(dict_csv, index_col=0)
|
|
|
|
if 'audio_url' not in df.columns:
|
|
logger.warning(" No audio_url column in CSV — re-scrape with hebrew_extract.py to capture audio URLs")
|
|
return
|
|
|
|
if args.test:
|
|
df = df.head(args.test)
|
|
|
|
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
|
|
downloaded = 0
|
|
skipped = 0
|
|
no_url = 0
|
|
|
|
def strip_nik(t: str) -> str:
|
|
return "".join(c for c in unicodedata.normalize("NFD", t)
|
|
if unicodedata.category(c) != "Mn")
|
|
|
|
for _, row in df.iterrows():
|
|
word = str(row.get("Word", "")).strip()
|
|
word_plain = str(row.get("Word Without Nikkud", "")).strip()
|
|
audio_url = str(row.get("audio_url", "")).strip()
|
|
|
|
if not word:
|
|
continue
|
|
|
|
safe_name = re.sub(r"[^\u05d0-\u05ea]", "", strip_nik(word_plain or word))
|
|
if not safe_name:
|
|
continue
|
|
mp3_path = AUDIO_DIR / f"{safe_name}.mp3"
|
|
|
|
if mp3_path.exists():
|
|
skipped += 1
|
|
continue
|
|
|
|
if not audio_url or audio_url in ("nan", "None", ""):
|
|
no_url += 1
|
|
continue
|
|
|
|
try:
|
|
resp = requests.get(audio_url, timeout=10)
|
|
resp.raise_for_status()
|
|
mp3_path.write_bytes(resp.content)
|
|
downloaded += 1
|
|
time.sleep(0.3)
|
|
except Exception as e:
|
|
logger.debug(f" Audio download failed for {word}: {e}")
|
|
|
|
logger.info(f" Audio: {downloaded} downloaded, {skipped} already cached, {no_url} without URL")
|
|
|
|
except Exception as e:
|
|
logger.warning(f" Audio step failed: {e}")
|
|
|
|
|
|
def step_conj_audio(args, conjugations: dict):
|
|
"""Step 4b — download conjugation audio .mp3 files."""
|
|
if args.skip_audio:
|
|
logger.info("[4b] Skipping conjugation audio (--skip-audio)")
|
|
return
|
|
|
|
logger.info("[4b] Downloading conjugation audio files …")
|
|
AUDIO_CONJ_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
import requests
|
|
downloaded = 0
|
|
skipped = 0
|
|
failed = 0
|
|
|
|
for infinitive, data in conjugations.items():
|
|
if not data or not data.get("forms"):
|
|
continue
|
|
|
|
slug = data.get("slug", "")
|
|
if not slug:
|
|
continue
|
|
|
|
# Active forms
|
|
for form_key, form_data in data["forms"].items():
|
|
audio_url = form_data.get("audio_url", "")
|
|
if not audio_url:
|
|
continue
|
|
filename = f"{slug}_{form_key}.mp3"
|
|
mp3_path = AUDIO_CONJ_DIR / filename
|
|
if mp3_path.exists():
|
|
skipped += 1
|
|
continue
|
|
try:
|
|
resp = requests.get(audio_url, timeout=10)
|
|
resp.raise_for_status()
|
|
mp3_path.write_bytes(resp.content)
|
|
downloaded += 1
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
logger.debug(f" Conj audio failed {filename}: {e}")
|
|
failed += 1
|
|
|
|
# Passive partner forms
|
|
passive = data.get("passive_partner")
|
|
if passive and passive.get("forms"):
|
|
for form_key, form_data in passive["forms"].items():
|
|
audio_url = form_data.get("audio_url", "")
|
|
if not audio_url:
|
|
continue
|
|
filename = f"{slug}_passive_{form_key}.mp3"
|
|
mp3_path = AUDIO_CONJ_DIR / filename
|
|
if mp3_path.exists():
|
|
skipped += 1
|
|
continue
|
|
try:
|
|
resp = requests.get(audio_url, timeout=10)
|
|
resp.raise_for_status()
|
|
mp3_path.write_bytes(resp.content)
|
|
downloaded += 1
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
logger.debug(f" Conj audio failed {filename}: {e}")
|
|
failed += 1
|
|
|
|
logger.info(
|
|
f" Conjugation audio: {downloaded} downloaded, "
|
|
f"{skipped} cached, {failed} failed"
|
|
)
|
|
|
|
|
|
def step_fonts(args):
|
|
"""Step 4c — download Heebo font files (one-time, cached)."""
|
|
FONTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
regular = FONTS_DIR / "_Heebo-Regular.ttf"
|
|
bold = FONTS_DIR / "_Heebo-Bold.ttf"
|
|
|
|
if regular.exists() and bold.exists():
|
|
logger.info("[4c] Heebo fonts already cached")
|
|
return
|
|
|
|
logger.info("[4c] Downloading Heebo fonts from Google Fonts …")
|
|
|
|
# Fetch CSS to get actual TTF source URLs (static subset for Hebrew + Latin)
|
|
import requests as _req
|
|
headers = {
|
|
# Request TTF (not woff2) so Anki can embed them
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Gecko/20100101 Firefox/120.0"
|
|
}
|
|
css_url = "https://fonts.googleapis.com/css2?family=Heebo:wght@400;700"
|
|
try:
|
|
css_resp = _req.get(css_url, headers=headers, timeout=15)
|
|
css_resp.raise_for_status()
|
|
css_text = css_resp.text
|
|
|
|
# Find all src: url(...) references (may be woff2 for modern UA)
|
|
font_urls = re.findall(r"src:\s*url\(([^)]+)\)", css_text)
|
|
logger.debug(f" Found {len(font_urls)} font URL(s) in CSS")
|
|
|
|
# Prefer TTF; if only woff2 available, download first two and note
|
|
downloaded = []
|
|
for i, fu in enumerate(font_urls[:2]):
|
|
fu = fu.strip("'\"")
|
|
dest = regular if i == 0 else bold
|
|
if dest.exists():
|
|
continue
|
|
fr = _req.get(fu, timeout=15)
|
|
fr.raise_for_status()
|
|
dest.write_bytes(fr.content)
|
|
downloaded.append(dest.name)
|
|
logger.info(f" Downloaded → {dest.name}")
|
|
|
|
if not downloaded:
|
|
logger.info(" All font files already present")
|
|
|
|
except Exception as e:
|
|
logger.warning(f" Heebo download failed: {e}")
|
|
logger.warning(" Cards will fall back to Arial Hebrew / David.")
|
|
logger.warning(
|
|
" To install manually: download Heebo-Regular.ttf and Heebo-Bold.ttf "
|
|
"from https://fonts.google.com/specimen/Heebo and rename with _ prefix "
|
|
f"into {FONTS_DIR}"
|
|
)
|
|
|
|
|
|
def step_images(args) -> dict:
|
|
"""Step 4d — fetch images for concrete nouns (resume-safe)."""
|
|
if args.skip_images:
|
|
logger.info("[4d] Skipping images (--skip-images)")
|
|
cache_path = DATA_DIR / "image_cache.json"
|
|
if cache_path.exists():
|
|
with open(cache_path) as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
limit = args.test # When in test mode, limit images too
|
|
logger.info("[4d] Fetching images for concrete nouns …")
|
|
import image_fetch
|
|
return image_fetch.run(limit=limit)
|
|
|
|
|
|
def step_build_all(args, examples_cache: dict, freq_cache: dict, conjugations: dict | None, image_cache: dict | None = None):
|
|
"""Step 5 — build all 6 release variants (4 vocab + 2 conj)."""
|
|
logger.info("[5] Building all deck variants …")
|
|
import apkg_builder
|
|
|
|
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "hebrew_dict.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict.csv"
|
|
|
|
apkg_builder.build_all_variants(
|
|
dict_csv,
|
|
conjugations=conjugations or {},
|
|
examples_cache=examples_cache,
|
|
freq_cache=freq_cache,
|
|
image_cache=image_cache or {},
|
|
limit=args.test,
|
|
)
|
|
|
|
|
|
def step_conjugations(args):
|
|
"""Step 6 — extract conjugations (returns data; building handled by step_build_all).
|
|
|
|
--skip-conjugations skips re-extraction from pealim.com but still loads
|
|
from cache so conj deck variants are built correctly.
|
|
"""
|
|
conj_cache = DATA_DIR / "conjugations.json"
|
|
|
|
if args.skip_conjugations:
|
|
if conj_cache.exists():
|
|
logger.info("[6] --skip-conjugations: loading from cache …")
|
|
with open(conj_cache) as f:
|
|
import json as _json
|
|
return _json.load(f)
|
|
logger.info("[6] --skip-conjugations: no cache found, skipping conj decks")
|
|
return None
|
|
|
|
verbs_file = Path(__file__).parent / "verbs_input.txt"
|
|
if not verbs_file.exists():
|
|
logger.info("[6] verbs_input.txt not found — skipping conjugation deck")
|
|
return None
|
|
|
|
if conj_cache.exists():
|
|
logger.info("[6] Using cached conjugations.json …")
|
|
with open(conj_cache) as f:
|
|
import json as _json
|
|
conjugations = _json.load(f)
|
|
else:
|
|
logger.info("[6] Extracting verb conjugations …")
|
|
import conjugation_extract
|
|
conjugations = conjugation_extract.main(verbs_file)
|
|
|
|
# Download conjugation audio
|
|
step_conj_audio(args, conjugations)
|
|
|
|
return conjugations
|
|
|
|
|
|
def print_summary(args, examples_cache, freq_cache, conjugations):
|
|
logger.info("")
|
|
logger.info("=" * 60)
|
|
logger.info("SUMMARY")
|
|
logger.info("=" * 60)
|
|
|
|
dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "hebrew_dict.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict_for_anki.csv"
|
|
if not dict_csv.exists():
|
|
dict_csv = DATA_DIR / "pealim_dict.csv"
|
|
if dict_csv.exists():
|
|
import pandas as pd
|
|
try:
|
|
df = pd.read_csv(dict_csv, sep=";", index_col=0)
|
|
if df.shape[1] < 3:
|
|
raise ValueError("too few columns")
|
|
except (ValueError, pd.errors.ParserError):
|
|
df = pd.read_csv(dict_csv, index_col=0)
|
|
logger.info(f" Dictionary words: {len(df)}")
|
|
|
|
logger.info(f" Frequency entries: {len(freq_cache)}")
|
|
logger.info(f" Example cache entries: {len(examples_cache)}")
|
|
covered = sum(1 for v in examples_cache.values() if v)
|
|
if examples_cache:
|
|
logger.info(f" Example coverage: {covered}/{len(examples_cache)} ({100*covered//len(examples_cache)}%)")
|
|
|
|
if AUDIO_DIR.exists():
|
|
mp3s = list(AUDIO_DIR.glob("*.mp3"))
|
|
logger.info(f" Vocabulary audio files: {len(mp3s)}")
|
|
|
|
if AUDIO_CONJ_DIR.exists():
|
|
mp3s = list(AUDIO_CONJ_DIR.glob("*.mp3"))
|
|
logger.info(f" Conjugation audio files: {len(mp3s)}")
|
|
|
|
image_cache_path = DATA_DIR / "image_cache.json"
|
|
if image_cache_path.exists():
|
|
with open(image_cache_path) as f:
|
|
ic = json.load(f)
|
|
found_imgs = sum(1 for v in ic.values() if v)
|
|
logger.info(f" Images: {found_imgs}/{len(ic)} nouns with images")
|
|
|
|
import apkg_builder as _ab
|
|
all_apkgs = [
|
|
_ab.VOCAB_APKG, _ab.VOCAB_APKG_AUDIO, _ab.VOCAB_APKG_IMAGES, _ab.VOCAB_APKG_AUDIO_IMAGES,
|
|
_ab.CONJ_APKG, _ab.CONJ_APKG_AUDIO,
|
|
]
|
|
for apkg in all_apkgs:
|
|
if apkg.exists():
|
|
size_mb = apkg.stat().st_size / 1e6
|
|
logger.info(f" {apkg.name}: {size_mb:.1f} MB")
|
|
if conjugations:
|
|
verb_count = sum(1 for v in conjugations.values() if v)
|
|
logger.info(f" Verbs in conjugation deck: {verb_count}")
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("DONE")
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("PEALIM ANKI DECK BUILDER")
|
|
if args.only:
|
|
logger.info(f" MODE: --only {args.only}")
|
|
if args.test:
|
|
logger.info(f" TEST MODE: {args.test} words")
|
|
if args.refresh_examples:
|
|
logger.info(" REFRESH EXAMPLES: Ben Yehuda index will be rebuilt")
|
|
logger.info("=" * 60)
|
|
|
|
if args.only == "conjugations":
|
|
step_fonts(args)
|
|
conjugations = step_conjugations(args)
|
|
if conjugations:
|
|
import apkg_builder
|
|
apkg_builder.build_all_variants(
|
|
DATA_DIR / "hebrew_dict_for_anki.csv",
|
|
conjugations=conjugations,
|
|
limit=args.test,
|
|
)
|
|
print_summary(args, {}, {}, conjugations or {})
|
|
return
|
|
|
|
if args.only == "vocab":
|
|
args.skip_conjugations = True
|
|
|
|
step_scrape(args)
|
|
freq_cache = step_frequency()
|
|
examples_cache = step_examples(args, freq_cache)
|
|
step_audio(args)
|
|
step_fonts(args)
|
|
image_cache = step_images(args)
|
|
conjugations = step_conjugations(args)
|
|
step_build_all(args, examples_cache, freq_cache, conjugations, image_cache)
|
|
|
|
print_summary(args, examples_cache, freq_cache, conjugations or {})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|