- Cloze card pipeline: 924 cards from 2,296 AI-vetted Hebrew book sentences - Plurals deck: 375 notes (144 irregular + 231 regular from 86 mishkal patterns) - Ktiv male forms expanded to 20,711 entries for sentence matching - Project reorg: helpers.py (deduped strip_nikkud from 10 files), scripts/ for one-off tools, tests/ with smoke tests, deleted 3 dead files - Lint tooling: pyproject.toml with ruff/vulture/bandit/pytest config, .editorconfig, fixed all 129 ruff errors (B023 closure fix, SIM103, unused vars) - validate_apkg.py: card count range check for optional cloze template - Data caches committed: vetted_sentences, ktiv_male_forms, noun_plurals, noun_slug_map, vocab_sentence_matches, epub_sentence_index Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
446 lines
15 KiB
Python
446 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Extract example sentences from nikud'd Hebrew EPUBs (and PDFs where possible),
|
||
match them against the vocab list, and produce examples_cache.json.
|
||
|
||
Usage:
|
||
python3 epub_examples.py
|
||
|
||
Outputs:
|
||
data/epub_sentence_index.json — full sentence corpus
|
||
data/examples_cache.json — best sentence(s) per vocab word
|
||
"""
|
||
|
||
import csv
|
||
import json
|
||
import os
|
||
import re
|
||
import zipfile
|
||
from html.parser import HTMLParser
|
||
from pathlib import Path
|
||
|
||
from helpers import strip_nikkud
|
||
|
||
DATA_DIR = Path(__file__).parent / "data"
|
||
EPUB_DIR = DATA_DIR / "epubs"
|
||
DICT_CSV = DATA_DIR / "hebrew_dict_for_anki.csv"
|
||
|
||
# Book metadata: filename -> display name
|
||
EPUB_BOOKS = {
|
||
"little_prince.epub": "הנסיך הקטן",
|
||
"time_tunnel_82.epub": "מנהרת הזמן 82",
|
||
}
|
||
|
||
# PDF books are excluded — pypdf produces garbled RTL text (reversed chars within
|
||
# words). If/when a proper EPUB version becomes available on Calibre, add it to
|
||
# EPUB_BOOKS above instead.
|
||
PDF_BOOKS: dict[str, str] = {}
|
||
|
||
# Sentence length bounds (word count)
|
||
MIN_WORDS = 4
|
||
MAX_WORDS = 15
|
||
|
||
|
||
|
||
# ── HTML text extraction ─────────────────────────────────────────
|
||
|
||
|
||
class _TextExtractor(HTMLParser):
|
||
"""Extract text content from HTML, skipping script/style tags."""
|
||
|
||
SKIP_TAGS = {"script", "style", "head"}
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.parts: list[str] = []
|
||
self._skip_depth = 0
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
if tag in self.SKIP_TAGS:
|
||
self._skip_depth += 1
|
||
# Insert space for block-level elements to avoid word concatenation
|
||
if tag in (
|
||
"p",
|
||
"div",
|
||
"br",
|
||
"li",
|
||
"h1",
|
||
"h2",
|
||
"h3",
|
||
"h4",
|
||
"h5",
|
||
"h6",
|
||
"td",
|
||
"th",
|
||
"tr",
|
||
"blockquote",
|
||
"section",
|
||
):
|
||
self.parts.append("\n")
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag in self.SKIP_TAGS:
|
||
self._skip_depth = max(0, self._skip_depth - 1)
|
||
|
||
def handle_data(self, data):
|
||
if self._skip_depth == 0:
|
||
self.parts.append(data)
|
||
|
||
def get_text(self) -> str:
|
||
return "".join(self.parts)
|
||
|
||
|
||
def extract_text_from_html(html: str) -> str:
|
||
"""Parse HTML and return plain text."""
|
||
parser = _TextExtractor()
|
||
parser.feed(html)
|
||
return parser.get_text()
|
||
|
||
|
||
# ── EPUB processing ──────────────────────────────────────────────
|
||
|
||
|
||
def _content_files_from_epub(zf: zipfile.ZipFile) -> list[str]:
|
||
"""Get ordered list of content XHTML files from the OPF manifest."""
|
||
# Find the OPF file
|
||
opf_path = None
|
||
for name in zf.namelist():
|
||
if name.endswith(".opf"):
|
||
opf_path = name
|
||
break
|
||
if not opf_path:
|
||
# Fallback: just use all xhtml files
|
||
return sorted(
|
||
n
|
||
for n in zf.namelist()
|
||
if n.endswith((".xhtml", ".html"))
|
||
and "toc" not in n.lower()
|
||
and "cover" not in n.lower()
|
||
and "nav" not in n.lower()
|
||
)
|
||
|
||
# Parse OPF to get spine order
|
||
opf_content = zf.read(opf_path).decode("utf-8")
|
||
opf_dir = os.path.dirname(opf_path)
|
||
|
||
# Extract manifest items: id -> href
|
||
manifest = {}
|
||
for m in re.finditer(r'<item\s+[^>]*id="([^"]+)"[^>]*href="([^"]+)"', opf_content):
|
||
manifest[m.group(1)] = m.group(2)
|
||
# Also try reversed attribute order
|
||
for m in re.finditer(r'<item\s+[^>]*href="([^"]+)"[^>]*id="([^"]+)"', opf_content):
|
||
manifest[m.group(2)] = m.group(1)
|
||
|
||
# Extract spine order
|
||
spine_ids = re.findall(r'<itemref\s+[^>]*idref="([^"]+)"', opf_content)
|
||
|
||
result = []
|
||
for sid in spine_ids:
|
||
href = manifest.get(sid, "")
|
||
if href and href.endswith((".xhtml", ".html")):
|
||
full_path = os.path.join(opf_dir, href) if opf_dir else href
|
||
# Normalize path separators
|
||
full_path = full_path.replace("\\", "/")
|
||
if full_path in zf.namelist():
|
||
result.append(full_path)
|
||
|
||
if not result:
|
||
# Fallback
|
||
return sorted(
|
||
n
|
||
for n in zf.namelist()
|
||
if n.endswith((".xhtml", ".html")) and "toc" not in n.lower() and "cover" not in n.lower()
|
||
)
|
||
return result
|
||
|
||
|
||
def extract_sentences_from_epub(epub_path: Path, book_name: str) -> list[dict]:
|
||
"""Extract sentences from an EPUB file.
|
||
|
||
Returns list of {"text": str, "book": str, "stripped": str}
|
||
"""
|
||
zf = zipfile.ZipFile(epub_path)
|
||
content_files = _content_files_from_epub(zf)
|
||
|
||
all_text = []
|
||
for cf in content_files:
|
||
try:
|
||
html = zf.read(cf).decode("utf-8")
|
||
except (KeyError, UnicodeDecodeError):
|
||
continue
|
||
text = extract_text_from_html(html)
|
||
all_text.append(text)
|
||
|
||
full_text = "\n".join(all_text)
|
||
return _split_into_sentences(full_text, book_name)
|
||
|
||
|
||
# ── PDF processing ───────────────────────────────────────────────
|
||
|
||
|
||
def extract_sentences_from_pdf(pdf_path: Path, book_name: str) -> list[dict]:
|
||
"""Extract sentences from a PDF file (best-effort, handles RTL reversal)."""
|
||
try:
|
||
import pypdf
|
||
except ImportError:
|
||
print(f" [SKIP] pypdf not installed, cannot process {pdf_path.name}")
|
||
return []
|
||
|
||
reader = pypdf.PdfReader(pdf_path)
|
||
all_text_parts = []
|
||
|
||
for page in reader.pages:
|
||
raw = page.extract_text()
|
||
if not raw:
|
||
continue
|
||
# pypdf often reverses word order for RTL text; fix it
|
||
fixed_lines = []
|
||
for line in raw.split("\n"):
|
||
words = line.split()
|
||
# Check if this line is predominantly Hebrew
|
||
hebrew_chars = sum(1 for c in line if "\u0590" <= c <= "\u05ff")
|
||
if hebrew_chars > len(line) * 0.3 and len(words) > 1:
|
||
# Reverse word order
|
||
fixed_lines.append(" ".join(reversed(words)))
|
||
else:
|
||
fixed_lines.append(line)
|
||
all_text_parts.append("\n".join(fixed_lines))
|
||
|
||
full_text = "\n".join(all_text_parts)
|
||
return _split_into_sentences(full_text, book_name)
|
||
|
||
|
||
# ── Sentence splitting ───────────────────────────────────────────
|
||
|
||
# Hebrew sentence terminators: period, exclamation, question mark, sof pasuk
|
||
_SENT_SPLIT = re.compile(r"[.!?\u05C3]+")
|
||
|
||
# Punctuation to strip from word boundaries when matching
|
||
_PUNCT = re.compile(
|
||
r'^[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+|[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+$'
|
||
)
|
||
|
||
|
||
def _split_into_sentences(text: str, book_name: str) -> list[dict]:
|
||
"""Split text into sentences and filter by length."""
|
||
# Normalize whitespace
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
|
||
raw_sentences = _SENT_SPLIT.split(text)
|
||
results = []
|
||
seen = set()
|
||
|
||
for sent in raw_sentences:
|
||
sent = sent.strip()
|
||
if not sent:
|
||
continue
|
||
|
||
# Count Hebrew words (skip non-Hebrew tokens like numbers)
|
||
words = sent.split()
|
||
hebrew_words = [w for w in words if any("\u0590" <= c <= "\u05ff" for c in w)]
|
||
|
||
if len(hebrew_words) < MIN_WORDS or len(hebrew_words) > MAX_WORDS:
|
||
continue
|
||
|
||
# Skip duplicates
|
||
stripped = strip_nikkud(sent)
|
||
if stripped in seen:
|
||
continue
|
||
seen.add(stripped)
|
||
|
||
results.append(
|
||
{
|
||
"text": sent,
|
||
"book": book_name,
|
||
"stripped": stripped,
|
||
}
|
||
)
|
||
|
||
return results
|
||
|
||
|
||
# ── Vocab loading ────────────────────────────────────────────────
|
||
|
||
|
||
def load_vocab(csv_path: Path) -> dict:
|
||
"""Load vocab CSV and return {stripped_form: nikkud_word} mapping.
|
||
|
||
Also returns reverse mapping for lookup.
|
||
Returns (word_to_nikkud, nikkud_words_set)
|
||
"""
|
||
words_by_stripped: dict[str, list[str]] = {} # stripped -> [nikkud words]
|
||
|
||
with open(csv_path, encoding="utf-8") as f:
|
||
reader = csv.DictReader(f, delimiter=";")
|
||
for row in reader:
|
||
nikkud_word = row.get("Word", "").strip()
|
||
word_no_nik = row.get("Word Without Nikkud", "").strip()
|
||
if not nikkud_word:
|
||
continue
|
||
|
||
# Method 1: strip nikkud from the Word column
|
||
stripped_from_nikkud = strip_nikkud(nikkud_word)
|
||
|
||
# Add both forms for matching
|
||
for form in {stripped_from_nikkud, word_no_nik}:
|
||
if form:
|
||
words_by_stripped.setdefault(form, []).append(nikkud_word)
|
||
|
||
return words_by_stripped
|
||
|
||
|
||
# ── Matching ─────────────────────────────────────────────────────
|
||
|
||
|
||
def match_sentences(sentences: list[dict], words_by_stripped: dict) -> dict:
|
||
"""Match sentences against vocab words.
|
||
|
||
Returns {nikkud_word: [sentences]} with best (shortest) first.
|
||
"""
|
||
# Build a set of all stripped forms for fast lookup
|
||
all_forms = set(words_by_stripped.keys())
|
||
|
||
# Hebrew single-letter prefixes: ב, ה, ו, כ, ל, מ, ש, ד (של)
|
||
_HEB_PREFIXES = set("בהוכלמשד")
|
||
|
||
# For each sentence, extract stripped words
|
||
matches: dict[str, list[tuple[int, str]]] = {} # nikkud_word -> [(word_count, sentence)]
|
||
|
||
for sent_info in sentences:
|
||
sent_text = sent_info["text"]
|
||
sent_stripped = sent_info["stripped"]
|
||
word_count = len(sent_text.split())
|
||
|
||
# Get stripped words from the sentence
|
||
raw_words = sent_stripped.split()
|
||
# Map: candidate_form -> set of original cleaned words that produced it
|
||
# This lets us verify that prefix stripping is plausible
|
||
candidates: dict[str, str] = {} # form -> original_word
|
||
for w in raw_words:
|
||
cleaned = _PUNCT.sub("", w)
|
||
if not cleaned:
|
||
continue
|
||
# Direct match (always try)
|
||
candidates[cleaned] = cleaned
|
||
# Prefix stripping: only if remaining stem is >= 2 chars
|
||
# and the prefix char is a known Hebrew prefix letter
|
||
for prefix_len in (1, 2):
|
||
if len(cleaned) > prefix_len + 1:
|
||
prefix = cleaned[:prefix_len]
|
||
stem = cleaned[prefix_len:]
|
||
if all(c in _HEB_PREFIXES for c in prefix) and len(stem) >= 2:
|
||
candidates[stem] = cleaned
|
||
|
||
# Check which vocab words appear in this sentence
|
||
matched_forms = set(candidates.keys()) & all_forms
|
||
for form in matched_forms:
|
||
# Skip spurious matches: very short vocab forms (1-2 chars)
|
||
# should only match via direct word match, not prefix stripping
|
||
if len(form) <= 2 and form not in {_PUNCT.sub("", w) for w in raw_words}:
|
||
continue
|
||
for nikkud_word in words_by_stripped[form]:
|
||
matches.setdefault(nikkud_word, []).append((word_count, sent_text))
|
||
|
||
# Sort by word count (prefer shorter sentences) and deduplicate
|
||
result = {}
|
||
for nikkud_word, sent_list in matches.items():
|
||
sent_list.sort(key=lambda x: x[0])
|
||
seen = set()
|
||
unique = []
|
||
for _, sent in sent_list:
|
||
if sent not in seen:
|
||
seen.add(sent)
|
||
unique.append(sent)
|
||
if len(unique) >= 5: # Keep top 5 per word
|
||
break
|
||
result[nikkud_word] = unique
|
||
|
||
return result
|
||
|
||
|
||
# ── Main ─────────────────────────────────────────────────────────
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("EPUB Example Sentence Extraction Pipeline")
|
||
print("=" * 60)
|
||
|
||
# Step 1: Extract sentences from all books
|
||
all_sentences = []
|
||
book_counts = {}
|
||
|
||
for filename, book_name in EPUB_BOOKS.items():
|
||
path = EPUB_DIR / filename
|
||
if not path.exists():
|
||
print(f"\n[SKIP] {filename} not found")
|
||
continue
|
||
print(f"\n[EPUB] Extracting: {book_name} ({filename})")
|
||
sentences = extract_sentences_from_epub(path, book_name)
|
||
book_counts[book_name] = len(sentences)
|
||
all_sentences.extend(sentences)
|
||
print(f" -> {len(sentences)} sentences")
|
||
|
||
for filename, book_name in PDF_BOOKS.items():
|
||
path = EPUB_DIR / filename
|
||
if not path.exists():
|
||
print(f"\n[SKIP] {filename} not found")
|
||
continue
|
||
print(f"\n[PDF] Extracting: {book_name} ({filename})")
|
||
sentences = extract_sentences_from_pdf(path, book_name)
|
||
book_counts[book_name] = len(sentences)
|
||
all_sentences.extend(sentences)
|
||
print(f" -> {len(sentences)} sentences")
|
||
|
||
print(f"\nTotal sentences: {len(all_sentences)}")
|
||
|
||
# Step 2: Save sentence index
|
||
index_path = DATA_DIR / "epub_sentence_index.json"
|
||
with open(index_path, "w", encoding="utf-8") as f:
|
||
json.dump({"sentences": all_sentences}, f, ensure_ascii=False, indent=2)
|
||
print(f"\nSaved sentence index: {index_path}")
|
||
|
||
# Step 3: Load vocab and match
|
||
print(f"\nLoading vocab from {DICT_CSV} ...")
|
||
words_by_stripped = load_vocab(DICT_CSV)
|
||
total_vocab = len({w for wlist in words_by_stripped.values() for w in wlist})
|
||
print(f" {total_vocab} unique vocab words ({len(words_by_stripped)} lookup forms)")
|
||
|
||
print("\nMatching sentences against vocab ...")
|
||
examples_cache = match_sentences(all_sentences, words_by_stripped)
|
||
|
||
# Step 4: Save examples_cache
|
||
cache_path = DATA_DIR / "examples_cache.json"
|
||
with open(cache_path, "w", encoding="utf-8") as f:
|
||
json.dump(examples_cache, f, ensure_ascii=False, indent=2)
|
||
print(f"Saved examples cache: {cache_path}")
|
||
|
||
# Step 5: Summary stats
|
||
print("\n" + "=" * 60)
|
||
print("SUMMARY")
|
||
print("=" * 60)
|
||
print("\nSentences per book:")
|
||
for book_name, count in book_counts.items():
|
||
print(f" {book_name}: {count}")
|
||
print(f" Total: {len(all_sentences)}")
|
||
|
||
print("\nVocab matching:")
|
||
print(f" Total vocab words: {total_vocab}")
|
||
print(f" Words with examples: {len(examples_cache)}")
|
||
coverage = 100 * len(examples_cache) / total_vocab if total_vocab else 0
|
||
print(f" Coverage: {coverage:.1f}%")
|
||
|
||
# Show some sample matches
|
||
print("\nSample matches:")
|
||
count = 0
|
||
for word, sents in examples_cache.items():
|
||
if count >= 5:
|
||
break
|
||
print(f" {word} -> {sents[0][:60]}...")
|
||
count += 1
|
||
|
||
return examples_cache
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|