- Full pealim.com rescrape: 9,120 words (15 new), all with audio URLs - Plurals deck: 2:1 regular:irregular ratio (649 notes), RTL arrows, 1.6x hint text - Conjugation deck: blue infinitive on front, plain meaning on back, nikkud labels - Confusables deck: larger prompt text (32px), audio only when all words have it - Validator: non-audio variants no longer false-fail on audio check - 14 new audio files downloaded Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
315 lines
12 KiB
Python
315 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
validate_apkg.py — Anki deck integrity validation.
|
||
|
||
Checks both pealim_vocabulary.apkg and pealim_conjugations.apkg for
|
||
structural correctness, media consistency, and card data integrity.
|
||
|
||
Usage:
|
||
python3 validate_apkg.py [--vocab | --conjugations] [path/to/deck.apkg]
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import sys
|
||
import tempfile
|
||
import zipfile
|
||
from pathlib import Path
|
||
|
||
VOCAB_APKG = Path("output/hebrew_vocabulary.apkg")
|
||
CONJ_APKG = Path("output/hebrew_conjugations.apkg")
|
||
CONF_APKG = Path("output/hebrew_confusables.apkg")
|
||
PLURAL_APKG = Path("output/hebrew_plurals.apkg")
|
||
COMPLETE_APKG = Path("output/hebrew_complete.apkg")
|
||
|
||
PASS = "\033[32m✓\033[0m"
|
||
FAIL = "\033[31m✗\033[0m"
|
||
WARN = "\033[33m⚠\033[0m"
|
||
|
||
|
||
def check(label: str, ok: bool, detail: str = "") -> bool:
|
||
icon = PASS if ok else FAIL
|
||
line = f" {icon} {label}"
|
||
if detail:
|
||
line += f": {detail}"
|
||
print(line)
|
||
return ok
|
||
|
||
|
||
def warn(label: str, detail: str = "") -> None:
|
||
line = f" {WARN} {label}"
|
||
if detail:
|
||
line += f": {detail}"
|
||
print(line)
|
||
|
||
|
||
def _detect_format(data: bytes) -> str:
|
||
if data[:3] == b"ID3":
|
||
return "MP3 (ID3)"
|
||
if data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2", b"\xff\xfa"):
|
||
return "MP3 (raw)"
|
||
if data[:4] == b"OggS":
|
||
return "OGG"
|
||
if data[:4] == b"fLaC":
|
||
return "FLAC"
|
||
if data[:4] == b"RIFF":
|
||
return "WAV"
|
||
return f"unknown ({data[:4].hex()})"
|
||
|
||
|
||
def validate_apkg(apkg_path: Path) -> int:
|
||
"""Run all checks. Returns number of failures."""
|
||
print(f"\n{'=' * 60}")
|
||
print(f" Validating: {apkg_path}")
|
||
print(f"{'=' * 60}")
|
||
|
||
failures = 0
|
||
|
||
if not apkg_path.exists():
|
||
print(f" {FAIL} File not found: {apkg_path}")
|
||
return 1
|
||
|
||
file_size_mb = apkg_path.stat().st_size / 1_048_576
|
||
print(f"\n File size: {file_size_mb:.1f} MB")
|
||
|
||
# --- ZIP structure ---
|
||
print("\n[ZIP structure]")
|
||
try:
|
||
zf = zipfile.ZipFile(apkg_path)
|
||
except zipfile.BadZipFile as e:
|
||
print(f" {FAIL} Invalid ZIP: {e}")
|
||
return 1
|
||
|
||
with zf, tempfile.TemporaryDirectory() as tmpdir:
|
||
namelist = zf.namelist()
|
||
has_db = "collection.anki2" in namelist
|
||
has_media = "media" in namelist
|
||
failures += 0 if check("collection.anki2 present", has_db) else 1
|
||
failures += 0 if check("media manifest present", has_media) else 1
|
||
|
||
zf.extractall(tmpdir)
|
||
|
||
# --- Media manifest ---
|
||
print("\n[Media manifest]")
|
||
media_path = os.path.join(tmpdir, "media")
|
||
with open(media_path) as f:
|
||
try:
|
||
media_map: dict[str, str] = json.load(f)
|
||
except json.JSONDecodeError as e:
|
||
print(f" {FAIL} Invalid media JSON: {e}")
|
||
failures += 1
|
||
media_map = {}
|
||
|
||
original_names = set(media_map.values())
|
||
zip_numbered = set(namelist) - {"collection.anki2", "media"}
|
||
|
||
check(
|
||
"Manifest count matches ZIP entries",
|
||
len(media_map) == len(zip_numbered),
|
||
f"{len(media_map)} manifest vs {len(zip_numbered)} ZIP files",
|
||
)
|
||
|
||
# Check for zero-byte media files
|
||
zero_byte = []
|
||
for num, orig in media_map.items():
|
||
size = zf.getinfo(num).file_size if num in zf.NameToInfo else -1
|
||
if size == 0:
|
||
zero_byte.append(orig)
|
||
failures += (
|
||
0
|
||
if check("No zero-byte media files", len(zero_byte) == 0, f"{len(zero_byte)} empty" if zero_byte else "")
|
||
else 1
|
||
)
|
||
|
||
# Check audio format sample (first 20 mp3s)
|
||
mp3_names = [num for num, orig in media_map.items() if orig.endswith(".mp3")]
|
||
bad_format = []
|
||
for num in mp3_names[:20]:
|
||
data = zf.read(num)[:8]
|
||
fmt = _detect_format(data)
|
||
if "MP3" not in fmt:
|
||
bad_format.append(f"{media_map[num]}: {fmt}")
|
||
failures += (
|
||
0
|
||
if check(
|
||
f"Audio format (sampled {min(20, len(mp3_names))} files)",
|
||
len(bad_format) == 0,
|
||
"; ".join(bad_format) if bad_format else "all MP3",
|
||
)
|
||
else 1
|
||
)
|
||
|
||
# Fonts present
|
||
font_files = [v for v in original_names if v.endswith(".ttf")]
|
||
check("Heebo font files bundled", len(font_files) >= 1, ", ".join(font_files) if font_files else "none found")
|
||
|
||
# --- Database ---
|
||
print("\n[Database]")
|
||
db_path = os.path.join(tmpdir, "collection.anki2")
|
||
conn = sqlite3.connect(db_path)
|
||
|
||
schema_ver = conn.execute("SELECT ver FROM col").fetchone()[0]
|
||
failures += 0 if check("Schema version 11 (Anki 2.1)", schema_ver == 11, f"got {schema_ver}") else 1
|
||
|
||
note_count = conn.execute("SELECT COUNT(*) FROM notes").fetchone()[0]
|
||
card_count = conn.execute("SELECT COUNT(*) FROM cards").fetchone()[0]
|
||
failures += 0 if check("Notes present", note_count > 0, f"{note_count:,} notes") else 1
|
||
failures += 0 if check("Cards present", card_count > 0, f"{card_count:,} cards") else 1
|
||
|
||
# Determine expected cards per note from model templates
|
||
# Some templates are optional (e.g. cloze only generates when field is non-empty),
|
||
# so we check that cards fall between min and max expected range.
|
||
models_json_raw = conn.execute("SELECT models FROM col").fetchone()[0]
|
||
models_raw = json.loads(models_json_raw)
|
||
tmpl_counts = [len(m["tmpls"]) for m in models_raw.values()]
|
||
if len(set(tmpl_counts)) == 1 and len(tmpl_counts) == 1:
|
||
expected_ratio = tmpl_counts[0]
|
||
# Allow fewer cards when optional templates exist (e.g. cloze)
|
||
min_cards = note_count # at least 1 card per note
|
||
max_cards = note_count * expected_ratio
|
||
failures += (
|
||
0
|
||
if check(
|
||
f"Cards per note (1–{expected_ratio} templates)",
|
||
min_cards <= card_count <= max_cards,
|
||
f"{card_count:,} cards from {note_count:,} notes",
|
||
)
|
||
else 1
|
||
)
|
||
|
||
# Duplicate GUIDs
|
||
dup_guids = conn.execute("SELECT guid, COUNT(*) c FROM notes GROUP BY guid HAVING c > 1").fetchall()
|
||
failures += 0 if check("No duplicate GUIDs", len(dup_guids) == 0, f"{len(dup_guids)} duplicates") else 1
|
||
|
||
# Card queue states
|
||
queues = conn.execute("SELECT type, queue, COUNT(*) FROM cards GROUP BY type, queue").fetchall()
|
||
queue_map = {(t, q): cnt for t, q, cnt in queues}
|
||
new_cards = queue_map.get((0, 0), 0)
|
||
suspended = queue_map.get((0, -1), 0) + queue_map.get((1, -1), 0) + queue_map.get((2, -1), 0)
|
||
if new_cards > 0:
|
||
check("Cards in new queue (type=0, queue=0)", True, f"{new_cards:,}")
|
||
if suspended > 0:
|
||
warn("Suspended cards", f"{suspended:,}")
|
||
|
||
# dconf — new card order
|
||
dconf_json = conn.execute("SELECT dconf FROM col").fetchone()[0]
|
||
dconf = json.loads(dconf_json)
|
||
orders = {dc.get("new", {}).get("order") for dc in dconf.values() if isinstance(dc, dict)}
|
||
per_days = {dc.get("new", {}).get("perDay") for dc in dconf.values() if isinstance(dc, dict)}
|
||
check("new.order configured", bool(orders), f"{orders}")
|
||
if per_days:
|
||
check("new.perDay > 0", all(p and p > 0 for p in per_days if p is not None), f"perDay={per_days}")
|
||
|
||
# Deck assignment
|
||
decks_json = conn.execute("SELECT decks FROM col").fetchone()[0]
|
||
decks = json.loads(decks_json)
|
||
real_decks = {did: d for did, d in decks.items() if did != "1"}
|
||
if real_decks:
|
||
check("Custom deck exists (not Default only)", True, ", ".join(d["name"] for d in real_decks.values()))
|
||
# All cards in the custom deck?
|
||
for did_str in real_decks:
|
||
assigned = conn.execute("SELECT COUNT(*) FROM cards WHERE did=?", [int(did_str)]).fetchone()[0]
|
||
check(f"Cards in deck '{real_decks[did_str]['name']}'", assigned > 0, f"{assigned:,}/{card_count:,}")
|
||
|
||
# --- Sound references vs media manifest ---
|
||
print("\n[Sound references]")
|
||
notes_flds = conn.execute("SELECT flds FROM notes").fetchall()
|
||
sound_refs: set[str] = set()
|
||
for (flds,) in notes_flds:
|
||
for ref in re.findall(r"\[sound:([^\]]+)\]", flds):
|
||
sound_refs.add(ref)
|
||
|
||
missing_audio = sound_refs - original_names
|
||
orphaned_audio = original_names - sound_refs - set(font_files)
|
||
failures += (
|
||
0
|
||
if check(
|
||
"All sound refs in media manifest",
|
||
len(missing_audio) == 0,
|
||
f"{len(missing_audio)} missing" if missing_audio else "",
|
||
)
|
||
else 1
|
||
)
|
||
if orphaned_audio:
|
||
warn("Media files not referenced by any card", f"{len(orphaned_audio)} orphaned")
|
||
|
||
notes_with_audio = sum(1 for (flds,) in notes_flds if "[sound:" in flds)
|
||
pct = notes_with_audio / note_count * 100 if note_count else 0
|
||
if notes_with_audio > 0:
|
||
check("Notes with audio", True, f"{notes_with_audio:,}/{note_count:,} ({pct:.0f}%)")
|
||
else:
|
||
# Non-audio variants intentionally have no audio — not a failure
|
||
warn("No audio in this deck variant", f"0/{note_count:,}")
|
||
|
||
# --- Empty fields check ---
|
||
print("\n[Field content]")
|
||
models = models_raw
|
||
for mid_str, model in models.items():
|
||
field_names = [f["name"] for f in model["flds"]]
|
||
# Check required fields (first 3) are not empty
|
||
required_idx = list(range(min(3, len(field_names))))
|
||
all_notes_for_model = conn.execute("SELECT flds FROM notes WHERE mid=?", [int(mid_str)]).fetchall()
|
||
for idx in required_idx:
|
||
fname = field_names[idx]
|
||
empty = sum(
|
||
1
|
||
for (flds,) in all_notes_for_model
|
||
if len(flds.split("\x1f")) <= idx or not flds.split("\x1f")[idx].strip()
|
||
)
|
||
if empty > 0:
|
||
warn(f"Model '{model['name']}' field '{fname}' empty in {empty} notes")
|
||
else:
|
||
check(f"Model '{model['name']}' field '{fname}' populated", True)
|
||
|
||
conn.close()
|
||
|
||
print()
|
||
return failures
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="Validate Pealim .apkg files")
|
||
parser.add_argument("path", nargs="?", help="Path to .apkg file (validates both if omitted)")
|
||
group = parser.add_mutually_exclusive_group()
|
||
group.add_argument("--vocab", action="store_true", help="Validate vocabulary deck only")
|
||
group.add_argument("--conjugations", action="store_true", help="Validate conjugation deck only")
|
||
group.add_argument("--confusables", action="store_true", help="Validate confusables deck only")
|
||
group.add_argument("--plurals", action="store_true", help="Validate plurals deck only")
|
||
group.add_argument("--complete", action="store_true", help="Validate complete combined deck only")
|
||
args = parser.parse_args()
|
||
|
||
targets: list[Path] = []
|
||
if args.path:
|
||
targets = [Path(args.path)]
|
||
elif args.vocab:
|
||
targets = [VOCAB_APKG]
|
||
elif args.conjugations:
|
||
targets = [CONJ_APKG]
|
||
elif args.confusables:
|
||
targets = [CONF_APKG]
|
||
elif args.plurals:
|
||
targets = [PLURAL_APKG]
|
||
elif args.complete:
|
||
targets = [COMPLETE_APKG]
|
||
else:
|
||
targets = [VOCAB_APKG, CONJ_APKG, CONF_APKG, PLURAL_APKG, COMPLETE_APKG]
|
||
|
||
total_failures = 0
|
||
for path in targets:
|
||
total_failures += validate_apkg(path)
|
||
|
||
print(f"\n{'=' * 60}")
|
||
if total_failures == 0:
|
||
print(f" {PASS} All checks passed")
|
||
else:
|
||
print(f" {FAIL} {total_failures} check(s) failed")
|
||
print(f"{'=' * 60}\n")
|
||
|
||
sys.exit(0 if total_failures == 0 else 1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|