#!/usr/bin/env python3 """Clean the Hebrew frequency corpus by removing prefix+word combinations. Two modes: --mode yap (default) Use YAP morphological analyzer for accurate prefix detection. Requires YAP API running at localhost:8000. --mode heuristic Use rule-based prefix stripping (no external dependencies). Both modes preserve words that exist as known dictionary forms in words.json. Usage: python3 scripts/clean_frequency_corpus.py # YAP mode python3 scripts/clean_frequency_corpus.py --mode heuristic # heuristic fallback python3 scripts/clean_frequency_corpus.py --dry-run # preview only python3 scripts/clean_frequency_corpus.py --resume # resume YAP from checkpoint python3 scripts/clean_frequency_corpus.py --limit 1000 # process first N entries Input: data/frequency_cache.json (raw he_50k.txt, 49999 entries) Output: data/frequency_clean.json (filtered, prefix combos removed) data/frequency_discarded.json (discarded entries with reason) """ from __future__ import annotations import argparse import json import logging import os import sys import time from pathlib import Path import requests logger = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).parent.parent RAW_CACHE = PROJECT_ROOT / "data" / "frequency_cache.json" CLEAN_CACHE = PROJECT_ROOT / "data" / "frequency_clean.json" DISCARDED = PROJECT_ROOT / "data" / "frequency_discarded.json" WORDS_JSON = PROJECT_ROOT / "data" / "words.json" CHECKPOINT = PROJECT_ROOT / "data" / "_yap_checkpoint.json" YAP_URL = os.environ.get("YAP_URL", "http://localhost:8000/yap/heb/joint") YAP_TIMEOUT = 10 BATCH_SAVE_INTERVAL = 500 # --- YAP mode constants --- # POS tags that indicate a prefix PREFIX_POS = frozenset({"PREPOSITION", "CONJ", "DEF", "REL"}) # POS tags for the host word that make the combo a false positive HOST_POS = frozenset({"NN", "NNP", "NNT", "PRP", "CD", "DT", "EX"}) # --- Heuristic mode constants --- # Hebrew prefix combinations, longest first for greedy matching. PREFIXES = [ # 4-char "וכשמ", "וכשב", "וכשל", "וכשה", # 3-char "וכש", "ומה", "ובה", "וכה", "ולה", "ומש", "ובש", "וכב", "ולב", "ומב", "וכל", "ולכ", "שבה", "שמה", # 2-char "כש", "מה", "בה", "כה", "לה", "מש", "בש", "וב", "וה", "וכ", "ול", "ומ", "וש", "כב", "לב", "מב", "כל", "לכ", "שב", "שה", "שכ", "של", "שמ", # 1-char "ב", "ה", "ו", "כ", "ל", "מ", "ש", ] MIN_REMAINDER_LEN = 2 def _load_known_forms(words_path: Path) -> set[str]: """Load all known ktiv_male forms from words.json.""" if not words_path.exists(): logger.warning("words.json not found at %s — no dictionary filter", words_path) return set() with open(words_path, encoding="utf-8") as f: words = json.load(f) known: set[str] = set() for entry in words.values(): w = entry.get("word") or {} if km := w.get("ktiv_male"): known.add(km) for form in entry.get("active_forms") or []: if isinstance(form, dict) and (km2 := form.get("ktiv_male")): known.add(km2) for hp in entry.get("hufal_pual_forms") or []: if isinstance(hp, dict) and (km3 := hp.get("ktiv_male")): known.add(km3) for field in ("noun_inflection", "preposition_inflection", "adjective_inflection"): for inf_data in (entry.get(field) or {}).values(): if isinstance(inf_data, dict) and (km4 := inf_data.get("ktiv_male")): known.add(km4) logger.info("Loaded %d known dictionary forms from words.json", len(known)) return known # ── YAP mode ────────────────────────────────────────────────────────────── def query_yap(word: str) -> dict | None: """Send a single word to YAP and return the JSON response.""" payload = {"text": f"{word} "} try: resp = requests.post(YAP_URL, json=payload, timeout=YAP_TIMEOUT) resp.raise_for_status() return resp.json() except requests.RequestException as e: logger.warning("YAP request failed for '%s': %s", word, e) return None def is_prefix_combo_yap(yap_response: dict) -> tuple[bool, str]: """Check if any morphological analysis segments the word as prefix+host. Conservative: if ANY analysis in the lattice shows prefix+host → discard. """ lattice = yap_response.get("ma_lattice", "") if not lattice: return False, "" arcs = [] for line in lattice.strip().split("\n"): if not line.strip(): continue parts = line.split("\t") if len(parts) < 6: continue arcs.append( { "from": parts[0], "to": parts[1], "form": parts[2], "lemma": parts[3], "cpos": parts[4], "pos": parts[5], } ) if len(arcs) < 2: return False, "" for a in arcs: if a["cpos"] not in PREFIX_POS and a["pos"] not in PREFIX_POS: continue for b in arcs: if b["from"] != a["to"]: continue if b["cpos"] in HOST_POS or b["pos"] in HOST_POS: reason = f"{a['form']}({a['cpos']})+{b['form']}({b['cpos']})" return True, reason return False, "" # ── Heuristic mode ──────────────────────────────────────────────────────── def find_prefix_decomposition(word: str, freq: dict[str, int]) -> tuple[str, str] | None: """Check if word is a prefix+higher-ranked-word combo (heuristic).""" if len(word) <= MIN_REMAINDER_LEN: return None word_rank = freq.get(word, 999999) for prefix in PREFIXES: if not word.startswith(prefix): continue remainder = word[len(prefix) :] if len(remainder) < MIN_REMAINDER_LEN: continue if remainder in freq and freq[remainder] < word_rank: return prefix, remainder return None # ── Main ────────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser(description="Clean frequency corpus") parser.add_argument("--mode", choices=["yap", "heuristic"], default="yap", help="Detection mode") parser.add_argument("--dry-run", action="store_true", help="Show removals without saving") parser.add_argument("--resume", action="store_true", help="Resume YAP mode from checkpoint") parser.add_argument("--limit", type=int, default=0, help="Process only first N words (0=all)") args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") if not RAW_CACHE.exists(): logger.error("Raw frequency cache not found: %s", RAW_CACHE) sys.exit(1) with open(RAW_CACHE, encoding="utf-8") as f: raw_freq: dict[str, int] = json.load(f) logger.info("Raw frequency corpus: %d entries", len(raw_freq)) # Sort by rank words_by_rank = sorted(raw_freq.items(), key=lambda x: x[1]) if args.limit: words_by_rank = words_by_rank[: args.limit] if args.mode == "yap": discarded_list = _run_yap_mode(words_by_rank, args) else: known_forms = _load_known_forms(WORDS_JSON) discarded_list = _run_heuristic_mode(words_by_rank, raw_freq, known_forms) kept_count = len(words_by_rank) - len(discarded_list) logger.info("Done. Kept: %d, Discarded: %d", kept_count, len(discarded_list)) if args.dry_run: logger.info("Dry run — no files written") return # Build clean frequency dict (re-ranked without gaps) discarded_words = {d["word"] for d in discarded_list} clean_freq: dict[str, int] = {} new_rank = 1 for word, _rank in words_by_rank: if word not in discarded_words: clean_freq[word] = new_rank new_rank += 1 with open(CLEAN_CACHE, "w", encoding="utf-8") as f: json.dump(clean_freq, f, ensure_ascii=False) logger.info("Clean frequency saved: %d entries → %s", len(clean_freq), CLEAN_CACHE) with open(DISCARDED, "w", encoding="utf-8") as f: json.dump(discarded_list, f, ensure_ascii=False, indent=2) logger.info("Discarded entries saved: %d → %s", len(discarded_list), DISCARDED) def _run_yap_mode( words_by_rank: list[tuple[str, int]], args: argparse.Namespace, ) -> list[dict]: """Run YAP-based prefix detection.""" # Check YAP connectivity test = query_yap("בדיקה") if test is None: logger.error("Cannot connect to YAP API at %s", YAP_URL) sys.exit(1) logger.info("YAP API connected") # Load checkpoint if resuming analyzed: dict[str, dict] = {} if args.resume and CHECKPOINT.exists(): with open(CHECKPOINT, encoding="utf-8") as f: analyzed = json.load(f) logger.info("Resumed from checkpoint: %d words already analyzed", len(analyzed)) discarded_list: list[dict] = [] discarded_count = 0 kept_count = 0 error_count = 0 for i, (word, rank) in enumerate(words_by_rank): # Already analyzed (from checkpoint) if word in analyzed: if analyzed[word]["discard"]: discarded_count += 1 discarded_list.append({"word": word, "original_rank": rank, "reason": analyzed[word]["reason"]}) else: kept_count += 1 continue # Trivial: single char, ASCII, or too short if len(word) <= 1 or word.isascii(): analyzed[word] = {"discard": False, "reason": ""} kept_count += 1 continue result = query_yap(word) if result is None: analyzed[word] = {"discard": False, "reason": "yap_error"} error_count += 1 kept_count += 1 time.sleep(0.5) continue is_combo, reason = is_prefix_combo_yap(result) analyzed[word] = {"discard": is_combo, "reason": reason} if is_combo: discarded_count += 1 discarded_list.append({"word": word, "original_rank": rank, "reason": reason}) if rank <= 500 or discarded_count <= 50: logger.info(" DISCARD rank %5d: %s (%s)", rank, word, reason) else: kept_count += 1 # Rate limit if i % 10 == 0: time.sleep(0.01) # Checkpoint if (i + 1) % BATCH_SAVE_INTERVAL == 0: if not args.dry_run: with open(CHECKPOINT, "w", encoding="utf-8") as f: json.dump(analyzed, f, ensure_ascii=False) logger.info( " [%d/%d] kept=%d discarded=%d errors=%d", i + 1, len(words_by_rank), kept_count, discarded_count, error_count, ) # Final checkpoint save if not args.dry_run and CHECKPOINT.exists(): CHECKPOINT.unlink() if error_count: logger.warning("%d YAP errors encountered", error_count) return discarded_list def _run_heuristic_mode( words_by_rank: list[tuple[str, int]], raw_freq: dict[str, int], known_forms: set[str], ) -> list[dict]: """Run heuristic prefix detection (no external dependencies).""" discarded_list: list[dict] = [] discarded_count = 0 for word, rank in words_by_rank: if len(word) <= 1 or word.isascii(): continue # Known dictionary form → keep if word in known_forms: continue result = find_prefix_decomposition(word, raw_freq) if result is not None: prefix, remainder = result discarded_count += 1 reason = f"{prefix}+{remainder} (rank {raw_freq[remainder]})" discarded_list.append({"word": word, "original_rank": rank, "reason": reason}) if rank <= 500 or discarded_count <= 50: logger.info(" DISCARD rank %5d: %s = %s", rank, word, reason) return discarded_list if __name__ == "__main__": main()