commit aef5e5283a741ec9f5626cade21fdb31e0247329 Author: Sochen Date: Fri Mar 6 11:36:47 2026 +0000 Initial commit: automated hourly fax sender for insurance claims Sends doula coverage claims via Telnyx fax API every hour, logs every attempt, and generates a printable HTML report for HR. Includes both a Linux CLI with cron scheduling and a Windows GUI (tkinter) that can be packaged as a portable exe via PyInstaller. Co-Authored-By: Claude Opus 4.6 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..5b36051 --- /dev/null +++ b/.env.example @@ -0,0 +1,18 @@ +# Telnyx API credentials +# Sign up at https://portal.telnyx.com/ and get your API key +TELNYX_API_KEY=KEY_your_api_key_here + +# Your Telnyx Fax Application connection ID +# Create a Fax Application in Mission Control Portal under Messaging > Fax +TELNYX_CONNECTION_ID=your_connection_id_here + +# Your Telnyx fax-enabled phone number (E.164 format) +# Purchase one in the Mission Control Portal under Numbers, assign to Fax App +TELNYX_FROM_NUMBER=+1XXXXXXXXXX + +# Destination fax number (E.164 format) +FAX_TO_NUMBER=+1XXXXXXXXXX + +# ntfy notification URL (optional, comment out to disable) +NTFY_URL=https://nt.nevo.engineer/your-channel +NTFY_TOKEN=Bearer tk_your_token_here diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b4b9c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# Credentials and config +.env + +# Claim documents (personal data) +claims/*.pdf +claims/*.tiff +claims/*.png + +# Generated reports +reports/ + +# Python +__pycache__/ +*.pyc +*.pyo +venv/ +.venv/ + +# PyInstaller +build/ +dist/ +*.spec + +# GUI config (contains credentials) +autofax_config.json + +# OS +.DS_Store +Thumbs.db diff --git a/PROJECT.md b/PROJECT.md new file mode 100644 index 0000000..78657aa --- /dev/null +++ b/PROJECT.md @@ -0,0 +1,5 @@ +Basically, my health insurance claims to cover doulas, however requires a tedious process where one must either fax them or snail mail them the claim. I did both. The fax returned as failure and I haven't seen the claim added even after mailing it a month ago. My coworker has the same issue. + +Build a script (can integrate with cron) to sent an electronic fax to them every hour on the hour, and safely collect the returned failed responses in a directory. I will show this to them or to my companies HR. Also find an electronic fax API to use with a service that is the cheapest for a single month. Assume the doula claim paperwork is about 5 black and white pages. + +You've got a figure out directive, you can ask some clarifying questions in the beginning, but then I want you to go and build the thing. Preferred language is python. diff --git a/README.md b/README.md new file mode 100644 index 0000000..76da386 --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# AutoFax + +Automated hourly fax sender for insurance claim submissions. Sends your claim via Telnyx's fax API every hour and generates a printable HTML report documenting every attempt and its status -- useful for showing HR or the insurance company that their fax system is rejecting claims. + +## Quick Start (Windows GUI) + +If someone gave you `AutoFax.exe` on a flash drive: + +1. Double-click `AutoFax.exe` +2. Fill in the Telnyx credentials (API Key, Connection ID, From Number) +3. Set the destination fax number +4. Click **Browse** and select your claim PDF +5. Click **Start Hourly Faxing** + +Config is saved next to the exe, so it remembers your settings. Reports are generated in a `reports/` folder next to the exe -- open `fax_report.html` in a browser and print it for HR. + +## Building the Windows Executable + +On a Windows machine with Python 3.10+ installed: + +``` +build_windows.bat +``` + +This produces `dist/AutoFax.exe` -- a single self-contained file you can copy to a flash drive. + +## Linux/Server Setup (CLI + cron) + +### 1. Telnyx Account + +1. Sign up at [portal.telnyx.com](https://portal.telnyx.com/) +2. Add a payment method +3. Go to **Messaging > Fax** and create a Fax Application +4. Go to **Numbers** and purchase a fax-enabled number (~$1/month) +5. Assign the number to your Fax Application +6. Go to **API Keys** and copy your API key + +### 2. Configure + +```bash +cp .env.example .env +# Edit .env with your Telnyx credentials and the destination fax number +``` + +### 3. Add Your Claim + +Place your claim PDF in the `claims/` directory: + +```bash +cp /path/to/your/claim.pdf claims/ +``` + +### 4. Install & Start + +```bash +chmod +x install.sh +./install.sh +``` + +This will: +- Create a Python virtual environment +- Install dependencies +- Set up an hourly cron job +- Schedule auto-cleanup after 7 days + +### 5. Manual Test + +```bash +venv/bin/python autofax.py +``` + +## Reports + +After each fax attempt, a printable HTML report is generated at `reports/fax_report.html`. Open it in a browser and print to PDF for HR. It includes: + +- Timestamp of every attempt +- Delivery status (DELIVERED, FAILED, QUEUED) +- Confirmation IDs +- Error details for failures + +## Notifications + +Optional push notifications via [ntfy](https://ntfy.sh). Configure in `.env` (CLI) or in the GUI settings. + +## Uninstall (CLI) + +```bash +./uninstall.sh +``` + +## Cost + +Telnyx pricing: ~$0.007/page + ~$1/month for the phone number. +One week of hourly faxes (168 faxes x 5 pages = 840 pages) costs roughly $8-15. diff --git a/autofax.py b/autofax.py new file mode 100755 index 0000000..d9a8996 --- /dev/null +++ b/autofax.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +"""AutoFax - Send a fax every hour via Telnyx and log results.""" + +import json +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +import requests + +import config + +API = "https://api.telnyx.com/v2" + + +def headers(): + return {"Authorization": f"Bearer {config.TELNYX_API_KEY}"} + + +def find_claim_pdf() -> Path: + pdfs = sorted(config.CLAIMS_DIR.glob("*.pdf")) + if not pdfs: + print("ERROR: No PDF found in claims/ directory.", file=sys.stderr) + sys.exit(1) + return pdfs[0] + + +def upload_media(pdf_path: Path) -> str: + """Upload PDF to Telnyx media storage. Returns the media_name.""" + media_name = f"autofax-claim-{pdf_path.stem}" + with open(pdf_path, "rb") as f: + resp = requests.post( + f"{API}/media", + headers={**headers(), "Content-Type": "application/pdf"}, + params={"media_name": media_name}, + data=f.read(), + ) + resp.raise_for_status() + return media_name + + +def send_fax(pdf_path: Path) -> dict: + """Upload PDF and send a fax. Returns result dict.""" + timestamp = datetime.now(timezone.utc).isoformat() + try: + media_name = upload_media(pdf_path) + + resp = requests.post( + f"{API}/faxes", + headers={**headers(), "Content-Type": "application/json"}, + json={ + "connection_id": config.TELNYX_CONNECTION_ID, + "media_name": media_name, + "to": config.FAX_TO_NUMBER, + "from": config.TELNYX_FROM_NUMBER, + }, + ) + resp.raise_for_status() + fax_data = resp.json().get("data", {}) + + return { + "timestamp": timestamp, + "status": fax_data.get("status", "queued"), + "fax_id": fax_data.get("id"), + "to": config.FAX_TO_NUMBER, + "from": config.TELNYX_FROM_NUMBER, + "file": pdf_path.name, + "error": None, + } + except requests.HTTPError as e: + error_body = "" + try: + error_body = e.response.json() + except Exception: + error_body = e.response.text[:500] + return { + "timestamp": timestamp, + "status": "send_failed", + "fax_id": None, + "to": config.FAX_TO_NUMBER, + "from": config.TELNYX_FROM_NUMBER, + "file": pdf_path.name, + "error": f"{e} | {error_body}", + } + except Exception as e: + return { + "timestamp": timestamp, + "status": "send_failed", + "fax_id": None, + "to": config.FAX_TO_NUMBER, + "from": config.TELNYX_FROM_NUMBER, + "file": pdf_path.name, + "error": str(e), + } + + +def check_fax_status(fax_id: str) -> dict | None: + if not fax_id: + return None + try: + resp = requests.get(f"{API}/faxes/{fax_id}", headers=headers()) + resp.raise_for_status() + return resp.json().get("data", {}) + except Exception: + return None + + +def load_log() -> list[dict]: + if config.LOG_FILE.exists(): + return json.loads(config.LOG_FILE.read_text()) + return [] + + +def save_log(entries: list[dict]): + config.REPORTS_DIR.mkdir(parents=True, exist_ok=True) + config.LOG_FILE.write_text(json.dumps(entries, indent=2)) + + +def notify(subject: str, message: str, priority: str = "default"): + if not config.NTFY_URL: + return + try: + hdrs = {"Title": subject, "Priority": priority} + if config.NTFY_TOKEN: + hdrs["Authorization"] = config.NTFY_TOKEN + requests.post(config.NTFY_URL, data=message.encode(), headers=hdrs) + except Exception: + pass + + +def generate_report(entries: list[dict]): + config.REPORTS_DIR.mkdir(parents=True, exist_ok=True) + + total = len(entries) + failed = sum(1 for e in entries if e["status"] not in ("queued", "sending", "sent", "delivered")) + succeeded = sum(1 for e in entries if e["status"] == "delivered") + pending = total - failed - succeeded + + rows = "" + for i, e in enumerate(entries, 1): + status = e["status"] + if status in ("delivered", "sent"): + status_class = "success" + status_display = status.upper() + elif status in ("queued", "sending"): + status_class = "pending" + status_display = status.upper() + else: + status_class = "failed" + status_display = "FAILED" + + error_col = e.get("error") or e.get("failure_reason") or "-" + ts = e["timestamp"] + try: + dt = datetime.fromisoformat(ts) + ts_display = dt.strftime("%b %d, %Y %I:%M %p UTC") + except Exception: + ts_display = ts + + rows += f""" + + {i} + {ts_display} + {e.get('to', '-')} + {status_display} + {e.get('fax_id') or '-'} + {error_col} + """ + + html = f""" + + + +Fax Transmission Report + + + +

