#!/usr/bin/env python3 """ validate_apkg.py — Anki deck integrity validation. Checks both pealim_vocabulary.apkg and pealim_conjugations.apkg for structural correctness, media consistency, and card data integrity. Usage: python3 validate_apkg.py [--vocab | --conjugations] [path/to/deck.apkg] """ import argparse import json import os import re import sqlite3 import sys import tempfile import zipfile from pathlib import Path VOCAB_APKG = Path("output/hebrew_vocabulary.apkg") CONJ_APKG = Path("output/hebrew_conjugations.apkg") CONF_APKG = Path("output/hebrew_confusables.apkg") PLURAL_APKG = Path("output/hebrew_plurals.apkg") COMPLETE_APKG = Path("output/hebrew_complete.apkg") PASS = "\033[32m✓\033[0m" FAIL = "\033[31m✗\033[0m" WARN = "\033[33m⚠\033[0m" def check(label: str, ok: bool, detail: str = "") -> bool: icon = PASS if ok else FAIL line = f" {icon} {label}" if detail: line += f": {detail}" print(line) return ok def warn(label: str, detail: str = "") -> None: line = f" {WARN} {label}" if detail: line += f": {detail}" print(line) def _detect_format(data: bytes) -> str: if data[:3] == b"ID3": return "MP3 (ID3)" if data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2", b"\xff\xfa"): return "MP3 (raw)" if data[:4] == b"OggS": return "OGG" if data[:4] == b"fLaC": return "FLAC" if data[:4] == b"RIFF": return "WAV" return f"unknown ({data[:4].hex()})" def validate_apkg(apkg_path: Path) -> int: """Run all checks. Returns number of failures.""" print(f"\n{'=' * 60}") print(f" Validating: {apkg_path}") print(f"{'=' * 60}") failures = 0 if not apkg_path.exists(): print(f" {FAIL} File not found: {apkg_path}") return 1 file_size_mb = apkg_path.stat().st_size / 1_048_576 print(f"\n File size: {file_size_mb:.1f} MB") # --- ZIP structure --- print("\n[ZIP structure]") try: zf = zipfile.ZipFile(apkg_path) except zipfile.BadZipFile as e: print(f" {FAIL} Invalid ZIP: {e}") return 1 with zf, tempfile.TemporaryDirectory() as tmpdir: namelist = zf.namelist() has_db = "collection.anki2" in namelist has_media = "media" in namelist failures += 0 if check("collection.anki2 present", has_db) else 1 failures += 0 if check("media manifest present", has_media) else 1 zf.extractall(tmpdir) # --- Media manifest --- print("\n[Media manifest]") media_path = os.path.join(tmpdir, "media") with open(media_path) as f: try: media_map: dict[str, str] = json.load(f) except json.JSONDecodeError as e: print(f" {FAIL} Invalid media JSON: {e}") failures += 1 media_map = {} original_names = set(media_map.values()) zip_numbered = set(namelist) - {"collection.anki2", "media"} check( "Manifest count matches ZIP entries", len(media_map) == len(zip_numbered), f"{len(media_map)} manifest vs {len(zip_numbered)} ZIP files", ) # Check for zero-byte media files zero_byte = [] for num, orig in media_map.items(): size = zf.getinfo(num).file_size if num in zf.NameToInfo else -1 if size == 0: zero_byte.append(orig) failures += ( 0 if check("No zero-byte media files", len(zero_byte) == 0, f"{len(zero_byte)} empty" if zero_byte else "") else 1 ) # Check audio format sample (first 20 mp3s) mp3_names = [num for num, orig in media_map.items() if orig.endswith(".mp3")] bad_format = [] for num in mp3_names[:20]: data = zf.read(num)[:8] fmt = _detect_format(data) if "MP3" not in fmt: bad_format.append(f"{media_map[num]}: {fmt}") failures += ( 0 if check( f"Audio format (sampled {min(20, len(mp3_names))} files)", len(bad_format) == 0, "; ".join(bad_format) if bad_format else "all MP3", ) else 1 ) # Fonts present font_files = [v for v in original_names if v.endswith(".ttf")] check("Heebo font files bundled", len(font_files) >= 1, ", ".join(font_files) if font_files else "none found") # --- Database --- print("\n[Database]") db_path = os.path.join(tmpdir, "collection.anki2") conn = sqlite3.connect(db_path) schema_ver = conn.execute("SELECT ver FROM col").fetchone()[0] failures += 0 if check("Schema version 11 (Anki 2.1)", schema_ver == 11, f"got {schema_ver}") else 1 note_count = conn.execute("SELECT COUNT(*) FROM notes").fetchone()[0] card_count = conn.execute("SELECT COUNT(*) FROM cards").fetchone()[0] failures += 0 if check("Notes present", note_count > 0, f"{note_count:,} notes") else 1 failures += 0 if check("Cards present", card_count > 0, f"{card_count:,} cards") else 1 # Determine expected cards per note from model templates # Some templates are optional (e.g. cloze only generates when field is non-empty), # so we check that cards fall between min and max expected range. models_json_raw = conn.execute("SELECT models FROM col").fetchone()[0] models_raw = json.loads(models_json_raw) tmpl_counts = [len(m["tmpls"]) for m in models_raw.values()] if len(set(tmpl_counts)) == 1 and len(tmpl_counts) == 1: expected_ratio = tmpl_counts[0] # Allow fewer cards when optional templates exist (e.g. cloze) min_cards = note_count # at least 1 card per note max_cards = note_count * expected_ratio failures += ( 0 if check( f"Cards per note (1–{expected_ratio} templates)", min_cards <= card_count <= max_cards, f"{card_count:,} cards from {note_count:,} notes", ) else 1 ) # Duplicate GUIDs dup_guids = conn.execute("SELECT guid, COUNT(*) c FROM notes GROUP BY guid HAVING c > 1").fetchall() failures += 0 if check("No duplicate GUIDs", len(dup_guids) == 0, f"{len(dup_guids)} duplicates") else 1 # Card queue states queues = conn.execute("SELECT type, queue, COUNT(*) FROM cards GROUP BY type, queue").fetchall() queue_map = {(t, q): cnt for t, q, cnt in queues} new_cards = queue_map.get((0, 0), 0) suspended = queue_map.get((0, -1), 0) + queue_map.get((1, -1), 0) + queue_map.get((2, -1), 0) if new_cards > 0: check("Cards in new queue (type=0, queue=0)", True, f"{new_cards:,}") if suspended > 0: warn("Suspended cards", f"{suspended:,}") # dconf — new card order dconf_json = conn.execute("SELECT dconf FROM col").fetchone()[0] dconf = json.loads(dconf_json) orders = {dc.get("new", {}).get("order") for dc in dconf.values() if isinstance(dc, dict)} per_days = {dc.get("new", {}).get("perDay") for dc in dconf.values() if isinstance(dc, dict)} check("new.order configured", bool(orders), f"{orders}") if per_days: check("new.perDay > 0", all(p and p > 0 for p in per_days if p is not None), f"perDay={per_days}") # Deck assignment decks_json = conn.execute("SELECT decks FROM col").fetchone()[0] decks = json.loads(decks_json) real_decks = {did: d for did, d in decks.items() if did != "1"} if real_decks: check("Custom deck exists (not Default only)", True, ", ".join(d["name"] for d in real_decks.values())) # All cards in the custom deck? for did_str in real_decks: assigned = conn.execute("SELECT COUNT(*) FROM cards WHERE did=?", [int(did_str)]).fetchone()[0] check(f"Cards in deck '{real_decks[did_str]['name']}'", assigned > 0, f"{assigned:,}/{card_count:,}") # --- Sound references vs media manifest --- print("\n[Sound references]") notes_flds = conn.execute("SELECT flds FROM notes").fetchall() sound_refs: set[str] = set() for (flds,) in notes_flds: for ref in re.findall(r"\[sound:([^\]]+)\]", flds): sound_refs.add(ref) missing_audio = sound_refs - original_names orphaned_audio = original_names - sound_refs - set(font_files) failures += ( 0 if check( "All sound refs in media manifest", len(missing_audio) == 0, f"{len(missing_audio)} missing" if missing_audio else "", ) else 1 ) if orphaned_audio: warn("Media files not referenced by any card", f"{len(orphaned_audio)} orphaned") notes_with_audio = sum(1 for (flds,) in notes_flds if "[sound:" in flds) pct = notes_with_audio / note_count * 100 if note_count else 0 if notes_with_audio > 0: check("Notes with audio", True, f"{notes_with_audio:,}/{note_count:,} ({pct:.0f}%)") else: # Non-audio variants intentionally have no audio — not a failure warn("No audio in this deck variant", f"0/{note_count:,}") # --- Empty fields check --- print("\n[Field content]") models = models_raw for mid_str, model in models.items(): field_names = [f["name"] for f in model["flds"]] # Check required fields (first 3) are not empty required_idx = list(range(min(3, len(field_names)))) all_notes_for_model = conn.execute("SELECT flds FROM notes WHERE mid=?", [int(mid_str)]).fetchall() for idx in required_idx: fname = field_names[idx] empty = sum( 1 for (flds,) in all_notes_for_model if len(flds.split("\x1f")) <= idx or not flds.split("\x1f")[idx].strip() ) if empty > 0: warn(f"Model '{model['name']}' field '{fname}' empty in {empty} notes") else: check(f"Model '{model['name']}' field '{fname}' populated", True) conn.close() print() return failures def main() -> None: parser = argparse.ArgumentParser(description="Validate Pealim .apkg files") parser.add_argument("path", nargs="?", help="Path to .apkg file (validates both if omitted)") group = parser.add_mutually_exclusive_group() group.add_argument("--vocab", action="store_true", help="Validate vocabulary deck only") group.add_argument("--conjugations", action="store_true", help="Validate conjugation deck only") group.add_argument("--confusables", action="store_true", help="Validate confusables deck only") group.add_argument("--plurals", action="store_true", help="Validate plurals deck only") group.add_argument("--complete", action="store_true", help="Validate complete combined deck only") args = parser.parse_args() targets: list[Path] = [] if args.path: targets = [Path(args.path)] elif args.vocab: targets = [VOCAB_APKG] elif args.conjugations: targets = [CONJ_APKG] elif args.confusables: targets = [CONF_APKG] elif args.plurals: targets = [PLURAL_APKG] elif args.complete: targets = [COMPLETE_APKG] else: targets = [VOCAB_APKG, CONJ_APKG, CONF_APKG, PLURAL_APKG, COMPLETE_APKG] total_failures = 0 for path in targets: total_failures += validate_apkg(path) print(f"\n{'=' * 60}") if total_failures == 0: print(f" {PASS} All checks passed") else: print(f" {FAIL} {total_failures} check(s) failed") print(f"{'=' * 60}\n") sys.exit(0 if total_failures == 0 else 1) if __name__ == "__main__": main()