- Add clean_frequency_corpus.py: YAP morphological analyzer removes prefix+word combos (e.g. בבית=ב+בית) from he_50k frequency data. Headwords always protected. 30,430 clean entries from 49,999 raw. - Add assign_frequency.py: two-tier assignment with PoS-aware homograph handling. Tier 1 matches headwords; Tier 2 matches inflections (any rank) and conjugations (rank>5000 only, to avoid false positives). Function words claim frequency over content words in homograph groups, with manual overrides for 12 common dual-use words. - frequency_lookup.py auto-prefers frequency_clean.json when available - 6,691 entries now have frequency (was 5,974), 717 newly assigned Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
392 lines
15 KiB
Python
392 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""Assign frequency ranks from the cleaned corpus to words.json entries.
|
||
|
||
Two-tier assignment with PoS priority:
|
||
Tier 1: Match headword ktiv_male directly against corpus
|
||
Tier 2: Match conjugated/inflected forms (only if no other entry already
|
||
claimed that corpus word via tier 1)
|
||
|
||
PoS priority (based on standalone-word likelihood in Hebrew text):
|
||
כינויי_גוף (Pronoun) > מילות_חיבור (Conjunction) > שם_תואר (Adjective) >
|
||
מילית (Particle) > שם_עצם (Noun) > תוארי_הפועל (Adverb) >
|
||
מילות_יחס (Preposition) > פעלים (Verb)
|
||
|
||
Usage:
|
||
python3 scripts/assign_frequency.py # assign and save
|
||
python3 scripts/assign_frequency.py --dry-run # preview only
|
||
python3 scripts/assign_frequency.py --stats # show statistics only
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
PROJECT_ROOT = Path(__file__).parent.parent
|
||
WORDS_JSON = PROJECT_ROOT / "data" / "words.json"
|
||
CLEAN_CACHE = PROJECT_ROOT / "data" / "frequency_clean.json"
|
||
RAW_CACHE = PROJECT_ROOT / "data" / "frequency_cache.json"
|
||
|
||
# Function word PoS — these dominate content words in homograph groups
|
||
FUNCTION_POS = frozenset({"כינויי_גוף", "מילות_חיבור", "מילית", "מילות_יחס", "תוארי_הפועל"})
|
||
|
||
# Content PoS that loses frequency when a function word dominates
|
||
# Adjectives also lose (e.g. כן "honest" vs כן "yes") — they're rare collisions
|
||
CONTENT_POS = frozenset({"שם_עצם", "שם_תואר", "פעלים"})
|
||
|
||
# Manual overrides: at these corpus ranks, ALL homographs share frequency.
|
||
# These are cases where the content word is genuinely common enough to deserve it.
|
||
# e.g. rank 15: עם "people" (NN) alongside עם "with" (PREP)
|
||
# Manual overrides: at these ktiv_male forms, ALL homographs share frequency.
|
||
# These are cases where the content word is genuinely common enough to deserve it.
|
||
SHARE_ALL_WORDS = frozenset(
|
||
{
|
||
"עם", # "people" (NN) + "with" (PREP)
|
||
"שם", # "name" (NN) + "there" (ADV)
|
||
"אל", # "god" (NN) + "to" (PREP) + "don't" (PART)
|
||
"עד", # "witness"/"eternity" (NN) + "until" (PREP)
|
||
"פה", # "mouth" (NN) + "here" (ADV)
|
||
"לאחר", # "to be late" (VB) + "after" (PREP)
|
||
"יופי", # "beauty" (NN) + "great!" (ADV)
|
||
"המון", # "crowd" (NN) + "lots of" (ADV)
|
||
"חבל", # "rope" (NN) + "it's a pity" (ADV)
|
||
"ראשית", # "beginning" (NN) + "firstly" (ADV)
|
||
"עקב", # "heel"/"footprint" (NN) + "due to" (CONJ)
|
||
"אולם", # "hall" (NN) + "however" (ADV)
|
||
}
|
||
)
|
||
|
||
|
||
def _get_pos_tag(entry: dict) -> str:
|
||
"""Extract primary PoS tag from entry's tags field."""
|
||
tags = (entry.get("tags") or "").split()
|
||
for t in tags:
|
||
if not t.startswith("שורש"):
|
||
return t
|
||
return "unknown"
|
||
|
||
|
||
def _build_form_index(words: dict) -> dict[str, list[tuple[str, str]]]:
|
||
"""Build reverse index: ktiv_male_form -> [(unique_key, match_type), ...]"""
|
||
index: dict[str, list[tuple[str, str]]] = defaultdict(list)
|
||
|
||
for key, entry in words.items():
|
||
w = entry.get("word") or {}
|
||
if km := w.get("ktiv_male"):
|
||
index[km].append((key, "headword"))
|
||
|
||
# Verb conjugations: indexed for new-assignment-only matching (no upgrades).
|
||
# Conjugated forms collide with unrelated headwords, so tier 2 only uses
|
||
# these for entries that have NO existing frequency.
|
||
conj = entry.get("conjugation") or {}
|
||
for form in conj.get("active_forms") or []:
|
||
if isinstance(form, dict):
|
||
form_data = form.get("form") or {}
|
||
if km2 := form_data.get("ktiv_male"):
|
||
km2 = km2.rstrip("!\u200f ")
|
||
index[km2].append((key, "conjugation"))
|
||
|
||
for hp in conj.get("hufal_pual_forms") or []:
|
||
if isinstance(hp, dict):
|
||
hp_data = hp.get("form") or {}
|
||
if km3 := hp_data.get("ktiv_male"):
|
||
km3 = km3.rstrip("!\u200f ")
|
||
index[km3].append((key, "conjugation"))
|
||
|
||
for field in ("noun_inflection", "preposition_inflection", "adjective_inflection"):
|
||
for inf_data in (entry.get(field) or {}).values():
|
||
if isinstance(inf_data, dict) and (km4 := inf_data.get("ktiv_male")):
|
||
index[km4].append((key, "inflection"))
|
||
|
||
return dict(index)
|
||
|
||
|
||
def _should_get_frequency(
|
||
entry: dict,
|
||
all_headword_entries: list[tuple[str, str]],
|
||
corpus_word: str,
|
||
words: dict,
|
||
) -> bool:
|
||
"""Decide if an entry should get frequency in a homograph group.
|
||
|
||
Rules:
|
||
- If only one entry matches, it always gets frequency.
|
||
- If SHARE_ALL_WORDS includes this corpus word, all entries share.
|
||
- If the group has function words AND content words, content words lose.
|
||
- Otherwise all entries share.
|
||
"""
|
||
if len(all_headword_entries) <= 1:
|
||
return True
|
||
if corpus_word in SHARE_ALL_WORDS:
|
||
return True
|
||
|
||
pos = _get_pos_tag(entry)
|
||
has_function = any(_get_pos_tag(words[k]) in FUNCTION_POS for k, _ in all_headword_entries)
|
||
|
||
return not (has_function and pos in CONTENT_POS)
|
||
|
||
|
||
def assign_frequencies(
|
||
words: dict,
|
||
freq_corpus: dict[str, int],
|
||
raw_corpus: dict[str, int] | None = None,
|
||
upgrade: bool = False,
|
||
) -> dict[str, dict]:
|
||
"""Assign frequency ranks to words.json entries. Returns assignment details.
|
||
|
||
freq_corpus controls which words are valid (cleaned corpus).
|
||
raw_corpus provides original rank numbers (with gaps). If not provided,
|
||
uses freq_corpus ranks (re-ranked, no gaps).
|
||
upgrade: if True, tier 2 can upgrade an entry's rank when a conjugated/inflected
|
||
form has a better (lower) rank than the headword match.
|
||
"""
|
||
rank_source = raw_corpus if raw_corpus is not None else freq_corpus
|
||
form_index = _build_form_index(words)
|
||
|
||
# Track which corpus words have been claimed by tier 1
|
||
tier1_claimed: set[str] = set()
|
||
|
||
# Results tracking
|
||
assignments: dict[str, dict] = {} # unique_key -> {rank, source, corpus_word}
|
||
|
||
# --- Tier 1: headword matches ---
|
||
# For each corpus word, find all headword matches and assign to eligible entries.
|
||
# Homograph groups: function words get frequency, content words don't (unless overridden).
|
||
corpus_by_rank = sorted(freq_corpus.items(), key=lambda x: x[1])
|
||
|
||
for corpus_word, _clean_rank in corpus_by_rank:
|
||
matches = form_index.get(corpus_word, [])
|
||
headword_matches = [(k, t) for k, t in matches if t == "headword"]
|
||
if not headword_matches:
|
||
continue
|
||
|
||
original_rank = rank_source.get(corpus_word, _clean_rank)
|
||
assigned_any = False
|
||
for entry_key, _ in headword_matches:
|
||
if entry_key in assignments:
|
||
continue
|
||
if _should_get_frequency(words[entry_key], headword_matches, corpus_word, words):
|
||
assignments[entry_key] = {
|
||
"rank": original_rank,
|
||
"source": "headword",
|
||
"corpus_word": corpus_word,
|
||
}
|
||
assigned_any = True
|
||
|
||
if assigned_any:
|
||
tier1_claimed.add(corpus_word)
|
||
|
||
tier1_count = len(assignments)
|
||
logger.info("Tier 1 (headword): %d entries assigned", tier1_count)
|
||
|
||
# --- Tier 2: conjugation/inflection matches ---
|
||
# Only use corpus words NOT claimed in tier 1.
|
||
# A corpus word that matches an inflection is "owned" by that headword —
|
||
# it cannot also upgrade an unrelated verb via conjugation.
|
||
# Upgrades (when enabled) only apply within the same match type priority.
|
||
for corpus_word, _clean_rank in corpus_by_rank:
|
||
if corpus_word in tier1_claimed:
|
||
continue
|
||
|
||
matches = form_index.get(corpus_word, [])
|
||
secondary_matches = [(k, t) for k, t in matches if t in ("conjugation", "inflection")]
|
||
if not secondary_matches:
|
||
continue
|
||
|
||
original_rank = rank_source.get(corpus_word, _clean_rank)
|
||
|
||
# Split by type: inflections take priority over conjugations
|
||
inflection_matches = [(k, t) for k, t in secondary_matches if t == "inflection"]
|
||
conjugation_matches = [(k, t) for k, t in secondary_matches if t == "conjugation"]
|
||
|
||
# If any inflection matches exist, this corpus word belongs to inflection.
|
||
# Don't let conjugations claim it.
|
||
active_matches = inflection_matches if inflection_matches else conjugation_matches
|
||
|
||
for entry_key, match_type in active_matches:
|
||
existing = assignments.get(entry_key)
|
||
if existing is None:
|
||
# New assignment — conjugations only allowed for rank > 5000
|
||
# (too many false positives in the important tiers)
|
||
if match_type == "conjugation" and original_rank <= 5000:
|
||
continue
|
||
assignments[entry_key] = {
|
||
"rank": original_rank,
|
||
"source": match_type,
|
||
"corpus_word": corpus_word,
|
||
}
|
||
break
|
||
if upgrade and match_type == "inflection" and original_rank < existing["rank"]:
|
||
# Upgrade — only allowed for inflections (conjugations collide too much)
|
||
assignments[entry_key] = {
|
||
"rank": original_rank,
|
||
"source": f"upgrade:{match_type}",
|
||
"corpus_word": corpus_word,
|
||
}
|
||
break
|
||
|
||
tier2_count = len(assignments) - tier1_count
|
||
logger.info("Tier 2 (conjugation/inflection): %d entries assigned", tier2_count)
|
||
|
||
return assignments
|
||
|
||
|
||
def print_stats(words: dict, assignments: dict, freq_corpus: dict) -> None:
|
||
"""Print detailed statistics about frequency assignment."""
|
||
total = len(words)
|
||
assigned = len(assignments)
|
||
previously_had = sum(1 for e in words.values() if e.get("frequency") is not None)
|
||
|
||
print(f"\n{'=' * 60}")
|
||
print("Frequency Assignment Statistics")
|
||
print(f"{'=' * 60}")
|
||
print(f"Words.json entries: {total}")
|
||
print(f"Clean corpus size: {len(freq_corpus)}")
|
||
print(f"Previously had freq: {previously_had}")
|
||
print(f"Now assigned: {assigned}")
|
||
print(f"Newly gained: {assigned - previously_had}")
|
||
print(f"Still unlisted: {total - assigned}")
|
||
|
||
# By tier
|
||
tier1 = sum(1 for a in assignments.values() if a["source"] == "headword")
|
||
tier2_conj = sum(1 for a in assignments.values() if a["source"] == "conjugation")
|
||
tier2_inf = sum(1 for a in assignments.values() if a["source"] == "inflection")
|
||
print("\nBy assignment tier:")
|
||
print(f" Tier 1 (headword): {tier1}")
|
||
print(f" Tier 2 (conjugation): {tier2_conj}")
|
||
print(f" Tier 2 (inflection): {tier2_inf}")
|
||
|
||
# By PoS
|
||
print("\nBy PoS:")
|
||
from collections import Counter
|
||
|
||
pos_assigned = Counter()
|
||
pos_total = Counter()
|
||
for k, v in words.items():
|
||
pos = _get_pos_tag(v)
|
||
pos_total[pos] += 1
|
||
if k in assignments:
|
||
pos_assigned[pos] += 1
|
||
pos_order = [
|
||
"כינויי_גוף",
|
||
"מילות_חיבור",
|
||
"שם_תואר",
|
||
"מילית",
|
||
"שם_עצם",
|
||
"תוארי_הפועל",
|
||
"מילות_יחס",
|
||
"פעלים",
|
||
"unknown",
|
||
]
|
||
for pos in sorted(pos_total, key=lambda p: pos_order.index(p) if p in pos_order else 99):
|
||
a = pos_assigned[pos]
|
||
t = pos_total[pos]
|
||
pct = a / t * 100 if t else 0
|
||
print(f" {pos:20s}: {a:5d}/{t:5d} ({pct:.0f}%)")
|
||
|
||
# By frequency tier (using apkg_builder tiers)
|
||
print("\nBy frequency tier:")
|
||
tiers = {
|
||
"Core (1-500)": (1, 500),
|
||
"Essential (501-1500)": (501, 1500),
|
||
"Intermediate (1501-3000)": (1501, 3000),
|
||
"Upper-intermediate (3001-5000)": (3001, 5000),
|
||
"Advanced (5001-10000)": (5001, 10000),
|
||
"Rare (10001+)": (10001, 999999),
|
||
}
|
||
for label, (lo, hi) in tiers.items():
|
||
count = sum(1 for a in assignments.values() if lo <= a["rank"] <= hi)
|
||
print(f" {label:35s}: {count}")
|
||
|
||
# Top 20 newly assigned (entries that didn't have frequency before)
|
||
newly = []
|
||
for k, a in assignments.items():
|
||
if words[k].get("frequency") is None:
|
||
w = words[k].get("word", {})
|
||
newly.append((a["rank"], k, w.get("ktiv_male", ""), a["source"], a["corpus_word"]))
|
||
newly.sort()
|
||
if newly:
|
||
print("\nTop 20 newly assigned entries:")
|
||
for rank, _key, ktiv, source, corpus_word in newly[:20]:
|
||
print(f" rank {rank:5d}: {ktiv:15s} via {source:12s} (corpus: {corpus_word})")
|
||
|
||
# Entries that LOST frequency (had it before, not assigned now)
|
||
lost = []
|
||
for k, v in words.items():
|
||
old_freq = v.get("frequency")
|
||
if old_freq is not None and k not in assignments:
|
||
w = v.get("word", {})
|
||
lost.append((old_freq, k, w.get("ktiv_male", "")))
|
||
lost.sort()
|
||
if lost:
|
||
print(f"\nEntries that would LOSE frequency ({len(lost)} total):")
|
||
for rank, _key, ktiv in lost[:20]:
|
||
print(f" was rank {rank:5d}: {ktiv}")
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="Assign frequency to words.json")
|
||
parser.add_argument("--dry-run", action="store_true", help="Preview without saving")
|
||
parser.add_argument("--stats", action="store_true", help="Show statistics only")
|
||
parser.add_argument(
|
||
"--upgrade", action="store_true", help="Allow tier 2 to upgrade headword rank from conjugated forms"
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||
|
||
# Load data
|
||
freq_path = CLEAN_CACHE if CLEAN_CACHE.exists() else RAW_CACHE
|
||
logger.info("Loading frequency corpus: %s", freq_path)
|
||
with open(freq_path, encoding="utf-8") as f:
|
||
freq_corpus: dict[str, int] = json.load(f)
|
||
|
||
# Load raw corpus for original rank numbers (with gaps)
|
||
raw_corpus: dict[str, int] | None = None
|
||
if RAW_CACHE.exists() and freq_path != RAW_CACHE:
|
||
with open(RAW_CACHE, encoding="utf-8") as f:
|
||
raw_corpus = json.load(f)
|
||
logger.info("Using original ranks from %s", RAW_CACHE)
|
||
|
||
with open(WORDS_JSON, encoding="utf-8") as f:
|
||
words: dict = json.load(f)
|
||
|
||
logger.info("Corpus: %d entries, Words.json: %d entries", len(freq_corpus), len(words))
|
||
|
||
# Run assignment
|
||
assignments = assign_frequencies(words, freq_corpus, raw_corpus, upgrade=args.upgrade)
|
||
|
||
# Stats
|
||
print_stats(words, assignments, freq_corpus)
|
||
|
||
if args.stats or args.dry_run:
|
||
if args.dry_run:
|
||
logger.info("Dry run — no changes saved")
|
||
return
|
||
|
||
# Apply to words.json
|
||
changed = 0
|
||
for key, entry in words.items():
|
||
if key in assignments:
|
||
new_rank = assignments[key]["rank"]
|
||
if entry.get("frequency") != new_rank:
|
||
entry["frequency"] = new_rank
|
||
changed += 1
|
||
else:
|
||
if entry.get("frequency") is not None:
|
||
entry["frequency"] = None
|
||
changed += 1
|
||
|
||
with open(WORDS_JSON, "w", encoding="utf-8") as f:
|
||
json.dump(words, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info("Updated %d entries in words.json", changed)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|