Fax Transmission Report

+ +
+

This report documents automated fax transmission attempts to the insurance + company for doula coverage claim processing.

+

+ Total attempts: {total} | + Delivered: {succeeded} | + Failed: {failed} | + Pending: {pending} +

+

Fax destination: {entries[0]['to'] if entries else 'N/A'}

+

Report generated: {datetime.now(timezone.utc).strftime('%B %d, %Y at %I:%M %p UTC')}

+
+ + + + + + + + + + + + + + {rows} + +
#Date & TimeFax NumberStatusConfirmation IDError Details
+ + + +""" + + report_path = config.REPORTS_DIR / "fax_report.html" + report_path.write_text(html) + return report_path + + +def update_previous_statuses(entries: list[dict]) -> list[dict]: + for entry in entries: + if entry["status"] in ("queued", "sending"): + status_data = check_fax_status(entry.get("fax_id")) + if status_data: + new_status = status_data.get("status", entry["status"]) + if new_status != entry["status"]: + entry["status"] = new_status + if new_status == "failed": + entry["failure_reason"] = status_data.get("failure_reason", "Unknown") + return entries + + +def main(): + pdf_path = find_claim_pdf() + + entries = load_log() + entries = update_previous_statuses(entries) + + print(f"Sending fax: {pdf_path.name} -> {config.FAX_TO_NUMBER}") + result = send_fax(pdf_path) + entries.append(result) + + # Wait briefly and check initial status + if result["fax_id"] and result["status"] == "queued": + time.sleep(15) + status_data = check_fax_status(result["fax_id"]) + if status_data: + result["status"] = status_data.get("status", result["status"]) + if result["status"] == "failed": + result["failure_reason"] = status_data.get("failure_reason", "Unknown") + + save_log(entries) + report_path = generate_report(entries) + + # Notify via ntfy + status_labels = { + "delivered": "Fax Delivered", + "sent": "Fax Sent", + "queued": "Fax Queued", + "sending": "Fax Sending", + } + subject = status_labels.get(result["status"], "Fax Failed") + msg = f"To: {result['to']}\nStatus: {result['status']}\nFile: {result['file']}" + if result.get("error"): + msg += f"\nError: {result['error']}" + priority = "high" if result["status"] not in ("queued", "sending", "sent", "delivered") else "default" + notify(subject, msg, priority) + + print(f"Status: {result['status']}") + print(f"Report: {report_path}") + + if result["status"] == "send_failed": + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..2f7acc5 --- /dev/null +++ b/build.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "=== Building AutoFax Executable ===" + +# Create/use venv +if [ ! -d "venv" ]; then + python3 -m venv venv +fi + +source venv/bin/activate +pip install -q pyinstaller requests python-dotenv + +echo "Building..." +pyinstaller --onefile --windowed --name AutoFax gui.py + +echo "" +echo "=== Build successful! ===" +echo "Executable: dist/AutoFax" diff --git a/build_windows.bat b/build_windows.bat new file mode 100644 index 0000000..f572049 --- /dev/null +++ b/build_windows.bat @@ -0,0 +1,30 @@ +@echo off +echo === Building AutoFax Windows Executable === +echo. + +where python >nul 2>&1 +if %errorlevel% neq 0 ( + echo ERROR: Python not found. Install Python 3.10+ from python.org + echo Make sure to check "Add Python to PATH" during install. + pause + exit /b 1 +) + +echo Installing build dependencies... +pip install pyinstaller requests python-dotenv + +echo. +echo Building executable... +pyinstaller --onefile --windowed --name AutoFax --icon=NONE gui.py + +echo. +if exist dist\AutoFax.exe ( + echo === Build successful! === + echo Executable: dist\AutoFax.exe + echo. + echo Copy AutoFax.exe to a flash drive and run it anywhere. + echo Config and reports will be saved next to the exe. +) else ( + echo Build failed. Check the output above for errors. +) +pause diff --git a/claims/.gitkeep b/claims/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/config.py b/config.py new file mode 100644 index 0000000..ac5976d --- /dev/null +++ b/config.py @@ -0,0 +1,21 @@ +import os +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() + +# Paths +BASE_DIR = Path(__file__).parent +CLAIMS_DIR = BASE_DIR / "claims" +REPORTS_DIR = BASE_DIR / "reports" +LOG_FILE = REPORTS_DIR / "fax_log.json" + +# Telnyx +TELNYX_API_KEY = os.environ["TELNYX_API_KEY"] +TELNYX_CONNECTION_ID = os.environ["TELNYX_CONNECTION_ID"] +TELNYX_FROM_NUMBER = os.environ["TELNYX_FROM_NUMBER"] +FAX_TO_NUMBER = os.environ["FAX_TO_NUMBER"] + +# ntfy (optional) +NTFY_URL = os.environ.get("NTFY_URL") +NTFY_TOKEN = os.environ.get("NTFY_TOKEN") diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..f504b4b --- /dev/null +++ b/gui.py @@ -0,0 +1,534 @@ +#!/usr/bin/env python3 +"""AutoFax GUI - Simple tkinter interface for sending faxes on a schedule.""" + +import json +import os +import subprocess +import sys +import threading +import time +import tkinter as tk +from datetime import datetime, timezone +from pathlib import Path +from tkinter import filedialog, messagebox, scrolledtext + +import requests + +# Determine base directory (works for both script and frozen exe) +if getattr(sys, "frozen", False): + BASE_DIR = Path(sys.executable).parent +else: + BASE_DIR = Path(__file__).parent + +CONFIG_FILE = BASE_DIR / "autofax_config.json" +CLAIMS_DIR = BASE_DIR / "claims" +REPORTS_DIR = BASE_DIR / "reports" +LOG_FILE = REPORTS_DIR / "fax_log.json" +API = "https://api.telnyx.com/v2" + + +class AutoFaxApp: + def __init__(self, root: tk.Tk): + self.root = root + self.root.title("AutoFax - Insurance Claim Faxer") + self.root.geometry("620x700") + self.root.resizable(False, False) + + self.running = False + self.timer_thread = None + self.config = self.load_config() + + self.build_ui() + self.populate_fields() + self.update_status_display() + + # ── Config persistence ────────────────────────────────────────── + + def load_config(self) -> dict: + if CONFIG_FILE.exists(): + try: + return json.loads(CONFIG_FILE.read_text()) + except Exception: + pass + return {} + + def save_config(self): + self.config = { + "api_key": self.api_key_var.get().strip(), + "connection_id": self.conn_id_var.get().strip(), + "from_number": self.from_var.get().strip(), + "to_number": self.to_var.get().strip(), + "pdf_path": self.pdf_var.get().strip(), + "ntfy_url": self.ntfy_url_var.get().strip(), + "ntfy_token": self.ntfy_token_var.get().strip(), + } + CONFIG_FILE.write_text(json.dumps(self.config, indent=2)) + + # ── UI ────────────────────────────────────────────────────────── + + def build_ui(self): + # Header + header = tk.Frame(self.root, bg="#1a1a2e", height=50) + header.pack(fill="x") + header.pack_propagate(False) + tk.Label( + header, text="AutoFax", font=("Arial", 18, "bold"), + bg="#1a1a2e", fg="white", + ).pack(side="left", padx=15, pady=8) + self.status_label = tk.Label( + header, text="Stopped", font=("Arial", 11), + bg="#1a1a2e", fg="#e74c3c", + ) + self.status_label.pack(side="right", padx=15) + + # Main frame with padding + main = tk.Frame(self.root, padx=15, pady=10) + main.pack(fill="both", expand=True) + + # ── Telnyx Settings ── + lf_telnyx = tk.LabelFrame(main, text="Telnyx Settings", padx=10, pady=5) + lf_telnyx.pack(fill="x", pady=(0, 8)) + + self.api_key_var = tk.StringVar() + self.conn_id_var = tk.StringVar() + self.from_var = tk.StringVar() + + self._add_field(lf_telnyx, "API Key:", self.api_key_var, show="*") + self._add_field(lf_telnyx, "Connection ID:", self.conn_id_var) + self._add_field(lf_telnyx, "From Number:", self.from_var, placeholder="+1XXXXXXXXXX") + + # ── Fax Settings ── + lf_fax = tk.LabelFrame(main, text="Fax Settings", padx=10, pady=5) + lf_fax.pack(fill="x", pady=(0, 8)) + + self.to_var = tk.StringVar() + self.pdf_var = tk.StringVar() + + self._add_field(lf_fax, "To Number:", self.to_var, placeholder="+18019382102") + + pdf_frame = tk.Frame(lf_fax) + pdf_frame.pack(fill="x", pady=2) + tk.Label(pdf_frame, text="Claim PDF:", width=14, anchor="w").pack(side="left") + tk.Entry(pdf_frame, textvariable=self.pdf_var, width=35).pack(side="left", fill="x", expand=True) + tk.Button(pdf_frame, text="Browse...", command=self.browse_pdf).pack(side="left", padx=(5, 0)) + + # ── ntfy (optional) ── + lf_ntfy = tk.LabelFrame(main, text="Notifications (optional)", padx=10, pady=5) + lf_ntfy.pack(fill="x", pady=(0, 8)) + + self.ntfy_url_var = tk.StringVar() + self.ntfy_token_var = tk.StringVar() + + self._add_field(lf_ntfy, "ntfy URL:", self.ntfy_url_var, placeholder="https://ntfy.sh/your-topic") + self._add_field(lf_ntfy, "ntfy Token:", self.ntfy_token_var, show="*", placeholder="Bearer tk_...") + + # ── Controls ── + btn_frame = tk.Frame(main) + btn_frame.pack(fill="x", pady=(0, 8)) + + self.start_btn = tk.Button( + btn_frame, text="Start Hourly Faxing", font=("Arial", 12, "bold"), + bg="#2d8a4e", fg="white", command=self.toggle_running, + width=20, height=2, + ) + self.start_btn.pack(side="left") + + tk.Button( + btn_frame, text="Send Once Now", command=self.send_once, + ).pack(side="left", padx=(10, 0)) + + tk.Button( + btn_frame, text="Open Report", command=self.open_report, + ).pack(side="right") + + # ── Log ── + tk.Label(main, text="Activity Log:", anchor="w").pack(fill="x") + self.log_text = scrolledtext.ScrolledText( + main, height=12, font=("Consolas", 9), state="disabled", + ) + self.log_text.pack(fill="both", expand=True) + + def _add_field(self, parent, label, var, show=None, placeholder=None): + frame = tk.Frame(parent) + frame.pack(fill="x", pady=2) + tk.Label(frame, text=label, width=14, anchor="w").pack(side="left") + entry = tk.Entry(frame, textvariable=var, show=show or "") + entry.pack(side="left", fill="x", expand=True) + if placeholder: + entry.insert(0, placeholder) + entry.config(fg="grey") + entry.bind("", lambda e, ent=entry, ph=placeholder: self._clear_placeholder(ent, ph)) + entry.bind("", lambda e, ent=entry, ph=placeholder, v=var: self._restore_placeholder(ent, ph, v)) + + def _clear_placeholder(self, entry, placeholder): + if entry.get() == placeholder: + entry.delete(0, "end") + entry.config(fg="black") + + def _restore_placeholder(self, entry, placeholder, var): + if not var.get().strip() or var.get().strip() == placeholder: + entry.delete(0, "end") + entry.insert(0, placeholder) + entry.config(fg="grey") + + def populate_fields(self): + c = self.config + if c.get("api_key"): + self.api_key_var.set(c["api_key"]) + if c.get("connection_id"): + self.conn_id_var.set(c["connection_id"]) + if c.get("from_number"): + self.from_var.set(c["from_number"]) + if c.get("to_number"): + self.to_var.set(c["to_number"]) + if c.get("pdf_path"): + self.pdf_var.set(c["pdf_path"]) + if c.get("ntfy_url"): + self.ntfy_url_var.set(c["ntfy_url"]) + if c.get("ntfy_token"): + self.ntfy_token_var.set(c["ntfy_token"]) + + # ── Actions ───────────────────────────────────────────────────── + + def browse_pdf(self): + path = filedialog.askopenfilename( + title="Select Claim PDF", + filetypes=[("PDF files", "*.pdf")], + initialdir=str(CLAIMS_DIR) if CLAIMS_DIR.exists() else str(BASE_DIR), + ) + if path: + self.pdf_var.set(path) + + def validate(self) -> bool: + problems = [] + if not self.api_key_var.get().strip(): + problems.append("API Key is required") + if not self.conn_id_var.get().strip(): + problems.append("Connection ID is required") + from_num = self.from_var.get().strip() + if not from_num or from_num == "+1XXXXXXXXXX": + problems.append("From Number is required") + to_num = self.to_var.get().strip() + if not to_num or to_num == "+18019382102": + problems.append("To Number is required") + pdf = self.pdf_var.get().strip() + if not pdf or not Path(pdf).is_file(): + problems.append("Select a valid claim PDF file") + if problems: + messagebox.showerror("Missing Configuration", "\n".join(problems)) + return False + return True + + def toggle_running(self): + if self.running: + self.stop() + else: + self.start() + + def start(self): + if not self.validate(): + return + self.save_config() + self.running = True + self.start_btn.config(text="Stop Faxing", bg="#c0392b") + self.status_label.config(text="Running (hourly)", fg="#2d8a4e") + self.log("Started hourly fax schedule.") + self.timer_thread = threading.Thread(target=self.run_loop, daemon=True) + self.timer_thread.start() + + def stop(self): + self.running = False + self.start_btn.config(text="Start Hourly Faxing", bg="#2d8a4e") + self.status_label.config(text="Stopped", fg="#e74c3c") + self.log("Stopped.") + + def send_once(self): + if not self.validate(): + return + self.save_config() + self.log("Sending single fax...") + threading.Thread(target=self._do_send, daemon=True).start() + + def run_loop(self): + self._do_send() + while self.running: + # Sleep in small increments so we can stop quickly + for _ in range(3600): + if not self.running: + return + time.sleep(1) + if self.running: + self._do_send() + + # ── Fax logic ─────────────────────────────────────────────────── + + def _headers(self): + return {"Authorization": f"Bearer {self.api_key_var.get().strip()}"} + + def _do_send(self): + pdf_path = Path(self.pdf_var.get().strip()) + to_number = self.to_var.get().strip() + from_number = self.from_var.get().strip() + conn_id = self.conn_id_var.get().strip() + timestamp = datetime.now(timezone.utc).isoformat() + + try: + # Upload media + self.log(f"Uploading {pdf_path.name}...") + media_name = f"autofax-claim-{pdf_path.stem}" + with open(pdf_path, "rb") as f: + resp = requests.post( + f"{API}/media", + headers={**self._headers(), "Content-Type": "application/pdf"}, + params={"media_name": media_name}, + data=f.read(), + timeout=60, + ) + resp.raise_for_status() + + # Send fax + self.log(f"Sending fax to {to_number}...") + resp = requests.post( + f"{API}/faxes", + headers={**self._headers(), "Content-Type": "application/json"}, + json={ + "connection_id": conn_id, + "media_name": media_name, + "to": to_number, + "from": from_number, + }, + timeout=30, + ) + resp.raise_for_status() + fax_data = resp.json().get("data", {}) + fax_id = fax_data.get("id") + status = fax_data.get("status", "queued") + + entry = { + "timestamp": timestamp, + "status": status, + "fax_id": fax_id, + "to": to_number, + "from": from_number, + "file": pdf_path.name, + "error": None, + } + + # Brief wait then check status + if fax_id and status == "queued": + time.sleep(15) + try: + sr = requests.get( + f"{API}/faxes/{fax_id}", headers=self._headers(), timeout=15, + ) + sr.raise_for_status() + sd = sr.json().get("data", {}) + entry["status"] = sd.get("status", status) + if entry["status"] == "failed": + entry["failure_reason"] = sd.get("failure_reason", "Unknown") + except Exception: + pass + + self.log(f"Fax {entry['status'].upper()} (ID: {fax_id})") + + except requests.HTTPError as e: + error_detail = "" + try: + error_detail = e.response.text[:300] + except Exception: + pass + entry = { + "timestamp": timestamp, + "status": "send_failed", + "fax_id": None, + "to": to_number, + "from": from_number, + "file": pdf_path.name, + "error": f"{e} | {error_detail}", + } + self.log(f"FAILED: {e}") + except Exception as e: + entry = { + "timestamp": timestamp, + "status": "send_failed", + "fax_id": None, + "to": to_number, + "from": from_number, + "file": pdf_path.name, + "error": str(e), + } + self.log(f"FAILED: {e}") + + # Save to log and regenerate report + self._save_entry(entry) + self._generate_report() + self._notify(entry) + self.update_status_display() + + def _save_entry(self, entry: dict): + REPORTS_DIR.mkdir(parents=True, exist_ok=True) + entries = [] + if LOG_FILE.exists(): + try: + entries = json.loads(LOG_FILE.read_text()) + except Exception: + pass + entries.append(entry) + LOG_FILE.write_text(json.dumps(entries, indent=2)) + + def _notify(self, entry: dict): + ntfy_url = self.ntfy_url_var.get().strip() + if not ntfy_url or ntfy_url.startswith("https://ntfy.sh/your"): + return + try: + status_labels = { + "delivered": "Fax Delivered", "sent": "Fax Sent", + "queued": "Fax Queued", "sending": "Fax Sending", + } + subject = status_labels.get(entry["status"], "Fax Failed") + msg = f"To: {entry['to']}\nStatus: {entry['status']}\nFile: {entry['file']}" + if entry.get("error"): + msg += f"\nError: {entry['error']}" + hdrs = {"Title": subject} + ntfy_token = self.ntfy_token_var.get().strip() + if ntfy_token and not ntfy_token.startswith("Bearer tk_..."): + hdrs["Authorization"] = ntfy_token + if entry["status"] not in ("queued", "sending", "sent", "delivered"): + hdrs["Priority"] = "high" + requests.post(ntfy_url, data=msg.encode(), headers=hdrs, timeout=10) + except Exception: + pass + + def _generate_report(self): + REPORTS_DIR.mkdir(parents=True, exist_ok=True) + entries = [] + if LOG_FILE.exists(): + try: + entries = json.loads(LOG_FILE.read_text()) + except Exception: + return + + if not entries: + return + + total = len(entries) + failed = sum(1 for e in entries if e["status"] not in ("queued", "sending", "sent", "delivered")) + succeeded = sum(1 for e in entries if e["status"] == "delivered") + pending = total - failed - succeeded + + rows = "" + for i, e in enumerate(entries, 1): + status = e["status"] + if status in ("delivered", "sent"): + sc, sd = "success", status.upper() + elif status in ("queued", "sending"): + sc, sd = "pending", status.upper() + else: + sc, sd = "failed", "FAILED" + + error_col = e.get("error") or e.get("failure_reason") or "-" + try: + dt = datetime.fromisoformat(e["timestamp"]) + ts = dt.strftime("%b %d, %Y %I:%M %p UTC") + except Exception: + ts = e["timestamp"] + + rows += f'{i}{ts}{e.get("to", "-")}' + rows += f'{sd}{e.get("fax_id") or "-"}' + rows += f'{error_col}\n' + + html = f""" +Fax Transmission Report + +

