#!/usr/bin/env python3 """Assign frequency ranks from the cleaned corpus to words.json entries. Two-tier assignment with PoS priority: Tier 1: Match headword ktiv_male directly against corpus Tier 2: Match conjugated/inflected forms (only if no other entry already claimed that corpus word via tier 1) PoS priority (based on standalone-word likelihood in Hebrew text): כינויי_גוף (Pronoun) > מילות_חיבור (Conjunction) > שם_תואר (Adjective) > מילית (Particle) > שם_עצם (Noun) > תוארי_הפועל (Adverb) > מילות_יחס (Preposition) > פעלים (Verb) Usage: python3 scripts/assign_frequency.py # assign and save python3 scripts/assign_frequency.py --dry-run # preview only python3 scripts/assign_frequency.py --stats # show statistics only """ from __future__ import annotations import argparse import json import logging from collections import defaultdict from pathlib import Path logger = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).parent.parent WORDS_JSON = PROJECT_ROOT / "data" / "words.json" CLEAN_CACHE = PROJECT_ROOT / "data" / "frequency_clean.json" RAW_CACHE = PROJECT_ROOT / "data" / "frequency_cache.json" # Function word PoS — these dominate content words in homograph groups FUNCTION_POS = frozenset({"כינויי_גוף", "מילות_חיבור", "מילית", "מילות_יחס", "תוארי_הפועל"}) # Content PoS that loses frequency when a function word dominates # Adjectives also lose (e.g. כן "honest" vs כן "yes") — they're rare collisions CONTENT_POS = frozenset({"שם_עצם", "שם_תואר", "פעלים"}) # Manual overrides: at these corpus ranks, ALL homographs share frequency. # These are cases where the content word is genuinely common enough to deserve it. # e.g. rank 15: עם "people" (NN) alongside עם "with" (PREP) # Manual overrides: at these ktiv_male forms, ALL homographs share frequency. # These are cases where the content word is genuinely common enough to deserve it. SHARE_ALL_WORDS = frozenset( { "עם", # "people" (NN) + "with" (PREP) "שם", # "name" (NN) + "there" (ADV) "אל", # "god" (NN) + "to" (PREP) + "don't" (PART) "עד", # "witness"/"eternity" (NN) + "until" (PREP) "פה", # "mouth" (NN) + "here" (ADV) "לאחר", # "to be late" (VB) + "after" (PREP) "יופי", # "beauty" (NN) + "great!" (ADV) "המון", # "crowd" (NN) + "lots of" (ADV) "חבל", # "rope" (NN) + "it's a pity" (ADV) "ראשית", # "beginning" (NN) + "firstly" (ADV) "עקב", # "heel"/"footprint" (NN) + "due to" (CONJ) "אולם", # "hall" (NN) + "however" (ADV) } ) def _get_pos_tag(entry: dict) -> str: """Extract primary PoS tag from entry's tags field.""" tags = (entry.get("tags") or "").split() for t in tags: if not t.startswith("שורש"): return t return "unknown" def _build_form_index(words: dict) -> dict[str, list[tuple[str, str]]]: """Build reverse index: ktiv_male_form -> [(unique_key, match_type), ...]""" index: dict[str, list[tuple[str, str]]] = defaultdict(list) for key, entry in words.items(): w = entry.get("word") or {} if km := w.get("ktiv_male"): index[km].append((key, "headword")) # Verb conjugations: indexed for new-assignment-only matching (no upgrades). # Conjugated forms collide with unrelated headwords, so tier 2 only uses # these for entries that have NO existing frequency. conj = entry.get("conjugation") or {} for form in conj.get("active_forms") or []: if isinstance(form, dict): form_data = form.get("form") or {} if km2 := form_data.get("ktiv_male"): km2 = km2.rstrip("!\u200f ") index[km2].append((key, "conjugation")) for hp in conj.get("hufal_pual_forms") or []: if isinstance(hp, dict): hp_data = hp.get("form") or {} if km3 := hp_data.get("ktiv_male"): km3 = km3.rstrip("!\u200f ") index[km3].append((key, "conjugation")) for field in ("noun_inflection", "preposition_inflection", "adjective_inflection"): for inf_data in (entry.get(field) or {}).values(): if isinstance(inf_data, dict) and (km4 := inf_data.get("ktiv_male")): index[km4].append((key, "inflection")) return dict(index) def _should_get_frequency( entry: dict, all_headword_entries: list[tuple[str, str]], corpus_word: str, words: dict, ) -> bool: """Decide if an entry should get frequency in a homograph group. Rules: - If only one entry matches, it always gets frequency. - If SHARE_ALL_WORDS includes this corpus word, all entries share. - If the group has function words AND content words, content words lose. - Otherwise all entries share. """ if len(all_headword_entries) <= 1: return True if corpus_word in SHARE_ALL_WORDS: return True pos = _get_pos_tag(entry) has_function = any(_get_pos_tag(words[k]) in FUNCTION_POS for k, _ in all_headword_entries) return not (has_function and pos in CONTENT_POS) def assign_frequencies( words: dict, freq_corpus: dict[str, int], raw_corpus: dict[str, int] | None = None, upgrade: bool = False, ) -> dict[str, dict]: """Assign frequency ranks to words.json entries. Returns assignment details. freq_corpus controls which words are valid (cleaned corpus). raw_corpus provides original rank numbers (with gaps). If not provided, uses freq_corpus ranks (re-ranked, no gaps). upgrade: if True, tier 2 can upgrade an entry's rank when a conjugated/inflected form has a better (lower) rank than the headword match. """ rank_source = raw_corpus if raw_corpus is not None else freq_corpus form_index = _build_form_index(words) # Track which corpus words have been claimed by tier 1 tier1_claimed: set[str] = set() # Results tracking assignments: dict[str, dict] = {} # unique_key -> {rank, source, corpus_word} # --- Tier 1: headword matches --- # For each corpus word, find all headword matches and assign to eligible entries. # Homograph groups: function words get frequency, content words don't (unless overridden). corpus_by_rank = sorted(freq_corpus.items(), key=lambda x: x[1]) for corpus_word, _clean_rank in corpus_by_rank: matches = form_index.get(corpus_word, []) headword_matches = [(k, t) for k, t in matches if t == "headword"] if not headword_matches: continue original_rank = rank_source.get(corpus_word, _clean_rank) assigned_any = False for entry_key, _ in headword_matches: if entry_key in assignments: continue if _should_get_frequency(words[entry_key], headword_matches, corpus_word, words): assignments[entry_key] = { "rank": original_rank, "source": "headword", "corpus_word": corpus_word, } assigned_any = True if assigned_any: tier1_claimed.add(corpus_word) tier1_count = len(assignments) logger.info("Tier 1 (headword): %d entries assigned", tier1_count) # --- Tier 2: conjugation/inflection matches --- # Only use corpus words NOT claimed in tier 1. # A corpus word that matches an inflection is "owned" by that headword — # it cannot also upgrade an unrelated verb via conjugation. # Upgrades (when enabled) only apply within the same match type priority. for corpus_word, _clean_rank in corpus_by_rank: if corpus_word in tier1_claimed: continue matches = form_index.get(corpus_word, []) secondary_matches = [(k, t) for k, t in matches if t in ("conjugation", "inflection")] if not secondary_matches: continue original_rank = rank_source.get(corpus_word, _clean_rank) # Split by type: inflections take priority over conjugations inflection_matches = [(k, t) for k, t in secondary_matches if t == "inflection"] conjugation_matches = [(k, t) for k, t in secondary_matches if t == "conjugation"] # If any inflection matches exist, this corpus word belongs to inflection. # Don't let conjugations claim it. active_matches = inflection_matches if inflection_matches else conjugation_matches for entry_key, match_type in active_matches: existing = assignments.get(entry_key) if existing is None: # New assignment — conjugations only allowed for rank > 5000 # (too many false positives in the important tiers) if match_type == "conjugation" and original_rank <= 5000: continue assignments[entry_key] = { "rank": original_rank, "source": match_type, "corpus_word": corpus_word, } break if upgrade and match_type == "inflection" and original_rank < existing["rank"]: # Upgrade — only allowed for inflections (conjugations collide too much) assignments[entry_key] = { "rank": original_rank, "source": f"upgrade:{match_type}", "corpus_word": corpus_word, } break tier2_count = len(assignments) - tier1_count logger.info("Tier 2 (conjugation/inflection): %d entries assigned", tier2_count) return assignments def print_stats(words: dict, assignments: dict, freq_corpus: dict) -> None: """Print detailed statistics about frequency assignment.""" total = len(words) assigned = len(assignments) previously_had = sum(1 for e in words.values() if e.get("frequency") is not None) print(f"\n{'=' * 60}") print("Frequency Assignment Statistics") print(f"{'=' * 60}") print(f"Words.json entries: {total}") print(f"Clean corpus size: {len(freq_corpus)}") print(f"Previously had freq: {previously_had}") print(f"Now assigned: {assigned}") print(f"Newly gained: {assigned - previously_had}") print(f"Still unlisted: {total - assigned}") # By tier tier1 = sum(1 for a in assignments.values() if a["source"] == "headword") tier2_conj = sum(1 for a in assignments.values() if a["source"] == "conjugation") tier2_inf = sum(1 for a in assignments.values() if a["source"] == "inflection") print("\nBy assignment tier:") print(f" Tier 1 (headword): {tier1}") print(f" Tier 2 (conjugation): {tier2_conj}") print(f" Tier 2 (inflection): {tier2_inf}") # By PoS print("\nBy PoS:") from collections import Counter pos_assigned = Counter() pos_total = Counter() for k, v in words.items(): pos = _get_pos_tag(v) pos_total[pos] += 1 if k in assignments: pos_assigned[pos] += 1 pos_order = [ "כינויי_גוף", "מילות_חיבור", "שם_תואר", "מילית", "שם_עצם", "תוארי_הפועל", "מילות_יחס", "פעלים", "unknown", ] for pos in sorted(pos_total, key=lambda p: pos_order.index(p) if p in pos_order else 99): a = pos_assigned[pos] t = pos_total[pos] pct = a / t * 100 if t else 0 print(f" {pos:20s}: {a:5d}/{t:5d} ({pct:.0f}%)") # By frequency tier (using apkg_builder tiers) print("\nBy frequency tier:") tiers = { "Core (1-500)": (1, 500), "Essential (501-1500)": (501, 1500), "Intermediate (1501-3000)": (1501, 3000), "Upper-intermediate (3001-5000)": (3001, 5000), "Advanced (5001-10000)": (5001, 10000), "Rare (10001+)": (10001, 999999), } for label, (lo, hi) in tiers.items(): count = sum(1 for a in assignments.values() if lo <= a["rank"] <= hi) print(f" {label:35s}: {count}") # Top 20 newly assigned (entries that didn't have frequency before) newly = [] for k, a in assignments.items(): if words[k].get("frequency") is None: w = words[k].get("word", {}) newly.append((a["rank"], k, w.get("ktiv_male", ""), a["source"], a["corpus_word"])) newly.sort() if newly: print("\nTop 20 newly assigned entries:") for rank, _key, ktiv, source, corpus_word in newly[:20]: print(f" rank {rank:5d}: {ktiv:15s} via {source:12s} (corpus: {corpus_word})") # Entries that LOST frequency (had it before, not assigned now) lost = [] for k, v in words.items(): old_freq = v.get("frequency") if old_freq is not None and k not in assignments: w = v.get("word", {}) lost.append((old_freq, k, w.get("ktiv_male", ""))) lost.sort() if lost: print(f"\nEntries that would LOSE frequency ({len(lost)} total):") for rank, _key, ktiv in lost[:20]: print(f" was rank {rank:5d}: {ktiv}") def main() -> None: parser = argparse.ArgumentParser(description="Assign frequency to words.json") parser.add_argument("--dry-run", action="store_true", help="Preview without saving") parser.add_argument("--stats", action="store_true", help="Show statistics only") parser.add_argument( "--upgrade", action="store_true", help="Allow tier 2 to upgrade headword rank from conjugated forms" ) args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") # Load data freq_path = CLEAN_CACHE if CLEAN_CACHE.exists() else RAW_CACHE logger.info("Loading frequency corpus: %s", freq_path) with open(freq_path, encoding="utf-8") as f: freq_corpus: dict[str, int] = json.load(f) # Load raw corpus for original rank numbers (with gaps) raw_corpus: dict[str, int] | None = None if RAW_CACHE.exists() and freq_path != RAW_CACHE: with open(RAW_CACHE, encoding="utf-8") as f: raw_corpus = json.load(f) logger.info("Using original ranks from %s", RAW_CACHE) with open(WORDS_JSON, encoding="utf-8") as f: words: dict = json.load(f) logger.info("Corpus: %d entries, Words.json: %d entries", len(freq_corpus), len(words)) # Run assignment assignments = assign_frequencies(words, freq_corpus, raw_corpus, upgrade=args.upgrade) # Stats print_stats(words, assignments, freq_corpus) if args.stats or args.dry_run: if args.dry_run: logger.info("Dry run — no changes saved") return # Apply to words.json changed = 0 for key, entry in words.items(): if key in assignments: new_rank = assignments[key]["rank"] if entry.get("frequency") != new_rank: entry["frequency"] = new_rank changed += 1 else: if entry.get("frequency") is not None: entry["frequency"] = None changed += 1 with open(WORDS_JSON, "w", encoding="utf-8") as f: json.dump(words, f, ensure_ascii=False, indent=2) logger.info("Updated %d entries in words.json", changed) if __name__ == "__main__": main()