Python code cleanup (python-pro review)

- Type annotations: dict|None defaults, return types, nested func annotations
- Dead code: removed unused row_forms_with_audio(), duplicate _strip_nikkud defs,
  redundant guards, duplicate 'ism' in ABSTRACT_SUFFIXES
- Exceptions: narrowed bare except to (ValueError, pd.errors.ParserError) and
  (json.JSONDecodeError, OSError) throughout; all raise ValueError given messages
- Deduplication: extracted deduplicate() helper in _parse_table; setdefault() for
  dict building in benyehuda and apkg_builder; list comprehension in benyehuda
- Correctness: limit=0 guard fixed (is not None); audio tag parsing uses
  removeprefix/removesuffix instead of magic offsets; vectorized pandas sum
- Constants: BINYAN_NAMES extracted; unicodedata imports moved to top level
- benyehuda load(): removed wasted cache read on force_rebuild; word-boundary
  regex simplified from double-negative to \w

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Sochen 2026-03-04 07:33:14 +00:00
parent 0686298610
commit bb79725a7f
6 changed files with 85 additions and 119 deletions

View file

@ -399,7 +399,7 @@ def build_vocab_deck(
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError("too few columns")
except Exception:
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
if limit:
@ -417,8 +417,8 @@ def build_vocab_deck(
try:
with open(image_cache_path) as _f:
image_cache = json.load(_f)
except Exception:
pass
except (json.JSONDecodeError, OSError) as e:
logger.debug(f"Could not load image cache from disk: {e}")
images_dir = DATA_DIR / "images"
@ -451,7 +451,7 @@ def build_vocab_deck(
word_no_nik = str(row.get("Word Without Nikkud", "")).strip()
shared_roots = str(row.get("shared roots", row.get("SharedRoots", ""))).strip()
tags_str = str(row.get("tags", row.get("Tags", ""))).strip()
freq_rank_raw = int(row["_freq_rank"])
freq_rank_raw = row["_freq_rank"]
freq_display = str(freq_rank_raw) if freq_rank_raw < 999_999 else "50k+"
root = "" if root in ("nan", "None", "-") else root
@ -470,7 +470,7 @@ def build_vocab_deck(
# Audio
audio_tag = _audio_tag(word_no_nik)
if audio_tag:
mp3_name = audio_tag[7:-1] # strip [sound: and ]
mp3_name = audio_tag.removeprefix("[sound:").removesuffix("]")
mp3_path = AUDIO_DIR / mp3_name
if mp3_path not in media_files:
media_files.append(mp3_path)
@ -482,14 +482,12 @@ def build_vocab_deck(
# Related words grouped by PoS category
related_html = ""
if shared_roots and shared_roots not in ("nan", "None", ""):
if shared_roots:
related_words = shared_roots.split()
groups: dict[str, list[str]] = {}
for rw in related_words:
cat = word_to_pos_cat.get(_strip_nikkud(rw), "Other")
if cat not in groups:
groups[cat] = []
groups[cat].append(rw)
groups.setdefault(cat, []).append(rw)
parts = []
for cat, words in groups.items():
if cat == "Other":
@ -532,13 +530,15 @@ def build_vocab_deck(
deck.add_note(note)
# Diagnostic: count words without PoS coverage in shared_roots
other_count = sum(
1 for _, row in df.iterrows()
for rw in str(row.get("shared roots", row.get("SharedRoots", ""))).split()
if str(row.get("shared roots", row.get("SharedRoots", ""))) not in ("nan", "None", "")
and word_to_pos_cat.get(_strip_nikkud(rw)) is None
)
unlisted = sum(1 for v in df["_freq_rank"] if int(v) >= 999_999)
other_count = 0
for _, row in df.iterrows():
sr = str(row.get("shared roots", row.get("SharedRoots", ""))).strip()
if sr and sr not in ("nan", "None"):
other_count += sum(
1 for rw in sr.split()
if word_to_pos_cat.get(_strip_nikkud(rw)) is None
)
unlisted = int((df["_freq_rank"] >= 999_999).sum())
logger.info(f" Unlisted words (not in frequency corpus): {unlisted}/{len(df)}")
logger.info(f" Related-words without PoS coverage: {other_count} (shown unlabeled)")
logger.info(f"Vocabulary deck: {len(deck.notes)} notes")
@ -681,7 +681,7 @@ def write_vocab_apkg(
def write_conj_apkg(
deck: genanki.Deck,
media_files: list[Path] = None,
media_files: list[Path] | None = None,
out_path: Path = CONJ_APKG,
) -> None:
out_path.parent.mkdir(parents=True, exist_ok=True)

View file

@ -80,10 +80,9 @@ def _build_index(corpus_zip_bytes: bytes) -> None:
words = re.findall(r"[\u05d0-\u05ea\u05b0-\u05c7'\"]+", sentence)
for w in set(words):
if len(w) >= 2:
if w not in _index:
_index[w] = []
if len(_index[w]) < MAX_INDEX_ENTRIES:
_index[w].append(sentence)
bucket = _index.setdefault(w, [])
if len(bucket) < MAX_INDEX_ENTRIES:
bucket.append(sentence)
logger.info(f"Index built: {len(_index)} unique word forms")
@ -108,19 +107,19 @@ def load(force_rebuild: bool = False) -> None:
if _index and not force_rebuild:
return
# Load persisted examples cache
if EXAMPLES_CACHE_PATH.exists():
with open(EXAMPLES_CACHE_PATH, encoding="utf-8") as f:
_examples_cache = json.load(f)
if force_rebuild:
# Delete old index so it rebuilds
# Delete old index and discard examples cache
if INDEX_PATH.exists():
INDEX_PATH.unlink()
logger.info("Deleted old Ben Yehuda index (force rebuild)")
_examples_cache = {}
else:
# Load persisted examples cache (not needed on rebuild)
if EXAMPLES_CACHE_PATH.exists():
with open(EXAMPLES_CACHE_PATH, encoding="utf-8") as f:
_examples_cache = json.load(f)
if INDEX_PATH.exists() and not force_rebuild:
if INDEX_PATH.exists():
_load_index()
return
@ -175,12 +174,8 @@ def get_examples(word_nikkud: str) -> list[str]:
# Filter: word must appear as a whole token
# Match the stripped form (for robustness with nikkud variants in sentence)
if word_stripped:
pattern = r"(?<![^\s\W])" + re.escape(word_stripped) + r"(?![^\s\W])"
matched = []
for s in candidates:
s_stripped = _strip_nikkud(s)
if re.search(pattern, s_stripped):
matched.append(s)
pattern = r"(?<!\w)" + re.escape(word_stripped) + r"(?!\w)"
matched = [s for s in candidates if re.search(pattern, _strip_nikkud(s))]
else:
matched = candidates[:]

View file

@ -19,6 +19,7 @@ import json
import logging
import re
import time
import unicodedata
import urllib.parse
from pathlib import Path
@ -98,18 +99,24 @@ TENSE_DESCRIPTION = {
"infinitive": "מְקוֹר",
}
BINYAN_NAMES: tuple[str, ...] = (
"Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al"
)
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; pealim-anki/2.0)"})
def _strip_nikkud(text: str) -> str:
"""Remove Hebrew nikkud (diacritics) from a string."""
return "".join(
ch for ch in unicodedata.normalize("NFD", text)
if unicodedata.category(ch) != "Mn"
)
def _build_pos_lookup() -> dict[str, str]:
"""Build word_stripped → binyan dict from pealim_dict_for_anki.csv."""
import unicodedata
def strip_nikkud(t: str) -> str:
return "".join(c for c in unicodedata.normalize("NFD", t)
if unicodedata.category(c) != "Mn")
lookup: dict[str, str] = {}
if not DICT_CSV.exists():
return lookup
@ -119,15 +126,15 @@ def _build_pos_lookup() -> dict[str, str]:
try:
df = pd.read_csv(DICT_CSV, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError
except Exception:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(DICT_CSV, index_col=0)
for _, row in df.iterrows():
word = str(row.get("Word", "")).strip()
pos = str(row.get("Part of speech", row.get("Part of Speech", ""))).strip()
if word and pos and "nan" not in pos.lower():
lookup[strip_nikkud(word)] = pos
lookup[_strip_nikkud(word)] = pos
except Exception as e:
logger.debug(f"Could not load PoS lookup: {e}")
@ -147,14 +154,8 @@ def _get_pos_lookup() -> dict[str, str]:
def _binyan_from_pos(word: str) -> str:
"""Look up binyan from PoS field: 'Verb pa\'al' or 'Verb Pi\'el' → canonical name."""
import unicodedata
def strip_nikkud(t: str) -> str:
return "".join(c for c in unicodedata.normalize("NFD", t)
if unicodedata.category(c) != "Mn")
lookup = _get_pos_lookup()
pos_str = lookup.get(strip_nikkud(word), "")
pos_str = lookup.get(_strip_nikkud(word), "")
if not pos_str:
return ""
@ -191,10 +192,8 @@ def _find_slug(query: str) -> str | None:
def _is_passive_binyan(binyan: str) -> bool:
for marker in ["פֻּעַל", "הֻפְעַל", "Pu'al", "Huf'al", "pual", "hufal"]:
if marker.lower() in binyan.lower():
return True
return False
"""Return True if the binyan is a passive (Pu'al or Huf'al)."""
return any(marker in binyan for marker in ("פֻּעַל", "הֻפְעַל", "Pu'al", "Huf'al"))
def _get_menukad(cell) -> tuple[str, str]:
@ -255,21 +254,6 @@ def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> d
forms: dict[str, dict] = {}
def row_forms_with_audio(row_idx: int) -> list[tuple[str, str]]:
"""Extract all (Hebrew form, audio_url) pairs from a row (expanding colspans)."""
cells = rows[row_idx].find_all(["th", "td"])
result = []
for cell in cells:
txt, audio_url = _get_menukad(cell)
colspan = int(cell.get("colspan", 1))
if txt:
for _ in range(colspan):
result.append((txt, audio_url))
else:
for _ in range(colspan):
result.append(("", ""))
return result
def first_heb_forms(row_idx: int) -> list[tuple[str, str]]:
"""Get only the Hebrew-text cells from a row (skip label cells)."""
cells = rows[row_idx].find_all(["th", "td"])
@ -282,6 +266,16 @@ def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> d
result.append((txt, audio_url))
return result
def deduplicate(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""Return pairs with duplicate form-text entries removed (first occurrence kept)."""
seen: set[str] = set()
out: list[tuple[str, str]] = []
for pair in pairs:
if pair[0] not in seen:
seen.add(pair[0])
out.append(pair)
return out
# Find rows by tense label
present_row = past_row = future_row = imp_row = inf_row = -1
for i, row in enumerate(rows):
@ -310,13 +304,7 @@ def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> d
# Past tense
if past_row >= 0:
hf = first_heb_forms(past_row)
unique: list[tuple[str, str]] = []
seen_forms: set[str] = set()
for pair in hf:
if pair[0] not in seen_forms:
seen_forms.add(pair[0])
unique.append(pair)
unique = deduplicate(first_heb_forms(past_row))
if len(unique) >= 1:
store("past_1s", unique[0][0], unique[0][1])
if len(unique) >= 2:
@ -329,26 +317,14 @@ def _parse_table(soup: BeautifulSoup, passive: bool = False, table_el=None) -> d
store(k, v, au)
if past_row + 2 < len(rows):
hf3 = first_heb_forms(past_row + 2)
unique3: list[tuple[str, str]] = []
seen3: set[str] = set()
for pair in hf3:
if pair[0] not in seen3:
seen3.add(pair[0])
unique3.append(pair)
unique3 = deduplicate(first_heb_forms(past_row + 2))
keys3 = ["past_3ms", "past_3fs", "past_3p"]
for k, (v, au) in zip(keys3, unique3):
store(k, v, au)
# Future tense
if future_row >= 0:
hf = first_heb_forms(future_row)
unique_f: list[tuple[str, str]] = []
seen_f: set[str] = set()
for pair in hf:
if pair[0] not in seen_f:
seen_f.add(pair[0])
unique_f.append(pair)
unique_f = deduplicate(first_heb_forms(future_row))
if len(unique_f) >= 1:
store("future_1s", unique_f[0][0], unique_f[0][1])
if len(unique_f) >= 2:
@ -386,14 +362,14 @@ def _extract_binyan_from_page(soup: BeautifulSoup) -> str:
"""Extract binyan from page header span."""
for h3 in soup.find_all("h3", class_="page-header"):
text = h3.get_text(" ", strip=True)
for bname in ["Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al"]:
for bname in BINYAN_NAMES:
if bname in text:
return bname
# Also try og:description
meta = soup.find("meta", {"property": "og:description"})
if meta:
desc = meta.get("content", "")
for bname in ["Pa'al", "Nif'al", "Pi'el", "Pu'al", "Hitpa'el", "Hif'il", "Huf'al"]:
for bname in BINYAN_NAMES:
if bname in desc:
return bname
return ""
@ -403,15 +379,15 @@ def _extract_passive_binyan_from_page(soup: BeautifulSoup) -> str:
"""Extract passive binyan name from passive section heading."""
for h3 in soup.find_all("h3"):
text = h3.get_text(" ", strip=True)
if "passive" in text.lower() or "Passive" in text:
for bname in ["Pu'al", "Huf'al"]:
if "passive" in text.lower():
for bname in ("Pu'al", "Huf'al"):
if bname in text:
return bname
# Infer: Pa'al/Pi'el → Pu'al; Hif'il → Huf'al (stored as span text)
span = h3.find("span", class_="small")
if span:
span_text = span.get_text(strip=True)
for bname in ["Pu'al", "Huf'al"]:
for bname in ("Pu'al", "Huf'al"):
if bname in span_text:
return bname
return ""

View file

@ -52,12 +52,10 @@ def load(cache_path: Path = CACHE_PATH) -> None:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) >= 1:
word = _strip_nikkud(parts[0])
if word and word not in _freq:
_freq[word] = rank
rank += 1
word = _strip_nikkud(line.split()[0])
if word and word not in _freq:
_freq[word] = rank
rank += 1
cache_path.parent.mkdir(parents=True, exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:

View file

@ -41,7 +41,7 @@ REQUEST_TIMEOUT = 10
# Abstract noun suffixes — words whose English meaning ends in these are skipped
ABSTRACT_SUFFIXES = (
"tion", "ity", "ness", "ment", "ance", "ence", "ism",
"hood", "ship", "ure", "ism", "age",
"hood", "ship", "ure", "age",
)
session = requests.Session()
@ -67,10 +67,7 @@ def is_concrete(english_meaning: str) -> bool:
return False
# Check last word for abstract suffixes
last = words[-1] if words else ""
for suffix in ABSTRACT_SUFFIXES:
if last.endswith(suffix):
return False
return True
return not any(last.endswith(suffix) for suffix in ABSTRACT_SUFFIXES)
def _safe_name(word_no_nikkud: str) -> str:
@ -227,8 +224,8 @@ def run(limit: int | None = None, dry_run: bool = False, single_word: str | None
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError
except Exception:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
cache = load_cache()
@ -238,7 +235,7 @@ def run(limit: int | None = None, dry_run: bool = False, single_word: str | None
skipped_cached = 0
for _, row in df.iterrows():
if limit and processed >= limit:
if limit is not None and processed >= limit:
break
word = str(row.get("Word", "")).strip()

20
run.py
View file

@ -22,6 +22,7 @@ import logging
import re
import sys
import time
import unicodedata
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
@ -78,7 +79,7 @@ def step_scrape(args):
logger.info(f" Saved Anki CSV → {anki_csv}")
def step_frequency():
def step_frequency() -> dict[str, int]:
"""Step 2 — load/download word frequency data."""
logger.info("[2] Loading word frequency data …")
import frequency_lookup
@ -109,8 +110,8 @@ def step_examples(args, freq_cache: dict):
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError
except Exception:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
if args.test:
@ -152,13 +153,12 @@ def step_audio(args):
import pandas as pd
import requests
import unicodedata
try:
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError
except Exception:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
if args.test:
@ -168,7 +168,7 @@ def step_audio(args):
downloaded = 0
skipped = 0
def strip_nik(t):
def strip_nik(t: str) -> str:
return "".join(c for c in unicodedata.normalize("NFD", t)
if unicodedata.category(c) != "Mn")
@ -345,7 +345,7 @@ def step_images(args) -> dict:
return image_fetch.run(limit=limit)
def step_build_vocab(args, examples_cache: dict, freq_cache: dict, image_cache: dict = None):
def step_build_vocab(args, examples_cache: dict, freq_cache: dict, image_cache: dict | None = None):
"""Step 5 — build vocabulary .apkg."""
logger.info("[5] Building vocabulary deck …")
import apkg_builder
@ -406,8 +406,8 @@ def print_summary(args, examples_cache, freq_cache, conjugations):
try:
df = pd.read_csv(dict_csv, sep=";", index_col=0)
if df.shape[1] < 3:
raise ValueError
except Exception:
raise ValueError("too few columns")
except (ValueError, pd.errors.ParserError):
df = pd.read_csv(dict_csv, index_col=0)
logger.info(f" Dictionary words: {len(df)}")