Fax Transmission Report

+
+

This report documents automated fax transmission attempts to the insurance +company for doula coverage claim processing.

+

Total attempts: {total} | +Delivered: {succeeded} | +Failed: {failed} | +Pending: {pending}

+

Fax destination: {entries[0]['to']}

+

Report generated: {datetime.now(timezone.utc).strftime('%B %d, %Y at %I:%M %p UTC')}

+
+ + + + +{rows} +
#Date & TimeFax NumberStatusConfirmation IDError Details
+ +""" + + (REPORTS_DIR / "fax_report.html").write_text(html) + + # ── Helpers ───────────────────────────────────────────────────── + + def update_status_display(self): + """Update the status label with attempt count.""" + if LOG_FILE.exists(): + try: + entries = json.loads(LOG_FILE.read_text()) + total = len(entries) + failed = sum(1 for e in entries if e["status"] not in ("queued", "sending", "sent", "delivered")) + if total > 0: + extra = f" | {total} sent, {failed} failed" + current = self.status_label.cget("text").split(" |")[0] + self.status_label.config(text=f"{current}{extra}") + except Exception: + pass + + def open_report(self): + report = REPORTS_DIR / "fax_report.html" + if not report.exists(): + messagebox.showinfo("No Report", "No fax attempts recorded yet.") + return + # Cross-platform open + if sys.platform == "win32": + os.startfile(str(report)) + elif sys.platform == "darwin": + subprocess.run(["open", str(report)]) + else: + subprocess.run(["xdg-open", str(report)]) + + def log(self, message: str): + timestamp = datetime.now().strftime("%H:%M:%S") + line = f"[{timestamp}] {message}\n" + + def _append(): + self.log_text.config(state="normal") + self.log_text.insert("end", line) + self.log_text.see("end") + self.log_text.config(state="disabled") + + self.root.after(0, _append) + + +def main(): + CLAIMS_DIR.mkdir(exist_ok=True) + REPORTS_DIR.mkdir(exist_ok=True) + + root = tk.Tk() + AutoFaxApp(root) + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..f529edf --- /dev/null +++ b/install.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +VENV_DIR="$SCRIPT_DIR/venv" +PYTHON="$VENV_DIR/bin/python" +CRON_TAG="# autofax" + +echo "=== AutoFax Setup ===" + +# Create virtual environment +if [ ! -d "$VENV_DIR" ]; then + echo "Creating virtual environment..." + python3 -m venv "$VENV_DIR" +fi + +echo "Installing dependencies..." +"$VENV_DIR/bin/pip" install -q -r "$SCRIPT_DIR/requirements.txt" + +# Create directories +mkdir -p "$SCRIPT_DIR/claims" "$SCRIPT_DIR/reports" + +# Check for .env +if [ ! -f "$SCRIPT_DIR/.env" ]; then + echo "" + echo "WARNING: No .env file found!" + echo "Copy .env.example to .env and fill in your credentials:" + echo " cp $SCRIPT_DIR/.env.example $SCRIPT_DIR/.env" + echo "" +fi + +# Check for claim PDF +PDF_COUNT=$(find "$SCRIPT_DIR/claims" -name "*.pdf" 2>/dev/null | wc -l) +if [ "$PDF_COUNT" -eq 0 ]; then + echo "WARNING: No PDF found in claims/ directory." + echo "Place your claim PDF in: $SCRIPT_DIR/claims/" + echo "" +fi + +# Calculate expiry date (7 days from now) +if date --version >/dev/null 2>&1; then + # GNU date + EXPIRY=$(date -d "+7 days" "+%Y-%m-%d %H:%M") +else + # BSD/macOS date + EXPIRY=$(date -v+7d "+%Y-%m-%d %H:%M") +fi + +# Install cron jobs +echo "Installing cron jobs..." +# Remove any existing autofax entries +crontab -l 2>/dev/null | grep -v "$CRON_TAG" > /tmp/autofax_cron || true + +# Hourly fax job +echo "0 * * * * $PYTHON $SCRIPT_DIR/autofax.py >> $SCRIPT_DIR/reports/cron.log 2>&1 $CRON_TAG" >> /tmp/autofax_cron + +# Self-destruct: remove autofax cron entries after 7 days +# Runs once at the expiry time, removes all autofax lines, then removes itself +echo "0 0 * * * $PYTHON -c \" +import subprocess, datetime +if datetime.datetime.now() >= datetime.datetime.fromisoformat('$(date -d '+7 days' '+%Y-%m-%dT%H:%M' 2>/dev/null || date -v+7d '+%Y-%m-%dT%H:%M')'): + result = subprocess.run(['crontab', '-l'], capture_output=True, text=True) + lines = [l for l in result.stdout.splitlines() if 'autofax' not in l] + subprocess.run(['crontab', '-'], input=chr(10).join(lines), text=True) +\" 2>/dev/null $CRON_TAG" >> /tmp/autofax_cron + +crontab /tmp/autofax_cron +rm /tmp/autofax_cron + +echo "" +echo "=== Setup Complete ===" +echo "Fax will be sent every hour on the hour." +echo "Auto-expires: $EXPIRY (7 days from now)" +echo "" +echo "Checklist:" +echo " [ ] .env file configured with Telnyx credentials" +echo " [ ] Claim PDF placed in claims/ directory" +echo "" +echo "Manual test: $PYTHON $SCRIPT_DIR/autofax.py" +echo "View report: open $SCRIPT_DIR/reports/fax_report.html" +echo "View log: cat $SCRIPT_DIR/reports/fax_log.json" +echo "Remove cron: crontab -l | grep -v autofax | crontab -" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f49b6b8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.28.0 +python-dotenv>=1.0.0 diff --git a/uninstall.sh b/uninstall.sh new file mode 100755 index 0000000..79cb5f5 --- /dev/null +++ b/uninstall.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "Removing AutoFax cron jobs..." +crontab -l 2>/dev/null | grep -v "autofax" | crontab - 2>/dev/null || true +echo "Done. Cron jobs removed." +echo "" +echo "Reports are preserved in: $(cd "$(dirname "$0")" && pwd)/reports/" +echo "To fully remove: rm -rf $(cd "$(dirname "$0")" && pwd)/venv"