diff --git a/epub_examples.py b/epub_examples.py new file mode 100644 index 0000000..c5da2a7 --- /dev/null +++ b/epub_examples.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 +""" +Extract example sentences from nikud'd Hebrew EPUBs (and PDFs where possible), +match them against the vocab list, and produce examples_cache.json. + +Usage: + python3 epub_examples.py + +Outputs: + data/epub_sentence_index.json — full sentence corpus + data/examples_cache.json — best sentence(s) per vocab word +""" + +import csv +import json +import os +import re +import zipfile +from html.parser import HTMLParser +from pathlib import Path + +from helpers import strip_nikkud + +DATA_DIR = Path(__file__).parent / "data" +EPUB_DIR = DATA_DIR / "epubs" +DICT_CSV = DATA_DIR / "hebrew_dict_for_anki.csv" + +# Book metadata: filename -> display name +EPUB_BOOKS = { + "little_prince.epub": "הנסיך הקטן", + "time_tunnel_82.epub": "מנהרת הזמן 82", +} + +# PDF books are excluded — pypdf produces garbled RTL text (reversed chars within +# words). If/when a proper EPUB version becomes available on Calibre, add it to +# EPUB_BOOKS above instead. +PDF_BOOKS: dict[str, str] = {} + +# Sentence length bounds (word count) +MIN_WORDS = 4 +MAX_WORDS = 15 + + +# ── HTML text extraction ───────────────────────────────────────── + + +class _TextExtractor(HTMLParser): + """Extract text content from HTML, skipping script/style tags.""" + + SKIP_TAGS = {"script", "style", "head"} + + def __init__(self): + super().__init__() + self.parts: list[str] = [] + self._skip_depth = 0 + + def handle_starttag(self, tag, attrs): + _ = attrs # required by HTMLParser interface + if tag in self.SKIP_TAGS: + self._skip_depth += 1 + # Insert space for block-level elements to avoid word concatenation + if tag in ( + "p", + "div", + "br", + "li", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "td", + "th", + "tr", + "blockquote", + "section", + ): + self.parts.append("\n") + + def handle_endtag(self, tag): + if tag in self.SKIP_TAGS: + self._skip_depth = max(0, self._skip_depth - 1) + + def handle_data(self, data): + if self._skip_depth == 0: + self.parts.append(data) + + def get_text(self) -> str: + return "".join(self.parts) + + +def extract_text_from_html(html: str) -> str: + """Parse HTML and return plain text.""" + parser = _TextExtractor() + parser.feed(html) + return parser.get_text() + + +# ── EPUB processing ────────────────────────────────────────────── + + +def _content_files_from_epub(zf: zipfile.ZipFile) -> list[str]: + """Get ordered list of content XHTML files from the OPF manifest.""" + # Find the OPF file + opf_path = None + for name in zf.namelist(): + if name.endswith(".opf"): + opf_path = name + break + if not opf_path: + # Fallback: just use all xhtml files + return sorted( + n + for n in zf.namelist() + if n.endswith((".xhtml", ".html")) + and "toc" not in n.lower() + and "cover" not in n.lower() + and "nav" not in n.lower() + ) + + # Parse OPF to get spine order + opf_content = zf.read(opf_path).decode("utf-8") + opf_dir = os.path.dirname(opf_path) + + # Extract manifest items: id -> href + manifest = {} + for m in re.finditer(r']*id="([^"]+)"[^>]*href="([^"]+)"', opf_content): + manifest[m.group(1)] = m.group(2) + # Also try reversed attribute order + for m in re.finditer(r']*href="([^"]+)"[^>]*id="([^"]+)"', opf_content): + manifest[m.group(2)] = m.group(1) + + # Extract spine order + spine_ids = re.findall(r']*idref="([^"]+)"', opf_content) + + result = [] + for sid in spine_ids: + href = manifest.get(sid, "") + if href and href.endswith((".xhtml", ".html")): + full_path = os.path.join(opf_dir, href) if opf_dir else href + # Normalize path separators + full_path = full_path.replace("\\", "/") + if full_path in zf.namelist(): + result.append(full_path) + + if not result: + # Fallback + return sorted( + n + for n in zf.namelist() + if n.endswith((".xhtml", ".html")) and "toc" not in n.lower() and "cover" not in n.lower() + ) + return result + + +def extract_sentences_from_epub(epub_path: Path, book_name: str) -> list[dict]: + """Extract sentences from an EPUB file. + + Returns list of {"text": str, "book": str, "stripped": str} + """ + zf = zipfile.ZipFile(epub_path) + content_files = _content_files_from_epub(zf) + + all_text = [] + for cf in content_files: + try: + html = zf.read(cf).decode("utf-8") + except (KeyError, UnicodeDecodeError): + continue + text = extract_text_from_html(html) + all_text.append(text) + + full_text = "\n".join(all_text) + return _split_into_sentences(full_text, book_name) + + +# ── PDF processing ─────────────────────────────────────────────── + + +def extract_sentences_from_pdf(pdf_path: Path, book_name: str) -> list[dict]: + """Extract sentences from a PDF file (best-effort, handles RTL reversal).""" + try: + import pypdf + except ImportError: + print(f" [SKIP] pypdf not installed, cannot process {pdf_path.name}") + return [] + + reader = pypdf.PdfReader(pdf_path) + all_text_parts = [] + + for page in reader.pages: + raw = page.extract_text() + if not raw: + continue + # pypdf often reverses word order for RTL text; fix it + fixed_lines = [] + for line in raw.split("\n"): + words = line.split() + # Check if this line is predominantly Hebrew + hebrew_chars = sum(1 for c in line if "\u0590" <= c <= "\u05ff") + if hebrew_chars > len(line) * 0.3 and len(words) > 1: + # Reverse word order + fixed_lines.append(" ".join(reversed(words))) + else: + fixed_lines.append(line) + all_text_parts.append("\n".join(fixed_lines)) + + full_text = "\n".join(all_text_parts) + return _split_into_sentences(full_text, book_name) + + +# ── Sentence splitting ─────────────────────────────────────────── + +# Hebrew sentence terminators: period, exclamation, question mark, sof pasuk +_SENT_SPLIT = re.compile(r"[.!?\u05C3]+") + +# Punctuation to strip from word boundaries when matching +_PUNCT = re.compile( + r'^[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+|[\u0022\u0027\u05F4\u05F3,;:\-–—…\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+$' +) + + +def _split_into_sentences(text: str, book_name: str) -> list[dict]: + """Split text into sentences and filter by length.""" + # Normalize whitespace + text = re.sub(r"\s+", " ", text).strip() + + raw_sentences = _SENT_SPLIT.split(text) + results = [] + seen = set() + + for sent in raw_sentences: + sent = sent.strip() + if not sent: + continue + + # Count Hebrew words (skip non-Hebrew tokens like numbers) + words = sent.split() + hebrew_words = [w for w in words if any("\u0590" <= c <= "\u05ff" for c in w)] + + if len(hebrew_words) < MIN_WORDS or len(hebrew_words) > MAX_WORDS: + continue + + # Skip duplicates + stripped = strip_nikkud(sent) + if stripped in seen: + continue + seen.add(stripped) + + results.append( + { + "text": sent, + "book": book_name, + "stripped": stripped, + } + ) + + return results + + +# ── Vocab loading ──────────────────────────────────────────────── + + +def load_vocab(csv_path: Path) -> dict: + """Load vocab CSV and return {stripped_form: nikkud_word} mapping. + + Also returns reverse mapping for lookup. + Returns (word_to_nikkud, nikkud_words_set) + """ + words_by_stripped: dict[str, list[str]] = {} # stripped -> [nikkud words] + + with open(csv_path, encoding="utf-8") as f: + reader = csv.DictReader(f, delimiter=";") + for row in reader: + nikkud_word = row.get("Word", "").strip() + word_no_nik = row.get("Word Without Nikkud", "").strip() + if not nikkud_word: + continue + + # Method 1: strip nikkud from the Word column + stripped_from_nikkud = strip_nikkud(nikkud_word) + + # Add both forms for matching + for form in {stripped_from_nikkud, word_no_nik}: + if form: + words_by_stripped.setdefault(form, []).append(nikkud_word) + + return words_by_stripped + + +# ── Matching ───────────────────────────────────────────────────── + + +def match_sentences(sentences: list[dict], words_by_stripped: dict) -> dict: + """Match sentences against vocab words. + + Returns {nikkud_word: [sentences]} with best (shortest) first. + """ + # Build a set of all stripped forms for fast lookup + all_forms = set(words_by_stripped.keys()) + + # Hebrew single-letter prefixes: ב, ה, ו, כ, ל, מ, ש, ד (של) + _HEB_PREFIXES = set("בהוכלמשד") + + # For each sentence, extract stripped words + matches: dict[str, list[tuple[int, str]]] = {} # nikkud_word -> [(word_count, sentence)] + + for sent_info in sentences: + sent_text = sent_info["text"] + sent_stripped = sent_info["stripped"] + word_count = len(sent_text.split()) + + # Get stripped words from the sentence + raw_words = sent_stripped.split() + # Map: candidate_form -> set of original cleaned words that produced it + # This lets us verify that prefix stripping is plausible + candidates: dict[str, str] = {} # form -> original_word + for w in raw_words: + cleaned = _PUNCT.sub("", w) + if not cleaned: + continue + # Direct match (always try) + candidates[cleaned] = cleaned + # Prefix stripping: only if remaining stem is >= 2 chars + # and the prefix char is a known Hebrew prefix letter + for prefix_len in (1, 2): + if len(cleaned) > prefix_len + 1: + prefix = cleaned[:prefix_len] + stem = cleaned[prefix_len:] + if all(c in _HEB_PREFIXES for c in prefix) and len(stem) >= 2: + candidates[stem] = cleaned + + # Check which vocab words appear in this sentence + matched_forms = set(candidates.keys()) & all_forms + for form in matched_forms: + # Skip spurious matches: very short vocab forms (1-2 chars) + # should only match via direct word match, not prefix stripping + if len(form) <= 2 and form not in {_PUNCT.sub("", w) for w in raw_words}: + continue + for nikkud_word in words_by_stripped[form]: + matches.setdefault(nikkud_word, []).append((word_count, sent_text)) + + # Sort by word count (prefer shorter sentences) and deduplicate + result = {} + for nikkud_word, sent_list in matches.items(): + sent_list.sort(key=lambda x: x[0]) + seen = set() + unique = [] + for _, sent in sent_list: + if sent not in seen: + seen.add(sent) + unique.append(sent) + if len(unique) >= 5: # Keep top 5 per word + break + result[nikkud_word] = unique + + return result + + +# ── Main ───────────────────────────────────────────────────────── + + +def main(): + print("=" * 60) + print("EPUB Example Sentence Extraction Pipeline") + print("=" * 60) + + # Step 1: Extract sentences from all books + all_sentences = [] + book_counts = {} + + for filename, book_name in EPUB_BOOKS.items(): + path = EPUB_DIR / filename + if not path.exists(): + print(f"\n[SKIP] {filename} not found") + continue + print(f"\n[EPUB] Extracting: {book_name} ({filename})") + sentences = extract_sentences_from_epub(path, book_name) + book_counts[book_name] = len(sentences) + all_sentences.extend(sentences) + print(f" -> {len(sentences)} sentences") + + for filename, book_name in PDF_BOOKS.items(): + path = EPUB_DIR / filename + if not path.exists(): + print(f"\n[SKIP] {filename} not found") + continue + print(f"\n[PDF] Extracting: {book_name} ({filename})") + sentences = extract_sentences_from_pdf(path, book_name) + book_counts[book_name] = len(sentences) + all_sentences.extend(sentences) + print(f" -> {len(sentences)} sentences") + + print(f"\nTotal sentences: {len(all_sentences)}") + + # Step 2: Save sentence index + index_path = DATA_DIR / "epub_sentence_index.json" + with open(index_path, "w", encoding="utf-8") as f: + json.dump({"sentences": all_sentences}, f, ensure_ascii=False, indent=2) + print(f"\nSaved sentence index: {index_path}") + + # Step 3: Load vocab and match + print(f"\nLoading vocab from {DICT_CSV} ...") + words_by_stripped = load_vocab(DICT_CSV) + total_vocab = len({w for wlist in words_by_stripped.values() for w in wlist}) + print(f" {total_vocab} unique vocab words ({len(words_by_stripped)} lookup forms)") + + print("\nMatching sentences against vocab ...") + examples_cache = match_sentences(all_sentences, words_by_stripped) + + # Step 4: Save examples_cache + cache_path = DATA_DIR / "examples_cache.json" + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(examples_cache, f, ensure_ascii=False, indent=2) + print(f"Saved examples cache: {cache_path}") + + # Step 5: Summary stats + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + print("\nSentences per book:") + for book_name, count in book_counts.items(): + print(f" {book_name}: {count}") + print(f" Total: {len(all_sentences)}") + + print("\nVocab matching:") + print(f" Total vocab words: {total_vocab}") + print(f" Words with examples: {len(examples_cache)}") + coverage = 100 * len(examples_cache) / total_vocab if total_vocab else 0 + print(f" Coverage: {coverage:.1f}%") + + # Show some sample matches + print("\nSample matches:") + count = 0 + for word, sents in examples_cache.items(): + if count >= 5: + break + print(f" {word} -> {sents[0][:60]}...") + count += 1 + + return examples_cache + + +if __name__ == "__main__": + main() diff --git a/rebuild_sentence_matches.py b/rebuild_sentence_matches.py new file mode 100644 index 0000000..1d8b1cb --- /dev/null +++ b/rebuild_sentence_matches.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +""" +Rebuild vocab_sentence_matches.json using both direct word matching +and ktiv male conjugated/declined form matching. + +This dramatically improves sentence coverage by matching not just +dictionary forms but all conjugated verbs and declined nouns. +""" + +import json +import logging +import re +from pathlib import Path + +import pandas as pd + +from helpers import strip_nikkud as _strip_nikkud + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +DATA_DIR = Path(__file__).parent / "data" + + +def main(): + # Load sentences + with open(DATA_DIR / "epub_sentence_index.json") as f: + sentences = json.load(f).get("sentences", []) + logger.info(f"Loaded {len(sentences)} sentences") + + # Load vocab CSV + csv_path = DATA_DIR / "hebrew_dict_for_anki.csv" + try: + df = pd.read_csv(csv_path, sep=";", index_col=0) + if df.shape[1] < 3: + raise ValueError + except (ValueError, pd.errors.ParserError): + df = pd.read_csv(csv_path, index_col=0) + logger.info(f"Loaded {len(df)} vocab entries") + + # Build word lookup: stripped_form → (word_nikkud, word_no_nikkud) + word_lookup: dict[str, list[tuple[str, str]]] = {} + for _, row in df.iterrows(): + word = str(row.get("Word", "")).strip() + wni = str(row.get("Word Without Nikkud", "")).strip() + if not word or word in ("nan", "None"): + continue + stripped = _strip_nikkud(word) + if stripped: + word_lookup.setdefault(stripped, []).append((word, wni)) + + # Load ktiv male forms: ktiv_male_form → [{word_nikkud, form_type, ...}] + ktiv_path = DATA_DIR / "ktiv_male_forms.json" + ktiv_forms: dict[str, list[dict]] = {} + if ktiv_path.exists(): + with open(ktiv_path) as f: + ktiv_forms = json.load(f) + logger.info(f"Loaded {len(ktiv_forms)} ktiv male forms") + else: + logger.warning("No ktiv_male_forms.json — only using direct matching") + + # Build reverse lookup: ktiv_male → set of dictionary words (nikkud) + ktiv_to_word: dict[str, set[str]] = {} + for ktiv, entries in ktiv_forms.items(): + for entry in entries: + word_nikkud = entry.get("word_nikkud", "") + if word_nikkud: + ktiv_to_word.setdefault(ktiv, set()).add(word_nikkud) + + # Also add all vocab words' own stripped forms to ktiv_to_word + for stripped, entries in word_lookup.items(): + for word_nikkud, _ in entries: + ktiv_to_word.setdefault(stripped, set()).add(word_nikkud) + + logger.info(f"Total matchable forms: {len(ktiv_to_word)}") + + # Tokenize all sentences once + sentence_tokens: list[tuple[dict, list[str]]] = [] + for s in sentences: + stripped = s.get("stripped", _strip_nikkud(s.get("text", ""))) + tokens = [re.sub(r'[.,!?;:"\'\u05be]', "", t) for t in stripped.split()] + tokens = [t for t in tokens if t] # remove empty + sentence_tokens.append((s, tokens)) + + # Match: for each sentence token, check ktiv_to_word lookup + # Build word_nikkud → [sentence_info] + matches: dict[str, list[dict]] = {} # word_nikkud → [sentences] + + for sent, tokens in sentence_tokens: + text = sent.get("text", "") + book = sent.get("book", "") + word_len = len(tokens) + + # Skip sentences that are too short or too long + if word_len < 4 or word_len > 15: + continue + + for tok in tokens: + if tok in ktiv_to_word: + for word_nikkud in ktiv_to_word[tok]: + matches.setdefault(word_nikkud, []).append( + { + "text": text, + "book": book, + "matched_form": tok, + "word_count": word_len, + } + ) + + logger.info(f"Words with at least 1 match: {len(matches)}") + + # Deduplicate and limit to 3 best sentences per word + # Prefer shorter sentences (6-12 words ideal) + output: dict[str, dict] = {} + for word_nikkud, sents in matches.items(): + # Deduplicate by text + seen_texts = set() + unique = [] + for s in sents: + if s["text"] not in seen_texts: + seen_texts.add(s["text"]) + unique.append(s) + + # Score: prefer 6-12 word sentences + def score(s): + wc = s["word_count"] + if 6 <= wc <= 12: + return 0 # ideal + return abs(wc - 9) # distance from ideal + + unique.sort(key=score) + best = unique[:3] + + # Find the Word Without Nikkud for this word + stripped = _strip_nikkud(word_nikkud) + wni = stripped # default + if stripped in word_lookup: + for wn, w_wni in word_lookup[stripped]: + if wn == word_nikkud: + wni = w_wni + break + + output[wni] = { + "word_nikkud": word_nikkud, + "sentences": [{"text": s["text"], "book": s["book"]} for s in best], + } + + # Save + out_path = DATA_DIR / "vocab_sentence_matches.json" + with open(out_path, "w") as f: + json.dump(output, f, ensure_ascii=False, indent=1) + + total_sents = sum(len(v["sentences"]) for v in output.values()) + logger.info(f"Saved {len(output)} words with {total_sents} sentences → {out_path}") + + # Stats + total_vocab = len(df) + pct = len(output) * 100 / total_vocab + logger.info(f"Coverage: {len(output)}/{total_vocab} ({pct:.1f}%)") + + # Breakdown by match type + direct_only = 0 + ktiv_only = 0 + both = 0 + for _wni, info in output.items(): + word = info["word_nikkud"] + stripped = _strip_nikkud(word) + has_direct = stripped in word_lookup + has_ktiv = any(s.get("matched_form", "") != stripped for s in info["sentences"]) + if has_direct and has_ktiv: + both += 1 + elif has_ktiv: + ktiv_only += 1 + else: + direct_only += 1 + + logger.info(f" Direct matches only: {direct_only}") + logger.info(f" Ktiv male matches only: {ktiv_only}") + logger.info(f" Both: {both}") + + +if __name__ == "__main__": + main()