Exposes _try_strip_prefix under a public name so the upcoming sentence_difficulty module can reuse Hebrew prefix stripping logic without duplicating it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
898 lines
31 KiB
Python
898 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Extract example sentences from nikud'd Hebrew EPUB files, match them against
|
||
the vocabulary list in data/words.json, and write matched examples back into
|
||
words.json.
|
||
|
||
Usage (standalone):
|
||
python3 epub_examples.py
|
||
|
||
Called from run.py via:
|
||
run(words) — words dict is passed in and updated in place
|
||
"""
|
||
|
||
import logging
|
||
import os
|
||
import re
|
||
import zipfile
|
||
from html.parser import HTMLParser
|
||
from pathlib import Path
|
||
|
||
from helpers import strip_nikkud
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DATA_DIR = Path(__file__).parent / "data"
|
||
EPUB_DIR = DATA_DIR / "epubs"
|
||
WORDS_JSON = DATA_DIR / "words.json"
|
||
|
||
|
||
# Book metadata: filename -> display name
|
||
def _discover_epubs() -> dict[str, str]:
|
||
"""Auto-discover all .epub and .txt files in EPUB_DIR, returning {filepath: display_name}."""
|
||
if not EPUB_DIR.exists():
|
||
return {}
|
||
books: dict[str, str] = {}
|
||
for path in sorted(EPUB_DIR.glob("*.epub")):
|
||
stem = path.stem
|
||
stem_stripped = strip_nikkud(stem).lower()
|
||
# Derive a brief English display name from the filename
|
||
parts = stem.split(" -- ")
|
||
title_part = strip_nikkud(parts[0]).strip().lower()
|
||
if "alice" in stem_stripped or "אליס" in title_part:
|
||
name = "alice_wonderland"
|
||
elif "little_prince" in stem_stripped or "נסיך" in title_part:
|
||
name = "little_prince"
|
||
elif "מנהרת" in title_part or "time_tunnel" in stem_stripped:
|
||
num_match = re.search(r"(\d+)", stem_stripped)
|
||
num = num_match.group(1) if num_match else stem_stripped.replace("time_tunnel_", "")
|
||
name = f"time_tunnel_{num}"
|
||
else:
|
||
name = stem_stripped[:40]
|
||
books[str(path)] = name
|
||
# Also discover plain-text files (e.g. Ben Yehuda downloads)
|
||
for path in sorted(EPUB_DIR.glob("*.txt")):
|
||
books[str(path)] = path.stem
|
||
return books
|
||
|
||
|
||
# Sentence length bounds (word count)
|
||
MIN_WORDS = 3
|
||
MAX_WORDS = 15
|
||
|
||
|
||
# ── HTML text extraction ─────────────────────────────────────────
|
||
|
||
|
||
class _TextExtractor(HTMLParser):
|
||
"""Extract text content from HTML, skipping script/style tags."""
|
||
|
||
SKIP_TAGS = {"script", "style", "head"}
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.parts: list[str] = []
|
||
self._skip_depth = 0
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
_ = attrs # required by HTMLParser interface
|
||
if tag in self.SKIP_TAGS:
|
||
self._skip_depth += 1
|
||
# Insert newline for block-level elements to avoid word concatenation
|
||
if tag in (
|
||
"p",
|
||
"div",
|
||
"br",
|
||
"li",
|
||
"h1",
|
||
"h2",
|
||
"h3",
|
||
"h4",
|
||
"h5",
|
||
"h6",
|
||
"td",
|
||
"th",
|
||
"tr",
|
||
"blockquote",
|
||
"section",
|
||
):
|
||
self.parts.append("\n")
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag in self.SKIP_TAGS:
|
||
self._skip_depth = max(0, self._skip_depth - 1)
|
||
|
||
def handle_data(self, data):
|
||
if self._skip_depth == 0:
|
||
self.parts.append(data)
|
||
|
||
def get_text(self) -> str:
|
||
return "".join(self.parts)
|
||
|
||
|
||
def extract_text_from_html(html: str) -> str:
|
||
"""Parse HTML and return plain text."""
|
||
parser = _TextExtractor()
|
||
parser.feed(html)
|
||
return parser.get_text()
|
||
|
||
|
||
# ── EPUB processing ──────────────────────────────────────────────
|
||
|
||
|
||
def _content_files_from_epub(zf: zipfile.ZipFile) -> list[str]:
|
||
"""Get ordered list of content XHTML files from the OPF manifest."""
|
||
opf_path = None
|
||
for name in zf.namelist():
|
||
if name.endswith(".opf"):
|
||
opf_path = name
|
||
break
|
||
if not opf_path:
|
||
# Fallback: just use all xhtml files
|
||
return sorted(
|
||
n
|
||
for n in zf.namelist()
|
||
if n.endswith((".xhtml", ".html"))
|
||
and "toc" not in n.lower()
|
||
and "cover" not in n.lower()
|
||
and "nav" not in n.lower()
|
||
)
|
||
|
||
# Parse OPF to get spine order
|
||
opf_content = zf.read(opf_path).decode("utf-8")
|
||
opf_dir = os.path.dirname(opf_path)
|
||
|
||
# Extract manifest items: id -> href
|
||
manifest: dict[str, str] = {}
|
||
for m in re.finditer(r'<item\s+[^>]*id="([^"]+)"[^>]*href="([^"]+)"', opf_content):
|
||
manifest[m.group(1)] = m.group(2)
|
||
# Also try reversed attribute order
|
||
for m in re.finditer(r'<item\s+[^>]*href="([^"]+)"[^>]*id="([^"]+)"', opf_content):
|
||
manifest[m.group(2)] = m.group(1)
|
||
|
||
# Extract spine order
|
||
spine_ids = re.findall(r'<itemref\s+[^>]*idref="([^"]+)"', opf_content)
|
||
|
||
result = []
|
||
for sid in spine_ids:
|
||
href = manifest.get(sid, "")
|
||
if href and href.endswith((".xhtml", ".html")):
|
||
full_path = os.path.join(opf_dir, href) if opf_dir else href
|
||
# Normalize path separators
|
||
full_path = full_path.replace("\\", "/")
|
||
if full_path in zf.namelist():
|
||
result.append(full_path)
|
||
|
||
if not result:
|
||
# Fallback
|
||
return sorted(
|
||
n
|
||
for n in zf.namelist()
|
||
if n.endswith((".xhtml", ".html")) and "toc" not in n.lower() and "cover" not in n.lower()
|
||
)
|
||
return result
|
||
|
||
|
||
def extract_sentences_from_epub(epub_path: Path, book_name: str) -> list[dict]:
|
||
"""Extract sentences from an EPUB file.
|
||
|
||
Args:
|
||
epub_path: Path to the .epub file.
|
||
book_name: Human-readable book name used as the ``source`` field.
|
||
|
||
Returns:
|
||
List of ``{"text": str, "source": str}`` dicts.
|
||
"""
|
||
zf = zipfile.ZipFile(epub_path)
|
||
content_files = _content_files_from_epub(zf)
|
||
|
||
all_text = []
|
||
for cf in content_files:
|
||
try:
|
||
html = zf.read(cf).decode("utf-8")
|
||
except (KeyError, UnicodeDecodeError):
|
||
continue
|
||
text = extract_text_from_html(html)
|
||
all_text.append(text)
|
||
|
||
full_text = "\n".join(all_text)
|
||
return _split_into_sentences(full_text, book_name)
|
||
|
||
|
||
def extract_sentences_from_text(text_path: Path, book_name: str) -> list[dict]:
|
||
"""Extract sentences from a plain-text file (e.g. Ben Yehuda downloads).
|
||
|
||
Args:
|
||
text_path: Path to the .txt file.
|
||
book_name: Human-readable book name used as the ``source`` field.
|
||
|
||
Returns:
|
||
List of ``{"text": str, "source": str}`` dicts.
|
||
"""
|
||
full_text = text_path.read_text(encoding="utf-8")
|
||
return _split_into_sentences(full_text, book_name)
|
||
|
||
|
||
# ── Sentence splitting ───────────────────────────────────────────
|
||
|
||
# Hebrew sentence terminators: period, exclamation, question mark, sof pasuk
|
||
_SENT_SPLIT = re.compile(r"[.!?\u05C3]+")
|
||
|
||
# Punctuation to strip from word boundaries when matching
|
||
_PUNCT = re.compile(
|
||
r'^[\u0022\u0027\u05F4\u05F3,;:\-\u2013\u2014\u2026\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+|'
|
||
r'[\u0022\u0027\u05F4\u05F3,;:\-\u2013\u2014\u2026\u201C\u201D\u201E\u201F\u2018\u2019()\[\]{}«»"\']+$'
|
||
)
|
||
|
||
|
||
def _split_into_sentences(text: str, book_name: str) -> list[dict]:
|
||
"""Split text into Hebrew sentences and filter by word count.
|
||
|
||
Args:
|
||
text: Raw extracted text from an EPUB chapter.
|
||
book_name: Source label for each sentence dict.
|
||
|
||
Returns:
|
||
List of ``{"text": str, "source": str}`` dicts, deduplicated by exact text.
|
||
"""
|
||
# Normalize whitespace
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
|
||
raw_sentences = _SENT_SPLIT.split(text)
|
||
results: list[dict] = []
|
||
seen: set[str] = set()
|
||
|
||
for sent in raw_sentences:
|
||
sent = sent.strip()
|
||
if not sent:
|
||
continue
|
||
|
||
# Count Hebrew words (skip non-Hebrew tokens like numbers)
|
||
words = sent.split()
|
||
hebrew_words = [w for w in words if any("\u0590" <= c <= "\u05ff" for c in w)]
|
||
|
||
if len(hebrew_words) < MIN_WORDS or len(hebrew_words) > MAX_WORDS:
|
||
continue
|
||
|
||
# Deduplicate by exact nikkud text
|
||
if sent in seen:
|
||
continue
|
||
seen.add(sent)
|
||
|
||
results.append({"text": sent, "source": book_name})
|
||
|
||
return results
|
||
|
||
|
||
# ── Nikkud index ─────────────────────────────────────────────────
|
||
|
||
# Unicode ranges for Hebrew combining marks
|
||
_NIKKUD_LOW = 0x05B0 # start of vowel points (shva)
|
||
_NIKKUD_HIGH = 0x05BD # end of vowel range (meteg); 0x05BE is maqaf (punctuation)
|
||
_DAGESH = "\u05bc"
|
||
_SHIN_DOT = "\u05c1"
|
||
_SIN_DOT = "\u05c2"
|
||
|
||
# Valid prefix consonants
|
||
_PREFIX_CONSONANTS = set("בהוכלמש")
|
||
|
||
# Named vowel combining marks
|
||
_SHVA = "\u05b0"
|
||
_HIRIQ = "\u05b4"
|
||
_TSERE = "\u05b5"
|
||
_SEGOL = "\u05b6"
|
||
_PATACH = "\u05b7"
|
||
_QAMATZ = "\u05b8"
|
||
|
||
# Valid nikkud patterns on each prefix consonant.
|
||
# Key = consonant, Value = set of frozensets of combining marks valid for that prefix.
|
||
_VALID_PREFIX_MARKS: dict[str, set[frozenset]] = {
|
||
"ב": {
|
||
frozenset({_SHVA, _DAGESH}), # בְּ standard
|
||
frozenset({_HIRIQ, _DAGESH}), # בִּ before shva
|
||
frozenset({_PATACH, _DAGESH}), # בַּ with definite article
|
||
frozenset({_QAMATZ, _DAGESH}), # בָּ before chataf qamatz
|
||
frozenset({_SEGOL, _DAGESH}), # בֶּ before chataf segol
|
||
},
|
||
"כ": {
|
||
frozenset({_SHVA, _DAGESH}), # כְּ
|
||
frozenset({_HIRIQ, _DAGESH}), # כִּ
|
||
frozenset({_PATACH, _DAGESH}), # כַּ
|
||
frozenset({_QAMATZ, _DAGESH}), # כָּ
|
||
frozenset({_SEGOL, _DAGESH}), # כֶּ
|
||
},
|
||
"ל": {
|
||
frozenset({_SHVA}), # לְ standard
|
||
frozenset({_HIRIQ}), # לִ before shva
|
||
frozenset({_PATACH}), # לַ with definite article
|
||
frozenset({_QAMATZ}), # לָ demonstratives
|
||
frozenset({_SEGOL}), # לֶ before chataf segol
|
||
},
|
||
"ו": {
|
||
frozenset({_SHVA}), # וְ standard
|
||
frozenset({_DAGESH}), # וּ (shureq) before shva/bumf
|
||
frozenset({_PATACH}), # וַ before chataf patach
|
||
frozenset({_QAMATZ}), # וָ before chataf qamatz
|
||
frozenset({_SEGOL}), # וֶ before chataf segol
|
||
frozenset({_HIRIQ}), # וִ before yud-shva
|
||
},
|
||
"מ": {
|
||
frozenset({_HIRIQ}), # מִ standard
|
||
frozenset({_TSERE}), # מֵ before gutturals
|
||
},
|
||
"ש": {
|
||
frozenset({_SEGOL, _DAGESH}), # שֶׁ standard
|
||
frozenset({_SEGOL, _DAGESH, _SHIN_DOT}), # שֶׁ with explicit shin dot
|
||
},
|
||
"ה": {
|
||
frozenset({_PATACH}), # הַ standard definite article
|
||
frozenset({_QAMATZ}), # הָ before gutturals
|
||
frozenset({_SEGOL}), # הֶ before qamatz-bearing gutturals
|
||
},
|
||
}
|
||
|
||
|
||
def _is_combining_mark(ch: str) -> bool:
|
||
"""Return True if ch is a Hebrew combining mark (nikkud, dagesh, or dots)."""
|
||
cp = ord(ch)
|
||
if _NIKKUD_LOW <= cp <= _NIKKUD_HIGH:
|
||
return True
|
||
return ch in (_DAGESH, _SHIN_DOT, _SIN_DOT)
|
||
|
||
|
||
def _decompose_first_char(token: str) -> tuple[str, frozenset, str]:
|
||
"""Split token into (first_consonant, its_combining_marks, remainder).
|
||
|
||
Args:
|
||
token: A nikkud Hebrew token string.
|
||
|
||
Returns:
|
||
A tuple of (consonant, marks, rest). Returns ("", frozenset(), token)
|
||
if the token does not start with a Hebrew consonant (alef–tav range).
|
||
"""
|
||
if not token:
|
||
return ("", frozenset(), token)
|
||
|
||
first = token[0]
|
||
# Check it's a Hebrew consonant (alef–tav)
|
||
if not ("\u05d0" <= first <= "\u05ea"):
|
||
return ("", frozenset(), token)
|
||
|
||
# Collect all combining marks that follow the consonant
|
||
marks: set[str] = set()
|
||
i = 1
|
||
while i < len(token):
|
||
ch = token[i]
|
||
if _is_combining_mark(ch):
|
||
marks.add(ch)
|
||
i += 1
|
||
else:
|
||
break
|
||
|
||
return (first, frozenset(marks), token[i:])
|
||
|
||
|
||
def _is_valid_prefix(consonant: str, marks: frozenset) -> bool:
|
||
"""Check if consonant + marks form a valid Hebrew prefix combination.
|
||
|
||
Args:
|
||
consonant: The prefix consonant character.
|
||
marks: Frozenset of combining mark characters on that consonant.
|
||
|
||
Returns:
|
||
True if this is a recognised Hebrew prefix vocalization.
|
||
"""
|
||
valid = _VALID_PREFIX_MARKS.get(consonant)
|
||
if not valid:
|
||
return False
|
||
# For ש, allow shin dot to be present or absent
|
||
if consonant == "ש":
|
||
marks_without_shin = marks - {_SHIN_DOT}
|
||
return marks_without_shin in valid or marks in valid
|
||
return marks in valid
|
||
|
||
|
||
def _rebuild_token(consonant: str, marks: frozenset, rest: str) -> str:
|
||
"""Reassemble a token from its decomposed parts, sorting marks by codepoint."""
|
||
return consonant + "".join(sorted(marks)) + rest
|
||
|
||
|
||
def _try_strip_prefix(token: str, nikkud_index: dict) -> list[tuple[str, str, str]]:
|
||
"""Try stripping 1 or 2 prefix letters from a nikkud token.
|
||
|
||
Args:
|
||
token: A cleaned nikkud word token.
|
||
nikkud_index: Mapping from nikkud form to list of (unique_key, match_type).
|
||
|
||
Returns:
|
||
List of (unique_key, match_type, matched_remainder) for each hit found.
|
||
The match_type will have ``"_prefix"`` appended to the base type.
|
||
"""
|
||
results: list[tuple[str, str, str]] = []
|
||
|
||
# Try 1-letter prefix
|
||
c1, m1, rest1 = _decompose_first_char(token)
|
||
if not (c1 and _is_valid_prefix(c1, m1) and rest1):
|
||
return results
|
||
|
||
# Direct match on 1-prefix remainder
|
||
if rest1 in nikkud_index:
|
||
for unique_key, match_type in nikkud_index[rest1]:
|
||
results.append((unique_key, match_type + "_prefix", rest1))
|
||
|
||
# Try removing dagesh from first letter of remainder
|
||
# (handles absorbed definite article: לַמֶּלֶךְ → מֶּלֶךְ → מֶלֶךְ)
|
||
c2, m2, rest2_inner = _decompose_first_char(rest1)
|
||
if c2 and _DAGESH in m2:
|
||
without_dagesh = _rebuild_token(c2, m2 - {_DAGESH}, rest2_inner)
|
||
if without_dagesh != rest1 and without_dagesh in nikkud_index:
|
||
for unique_key, match_type in nikkud_index[without_dagesh]:
|
||
results.append((unique_key, match_type + "_prefix", without_dagesh))
|
||
|
||
# Try 2-letter prefix (ו and ש commonly stack with another prefix)
|
||
if c1 in "וש":
|
||
c2b, m2b, rest2b = _decompose_first_char(rest1)
|
||
if c2b and c2b in _PREFIX_CONSONANTS and _is_valid_prefix(c2b, m2b) and rest2b:
|
||
if rest2b in nikkud_index:
|
||
for unique_key, match_type in nikkud_index[rest2b]:
|
||
results.append((unique_key, match_type + "_prefix", rest2b))
|
||
|
||
# Also try dagesh removal on remainder of 2-letter prefix
|
||
c3, m3, rest3_inner = _decompose_first_char(rest2b)
|
||
if c3 and _DAGESH in m3:
|
||
without_dagesh2 = _rebuild_token(c3, m3 - {_DAGESH}, rest3_inner)
|
||
if without_dagesh2 != rest2b and without_dagesh2 in nikkud_index:
|
||
for unique_key, match_type in nikkud_index[without_dagesh2]:
|
||
results.append((unique_key, match_type + "_prefix", without_dagesh2))
|
||
|
||
return results
|
||
|
||
|
||
# Public alias for use by sentence_difficulty module
|
||
try_strip_prefix = _try_strip_prefix
|
||
|
||
|
||
def _build_nikkud_index(words: dict) -> dict[str, list[tuple[str, str]]]:
|
||
"""Build a mapping from nikkud form to list of (unique_key, match_type).
|
||
|
||
Indexes the following sources per entry:
|
||
|
||
- ``word.nikkud`` → "direct"
|
||
- conjugation active/passive forms → "conjugated"
|
||
- conjugation infinitive and reference_form → "conjugated"
|
||
- noun inflection singular/plural/construct/pronominal → "inflected"
|
||
|
||
Args:
|
||
words: The full words.json dict keyed by unique_key.
|
||
|
||
Returns:
|
||
Dict mapping each nikkud form to a list of (unique_key, match_type) tuples.
|
||
"""
|
||
index: dict[str, list[tuple[str, str]]] = {}
|
||
|
||
def _add(form: str | None, unique_key: str, match_type: str) -> None:
|
||
if form:
|
||
index.setdefault(form, []).append((unique_key, match_type))
|
||
|
||
for unique_key, entry in words.items():
|
||
# Direct word form
|
||
word = entry.get("word") or {}
|
||
_add(word.get("nikkud"), unique_key, "direct")
|
||
|
||
# Conjugation forms
|
||
conj = entry.get("conjugation") or {}
|
||
|
||
for form_entry in conj.get("active_forms") or []:
|
||
form = (form_entry.get("form") or {}).get("nikkud")
|
||
_add(form, unique_key, "conjugated")
|
||
|
||
for form_entry in conj.get("hufal_pual_forms") or []:
|
||
form = (form_entry.get("form") or {}).get("nikkud")
|
||
_add(form, unique_key, "conjugated")
|
||
|
||
inf = conj.get("infinitive") or {}
|
||
_add(inf.get("nikkud"), unique_key, "conjugated")
|
||
|
||
ref = conj.get("reference_form") or {}
|
||
_add(ref.get("nikkud"), unique_key, "conjugated")
|
||
|
||
# Noun inflection forms
|
||
noun = entry.get("noun_inflection") or {}
|
||
|
||
for field in ("singular", "plural", "construct_singular", "construct_plural"):
|
||
sub = noun.get(field) or {}
|
||
form = sub.get("nikkud")
|
||
_add(form, unique_key, "inflected")
|
||
# Index construct forms without maqaf too — modern text often
|
||
# writes smichut as two space-separated words without maqaf
|
||
if form and form.endswith("־"):
|
||
_add(form[:-1], unique_key, "inflected")
|
||
|
||
pronominal = noun.get("pronominal_suffixes") or {}
|
||
for _person, sub in pronominal.items():
|
||
if isinstance(sub, dict):
|
||
_add(sub.get("nikkud"), unique_key, "inflected")
|
||
|
||
return index
|
||
|
||
|
||
def _filter_collision_forms(nikkud_index: dict) -> dict:
|
||
"""Remove colliding forms for entries that have other unique forms.
|
||
|
||
A "colliding form" maps to 2+ unique_keys. For each unique_key that
|
||
appears in a collision, check whether it also has at least one
|
||
non-colliding form in the index. If so, remove it from the colliding
|
||
form's entry list. If a unique_key's *only* indexed forms all collide,
|
||
keep them (otherwise the entry would get zero matches).
|
||
|
||
Returns a new index dict with the same structure.
|
||
"""
|
||
# Identify collision forms and build reverse map (key → its forms)
|
||
collision_forms: set[str] = set()
|
||
key_to_forms: dict[str, set[str]] = {}
|
||
|
||
for form, entries in nikkud_index.items():
|
||
keys = {uk for uk, _ in entries}
|
||
if len(keys) >= 2:
|
||
collision_forms.add(form)
|
||
for uk, _ in entries:
|
||
key_to_forms.setdefault(uk, set()).add(form)
|
||
|
||
# For each key, check if it has any non-colliding form
|
||
keys_with_unique_forms: set[str] = set()
|
||
for uk, forms in key_to_forms.items():
|
||
if forms - collision_forms:
|
||
keys_with_unique_forms.add(uk)
|
||
|
||
# Build filtered index
|
||
filtered: dict[str, list[tuple[str, str]]] = {}
|
||
removed = 0
|
||
for form, entries in nikkud_index.items():
|
||
if form in collision_forms:
|
||
kept = [(uk, mt) for uk, mt in entries if uk not in keys_with_unique_forms]
|
||
removed += len(entries) - len(kept)
|
||
if kept:
|
||
filtered[form] = kept
|
||
else:
|
||
filtered[form] = entries
|
||
|
||
logger.info(f" Filtered {removed} collision mappings from entries with unique forms")
|
||
return filtered
|
||
|
||
|
||
# ── Matching ─────────────────────────────────────────────────────
|
||
|
||
|
||
def match_sentences(
|
||
sentences: list[dict],
|
||
nikkud_index: dict,
|
||
confusable_keys: set[str],
|
||
) -> dict:
|
||
"""Match sentences to vocab words using the nikkud index.
|
||
|
||
Args:
|
||
sentences: List of ``{"text": str, "source": str}`` dicts.
|
||
nikkud_index: Output of ``_build_nikkud_index``.
|
||
confusable_keys: Set of unique_keys that are in confusable groups.
|
||
|
||
Returns:
|
||
Dict mapping unique_key → list of match dicts, each containing:
|
||
``text``, ``source``, ``match_method``, ``word_count``,
|
||
``matched_form``, ``char_offset``, ``char_end``.
|
||
"""
|
||
matches: dict[str, list[dict]] = {}
|
||
|
||
for sent_info in sentences:
|
||
text = sent_info["text"]
|
||
source = sent_info["source"]
|
||
words_in_sent = text.split()
|
||
word_count = len(words_in_sent)
|
||
|
||
char_pos = 0
|
||
for raw_word in words_in_sent:
|
||
cleaned = _PUNCT.sub("", raw_word)
|
||
if not cleaned:
|
||
word_start = text.find(raw_word, char_pos)
|
||
char_pos = word_start + len(raw_word) if word_start >= 0 else char_pos
|
||
continue
|
||
|
||
# Locate positions within the sentence
|
||
word_start_in_sent = text.find(raw_word, char_pos)
|
||
if word_start_in_sent < 0:
|
||
word_start_in_sent = char_pos
|
||
clean_offset_in_raw = raw_word.find(cleaned)
|
||
if clean_offset_in_raw < 0:
|
||
clean_offset_in_raw = 0
|
||
clean_start = word_start_in_sent + clean_offset_in_raw
|
||
clean_end = clean_start + len(cleaned)
|
||
|
||
found: list[tuple[str, str]] = []
|
||
|
||
# Direct nikkud match
|
||
if cleaned in nikkud_index:
|
||
for unique_key, match_type in nikkud_index[cleaned]:
|
||
found.append((unique_key, match_type))
|
||
|
||
# Prefix stripping — only if no direct match exists
|
||
if cleaned not in nikkud_index:
|
||
for unique_key, match_type, _remainder in _try_strip_prefix(cleaned, nikkud_index):
|
||
found.append((unique_key, match_type))
|
||
|
||
for unique_key, match_method in found:
|
||
matches.setdefault(unique_key, []).append(
|
||
{
|
||
"text": text,
|
||
"source": source,
|
||
"match_method": match_method,
|
||
"word_count": word_count,
|
||
"matched_form": cleaned,
|
||
"char_offset": clean_start,
|
||
"char_end": clean_end,
|
||
}
|
||
)
|
||
|
||
char_pos = word_start_in_sent + len(raw_word)
|
||
|
||
return matches
|
||
|
||
|
||
# ── Writing results ──────────────────────────────────────────────
|
||
|
||
|
||
def update_words_json(words: dict, matches: dict, confusable_keys: set[str]) -> int:
|
||
"""Update words dict entries with matched example sentences.
|
||
|
||
Selects up to 3 best sentences per word (scoring prefers 6–12 word
|
||
sentences and non-prefix matches). Also generates a cloze entry for
|
||
the top match, unless the word is in the confusable set.
|
||
|
||
Args:
|
||
words: The full words.json dict, modified in place.
|
||
matches: Output of ``match_sentences``.
|
||
confusable_keys: Set of unique_keys in confusable groups.
|
||
|
||
Returns:
|
||
Count of words.json entries that were updated.
|
||
"""
|
||
import genanki # noqa: PLC0415 — import only where needed
|
||
|
||
updated = 0
|
||
|
||
for unique_key, sent_list in matches.items():
|
||
if unique_key not in words:
|
||
continue
|
||
|
||
entry = words[unique_key]
|
||
|
||
# Deduplicate by sentence text
|
||
seen_texts: set[str] = set()
|
||
unique: list[dict] = []
|
||
for s in sent_list:
|
||
if s["text"] not in seen_texts:
|
||
seen_texts.add(s["text"])
|
||
unique.append(s)
|
||
|
||
# Prefer direct matches; only fall back to prefix if none exist
|
||
direct = [s for s in unique if "prefix" not in s["match_method"]]
|
||
prefix_only = [s for s in unique if "prefix" in s["match_method"]]
|
||
pool = direct if direct else prefix_only
|
||
|
||
# Score: prefer 6–12 word sentences
|
||
def _score(s: dict) -> tuple[int,]:
|
||
wc = s["word_count"]
|
||
length_score = abs(wc - 9) if not (6 <= wc <= 12) else 0
|
||
return (length_score,)
|
||
|
||
pool.sort(key=_score)
|
||
best = pool[:3]
|
||
|
||
# Build vetted list
|
||
if not entry.get("examples"):
|
||
entry["examples"] = {}
|
||
examples: dict = entry["examples"]
|
||
examples["vetted"] = [
|
||
{
|
||
"text": s["text"],
|
||
"source": s["source"],
|
||
"match_method": s["match_method"],
|
||
}
|
||
for s in best
|
||
]
|
||
|
||
# Build cloze from best sentence (skip confusables)
|
||
is_confusable = unique_key in confusable_keys
|
||
if not is_confusable and best:
|
||
top = best[0]
|
||
# Preserve existing cloze_guid if sentence text unchanged
|
||
old_cloze = examples.get("cloze") or {}
|
||
if old_cloze.get("text") == top["text"]:
|
||
cloze_guid = old_cloze.get("cloze_guid")
|
||
else:
|
||
cloze_guid = genanki.guid_for("cloze", unique_key)
|
||
|
||
examples["cloze"] = {
|
||
"text": top["text"],
|
||
"cloze_word_start": top["char_offset"],
|
||
"cloze_word_end": top["char_end"],
|
||
"cloze_hint": None,
|
||
"cloze_guid": cloze_guid,
|
||
}
|
||
elif is_confusable:
|
||
examples.pop("cloze", None)
|
||
|
||
examples["rejected_count"] = 0
|
||
updated += 1
|
||
|
||
# Deduplicate shared examples across confusable groups
|
||
cleared = _deduplicate_confusable_examples(words)
|
||
if cleared:
|
||
logger.info(f" Cleared shared examples from {cleared} confusable entries")
|
||
|
||
return updated
|
||
|
||
|
||
def _deduplicate_confusable_examples(words: dict) -> int:
|
||
"""Remove shared examples from less-common confusable group members.
|
||
|
||
After example matching assigns sentences, confusable entries often share
|
||
identical examples (matched via shared nikkud forms). This function keeps
|
||
examples only on the highest-frequency member, clearing others.
|
||
|
||
Args:
|
||
words: The full words.json dict, modified in place (examples already
|
||
assigned).
|
||
|
||
Returns:
|
||
Count of entries whose examples were cleared.
|
||
"""
|
||
from collections import defaultdict
|
||
|
||
# Build confusable group map: group_id → [unique_key, ...]
|
||
group_map: dict[tuple[str, ...], list[str]] = defaultdict(list)
|
||
for key, entry in words.items():
|
||
cg = entry.get("confusable_group")
|
||
if cg:
|
||
group_id = tuple(sorted(cg))
|
||
group_map[group_id].append(key)
|
||
|
||
cleared = 0
|
||
|
||
for _group_id, members in group_map.items():
|
||
if len(members) < 2:
|
||
continue
|
||
|
||
# Collect vetted sentence text sets per member
|
||
member_texts: dict[str, frozenset[str]] = {}
|
||
for key in members:
|
||
vetted = (words[key].get("examples") or {}).get("vetted") or []
|
||
texts = frozenset(e.get("text", "") for e in vetted)
|
||
member_texts[key] = texts
|
||
|
||
# Find members with identical non-empty sentence sets
|
||
# Group members by their sentence set
|
||
text_groups: dict[frozenset[str], list[str]] = defaultdict(list)
|
||
for key, texts in member_texts.items():
|
||
if texts: # skip entries with no examples
|
||
text_groups[texts].append(key)
|
||
|
||
# For each set of members sharing identical examples, keep only the
|
||
# highest-frequency one
|
||
for _texts, sharing_keys in text_groups.items():
|
||
if len(sharing_keys) < 2:
|
||
continue
|
||
|
||
# Sort by frequency_rank (lower = more common = winner).
|
||
# No frequency → sort last (use large sentinel).
|
||
# Tie-break: alphabetical by unique_key.
|
||
def _sort_key(k: str) -> tuple[int, str]:
|
||
rank = words[k].get("frequency_rank")
|
||
return (rank if rank is not None else 999999, k)
|
||
|
||
sharing_keys.sort(key=_sort_key)
|
||
winner = sharing_keys[0]
|
||
losers = sharing_keys[1:]
|
||
|
||
for loser_key in losers:
|
||
entry = words[loser_key]
|
||
examples = entry.get("examples") or {}
|
||
examples["vetted"] = []
|
||
examples.pop("cloze", None)
|
||
entry["examples"] = examples
|
||
cleared += 1
|
||
logger.debug(f" Cleared examples from {loser_key} (kept on {winner})")
|
||
|
||
return cleared
|
||
|
||
|
||
# ── Public API ───────────────────────────────────────────────────
|
||
|
||
|
||
def run(words: dict) -> dict:
|
||
"""Extract EPUB sentences, match against words, update words dict in place.
|
||
|
||
Called from run.py with the already-loaded words.json dict.
|
||
|
||
Args:
|
||
words: The full words.json dict keyed by unique_key. Modified in place.
|
||
|
||
Returns:
|
||
Summary stats dict with keys ``books``, ``matched``, ``total_vocab``.
|
||
"""
|
||
logger.info(" Extracting sentences from EPUBs ...")
|
||
all_sentences: list[dict] = []
|
||
book_counts: dict[str, int] = {}
|
||
|
||
for filepath, book_name in _discover_epubs().items():
|
||
path = Path(filepath)
|
||
if path.suffix == ".txt":
|
||
sentences = extract_sentences_from_text(path, book_name)
|
||
else:
|
||
sentences = extract_sentences_from_epub(path, book_name)
|
||
book_counts[book_name] = len(sentences)
|
||
all_sentences.extend(sentences)
|
||
logger.info(f" {book_name}: {len(sentences)} sentences")
|
||
|
||
if not all_sentences:
|
||
logger.warning(" No EPUB files found — skipping example extraction")
|
||
return {"books": {}, "matched": 0, "total_vocab": len(words)}
|
||
|
||
logger.info(f" Total sentences: {len(all_sentences)}")
|
||
|
||
# Build nikkud index
|
||
logger.info(" Building nikkud index from words.json ...")
|
||
nikkud_index = _build_nikkud_index(words)
|
||
logger.info(f" {len(nikkud_index)} unique nikkud forms indexed")
|
||
|
||
# Filter out collision forms for entries that have unique forms
|
||
nikkud_index = _filter_collision_forms(nikkud_index)
|
||
|
||
# Build confusable key set
|
||
confusable_keys: set[str] = set()
|
||
for key, entry in words.items():
|
||
if entry.get("confusable_group"):
|
||
confusable_keys.add(key)
|
||
|
||
# Match sentences
|
||
logger.info(" Matching sentences against vocab ...")
|
||
matches = match_sentences(all_sentences, nikkud_index, confusable_keys)
|
||
logger.info(f" {len(matches)} words matched")
|
||
|
||
# Break down by match method
|
||
method_counts: dict[str, int] = {}
|
||
for sent_list in matches.values():
|
||
for s in sent_list:
|
||
method = s["match_method"]
|
||
method_counts[method] = method_counts.get(method, 0) + 1
|
||
for method, count in sorted(method_counts.items()):
|
||
logger.info(f" {method}: {count} sentence-word pairs")
|
||
|
||
# Update words dict in place
|
||
updated = update_words_json(words, matches, confusable_keys)
|
||
logger.info(f" Updated {updated} entries in words.json")
|
||
|
||
return {
|
||
"books": book_counts,
|
||
"matched": len(matches),
|
||
"total_vocab": len(words),
|
||
}
|
||
|
||
|
||
# ── Standalone entry point ───────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
import json
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||
|
||
words_path = DATA_DIR / "words.json"
|
||
with open(words_path, encoding="utf-8") as f:
|
||
words = json.load(f)
|
||
|
||
stats = run(words)
|
||
|
||
# Save updated words.json
|
||
with open(words_path, "w", encoding="utf-8") as f:
|
||
json.dump(words, f, ensure_ascii=False, indent=2)
|
||
|
||
coverage = stats["matched"] * 100 / stats["total_vocab"] if stats["total_vocab"] else 0
|
||
logger.info(f" Coverage: {stats['matched']}/{stats['total_vocab']} ({coverage:.1f}%)")
|