autofax/gui.py
Sochen aef5e5283a Initial commit: automated hourly fax sender for insurance claims
Sends doula coverage claims via Telnyx fax API every hour, logs every
attempt, and generates a printable HTML report for HR. Includes both
a Linux CLI with cron scheduling and a Windows GUI (tkinter) that can
be packaged as a portable exe via PyInstaller.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 11:36:47 +00:00

534 lines
20 KiB
Python

#!/usr/bin/env python3
"""AutoFax GUI - Simple tkinter interface for sending faxes on a schedule."""
import json
import os
import subprocess
import sys
import threading
import time
import tkinter as tk
from datetime import datetime, timezone
from pathlib import Path
from tkinter import filedialog, messagebox, scrolledtext
import requests
# Determine base directory (works for both script and frozen exe)
if getattr(sys, "frozen", False):
BASE_DIR = Path(sys.executable).parent
else:
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "autofax_config.json"
CLAIMS_DIR = BASE_DIR / "claims"
REPORTS_DIR = BASE_DIR / "reports"
LOG_FILE = REPORTS_DIR / "fax_log.json"
API = "https://api.telnyx.com/v2"
class AutoFaxApp:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("AutoFax - Insurance Claim Faxer")
self.root.geometry("620x700")
self.root.resizable(False, False)
self.running = False
self.timer_thread = None
self.config = self.load_config()
self.build_ui()
self.populate_fields()
self.update_status_display()
# ── Config persistence ──────────────────────────────────────────
def load_config(self) -> dict:
if CONFIG_FILE.exists():
try:
return json.loads(CONFIG_FILE.read_text())
except Exception:
pass
return {}
def save_config(self):
self.config = {
"api_key": self.api_key_var.get().strip(),
"connection_id": self.conn_id_var.get().strip(),
"from_number": self.from_var.get().strip(),
"to_number": self.to_var.get().strip(),
"pdf_path": self.pdf_var.get().strip(),
"ntfy_url": self.ntfy_url_var.get().strip(),
"ntfy_token": self.ntfy_token_var.get().strip(),
}
CONFIG_FILE.write_text(json.dumps(self.config, indent=2))
# ── UI ──────────────────────────────────────────────────────────
def build_ui(self):
# Header
header = tk.Frame(self.root, bg="#1a1a2e", height=50)
header.pack(fill="x")
header.pack_propagate(False)
tk.Label(
header, text="AutoFax", font=("Arial", 18, "bold"),
bg="#1a1a2e", fg="white",
).pack(side="left", padx=15, pady=8)
self.status_label = tk.Label(
header, text="Stopped", font=("Arial", 11),
bg="#1a1a2e", fg="#e74c3c",
)
self.status_label.pack(side="right", padx=15)
# Main frame with padding
main = tk.Frame(self.root, padx=15, pady=10)
main.pack(fill="both", expand=True)
# ── Telnyx Settings ──
lf_telnyx = tk.LabelFrame(main, text="Telnyx Settings", padx=10, pady=5)
lf_telnyx.pack(fill="x", pady=(0, 8))
self.api_key_var = tk.StringVar()
self.conn_id_var = tk.StringVar()
self.from_var = tk.StringVar()
self._add_field(lf_telnyx, "API Key:", self.api_key_var, show="*")
self._add_field(lf_telnyx, "Connection ID:", self.conn_id_var)
self._add_field(lf_telnyx, "From Number:", self.from_var, placeholder="+1XXXXXXXXXX")
# ── Fax Settings ──
lf_fax = tk.LabelFrame(main, text="Fax Settings", padx=10, pady=5)
lf_fax.pack(fill="x", pady=(0, 8))
self.to_var = tk.StringVar()
self.pdf_var = tk.StringVar()
self._add_field(lf_fax, "To Number:", self.to_var, placeholder="+18019382102")
pdf_frame = tk.Frame(lf_fax)
pdf_frame.pack(fill="x", pady=2)
tk.Label(pdf_frame, text="Claim PDF:", width=14, anchor="w").pack(side="left")
tk.Entry(pdf_frame, textvariable=self.pdf_var, width=35).pack(side="left", fill="x", expand=True)
tk.Button(pdf_frame, text="Browse...", command=self.browse_pdf).pack(side="left", padx=(5, 0))
# ── ntfy (optional) ──
lf_ntfy = tk.LabelFrame(main, text="Notifications (optional)", padx=10, pady=5)
lf_ntfy.pack(fill="x", pady=(0, 8))
self.ntfy_url_var = tk.StringVar()
self.ntfy_token_var = tk.StringVar()
self._add_field(lf_ntfy, "ntfy URL:", self.ntfy_url_var, placeholder="https://ntfy.sh/your-topic")
self._add_field(lf_ntfy, "ntfy Token:", self.ntfy_token_var, show="*", placeholder="Bearer tk_...")
# ── Controls ──
btn_frame = tk.Frame(main)
btn_frame.pack(fill="x", pady=(0, 8))
self.start_btn = tk.Button(
btn_frame, text="Start Hourly Faxing", font=("Arial", 12, "bold"),
bg="#2d8a4e", fg="white", command=self.toggle_running,
width=20, height=2,
)
self.start_btn.pack(side="left")
tk.Button(
btn_frame, text="Send Once Now", command=self.send_once,
).pack(side="left", padx=(10, 0))
tk.Button(
btn_frame, text="Open Report", command=self.open_report,
).pack(side="right")
# ── Log ──
tk.Label(main, text="Activity Log:", anchor="w").pack(fill="x")
self.log_text = scrolledtext.ScrolledText(
main, height=12, font=("Consolas", 9), state="disabled",
)
self.log_text.pack(fill="both", expand=True)
def _add_field(self, parent, label, var, show=None, placeholder=None):
frame = tk.Frame(parent)
frame.pack(fill="x", pady=2)
tk.Label(frame, text=label, width=14, anchor="w").pack(side="left")
entry = tk.Entry(frame, textvariable=var, show=show or "")
entry.pack(side="left", fill="x", expand=True)
if placeholder:
entry.insert(0, placeholder)
entry.config(fg="grey")
entry.bind("<FocusIn>", lambda e, ent=entry, ph=placeholder: self._clear_placeholder(ent, ph))
entry.bind("<FocusOut>", lambda e, ent=entry, ph=placeholder, v=var: self._restore_placeholder(ent, ph, v))
def _clear_placeholder(self, entry, placeholder):
if entry.get() == placeholder:
entry.delete(0, "end")
entry.config(fg="black")
def _restore_placeholder(self, entry, placeholder, var):
if not var.get().strip() or var.get().strip() == placeholder:
entry.delete(0, "end")
entry.insert(0, placeholder)
entry.config(fg="grey")
def populate_fields(self):
c = self.config
if c.get("api_key"):
self.api_key_var.set(c["api_key"])
if c.get("connection_id"):
self.conn_id_var.set(c["connection_id"])
if c.get("from_number"):
self.from_var.set(c["from_number"])
if c.get("to_number"):
self.to_var.set(c["to_number"])
if c.get("pdf_path"):
self.pdf_var.set(c["pdf_path"])
if c.get("ntfy_url"):
self.ntfy_url_var.set(c["ntfy_url"])
if c.get("ntfy_token"):
self.ntfy_token_var.set(c["ntfy_token"])
# ── Actions ─────────────────────────────────────────────────────
def browse_pdf(self):
path = filedialog.askopenfilename(
title="Select Claim PDF",
filetypes=[("PDF files", "*.pdf")],
initialdir=str(CLAIMS_DIR) if CLAIMS_DIR.exists() else str(BASE_DIR),
)
if path:
self.pdf_var.set(path)
def validate(self) -> bool:
problems = []
if not self.api_key_var.get().strip():
problems.append("API Key is required")
if not self.conn_id_var.get().strip():
problems.append("Connection ID is required")
from_num = self.from_var.get().strip()
if not from_num or from_num == "+1XXXXXXXXXX":
problems.append("From Number is required")
to_num = self.to_var.get().strip()
if not to_num or to_num == "+18019382102":
problems.append("To Number is required")
pdf = self.pdf_var.get().strip()
if not pdf or not Path(pdf).is_file():
problems.append("Select a valid claim PDF file")
if problems:
messagebox.showerror("Missing Configuration", "\n".join(problems))
return False
return True
def toggle_running(self):
if self.running:
self.stop()
else:
self.start()
def start(self):
if not self.validate():
return
self.save_config()
self.running = True
self.start_btn.config(text="Stop Faxing", bg="#c0392b")
self.status_label.config(text="Running (hourly)", fg="#2d8a4e")
self.log("Started hourly fax schedule.")
self.timer_thread = threading.Thread(target=self.run_loop, daemon=True)
self.timer_thread.start()
def stop(self):
self.running = False
self.start_btn.config(text="Start Hourly Faxing", bg="#2d8a4e")
self.status_label.config(text="Stopped", fg="#e74c3c")
self.log("Stopped.")
def send_once(self):
if not self.validate():
return
self.save_config()
self.log("Sending single fax...")
threading.Thread(target=self._do_send, daemon=True).start()
def run_loop(self):
self._do_send()
while self.running:
# Sleep in small increments so we can stop quickly
for _ in range(3600):
if not self.running:
return
time.sleep(1)
if self.running:
self._do_send()
# ── Fax logic ───────────────────────────────────────────────────
def _headers(self):
return {"Authorization": f"Bearer {self.api_key_var.get().strip()}"}
def _do_send(self):
pdf_path = Path(self.pdf_var.get().strip())
to_number = self.to_var.get().strip()
from_number = self.from_var.get().strip()
conn_id = self.conn_id_var.get().strip()
timestamp = datetime.now(timezone.utc).isoformat()
try:
# Upload media
self.log(f"Uploading {pdf_path.name}...")
media_name = f"autofax-claim-{pdf_path.stem}"
with open(pdf_path, "rb") as f:
resp = requests.post(
f"{API}/media",
headers={**self._headers(), "Content-Type": "application/pdf"},
params={"media_name": media_name},
data=f.read(),
timeout=60,
)
resp.raise_for_status()
# Send fax
self.log(f"Sending fax to {to_number}...")
resp = requests.post(
f"{API}/faxes",
headers={**self._headers(), "Content-Type": "application/json"},
json={
"connection_id": conn_id,
"media_name": media_name,
"to": to_number,
"from": from_number,
},
timeout=30,
)
resp.raise_for_status()
fax_data = resp.json().get("data", {})
fax_id = fax_data.get("id")
status = fax_data.get("status", "queued")
entry = {
"timestamp": timestamp,
"status": status,
"fax_id": fax_id,
"to": to_number,
"from": from_number,
"file": pdf_path.name,
"error": None,
}
# Brief wait then check status
if fax_id and status == "queued":
time.sleep(15)
try:
sr = requests.get(
f"{API}/faxes/{fax_id}", headers=self._headers(), timeout=15,
)
sr.raise_for_status()
sd = sr.json().get("data", {})
entry["status"] = sd.get("status", status)
if entry["status"] == "failed":
entry["failure_reason"] = sd.get("failure_reason", "Unknown")
except Exception:
pass
self.log(f"Fax {entry['status'].upper()} (ID: {fax_id})")
except requests.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.text[:300]
except Exception:
pass
entry = {
"timestamp": timestamp,
"status": "send_failed",
"fax_id": None,
"to": to_number,
"from": from_number,
"file": pdf_path.name,
"error": f"{e} | {error_detail}",
}
self.log(f"FAILED: {e}")
except Exception as e:
entry = {
"timestamp": timestamp,
"status": "send_failed",
"fax_id": None,
"to": to_number,
"from": from_number,
"file": pdf_path.name,
"error": str(e),
}
self.log(f"FAILED: {e}")
# Save to log and regenerate report
self._save_entry(entry)
self._generate_report()
self._notify(entry)
self.update_status_display()
def _save_entry(self, entry: dict):
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
entries = []
if LOG_FILE.exists():
try:
entries = json.loads(LOG_FILE.read_text())
except Exception:
pass
entries.append(entry)
LOG_FILE.write_text(json.dumps(entries, indent=2))
def _notify(self, entry: dict):
ntfy_url = self.ntfy_url_var.get().strip()
if not ntfy_url or ntfy_url.startswith("https://ntfy.sh/your"):
return
try:
status_labels = {
"delivered": "Fax Delivered", "sent": "Fax Sent",
"queued": "Fax Queued", "sending": "Fax Sending",
}
subject = status_labels.get(entry["status"], "Fax Failed")
msg = f"To: {entry['to']}\nStatus: {entry['status']}\nFile: {entry['file']}"
if entry.get("error"):
msg += f"\nError: {entry['error']}"
hdrs = {"Title": subject}
ntfy_token = self.ntfy_token_var.get().strip()
if ntfy_token and not ntfy_token.startswith("Bearer tk_..."):
hdrs["Authorization"] = ntfy_token
if entry["status"] not in ("queued", "sending", "sent", "delivered"):
hdrs["Priority"] = "high"
requests.post(ntfy_url, data=msg.encode(), headers=hdrs, timeout=10)
except Exception:
pass
def _generate_report(self):
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
entries = []
if LOG_FILE.exists():
try:
entries = json.loads(LOG_FILE.read_text())
except Exception:
return
if not entries:
return
total = len(entries)
failed = sum(1 for e in entries if e["status"] not in ("queued", "sending", "sent", "delivered"))
succeeded = sum(1 for e in entries if e["status"] == "delivered")
pending = total - failed - succeeded
rows = ""
for i, e in enumerate(entries, 1):
status = e["status"]
if status in ("delivered", "sent"):
sc, sd = "success", status.upper()
elif status in ("queued", "sending"):
sc, sd = "pending", status.upper()
else:
sc, sd = "failed", "FAILED"
error_col = e.get("error") or e.get("failure_reason") or "-"
try:
dt = datetime.fromisoformat(e["timestamp"])
ts = dt.strftime("%b %d, %Y %I:%M %p UTC")
except Exception:
ts = e["timestamp"]
rows += f'<tr><td>{i}</td><td>{ts}</td><td>{e.get("to", "-")}</td>'
rows += f'<td class="{sc}">{sd}</td><td>{e.get("fax_id") or "-"}</td>'
rows += f'<td class="error">{error_col}</td></tr>\n'
html = f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>Fax Transmission Report</title>
<style>
body{{font-family:Arial,sans-serif;margin:40px;color:#333}}
h1{{color:#1a1a2e;border-bottom:2px solid #1a1a2e;padding-bottom:10px}}
.summary{{background:#f5f5f5;padding:20px;border-radius:8px;margin:20px 0}}
.summary span{{font-weight:bold}}
table{{border-collapse:collapse;width:100%;margin:20px 0;font-size:14px}}
th{{background:#1a1a2e;color:#fff;padding:12px 8px;text-align:left}}
td{{padding:10px 8px;border-bottom:1px solid #ddd}}
tr:nth-child(even){{background:#f9f9f9}}
.success{{color:#2d8a4e;font-weight:bold}}
.failed{{color:#c0392b;font-weight:bold}}
.pending{{color:#e67e22;font-weight:bold}}
.error{{font-size:12px;color:#888;max-width:300px;word-wrap:break-word}}
.footer{{margin-top:30px;font-size:12px;color:#888;border-top:1px solid #ddd;padding-top:10px}}
@media print{{body{{margin:20px}}}}
</style></head><body>
<h1>Fax Transmission Report</h1>
<div class="summary">
<p>This report documents automated fax transmission attempts to the insurance
company for doula coverage claim processing.</p>
<p>Total attempts: <span>{total}</span> |
Delivered: <span class="success">{succeeded}</span> |
Failed: <span class="failed">{failed}</span> |
Pending: <span>{pending}</span></p>
<p>Fax destination: <span>{entries[0]['to']}</span></p>
<p>Report generated: <span>{datetime.now(timezone.utc).strftime('%B %d, %Y at %I:%M %p UTC')}</span></p>
</div>
<table><thead><tr>
<th>#</th><th>Date &amp; Time</th><th>Fax Number</th>
<th>Status</th><th>Confirmation ID</th><th>Error Details</th>
</tr></thead><tbody>
{rows}
</tbody></table>
<div class="footer"><p>Generated by AutoFax. Each row represents one
transmission attempt of the doula coverage claim (5 pages). Faxes are sent
once per hour. FAILED = the receiving fax machine did not accept.</p></div>
</body></html>"""
(REPORTS_DIR / "fax_report.html").write_text(html)
# ── Helpers ─────────────────────────────────────────────────────
def update_status_display(self):
"""Update the status label with attempt count."""
if LOG_FILE.exists():
try:
entries = json.loads(LOG_FILE.read_text())
total = len(entries)
failed = sum(1 for e in entries if e["status"] not in ("queued", "sending", "sent", "delivered"))
if total > 0:
extra = f" | {total} sent, {failed} failed"
current = self.status_label.cget("text").split(" |")[0]
self.status_label.config(text=f"{current}{extra}")
except Exception:
pass
def open_report(self):
report = REPORTS_DIR / "fax_report.html"
if not report.exists():
messagebox.showinfo("No Report", "No fax attempts recorded yet.")
return
# Cross-platform open
if sys.platform == "win32":
os.startfile(str(report))
elif sys.platform == "darwin":
subprocess.run(["open", str(report)])
else:
subprocess.run(["xdg-open", str(report)])
def log(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
line = f"[{timestamp}] {message}\n"
def _append():
self.log_text.config(state="normal")
self.log_text.insert("end", line)
self.log_text.see("end")
self.log_text.config(state="disabled")
self.root.after(0, _append)
def main():
CLAIMS_DIR.mkdir(exist_ok=True)
REPORTS_DIR.mkdir(exist_ok=True)
root = tk.Tk()
AutoFaxApp(root)
root.mainloop()
if __name__ == "__main__":
main()