#!/usr/bin/env python3 """Matrix to Paperless-ngx ingest bot. Monitors a Matrix room for PDF and JPEG files and uploads them to Paperless-ngx. Processes historical messages on startup and listens for new ones indefinitely. Failed uploads are retried with exponential backoff up to a maximum number of attempts. Supports both plain and E2E-encrypted file attachments. Encrypted files carry their AES key inline in the event, so no Megolm session is needed to decrypt them. """ import asyncio import base64 import hashlib import json import logging import logging.handlers import os import tempfile import time from pathlib import Path from typing import Optional import aiosqlite import httpx from Crypto.Cipher import AES from Crypto.Util import Counter from dotenv import load_dotenv from nio import AsyncClient, MessageDirection, RoomMessagesError from nio.events.room_events import BadEvent, RoomMessageFile, RoomMessageImage load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger(__name__) logging.getLogger("nio.events.misc").setLevel(logging.ERROR) class _SSLSMTPHandler(logging.handlers.SMTPHandler): """SMTPHandler that uses SMTP_SSL (port 465) instead of STARTTLS.""" def emit(self, record: logging.LogRecord) -> None: import smtplib try: host, port = self.mailhost with smtplib.SMTP_SSL(host, port) as smtp: smtp.login(self.username, self.password) msg = self.format(record) smtp.sendmail(self.fromaddr, self.toaddrs, f"From: {self.fromaddr}\r\n" f"To: {', '.join(self.toaddrs)}\r\n" f"Subject: {self.getSubject(record)}\r\n\r\n" f"{msg}") except Exception: self.handleError(record) def _setup_email_alerts() -> None: smtp_host = os.environ.get("SMTP_HOST") if not smtp_host: return handler = _SSLSMTPHandler( mailhost=(smtp_host, int(os.environ.get("SMTP_PORT", 465))), fromaddr=os.environ["ALERT_FROM"], toaddrs=[os.environ["ALERT_TO"]], subject="matrix-paperless-ingest error", credentials=(os.environ["SMTP_USER"], os.environ["SMTP_PASSWORD"]), ) handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler) log.info("Email alerts enabled → %s", os.environ["ALERT_TO"]) _setup_email_alerts() MATRIX_HOMESERVER = os.environ["MATRIX_HOMESERVER"] MATRIX_USER = os.environ["MATRIX_USER"] MATRIX_ACCESS_TOKEN = os.environ["MATRIX_ACCESS_TOKEN"] MATRIX_DEVICE_ID = os.environ["MATRIX_DEVICE_ID"] MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"] PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/") PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"] PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"]) DB_PATH = os.environ.get("DB_PATH", "state.db") UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL") # Retry delays in seconds: 1m, 5m, 30m, 2h, 24h — after the last delay is # exhausted the event is marked permanently failed and never retried again. RETRY_DELAYS = [60, 300, 1800, 7200, 86400] # How long to wait for Paperless to finish processing an uploaded document. PAPERLESS_TASK_POLL_INTERVAL = 5 # seconds between polls PAPERLESS_TASK_TIMEOUT = 300 # give up after this many seconds # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- async def init_db(db: aiosqlite.Connection) -> None: await db.execute(""" CREATE TABLE IF NOT EXISTS processed_events ( event_id TEXT PRIMARY KEY, filename TEXT NOT NULL, mxc_url TEXT NOT NULL, encryption_info TEXT, checksum TEXT, status TEXT NOT NULL, paperless_id INTEGER, retry_count INTEGER DEFAULT 0, next_retry REAL, created_at REAL DEFAULT (unixepoch()) ) """) # Migrations for existing databases for col in ("encryption_info TEXT", "checksum TEXT"): try: await db.execute(f"ALTER TABLE processed_events ADD COLUMN {col}") except Exception: pass await db.commit() async def get_event_row(db: aiosqlite.Connection, event_id: str) -> Optional[dict]: async with db.execute( "SELECT * FROM processed_events WHERE event_id = ?", (event_id,) ) as cursor: row = await cursor.fetchone() if row is None: return None cols = [d[0] for d in cursor.description] return dict(zip(cols, row)) async def upsert_event( db: aiosqlite.Connection, event_id: str, filename: str, mxc_url: str, status: str, encryption_info: Optional[str] = None, checksum: Optional[str] = None, paperless_id: Optional[int] = None, retry_count: int = 0, next_retry: Optional[float] = None, ) -> None: await db.execute( """ INSERT OR REPLACE INTO processed_events (event_id, filename, mxc_url, encryption_info, checksum, status, paperless_id, retry_count, next_retry) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (event_id, filename, mxc_url, encryption_info, checksum, status, paperless_id, retry_count, next_retry), ) await db.commit() # --------------------------------------------------------------------------- # Encrypted attachment helpers # --------------------------------------------------------------------------- def decrypt_attachment(ciphertext: bytes, file_info: dict) -> bytes: """Decrypt a Matrix encrypted file attachment (AES-256-CTR).""" key = base64.urlsafe_b64decode(file_info["key"]["k"] + "==") iv_bytes = base64.urlsafe_b64decode(file_info["iv"] + "==") ctr = Counter.new(128, initial_value=int.from_bytes(iv_bytes, "big")) cipher = AES.new(key, AES.MODE_CTR, counter=ctr) return cipher.decrypt(ciphertext) def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]: """ Return the encrypted file info dict if this BadEvent is an encrypted image/file attachment we care about, else None. Encrypted attachments use `file` instead of `url` in the content, which causes matrix-nio's schema validator to reject them as BadEvent. """ content = event.source.get("content", {}) msgtype = content.get("msgtype", "") if msgtype not in ("m.image", "m.file"): return None name = content.get("body", "").lower() if msgtype == "m.image" and not name.endswith((".jpg", ".jpeg")): return None if msgtype == "m.file" and not name.endswith(".pdf"): return None file_info = content.get("file", {}) if not file_info or "url" not in file_info or "key" not in file_info: return None return file_info # --------------------------------------------------------------------------- # File event helpers # --------------------------------------------------------------------------- def is_supported_file(event) -> bool: if isinstance(event, RoomMessageFile): return (event.body or "").lower().endswith(".pdf") if isinstance(event, RoomMessageImage): return (event.body or "").lower().endswith((".jpg", ".jpeg")) if isinstance(event, BadEvent): return _bad_event_encrypted_file_info(event) is not None return False def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]: """Returns (event_id, filename, mxc_url, encryption_info_json_or_None).""" if isinstance(event, BadEvent): content = event.source.get("content", {}) filename = content.get("body", "unknown") file_info = content.get("file", {}) return event.event_id, filename, file_info["url"], json.dumps(file_info) return event.event_id, event.body, event.url, None def content_type_for(filename: str) -> str: return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg" def validate_file(path: Path, filename: str) -> None: """Raise ValueError if the file doesn't look like the format its name claims.""" data = path.read_bytes()[:8] if filename.lower().endswith(".pdf"): if not data.startswith(b"%PDF"): raise ValueError(f"File does not start with %PDF magic bytes: {filename}") else: # JPEG: starts with FF D8 FF if not data[:3] == b"\xff\xd8\xff": raise ValueError(f"File does not start with JPEG magic bytes: {filename}") # --------------------------------------------------------------------------- # Paperless client # --------------------------------------------------------------------------- class PaperlessClient: def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None: self.base_url = base_url self.headers = {"Authorization": f"Token {token}"} self.inbox_tag_id = inbox_tag_id async def find_by_checksum(self, checksum: str) -> Optional[int]: """Return the Paperless document ID if the checksum already exists.""" async with httpx.AsyncClient() as client: r = await client.get( f"{self.base_url}/api/documents/", headers=self.headers, params={"checksum__iexact": checksum}, timeout=30, ) r.raise_for_status() results = r.json()["results"] return results[0]["id"] if results else None async def upload(self, filename: str, path: Path, content_type: str) -> str: """Upload a document from a file path and return the Paperless task ID.""" async with httpx.AsyncClient() as client: with path.open("rb") as fh: r = await client.post( f"{self.base_url}/api/documents/post_document/", headers=self.headers, data={"tags": str(self.inbox_tag_id)}, files={"document": (filename, fh, content_type)}, timeout=120, ) r.raise_for_status() return r.text.strip('"') async def wait_for_task(self, task_id: str) -> int: """Poll until the task completes and return the created document ID. Raises RuntimeError if the task fails or times out. """ deadline = time.time() + PAPERLESS_TASK_TIMEOUT async with httpx.AsyncClient() as client: while time.time() < deadline: r = await client.get( f"{self.base_url}/api/tasks/", headers=self.headers, params={"task_id": task_id}, timeout=30, ) r.raise_for_status() results = r.json() if not results: await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL) continue task = results[0] status = task.get("status") if status == "SUCCESS": doc_id = task.get("related_document") if doc_id is None: # Paperless rejected it as a duplicate — not an error for us log.info("Paperless task %s: document already existed", task_id) return -1 return int(doc_id) if status in ("FAILURE", "REVOKED"): result = task.get("result", "unknown error") raise RuntimeError(f"Paperless task {task_id} failed: {result}") # PENDING or STARTED — keep waiting await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL) raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s") # --------------------------------------------------------------------------- # Core processing # --------------------------------------------------------------------------- async def process_event( event_id: str, filename: str, mxc_url: str, encryption_info: Optional[str], matrix_client: AsyncClient, db: aiosqlite.Connection, paperless: PaperlessClient, ) -> None: row = await get_event_row(db, event_id) if row and row["status"] in ("uploaded", "skipped", "give_up"): log.debug("Already handled %s, skipping", event_id) return # Check if we've exhausted all retries if row and row["status"] == "failed" and row["retry_count"] > len(RETRY_DELAYS): log.warning("Max retries exceeded for %s, giving up", filename) await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info) return log.info("Processing %s (%s)", filename, event_id) tmp_path = None try: dl = await matrix_client.download(mxc=mxc_url) if not hasattr(dl, "body"): raise RuntimeError(f"Download failed: {dl}") # Write to a temp file to avoid loading large files into RAM suffix = Path(filename).suffix or ".bin" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp_path = Path(tmp.name) if encryption_info: tmp.write(decrypt_attachment(dl.body, json.loads(encryption_info))) else: tmp.write(dl.body) # Validate the file looks like what it claims to be validate_file(tmp_path, filename) # Deduplicate against Paperless by content checksum checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest() existing_id = await paperless.find_by_checksum(checksum) if existing_id is not None: log.info("Already in Paperless (id=%d), skipping: %s", existing_id, filename) await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info, checksum, existing_id) return # Upload and wait for Paperless to confirm it landed task_id = await paperless.upload(filename, tmp_path, content_type_for(filename)) log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id) doc_id = await paperless.wait_for_task(task_id) if doc_id == -1: # Paperless said it was a duplicate — treat as skipped log.info("Paperless reported duplicate for %s, marking skipped", filename) await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info, checksum) else: log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename) await upsert_event(db, event_id, filename, mxc_url, "uploaded", encryption_info, checksum, doc_id) except Exception as exc: log.error("Failed to process %s (%s): %s", filename, event_id, exc) retry_count = (row["retry_count"] + 1) if row else 1 if retry_count > len(RETRY_DELAYS): log.warning("Max retries exceeded for %s, giving up permanently", filename) await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info) else: delay = RETRY_DELAYS[retry_count - 1] await upsert_event( db, event_id, filename, mxc_url, "failed", encryption_info, retry_count=retry_count, next_retry=time.time() + delay, ) finally: if tmp_path and tmp_path.exists(): tmp_path.unlink() # --------------------------------------------------------------------------- # Historical catchup # --------------------------------------------------------------------------- async def catchup_history( matrix_client: AsyncClient, db: aiosqlite.Connection, paperless: PaperlessClient, room_id: str, start_token: str, ) -> None: log.info("Starting historical catchup...") token = start_token total = 0 while True: response = await matrix_client.room_messages( room_id, start=token, limit=100, direction=MessageDirection.back, ) if isinstance(response, RoomMessagesError): log.error("room_messages error: %s", response) break for event in response.chunk: if is_supported_file(event): total += 1 event_id, filename, mxc_url, enc_info = extract_event_fields(event) await process_event( event_id, filename, mxc_url, enc_info, matrix_client, db, paperless, ) if not response.end: break token = response.end log.info("Historical catchup complete — processed %d file event(s).", total) # --------------------------------------------------------------------------- # Retry loop # --------------------------------------------------------------------------- async def retry_loop( matrix_client: AsyncClient, db: aiosqlite.Connection, paperless: PaperlessClient, ) -> None: while True: await asyncio.sleep(60) now = time.time() async with db.execute( "SELECT event_id, filename, mxc_url, encryption_info FROM processed_events " "WHERE status = 'failed' AND next_retry <= ?", (now,), ) as cursor: rows = await cursor.fetchall() for event_id, filename, mxc_url, enc_info in rows: log.info("Retrying %s (%s)", filename, event_id) await process_event(event_id, filename, mxc_url, enc_info, matrix_client, db, paperless) # --------------------------------------------------------------------------- # Uptime Kuma heartbeat # --------------------------------------------------------------------------- async def heartbeat_loop() -> None: if not UPTIME_KUMA_PUSH_URL: return log.info("Heartbeat enabled, pinging Uptime Kuma every 60s") async with httpx.AsyncClient() as client: while True: try: await client.get(UPTIME_KUMA_PUSH_URL, timeout=10) except Exception as exc: log.warning("Heartbeat ping failed: %s", exc) await asyncio.sleep(60) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main() -> None: paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID) async with aiosqlite.connect(DB_PATH) as db: await init_db(db) matrix_client = AsyncClient(MATRIX_HOMESERVER, MATRIX_USER) matrix_client.access_token = MATRIX_ACCESS_TOKEN matrix_client.device_id = MATRIX_DEVICE_ID try: log.info("Connecting to Matrix...") sync_resp = await matrix_client.sync(timeout=30_000, full_state=True) # Accept invite if the bot hasn't joined yet if MATRIX_ROOM_ID in sync_resp.rooms.invite: log.info("Accepting invite to %s...", MATRIX_ROOM_ID) await matrix_client.join(MATRIX_ROOM_ID) sync_resp = await matrix_client.sync(timeout=30_000, full_state=True) room_timeline = sync_resp.rooms.join.get(MATRIX_ROOM_ID) if room_timeline is None: log.error( "Room %s not found in joined rooms. Is the bot a member?", MATRIX_ROOM_ID, ) return # Process events from the initial sync timeline first for event in room_timeline.timeline.events: if is_supported_file(event): event_id, filename, mxc_url, enc_info = extract_event_fields(event) await process_event(event_id, filename, mxc_url, enc_info, matrix_client, db, paperless) # Then paginate backwards for older history prev_batch = room_timeline.timeline.prev_batch await catchup_history(matrix_client, db, paperless, MATRIX_ROOM_ID, prev_batch) async def on_file(room, event): if room.room_id == MATRIX_ROOM_ID and is_supported_file(event): event_id, filename, mxc_url, enc_info = extract_event_fields(event) await process_event( event_id, filename, mxc_url, enc_info, matrix_client, db, paperless, ) matrix_client.add_event_callback( on_file, (RoomMessageFile, RoomMessageImage, BadEvent) ) log.info("Listening for new messages...") await asyncio.gather( matrix_client.sync_forever(timeout=30_000), retry_loop(matrix_client, db, paperless), heartbeat_loop(), ) finally: await matrix_client.close() if __name__ == "__main__": asyncio.run(main())