- Remove legacy Telnyx status strings (delivered, sent, queued, sending) - Remove duplicate save_log/generate_report calls in main() - Remove hardcoded personal fax number from GUI placeholder - Replace "doula coverage claim" with generic "claim" in HTML reports - Fix README: "7 days" → "3 successful deliveries", remove unnecessary number purchase step Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
521 lines
19 KiB
Python
521 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""AutoFax GUI - Simple tkinter interface for sending faxes on a schedule."""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import tkinter as tk
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from tkinter import filedialog, messagebox, scrolledtext
|
|
|
|
import requests
|
|
|
|
# Determine base directory (works for both script and frozen exe)
|
|
if getattr(sys, "frozen", False):
|
|
BASE_DIR = Path(sys.executable).parent
|
|
else:
|
|
BASE_DIR = Path(__file__).parent
|
|
|
|
CONFIG_FILE = BASE_DIR / "autofax_config.json"
|
|
CLAIMS_DIR = BASE_DIR / "claims"
|
|
REPORTS_DIR = BASE_DIR / "reports"
|
|
LOG_FILE = REPORTS_DIR / "fax_log.json"
|
|
|
|
|
|
class AutoFaxApp:
|
|
def __init__(self, root: tk.Tk):
|
|
self.root = root
|
|
self.root.title("AutoFax - Insurance Claim Faxer")
|
|
self.root.geometry("620x720")
|
|
self.root.resizable(False, False)
|
|
|
|
self.running = False
|
|
self.timer_thread = None
|
|
self.config = self.load_config()
|
|
|
|
self.build_ui()
|
|
self.populate_fields()
|
|
self.update_status_display()
|
|
|
|
# ── Config persistence ──────────────────────────────────────────
|
|
|
|
def load_config(self) -> dict:
|
|
if CONFIG_FILE.exists():
|
|
try:
|
|
return json.loads(CONFIG_FILE.read_text())
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def save_config(self):
|
|
self.config = {
|
|
"project_id": self.project_id_var.get().strip(),
|
|
"key_id": self.key_id_var.get().strip(),
|
|
"key_secret": self.key_secret_var.get().strip(),
|
|
"from_number": self.from_var.get().strip(),
|
|
"to_number": self.to_var.get().strip(),
|
|
"pdf_path": self.pdf_var.get().strip(),
|
|
"ntfy_url": self.ntfy_url_var.get().strip(),
|
|
"ntfy_token": self.ntfy_token_var.get().strip(),
|
|
}
|
|
CONFIG_FILE.write_text(json.dumps(self.config, indent=2))
|
|
|
|
# ── UI ──────────────────────────────────────────────────────────
|
|
|
|
def build_ui(self):
|
|
# Header
|
|
header = tk.Frame(self.root, bg="#1a1a2e", height=50)
|
|
header.pack(fill="x")
|
|
header.pack_propagate(False)
|
|
tk.Label(
|
|
header, text="AutoFax", font=("Arial", 18, "bold"),
|
|
bg="#1a1a2e", fg="white",
|
|
).pack(side="left", padx=15, pady=8)
|
|
self.status_label = tk.Label(
|
|
header, text="Stopped", font=("Arial", 11),
|
|
bg="#1a1a2e", fg="#e74c3c",
|
|
)
|
|
self.status_label.pack(side="right", padx=15)
|
|
|
|
# Main frame with padding
|
|
main = tk.Frame(self.root, padx=15, pady=10)
|
|
main.pack(fill="both", expand=True)
|
|
|
|
# ── Sinch Settings ──
|
|
lf_sinch = tk.LabelFrame(main, text="Sinch Settings", padx=10, pady=5)
|
|
lf_sinch.pack(fill="x", pady=(0, 8))
|
|
|
|
self.project_id_var = tk.StringVar()
|
|
self.key_id_var = tk.StringVar()
|
|
self.key_secret_var = tk.StringVar()
|
|
self.from_var = tk.StringVar()
|
|
|
|
self._add_field(lf_sinch, "Project ID:", self.project_id_var)
|
|
self._add_field(lf_sinch, "Key ID:", self.key_id_var)
|
|
self._add_field(lf_sinch, "Key Secret:", self.key_secret_var, show="*")
|
|
self._add_field(lf_sinch, "From (optional):", self.from_var, placeholder="+1XXXXXXXXXX")
|
|
|
|
# ── Fax Settings ──
|
|
lf_fax = tk.LabelFrame(main, text="Fax Settings", padx=10, pady=5)
|
|
lf_fax.pack(fill="x", pady=(0, 8))
|
|
|
|
self.to_var = tk.StringVar()
|
|
self.pdf_var = tk.StringVar()
|
|
|
|
self._add_field(lf_fax, "To Number:", self.to_var, placeholder="+1XXXXXXXXXX")
|
|
|
|
pdf_frame = tk.Frame(lf_fax)
|
|
pdf_frame.pack(fill="x", pady=2)
|
|
tk.Label(pdf_frame, text="Claim PDF:", width=14, anchor="w").pack(side="left")
|
|
tk.Entry(pdf_frame, textvariable=self.pdf_var, width=35).pack(side="left", fill="x", expand=True)
|
|
tk.Button(pdf_frame, text="Browse...", command=self.browse_pdf).pack(side="left", padx=(5, 0))
|
|
|
|
# ── ntfy (optional) ──
|
|
lf_ntfy = tk.LabelFrame(main, text="Notifications (optional)", padx=10, pady=5)
|
|
lf_ntfy.pack(fill="x", pady=(0, 8))
|
|
|
|
self.ntfy_url_var = tk.StringVar()
|
|
self.ntfy_token_var = tk.StringVar()
|
|
|
|
self._add_field(lf_ntfy, "ntfy URL:", self.ntfy_url_var, placeholder="https://ntfy.sh/your-topic")
|
|
self._add_field(lf_ntfy, "ntfy Token:", self.ntfy_token_var, show="*", placeholder="Bearer tk_...")
|
|
|
|
# ── Controls ──
|
|
btn_frame = tk.Frame(main)
|
|
btn_frame.pack(fill="x", pady=(0, 8))
|
|
|
|
self.start_btn = tk.Button(
|
|
btn_frame, text="Start Hourly Faxing", font=("Arial", 12, "bold"),
|
|
bg="#2d8a4e", fg="white", command=self.toggle_running,
|
|
width=20, height=2,
|
|
)
|
|
self.start_btn.pack(side="left")
|
|
|
|
tk.Button(
|
|
btn_frame, text="Send Once Now", command=self.send_once,
|
|
).pack(side="left", padx=(10, 0))
|
|
|
|
tk.Button(
|
|
btn_frame, text="Open Report", command=self.open_report,
|
|
).pack(side="right")
|
|
|
|
# ── Log ──
|
|
tk.Label(main, text="Activity Log:", anchor="w").pack(fill="x")
|
|
self.log_text = scrolledtext.ScrolledText(
|
|
main, height=12, font=("Consolas", 9), state="disabled",
|
|
)
|
|
self.log_text.pack(fill="both", expand=True)
|
|
|
|
def _add_field(self, parent, label, var, show=None, placeholder=None):
|
|
frame = tk.Frame(parent)
|
|
frame.pack(fill="x", pady=2)
|
|
tk.Label(frame, text=label, width=14, anchor="w").pack(side="left")
|
|
entry = tk.Entry(frame, textvariable=var, show=show or "")
|
|
entry.pack(side="left", fill="x", expand=True)
|
|
if placeholder:
|
|
entry.insert(0, placeholder)
|
|
entry.config(fg="grey")
|
|
entry.bind("<FocusIn>", lambda e, ent=entry, ph=placeholder: self._clear_placeholder(ent, ph))
|
|
entry.bind("<FocusOut>", lambda e, ent=entry, ph=placeholder, v=var: self._restore_placeholder(ent, ph, v))
|
|
|
|
def _clear_placeholder(self, entry, placeholder):
|
|
if entry.get() == placeholder:
|
|
entry.delete(0, "end")
|
|
entry.config(fg="black")
|
|
|
|
def _restore_placeholder(self, entry, placeholder, var):
|
|
if not var.get().strip() or var.get().strip() == placeholder:
|
|
entry.delete(0, "end")
|
|
entry.insert(0, placeholder)
|
|
entry.config(fg="grey")
|
|
|
|
def populate_fields(self):
|
|
c = self.config
|
|
for key, var in [
|
|
("project_id", self.project_id_var),
|
|
("key_id", self.key_id_var),
|
|
("key_secret", self.key_secret_var),
|
|
("from_number", self.from_var),
|
|
("to_number", self.to_var),
|
|
("pdf_path", self.pdf_var),
|
|
("ntfy_url", self.ntfy_url_var),
|
|
("ntfy_token", self.ntfy_token_var),
|
|
]:
|
|
if c.get(key):
|
|
var.set(c[key])
|
|
|
|
# ── Actions ─────────────────────────────────────────────────────
|
|
|
|
def browse_pdf(self):
|
|
path = filedialog.askopenfilename(
|
|
title="Select Claim PDF",
|
|
filetypes=[("PDF files", "*.pdf")],
|
|
initialdir=str(CLAIMS_DIR) if CLAIMS_DIR.exists() else str(BASE_DIR),
|
|
)
|
|
if path:
|
|
self.pdf_var.set(path)
|
|
|
|
def validate(self) -> bool:
|
|
problems = []
|
|
if not self.project_id_var.get().strip():
|
|
problems.append("Project ID is required")
|
|
if not self.key_id_var.get().strip():
|
|
problems.append("Key ID is required")
|
|
if not self.key_secret_var.get().strip():
|
|
problems.append("Key Secret is required")
|
|
to_num = self.to_var.get().strip()
|
|
if not to_num or to_num == "+1XXXXXXXXXX":
|
|
problems.append("To Number is required")
|
|
pdf = self.pdf_var.get().strip()
|
|
if not pdf or not Path(pdf).is_file():
|
|
problems.append("Select a valid claim PDF file")
|
|
if problems:
|
|
messagebox.showerror("Missing Configuration", "\n".join(problems))
|
|
return False
|
|
return True
|
|
|
|
def toggle_running(self):
|
|
if self.running:
|
|
self.stop()
|
|
else:
|
|
self.start()
|
|
|
|
def start(self):
|
|
if not self.validate():
|
|
return
|
|
self.save_config()
|
|
self.running = True
|
|
self.start_btn.config(text="Stop Faxing", bg="#c0392b")
|
|
self.status_label.config(text="Running (hourly)", fg="#2d8a4e")
|
|
self.log("Started hourly fax schedule.")
|
|
self.timer_thread = threading.Thread(target=self.run_loop, daemon=True)
|
|
self.timer_thread.start()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
self.start_btn.config(text="Start Hourly Faxing", bg="#2d8a4e")
|
|
self.status_label.config(text="Stopped", fg="#e74c3c")
|
|
self.log("Stopped.")
|
|
|
|
def send_once(self):
|
|
if not self.validate():
|
|
return
|
|
self.save_config()
|
|
self.log("Sending single fax...")
|
|
threading.Thread(target=self._do_send, daemon=True).start()
|
|
|
|
def run_loop(self):
|
|
self._do_send()
|
|
while self.running:
|
|
for _ in range(3600):
|
|
if not self.running:
|
|
return
|
|
time.sleep(1)
|
|
if self.running:
|
|
self._do_send()
|
|
|
|
# ── Fax logic ───────────────────────────────────────────────────
|
|
|
|
def _auth(self):
|
|
return (self.key_id_var.get().strip(), self.key_secret_var.get().strip())
|
|
|
|
def _api_url(self):
|
|
return f"https://fax.api.sinch.com/v3/projects/{self.project_id_var.get().strip()}"
|
|
|
|
def _do_send(self):
|
|
pdf_path = Path(self.pdf_var.get().strip())
|
|
to_number = self.to_var.get().strip()
|
|
from_number = self.from_var.get().strip()
|
|
if from_number == "+1XXXXXXXXXX":
|
|
from_number = ""
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
try:
|
|
self.log(f"Sending fax to {to_number}...")
|
|
form_data = {"to": to_number}
|
|
if from_number:
|
|
form_data["from"] = from_number
|
|
with open(pdf_path, "rb") as f:
|
|
resp = requests.post(
|
|
f"{self._api_url()}/faxes",
|
|
auth=self._auth(),
|
|
files={"file": (pdf_path.name, f, "application/pdf")},
|
|
data=form_data,
|
|
timeout=60,
|
|
)
|
|
resp.raise_for_status()
|
|
fax_data = resp.json()
|
|
fax_id = fax_data.get("id")
|
|
status = fax_data.get("status", "QUEUED")
|
|
|
|
entry = {
|
|
"timestamp": timestamp,
|
|
"status": status,
|
|
"fax_id": fax_id,
|
|
"to": to_number,
|
|
"from": from_number,
|
|
"file": pdf_path.name,
|
|
"error": None,
|
|
}
|
|
|
|
# Brief wait then check status
|
|
if fax_id and status == "QUEUED":
|
|
time.sleep(15)
|
|
try:
|
|
sr = requests.get(
|
|
f"{self._api_url()}/faxes/{fax_id}",
|
|
auth=self._auth(), timeout=15,
|
|
)
|
|
sr.raise_for_status()
|
|
sd = sr.json()
|
|
entry["status"] = sd.get("status", status)
|
|
if entry["status"] == "FAILURE":
|
|
entry["failure_reason"] = sd.get("failureReason", "Unknown")
|
|
except Exception:
|
|
pass
|
|
|
|
self.log(f"Fax {entry['status']} (ID: {fax_id})")
|
|
|
|
except requests.HTTPError as e:
|
|
error_detail = ""
|
|
try:
|
|
error_detail = e.response.text[:300]
|
|
except Exception:
|
|
pass
|
|
entry = {
|
|
"timestamp": timestamp,
|
|
"status": "send_failed",
|
|
"fax_id": None,
|
|
"to": to_number,
|
|
"from": from_number,
|
|
"file": pdf_path.name,
|
|
"error": f"{e} | {error_detail}",
|
|
}
|
|
self.log(f"FAILED: {e}")
|
|
except Exception as e:
|
|
entry = {
|
|
"timestamp": timestamp,
|
|
"status": "send_failed",
|
|
"fax_id": None,
|
|
"to": to_number,
|
|
"from": from_number,
|
|
"file": pdf_path.name,
|
|
"error": str(e),
|
|
}
|
|
self.log(f"FAILED: {e}")
|
|
|
|
self._save_entry(entry)
|
|
self._generate_report()
|
|
self._notify(entry)
|
|
self.update_status_display()
|
|
|
|
def _save_entry(self, entry: dict):
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
entries = []
|
|
if LOG_FILE.exists():
|
|
try:
|
|
entries = json.loads(LOG_FILE.read_text())
|
|
except Exception:
|
|
pass
|
|
entries.append(entry)
|
|
LOG_FILE.write_text(json.dumps(entries, indent=2))
|
|
|
|
def _notify(self, entry: dict):
|
|
ntfy_url = self.ntfy_url_var.get().strip()
|
|
if not ntfy_url or ntfy_url.startswith("https://ntfy.sh/your"):
|
|
return
|
|
try:
|
|
status_labels = {
|
|
"COMPLETED": "Fax Delivered", "IN_PROGRESS": "Fax Sending",
|
|
"QUEUED": "Fax Queued",
|
|
}
|
|
subject = status_labels.get(entry["status"], "Fax Failed")
|
|
msg = f"To: {entry['to']}\nStatus: {entry['status']}\nFile: {entry['file']}"
|
|
if entry.get("error"):
|
|
msg += f"\nError: {entry['error']}"
|
|
hdrs = {"Title": subject}
|
|
ntfy_token = self.ntfy_token_var.get().strip()
|
|
if ntfy_token and not ntfy_token.startswith("Bearer tk_..."):
|
|
hdrs["Authorization"] = ntfy_token
|
|
if entry["status"] not in ("QUEUED", "IN_PROGRESS", "COMPLETED"):
|
|
hdrs["Priority"] = "high"
|
|
requests.post(ntfy_url, data=msg.encode(), headers=hdrs, timeout=10)
|
|
except Exception:
|
|
pass
|
|
|
|
def _generate_report(self):
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
entries = []
|
|
if LOG_FILE.exists():
|
|
try:
|
|
entries = json.loads(LOG_FILE.read_text())
|
|
except Exception:
|
|
return
|
|
|
|
if not entries:
|
|
return
|
|
|
|
total = len(entries)
|
|
success_statuses = {"QUEUED", "IN_PROGRESS", "COMPLETED"}
|
|
failed = sum(1 for e in entries if e["status"] not in success_statuses)
|
|
succeeded = sum(1 for e in entries if e["status"] == "COMPLETED")
|
|
pending = total - failed - succeeded
|
|
|
|
rows = ""
|
|
for i, e in enumerate(entries, 1):
|
|
status = e["status"]
|
|
if status == "COMPLETED":
|
|
sc, sd = "success", "DELIVERED"
|
|
elif status in ("QUEUED", "IN_PROGRESS"):
|
|
sc, sd = "pending", status
|
|
else:
|
|
sc, sd = "failed", "FAILED"
|
|
|
|
error_col = e.get("error") or e.get("failure_reason") or "-"
|
|
try:
|
|
dt = datetime.fromisoformat(e["timestamp"])
|
|
ts = dt.strftime("%b %d, %Y %I:%M %p UTC")
|
|
except Exception:
|
|
ts = e["timestamp"]
|
|
|
|
rows += f'<tr><td>{i}</td><td>{ts}</td><td>{e.get("to", "-")}</td>'
|
|
rows += f'<td class="{sc}">{sd}</td><td>{e.get("fax_id") or "-"}</td>'
|
|
rows += f'<td class="error">{error_col}</td></tr>\n'
|
|
|
|
html = f"""<!DOCTYPE html>
|
|
<html><head><meta charset="utf-8"><title>Fax Transmission Report</title>
|
|
<style>
|
|
body{{font-family:Arial,sans-serif;margin:40px;color:#333}}
|
|
h1{{color:#1a1a2e;border-bottom:2px solid #1a1a2e;padding-bottom:10px}}
|
|
.summary{{background:#f5f5f5;padding:20px;border-radius:8px;margin:20px 0}}
|
|
.summary span{{font-weight:bold}}
|
|
table{{border-collapse:collapse;width:100%;margin:20px 0;font-size:14px}}
|
|
th{{background:#1a1a2e;color:#fff;padding:12px 8px;text-align:left}}
|
|
td{{padding:10px 8px;border-bottom:1px solid #ddd}}
|
|
tr:nth-child(even){{background:#f9f9f9}}
|
|
.success{{color:#2d8a4e;font-weight:bold}}
|
|
.failed{{color:#c0392b;font-weight:bold}}
|
|
.pending{{color:#e67e22;font-weight:bold}}
|
|
.error{{font-size:12px;color:#888;max-width:300px;word-wrap:break-word}}
|
|
.footer{{margin-top:30px;font-size:12px;color:#888;border-top:1px solid #ddd;padding-top:10px}}
|
|
@media print{{body{{margin:20px}}}}
|
|
</style></head><body>
|
|
<h1>Fax Transmission Report</h1>
|
|
<div class="summary">
|
|
<p>This report documents automated fax transmission attempts to the insurance
|
|
company for claim processing.</p>
|
|
<p>Total attempts: <span>{total}</span> |
|
|
Delivered: <span class="success">{succeeded}</span> |
|
|
Failed: <span class="failed">{failed}</span> |
|
|
Pending: <span>{pending}</span></p>
|
|
<p>Fax destination: <span>{entries[0]['to']}</span></p>
|
|
<p>Report generated: <span>{datetime.now(timezone.utc).strftime('%B %d, %Y at %I:%M %p UTC')}</span></p>
|
|
</div>
|
|
<table><thead><tr>
|
|
<th>#</th><th>Date & Time</th><th>Fax Number</th>
|
|
<th>Status</th><th>Confirmation ID</th><th>Error Details</th>
|
|
</tr></thead><tbody>
|
|
{rows}
|
|
</tbody></table>
|
|
<div class="footer"><p>Generated by AutoFax. Each row represents one
|
|
transmission attempt. Faxes are sent once per hour.
|
|
FAILED = the receiving fax machine did not accept.</p></div>
|
|
</body></html>"""
|
|
|
|
(REPORTS_DIR / "fax_report.html").write_text(html)
|
|
|
|
# ── Helpers ─────────────────────────────────────────────────────
|
|
|
|
def update_status_display(self):
|
|
if LOG_FILE.exists():
|
|
try:
|
|
entries = json.loads(LOG_FILE.read_text())
|
|
total = len(entries)
|
|
failed = sum(1 for e in entries if e["status"] not in ("QUEUED", "IN_PROGRESS", "COMPLETED"))
|
|
if total > 0:
|
|
extra = f" | {total} sent, {failed} failed"
|
|
current = self.status_label.cget("text").split(" |")[0]
|
|
self.status_label.config(text=f"{current}{extra}")
|
|
except Exception:
|
|
pass
|
|
|
|
def open_report(self):
|
|
report = REPORTS_DIR / "fax_report.html"
|
|
if not report.exists():
|
|
messagebox.showinfo("No Report", "No fax attempts recorded yet.")
|
|
return
|
|
if sys.platform == "win32":
|
|
os.startfile(str(report))
|
|
elif sys.platform == "darwin":
|
|
subprocess.run(["open", str(report)])
|
|
else:
|
|
subprocess.run(["xdg-open", str(report)])
|
|
|
|
def log(self, message: str):
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
line = f"[{timestamp}] {message}\n"
|
|
|
|
def _append():
|
|
self.log_text.config(state="normal")
|
|
self.log_text.insert("end", line)
|
|
self.log_text.see("end")
|
|
self.log_text.config(state="disabled")
|
|
|
|
self.root.after(0, _append)
|
|
|
|
|
|
def main():
|
|
CLAIMS_DIR.mkdir(exist_ok=True)
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
|
|
root = tk.Tk()
|
|
AutoFaxApp(root)
|
|
root.mainloop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|