#!/usr/bin/env python3 """ Fetch images for concrete Hebrew nouns from Wikipedia / Wikimedia Commons. TODO: Rewrite to update words.json image/image_source fields directly instead of writing to a separate image_cache.json. Currently the migration script bridges the gap. See Phase 5 in SPRINT_LOG.md. Scope: Noun PoS entries only. Concreteness heuristic: - English meaning has no abstract suffixes (-tion, -ity, -ness, -ment, -ance, -ism, -hood, -ship, -ure, -al, -ing when not a gerund, -ence) - Meaning is ≤ 4 words Image sources (tried in order): 1. Wikipedia page image via pageimages API 2. Wikimedia Commons search (first image file result) Cache: data/image_cache.json (word_no_nikkud → filename or null) Output: data/images/.jpg Usage: python3 image_fetch.py [--limit N] [--sample] [--word WORD] [--dry-run] """ import argparse import json import logging import re import time from pathlib import Path import requests logger = logging.getLogger(__name__) DATA_DIR = Path(__file__).parent / "data" IMAGES_DIR = DATA_DIR / "images" CACHE_PATH = DATA_DIR / "image_cache.json" REQUEST_DELAY = 0.5 REQUEST_TIMEOUT = 10 # Abstract noun suffixes — words whose English meaning ends in these are skipped ABSTRACT_SUFFIXES = ( "tion", "ity", "ness", "ment", "ance", "ence", "ism", "hood", "ship", "ure", "age", ) session = requests.Session() session.headers.update( {"User-Agent": "pealim-anki/3.0 (educational Hebrew Anki deck builder; contact: anki@pealim.invalid)"} ) def is_concrete(english_meaning: str) -> bool: """Return True if the English meaning looks like a concrete noun.""" meaning = english_meaning.strip().lower() # Strip leading article meaning = re.sub(r"^(a|an|the)\s+", "", meaning) words = meaning.split() if len(words) > 4: return False # Check last word for abstract suffixes last = words[-1] if words else "" return not any(last.endswith(suffix) for suffix in ABSTRACT_SUFFIXES) def _safe_name(word_no_nikkud: str) -> str: """Create a safe ASCII-ish filename from a Hebrew word (strip to Hebrew letters only).""" hebrew_only = re.sub(r"[^\u05d0-\u05ea]", "", word_no_nikkud) return hebrew_only if hebrew_only else "unknown" def _try_wikipedia(query: str) -> str | None: """Try Wikipedia pageimages API. Returns image URL or None.""" url = "https://en.wikipedia.org/w/api.php" params = { "action": "query", "titles": query, "prop": "pageimages", "format": "json", "pithumbsize": 200, "redirects": 1, } try: resp = session.get(url, params=params, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() pages = data.get("query", {}).get("pages", {}) for page in pages.values(): if "thumbnail" in page: return page["thumbnail"]["source"] except Exception as e: logger.debug(f"Wikipedia API error for {query!r}: {e}") return None def _try_commons(query: str) -> str | None: """Try Wikimedia Commons file search. Returns thumbnail URL or None.""" url = "https://commons.wikimedia.org/w/api.php" params = { "action": "query", "list": "search", "srnamespace": 6, "srsearch": query, "format": "json", "srlimit": 1, } try: resp = session.get(url, params=params, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() hits = data.get("query", {}).get("search", []) if not hits: return None file_title = hits[0]["title"] # e.g. "File:Cat_portrait.jpg" # Fetch imageinfo to get thumbnail URL info_params = { "action": "query", "titles": file_title, "prop": "imageinfo", "iiprop": "url", "iiurlwidth": 200, "format": "json", } resp2 = session.get(url, params=info_params, timeout=REQUEST_TIMEOUT) resp2.raise_for_status() data2 = resp2.json() pages2 = data2.get("query", {}).get("pages", {}) for page in pages2.values(): info = page.get("imageinfo", []) if info and "thumburl" in info[0]: return info[0]["thumburl"] except Exception as e: logger.debug(f"Commons API error for {query!r}: {e}") return None def _download_image(image_url: str, dest_path: Path) -> bool: """Download image_url to dest_path. Returns True on success.""" try: resp = session.get(image_url, timeout=REQUEST_TIMEOUT, stream=True) resp.raise_for_status() content_type = resp.headers.get("content-type", "") if "image" not in content_type: return False dest_path.write_bytes(resp.content) return True except Exception as e: logger.debug(f"Download failed {image_url}: {e}") return False def get_image(english_meaning: str, word_no_nikkud: str) -> str | None: """ Fetch a thumbnail image for the word. Returns filename (in IMAGES_DIR) or None. Downloads to IMAGES_DIR/.jpg. """ if not is_concrete(english_meaning): return None safe = _safe_name(word_no_nikkud) dest = IMAGES_DIR / f"{safe}.jpg" if dest.exists(): return dest.name # Try Wikipedia first, then Commons query = english_meaning.strip().lower() query = re.sub(r"^(a|an|the)\s+", "", query) image_url = _try_wikipedia(query) time.sleep(REQUEST_DELAY) if not image_url: image_url = _try_commons(query) time.sleep(REQUEST_DELAY) if not image_url: return None IMAGES_DIR.mkdir(parents=True, exist_ok=True) if _download_image(image_url, dest): logger.info(f" {word_no_nikkud!r} ({english_meaning!r}) → {dest.name}") return dest.name return None def load_cache() -> dict: if CACHE_PATH.exists(): try: with open(CACHE_PATH, encoding="utf-8") as f: return json.load(f) except Exception: # noqa: S110 pass return {} def save_cache(cache: dict) -> None: CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) with open(CACHE_PATH, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False, indent=2, sort_keys=True) def run(limit: int | None = None, dry_run: bool = False, single_word: str | None = None) -> dict: """ Fetch images for all Noun-PoS words in pealim_dict_for_anki.csv. Returns the updated image_cache dict. """ import pandas as pd dict_csv = DATA_DIR / "hebrew_dict_for_anki.csv" if not dict_csv.exists(): dict_csv = DATA_DIR / "hebrew_dict.csv" if not dict_csv.exists(): dict_csv = DATA_DIR / "pealim_dict_for_anki.csv" if not dict_csv.exists(): dict_csv = DATA_DIR / "pealim_dict.csv" if not dict_csv.exists(): logger.error("Dictionary CSV not found") return {} try: df = pd.read_csv(dict_csv, sep=";", index_col=0) if df.shape[1] < 3: raise ValueError("too few columns") except (ValueError, pd.errors.ParserError): df = pd.read_csv(dict_csv, index_col=0) cache = load_cache() processed = 0 hits = 0 skipped_abstract = 0 skipped_cached = 0 for _, row in df.iterrows(): if limit is not None and processed >= limit: break word = str(row.get("Word", "")).strip() meaning = str(row.get("Meaning", "")).strip() word_plain = str(row.get("Word Without Nikkud", "")).strip() pos_raw = str(row.get("Part of speech", row.get("Part of Speech", ""))).strip() if not word or not meaning or meaning in ("nan", "None"): continue if "nan" in pos_raw.lower() or "Noun" not in pos_raw: continue if single_word and word_plain != single_word: continue cache_key = word_plain if cache_key in cache: skipped_cached += 1 continue if not is_concrete(meaning): skipped_abstract += 1 cache[cache_key] = None continue processed += 1 logger.info(f"[{processed}] {word_plain!r} ({meaning!r}) …") if dry_run: logger.info(" [dry-run] would fetch image") cache[cache_key] = None continue filename = get_image(meaning, cache_key) cache[cache_key] = filename if filename: hits += 1 # Save cache periodically if processed % 10 == 0: save_cache(cache) save_cache(cache) logger.info( f"Image fetch complete: {hits} found, " f"{processed - hits} not found, " f"{skipped_abstract} abstract (skipped), " f"{skipped_cached} cached" ) return cache def main() -> None: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") p = argparse.ArgumentParser(description="Fetch images for concrete Hebrew nouns") p.add_argument("--limit", type=int, metavar="N", help="Process at most N nouns (for testing)") p.add_argument("--dry-run", action="store_true", help="Don't download, just check concreteness") p.add_argument("--word", metavar="WORD", help="Fetch image for a specific word (no-nikkud form)") args = p.parse_args() cache = run(limit=args.limit, dry_run=args.dry_run, single_word=args.word) found = [(k, v) for k, v in cache.items() if v] print(f"\n{len(found)} words with images (of {len(cache)} in cache)") if found[:5]: print("Sample:", found[:5]) if __name__ == "__main__": main()