hebrew_flash_cards/scripts/clean_frequency_corpus.py
Sochen 3b0f9defa9 feat: YAP-cleaned frequency corpus + two-tier assignment pipeline
- Add clean_frequency_corpus.py: YAP morphological analyzer removes
  prefix+word combos (e.g. בבית=ב+בית) from he_50k frequency data.
  Headwords always protected. 30,430 clean entries from 49,999 raw.
- Add assign_frequency.py: two-tier assignment with PoS-aware homograph
  handling. Tier 1 matches headwords; Tier 2 matches inflections (any rank)
  and conjugations (rank>5000 only, to avoid false positives).
  Function words claim frequency over content words in homograph groups,
  with manual overrides for 12 common dual-use words.
- frequency_lookup.py auto-prefers frequency_clean.json when available
- 6,691 entries now have frequency (was 5,974), 717 newly assigned

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 06:22:55 +00:00

400 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Clean the Hebrew frequency corpus by removing prefix+word combinations.
Two modes:
--mode yap (default) Use YAP morphological analyzer for accurate prefix detection.
Requires YAP API running at localhost:8000.
--mode heuristic Use rule-based prefix stripping (no external dependencies).
Both modes preserve words that exist as known dictionary forms in words.json.
Usage:
python3 scripts/clean_frequency_corpus.py # YAP mode
python3 scripts/clean_frequency_corpus.py --mode heuristic # heuristic fallback
python3 scripts/clean_frequency_corpus.py --dry-run # preview only
python3 scripts/clean_frequency_corpus.py --resume # resume YAP from checkpoint
python3 scripts/clean_frequency_corpus.py --limit 1000 # process first N entries
Input: data/frequency_cache.json (raw he_50k.txt, 49999 entries)
Output: data/frequency_clean.json (filtered, prefix combos removed)
data/frequency_discarded.json (discarded entries with reason)
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from pathlib import Path
import requests
logger = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent
RAW_CACHE = PROJECT_ROOT / "data" / "frequency_cache.json"
CLEAN_CACHE = PROJECT_ROOT / "data" / "frequency_clean.json"
DISCARDED = PROJECT_ROOT / "data" / "frequency_discarded.json"
WORDS_JSON = PROJECT_ROOT / "data" / "words.json"
CHECKPOINT = PROJECT_ROOT / "data" / "_yap_checkpoint.json"
YAP_URL = os.environ.get("YAP_URL", "http://localhost:8000/yap/heb/joint")
YAP_TIMEOUT = 10
BATCH_SAVE_INTERVAL = 500
# --- YAP mode constants ---
# POS tags that indicate a prefix
PREFIX_POS = frozenset({"PREPOSITION", "CONJ", "DEF", "REL"})
# POS tags for the host word that make the combo a false positive
HOST_POS = frozenset({"NN", "NNP", "NNT", "PRP", "CD", "DT", "EX"})
# --- Heuristic mode constants ---
# Hebrew prefix combinations, longest first for greedy matching.
PREFIXES = [
# 4-char
"וכשמ",
"וכשב",
"וכשל",
"וכשה",
# 3-char
"וכש",
"ומה",
"ובה",
"וכה",
"ולה",
"ומש",
"ובש",
"וכב",
"ולב",
"ומב",
"וכל",
"ולכ",
"שבה",
"שמה",
# 2-char
"כש",
"מה",
"בה",
"כה",
"לה",
"מש",
"בש",
"וב",
"וה",
"וכ",
"ול",
"ומ",
"וש",
"כב",
"לב",
"מב",
"כל",
"לכ",
"שב",
"שה",
"שכ",
"של",
"שמ",
# 1-char
"ב",
"ה",
"ו",
"כ",
"ל",
"מ",
"ש",
]
MIN_REMAINDER_LEN = 2
def _load_known_forms(words_path: Path) -> set[str]:
"""Load all known ktiv_male forms from words.json."""
if not words_path.exists():
logger.warning("words.json not found at %s — no dictionary filter", words_path)
return set()
with open(words_path, encoding="utf-8") as f:
words = json.load(f)
known: set[str] = set()
for entry in words.values():
w = entry.get("word") or {}
if km := w.get("ktiv_male"):
known.add(km)
for form in entry.get("active_forms") or []:
if isinstance(form, dict) and (km2 := form.get("ktiv_male")):
known.add(km2)
for hp in entry.get("hufal_pual_forms") or []:
if isinstance(hp, dict) and (km3 := hp.get("ktiv_male")):
known.add(km3)
for field in ("noun_inflection", "preposition_inflection", "adjective_inflection"):
for inf_data in (entry.get(field) or {}).values():
if isinstance(inf_data, dict) and (km4 := inf_data.get("ktiv_male")):
known.add(km4)
logger.info("Loaded %d known dictionary forms from words.json", len(known))
return known
# ── YAP mode ──────────────────────────────────────────────────────────────
def query_yap(word: str) -> dict | None:
"""Send a single word to YAP and return the JSON response."""
payload = {"text": f"{word} "}
try:
resp = requests.post(YAP_URL, json=payload, timeout=YAP_TIMEOUT)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logger.warning("YAP request failed for '%s': %s", word, e)
return None
def is_prefix_combo_yap(yap_response: dict) -> tuple[bool, str]:
"""Check if any morphological analysis segments the word as prefix+host.
Conservative: if ANY analysis in the lattice shows prefix+host → discard.
"""
lattice = yap_response.get("ma_lattice", "")
if not lattice:
return False, ""
arcs = []
for line in lattice.strip().split("\n"):
if not line.strip():
continue
parts = line.split("\t")
if len(parts) < 6:
continue
arcs.append(
{
"from": parts[0],
"to": parts[1],
"form": parts[2],
"lemma": parts[3],
"cpos": parts[4],
"pos": parts[5],
}
)
if len(arcs) < 2:
return False, ""
for a in arcs:
if a["cpos"] not in PREFIX_POS and a["pos"] not in PREFIX_POS:
continue
for b in arcs:
if b["from"] != a["to"]:
continue
if b["cpos"] in HOST_POS or b["pos"] in HOST_POS:
reason = f"{a['form']}({a['cpos']})+{b['form']}({b['cpos']})"
return True, reason
return False, ""
# ── Heuristic mode ────────────────────────────────────────────────────────
def find_prefix_decomposition(word: str, freq: dict[str, int]) -> tuple[str, str] | None:
"""Check if word is a prefix+higher-ranked-word combo (heuristic)."""
if len(word) <= MIN_REMAINDER_LEN:
return None
word_rank = freq.get(word, 999999)
for prefix in PREFIXES:
if not word.startswith(prefix):
continue
remainder = word[len(prefix) :]
if len(remainder) < MIN_REMAINDER_LEN:
continue
if remainder in freq and freq[remainder] < word_rank:
return prefix, remainder
return None
# ── Main ──────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="Clean frequency corpus")
parser.add_argument("--mode", choices=["yap", "heuristic"], default="yap", help="Detection mode")
parser.add_argument("--dry-run", action="store_true", help="Show removals without saving")
parser.add_argument("--resume", action="store_true", help="Resume YAP mode from checkpoint")
parser.add_argument("--limit", type=int, default=0, help="Process only first N words (0=all)")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
if not RAW_CACHE.exists():
logger.error("Raw frequency cache not found: %s", RAW_CACHE)
sys.exit(1)
with open(RAW_CACHE, encoding="utf-8") as f:
raw_freq: dict[str, int] = json.load(f)
logger.info("Raw frequency corpus: %d entries", len(raw_freq))
# Sort by rank
words_by_rank = sorted(raw_freq.items(), key=lambda x: x[1])
if args.limit:
words_by_rank = words_by_rank[: args.limit]
if args.mode == "yap":
discarded_list = _run_yap_mode(words_by_rank, args)
else:
known_forms = _load_known_forms(WORDS_JSON)
discarded_list = _run_heuristic_mode(words_by_rank, raw_freq, known_forms)
kept_count = len(words_by_rank) - len(discarded_list)
logger.info("Done. Kept: %d, Discarded: %d", kept_count, len(discarded_list))
if args.dry_run:
logger.info("Dry run — no files written")
return
# Build clean frequency dict (re-ranked without gaps)
discarded_words = {d["word"] for d in discarded_list}
clean_freq: dict[str, int] = {}
new_rank = 1
for word, _rank in words_by_rank:
if word not in discarded_words:
clean_freq[word] = new_rank
new_rank += 1
with open(CLEAN_CACHE, "w", encoding="utf-8") as f:
json.dump(clean_freq, f, ensure_ascii=False)
logger.info("Clean frequency saved: %d entries → %s", len(clean_freq), CLEAN_CACHE)
with open(DISCARDED, "w", encoding="utf-8") as f:
json.dump(discarded_list, f, ensure_ascii=False, indent=2)
logger.info("Discarded entries saved: %d%s", len(discarded_list), DISCARDED)
def _run_yap_mode(
words_by_rank: list[tuple[str, int]],
args: argparse.Namespace,
) -> list[dict]:
"""Run YAP-based prefix detection."""
# Check YAP connectivity
test = query_yap("בדיקה")
if test is None:
logger.error("Cannot connect to YAP API at %s", YAP_URL)
sys.exit(1)
logger.info("YAP API connected")
# Load checkpoint if resuming
analyzed: dict[str, dict] = {}
if args.resume and CHECKPOINT.exists():
with open(CHECKPOINT, encoding="utf-8") as f:
analyzed = json.load(f)
logger.info("Resumed from checkpoint: %d words already analyzed", len(analyzed))
discarded_list: list[dict] = []
discarded_count = 0
kept_count = 0
error_count = 0
for i, (word, rank) in enumerate(words_by_rank):
# Already analyzed (from checkpoint)
if word in analyzed:
if analyzed[word]["discard"]:
discarded_count += 1
discarded_list.append({"word": word, "original_rank": rank, "reason": analyzed[word]["reason"]})
else:
kept_count += 1
continue
# Trivial: single char, ASCII, or too short
if len(word) <= 1 or word.isascii():
analyzed[word] = {"discard": False, "reason": ""}
kept_count += 1
continue
result = query_yap(word)
if result is None:
analyzed[word] = {"discard": False, "reason": "yap_error"}
error_count += 1
kept_count += 1
time.sleep(0.5)
continue
is_combo, reason = is_prefix_combo_yap(result)
analyzed[word] = {"discard": is_combo, "reason": reason}
if is_combo:
discarded_count += 1
discarded_list.append({"word": word, "original_rank": rank, "reason": reason})
if rank <= 500 or discarded_count <= 50:
logger.info(" DISCARD rank %5d: %s (%s)", rank, word, reason)
else:
kept_count += 1
# Rate limit
if i % 10 == 0:
time.sleep(0.01)
# Checkpoint
if (i + 1) % BATCH_SAVE_INTERVAL == 0:
if not args.dry_run:
with open(CHECKPOINT, "w", encoding="utf-8") as f:
json.dump(analyzed, f, ensure_ascii=False)
logger.info(
" [%d/%d] kept=%d discarded=%d errors=%d",
i + 1,
len(words_by_rank),
kept_count,
discarded_count,
error_count,
)
# Final checkpoint save
if not args.dry_run and CHECKPOINT.exists():
CHECKPOINT.unlink()
if error_count:
logger.warning("%d YAP errors encountered", error_count)
return discarded_list
def _run_heuristic_mode(
words_by_rank: list[tuple[str, int]],
raw_freq: dict[str, int],
known_forms: set[str],
) -> list[dict]:
"""Run heuristic prefix detection (no external dependencies)."""
discarded_list: list[dict] = []
discarded_count = 0
for word, rank in words_by_rank:
if len(word) <= 1 or word.isascii():
continue
# Known dictionary form → keep
if word in known_forms:
continue
result = find_prefix_decomposition(word, raw_freq)
if result is not None:
prefix, remainder = result
discarded_count += 1
reason = f"{prefix}+{remainder} (rank {raw_freq[remainder]})"
discarded_list.append({"word": word, "original_rank": rank, "reason": reason})
if rank <= 500 or discarded_count <= 50:
logger.info(" DISCARD rank %5d: %s = %s", rank, word, reason)
return discarded_list
if __name__ == "__main__":
main()