WhatsApp bridge files (e.g. PDFs) may arrive with an empty body field, causing the previous .pdf extension check to silently skip them. Accept all RoomMessageFile events and fall back to "document.pdf" as filename. File content is still validated via magic bytes before upload.
570 lines
21 KiB
Python
570 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Matrix to Paperless-ngx ingest bot.
|
|
|
|
Monitors a Matrix room for PDF and JPEG files and uploads them to Paperless-ngx.
|
|
Processes historical messages on startup and listens for new ones indefinitely.
|
|
Failed uploads are retried with exponential backoff up to a maximum number of attempts.
|
|
|
|
Supports both plain and E2E-encrypted file attachments. Encrypted files carry
|
|
their AES key inline in the event, so no Megolm session is needed to decrypt them.
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiosqlite
|
|
import httpx
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util import Counter
|
|
from dotenv import load_dotenv
|
|
from nio import AsyncClient, MessageDirection, RoomMessagesError
|
|
from nio.events.room_events import BadEvent, RoomMessageFile, RoomMessageImage
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
logging.getLogger("nio.events.misc").setLevel(logging.ERROR)
|
|
|
|
class _SSLSMTPHandler(logging.handlers.SMTPHandler):
|
|
"""SMTPHandler that uses SMTP_SSL (port 465) instead of STARTTLS."""
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
import smtplib
|
|
try:
|
|
host, port = self.mailhost
|
|
with smtplib.SMTP_SSL(host, port) as smtp:
|
|
smtp.login(self.username, self.password)
|
|
msg = self.format(record)
|
|
smtp.sendmail(self.fromaddr, self.toaddrs,
|
|
f"From: {self.fromaddr}\r\n"
|
|
f"To: {', '.join(self.toaddrs)}\r\n"
|
|
f"Subject: {self.getSubject(record)}\r\n\r\n"
|
|
f"{msg}")
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
|
|
def _setup_email_alerts() -> None:
|
|
smtp_host = os.environ.get("SMTP_HOST")
|
|
if not smtp_host:
|
|
return
|
|
handler = _SSLSMTPHandler(
|
|
mailhost=(smtp_host, int(os.environ.get("SMTP_PORT", 465))),
|
|
fromaddr=os.environ["ALERT_FROM"],
|
|
toaddrs=[os.environ["ALERT_TO"]],
|
|
subject="matrix-paperless-ingest error",
|
|
credentials=(os.environ["SMTP_USER"], os.environ["SMTP_PASSWORD"]),
|
|
)
|
|
handler.setLevel(logging.ERROR)
|
|
logging.getLogger().addHandler(handler)
|
|
log.info("Email alerts enabled → %s", os.environ["ALERT_TO"])
|
|
|
|
_setup_email_alerts()
|
|
|
|
MATRIX_HOMESERVER = os.environ["MATRIX_HOMESERVER"]
|
|
MATRIX_USER = os.environ["MATRIX_USER"]
|
|
MATRIX_ACCESS_TOKEN = os.environ["MATRIX_ACCESS_TOKEN"]
|
|
MATRIX_DEVICE_ID = os.environ["MATRIX_DEVICE_ID"]
|
|
MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"]
|
|
|
|
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
|
|
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
|
|
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
|
|
|
|
DB_PATH = os.environ.get("DB_PATH", "state.db")
|
|
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
|
|
|
|
# Retry delays in seconds: 1m, 5m, 30m, 2h, 24h — after the last delay is
|
|
# exhausted the event is marked permanently failed and never retried again.
|
|
RETRY_DELAYS = [60, 300, 1800, 7200, 86400]
|
|
|
|
# How long to wait for Paperless to finish processing an uploaded document.
|
|
PAPERLESS_TASK_POLL_INTERVAL = 5 # seconds between polls
|
|
PAPERLESS_TASK_TIMEOUT = 300 # give up after this many seconds
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def init_db(db: aiosqlite.Connection) -> None:
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS processed_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
mxc_url TEXT NOT NULL,
|
|
encryption_info TEXT,
|
|
checksum TEXT,
|
|
status TEXT NOT NULL,
|
|
paperless_id INTEGER,
|
|
retry_count INTEGER DEFAULT 0,
|
|
next_retry REAL,
|
|
created_at REAL DEFAULT (unixepoch())
|
|
)
|
|
""")
|
|
# Migrations for existing databases
|
|
for col in ("encryption_info TEXT", "checksum TEXT"):
|
|
try:
|
|
await db.execute(f"ALTER TABLE processed_events ADD COLUMN {col}")
|
|
except Exception:
|
|
pass
|
|
await db.commit()
|
|
|
|
|
|
async def get_event_row(db: aiosqlite.Connection, event_id: str) -> Optional[dict]:
|
|
async with db.execute(
|
|
"SELECT * FROM processed_events WHERE event_id = ?", (event_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
return None
|
|
cols = [d[0] for d in cursor.description]
|
|
return dict(zip(cols, row))
|
|
|
|
|
|
async def upsert_event(
|
|
db: aiosqlite.Connection,
|
|
event_id: str,
|
|
filename: str,
|
|
mxc_url: str,
|
|
status: str,
|
|
encryption_info: Optional[str] = None,
|
|
checksum: Optional[str] = None,
|
|
paperless_id: Optional[int] = None,
|
|
retry_count: int = 0,
|
|
next_retry: Optional[float] = None,
|
|
) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO processed_events
|
|
(event_id, filename, mxc_url, encryption_info, checksum, status,
|
|
paperless_id, retry_count, next_retry)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(event_id, filename, mxc_url, encryption_info, checksum, status,
|
|
paperless_id, retry_count, next_retry),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Encrypted attachment helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def decrypt_attachment(ciphertext: bytes, file_info: dict) -> bytes:
|
|
"""Decrypt a Matrix encrypted file attachment (AES-256-CTR)."""
|
|
key = base64.urlsafe_b64decode(file_info["key"]["k"] + "==")
|
|
iv_bytes = base64.urlsafe_b64decode(file_info["iv"] + "==")
|
|
ctr = Counter.new(128, initial_value=int.from_bytes(iv_bytes, "big"))
|
|
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
|
|
return cipher.decrypt(ciphertext)
|
|
|
|
|
|
def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]:
|
|
"""
|
|
Return the encrypted file info dict if this BadEvent is an encrypted
|
|
image/file attachment we care about, else None.
|
|
|
|
Encrypted attachments use `file` instead of `url` in the content, which
|
|
causes matrix-nio's schema validator to reject them as BadEvent.
|
|
"""
|
|
content = event.source.get("content", {})
|
|
msgtype = content.get("msgtype", "")
|
|
if msgtype not in ("m.image", "m.file"):
|
|
return None
|
|
|
|
name = content.get("body", "").lower()
|
|
if msgtype == "m.image" and not name.endswith((".jpg", ".jpeg")):
|
|
return None
|
|
if msgtype == "m.file" and not name.endswith(".pdf"):
|
|
return None
|
|
|
|
file_info = content.get("file", {})
|
|
if not file_info or "url" not in file_info or "key" not in file_info:
|
|
return None
|
|
|
|
return file_info
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File event helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_supported_file(event) -> bool:
|
|
if isinstance(event, RoomMessageFile):
|
|
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
|
|
if isinstance(event, RoomMessageImage):
|
|
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
|
|
if isinstance(event, BadEvent):
|
|
return _bad_event_encrypted_file_info(event) is not None
|
|
return False
|
|
|
|
|
|
def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
|
|
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
|
|
if isinstance(event, BadEvent):
|
|
content = event.source.get("content", {})
|
|
filename = content.get("body", "unknown")
|
|
file_info = content.get("file", {})
|
|
return event.event_id, filename, file_info["url"], json.dumps(file_info)
|
|
if isinstance(event, RoomMessageFile):
|
|
filename = event.body or "document.pdf"
|
|
else:
|
|
filename = event.body or "image.jpg"
|
|
return event.event_id, filename, event.url, None
|
|
|
|
|
|
def content_type_for(filename: str) -> str:
|
|
return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
|
|
|
|
|
|
def validate_file(path: Path, filename: str) -> None:
|
|
"""Raise ValueError if the file doesn't look like the format its name claims."""
|
|
data = path.read_bytes()[:8]
|
|
if filename.lower().endswith(".pdf"):
|
|
if not data.startswith(b"%PDF"):
|
|
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
|
|
else:
|
|
# JPEG: starts with FF D8 FF
|
|
if not data[:3] == b"\xff\xd8\xff":
|
|
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paperless client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PaperlessClient:
|
|
def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None:
|
|
self.base_url = base_url
|
|
self.headers = {"Authorization": f"Token {token}"}
|
|
self.inbox_tag_id = inbox_tag_id
|
|
|
|
async def find_by_checksum(self, checksum: str) -> Optional[int]:
|
|
"""Return the Paperless document ID if the checksum already exists."""
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(
|
|
f"{self.base_url}/api/documents/",
|
|
headers=self.headers,
|
|
params={"checksum__iexact": checksum},
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
results = r.json()["results"]
|
|
return results[0]["id"] if results else None
|
|
|
|
async def upload(self, filename: str, path: Path, content_type: str) -> str:
|
|
"""Upload a document from a file path and return the Paperless task ID."""
|
|
async with httpx.AsyncClient() as client:
|
|
with path.open("rb") as fh:
|
|
r = await client.post(
|
|
f"{self.base_url}/api/documents/post_document/",
|
|
headers=self.headers,
|
|
data={"tags": str(self.inbox_tag_id)},
|
|
files={"document": (filename, fh, content_type)},
|
|
timeout=120,
|
|
)
|
|
r.raise_for_status()
|
|
return r.text.strip('"')
|
|
|
|
async def wait_for_task(self, task_id: str) -> int:
|
|
"""Poll until the task completes and return the created document ID.
|
|
|
|
Raises RuntimeError if the task fails or times out.
|
|
"""
|
|
deadline = time.time() + PAPERLESS_TASK_TIMEOUT
|
|
async with httpx.AsyncClient() as client:
|
|
while time.time() < deadline:
|
|
r = await client.get(
|
|
f"{self.base_url}/api/tasks/",
|
|
headers=self.headers,
|
|
params={"task_id": task_id},
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
results = r.json()
|
|
if not results:
|
|
await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL)
|
|
continue
|
|
|
|
task = results[0]
|
|
status = task.get("status")
|
|
|
|
if status == "SUCCESS":
|
|
doc_id = task.get("related_document")
|
|
if doc_id is None:
|
|
# Paperless rejected it as a duplicate — not an error for us
|
|
log.info("Paperless task %s: document already existed", task_id)
|
|
return -1
|
|
return int(doc_id)
|
|
|
|
if status in ("FAILURE", "REVOKED"):
|
|
result = task.get("result", "unknown error")
|
|
raise RuntimeError(f"Paperless task {task_id} failed: {result}")
|
|
|
|
# PENDING or STARTED — keep waiting
|
|
await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL)
|
|
|
|
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def process_event(
|
|
event_id: str,
|
|
filename: str,
|
|
mxc_url: str,
|
|
encryption_info: Optional[str],
|
|
matrix_client: AsyncClient,
|
|
db: aiosqlite.Connection,
|
|
paperless: PaperlessClient,
|
|
) -> None:
|
|
row = await get_event_row(db, event_id)
|
|
if row and row["status"] in ("uploaded", "skipped", "give_up"):
|
|
log.debug("Already handled %s, skipping", event_id)
|
|
return
|
|
|
|
# Check if we've exhausted all retries
|
|
if row and row["status"] == "failed" and row["retry_count"] > len(RETRY_DELAYS):
|
|
log.warning("Max retries exceeded for %s, giving up", filename)
|
|
await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info)
|
|
return
|
|
|
|
log.info("Processing %s (%s)", filename, event_id)
|
|
|
|
tmp_path = None
|
|
try:
|
|
dl = await matrix_client.download(mxc=mxc_url)
|
|
if not hasattr(dl, "body"):
|
|
raise RuntimeError(f"Download failed: {dl}")
|
|
|
|
# Write to a temp file to avoid loading large files into RAM
|
|
suffix = Path(filename).suffix or ".bin"
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
|
|
tmp_path = Path(tmp.name)
|
|
if encryption_info:
|
|
tmp.write(decrypt_attachment(dl.body, json.loads(encryption_info)))
|
|
else:
|
|
tmp.write(dl.body)
|
|
|
|
# Validate the file looks like what it claims to be
|
|
validate_file(tmp_path, filename)
|
|
|
|
# Deduplicate against Paperless by content checksum
|
|
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
|
|
existing_id = await paperless.find_by_checksum(checksum)
|
|
if existing_id is not None:
|
|
log.info("Already in Paperless (id=%d), skipping: %s", existing_id, filename)
|
|
await upsert_event(db, event_id, filename, mxc_url, "skipped",
|
|
encryption_info, checksum, existing_id)
|
|
return
|
|
|
|
# Upload and wait for Paperless to confirm it landed
|
|
task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
|
|
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
|
|
doc_id = await paperless.wait_for_task(task_id)
|
|
|
|
if doc_id == -1:
|
|
# Paperless said it was a duplicate — treat as skipped
|
|
log.info("Paperless reported duplicate for %s, marking skipped", filename)
|
|
await upsert_event(db, event_id, filename, mxc_url, "skipped",
|
|
encryption_info, checksum)
|
|
else:
|
|
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
|
|
await upsert_event(db, event_id, filename, mxc_url, "uploaded",
|
|
encryption_info, checksum, doc_id)
|
|
|
|
except Exception as exc:
|
|
log.error("Failed to process %s (%s): %s", filename, event_id, exc)
|
|
retry_count = (row["retry_count"] + 1) if row else 1
|
|
if retry_count > len(RETRY_DELAYS):
|
|
log.warning("Max retries exceeded for %s, giving up permanently", filename)
|
|
await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info)
|
|
else:
|
|
delay = RETRY_DELAYS[retry_count - 1]
|
|
await upsert_event(
|
|
db, event_id, filename, mxc_url, "failed",
|
|
encryption_info,
|
|
retry_count=retry_count,
|
|
next_retry=time.time() + delay,
|
|
)
|
|
finally:
|
|
if tmp_path and tmp_path.exists():
|
|
tmp_path.unlink()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Historical catchup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def catchup_history(
|
|
matrix_client: AsyncClient,
|
|
db: aiosqlite.Connection,
|
|
paperless: PaperlessClient,
|
|
room_id: str,
|
|
start_token: str,
|
|
) -> None:
|
|
log.info("Starting historical catchup...")
|
|
token = start_token
|
|
total = 0
|
|
batches = 0
|
|
events_seen = 0
|
|
|
|
while True:
|
|
response = await matrix_client.room_messages(
|
|
room_id,
|
|
start=token,
|
|
limit=100,
|
|
direction=MessageDirection.back,
|
|
)
|
|
|
|
if isinstance(response, RoomMessagesError):
|
|
log.error("room_messages error: %s", response)
|
|
break
|
|
|
|
batches += 1
|
|
events_seen += len(response.chunk)
|
|
|
|
for event in response.chunk:
|
|
if is_supported_file(event):
|
|
total += 1
|
|
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
|
await process_event(
|
|
event_id, filename, mxc_url, enc_info,
|
|
matrix_client, db, paperless,
|
|
)
|
|
|
|
if not response.end:
|
|
break
|
|
token = response.end
|
|
|
|
log.info(
|
|
"Historical catchup complete — processed %d file event(s) across %d batches (%d total events).",
|
|
total, batches, events_seen,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Retry loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def retry_loop(
|
|
matrix_client: AsyncClient,
|
|
db: aiosqlite.Connection,
|
|
paperless: PaperlessClient,
|
|
) -> None:
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
now = time.time()
|
|
async with db.execute(
|
|
"SELECT event_id, filename, mxc_url, encryption_info FROM processed_events "
|
|
"WHERE status = 'failed' AND next_retry <= ?",
|
|
(now,),
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
|
|
for event_id, filename, mxc_url, enc_info in rows:
|
|
log.info("Retrying %s (%s)", filename, event_id)
|
|
await process_event(event_id, filename, mxc_url, enc_info,
|
|
matrix_client, db, paperless)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Uptime Kuma heartbeat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def heartbeat_loop() -> None:
|
|
if not UPTIME_KUMA_PUSH_URL:
|
|
return
|
|
log.info("Heartbeat enabled, pinging Uptime Kuma every 60s")
|
|
async with httpx.AsyncClient() as client:
|
|
while True:
|
|
try:
|
|
await client.get(UPTIME_KUMA_PUSH_URL, timeout=10)
|
|
except Exception as exc:
|
|
log.warning("Heartbeat ping failed: %s", exc)
|
|
await asyncio.sleep(60)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def main() -> None:
|
|
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID)
|
|
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await init_db(db)
|
|
|
|
matrix_client = AsyncClient(MATRIX_HOMESERVER, MATRIX_USER)
|
|
matrix_client.access_token = MATRIX_ACCESS_TOKEN
|
|
matrix_client.device_id = MATRIX_DEVICE_ID
|
|
|
|
try:
|
|
log.info("Connecting to Matrix...")
|
|
sync_resp = await matrix_client.sync(timeout=30_000, full_state=True)
|
|
|
|
# Accept invite if the bot hasn't joined yet
|
|
if MATRIX_ROOM_ID in sync_resp.rooms.invite:
|
|
log.info("Accepting invite to %s...", MATRIX_ROOM_ID)
|
|
await matrix_client.join(MATRIX_ROOM_ID)
|
|
sync_resp = await matrix_client.sync(timeout=30_000, full_state=True)
|
|
|
|
room_timeline = sync_resp.rooms.join.get(MATRIX_ROOM_ID)
|
|
if room_timeline is None:
|
|
log.error(
|
|
"Room %s not found in joined rooms. Is the bot a member?",
|
|
MATRIX_ROOM_ID,
|
|
)
|
|
return
|
|
|
|
# Process events from the initial sync timeline first
|
|
for event in room_timeline.timeline.events:
|
|
if is_supported_file(event):
|
|
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
|
await process_event(event_id, filename, mxc_url, enc_info,
|
|
matrix_client, db, paperless)
|
|
|
|
# Then paginate backwards for older history
|
|
prev_batch = room_timeline.timeline.prev_batch
|
|
await catchup_history(matrix_client, db, paperless, MATRIX_ROOM_ID, prev_batch)
|
|
|
|
async def on_file(room, event):
|
|
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
|
|
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
|
await process_event(
|
|
event_id, filename, mxc_url, enc_info,
|
|
matrix_client, db, paperless,
|
|
)
|
|
|
|
matrix_client.add_event_callback(
|
|
on_file, (RoomMessageFile, RoomMessageImage, BadEvent)
|
|
)
|
|
|
|
log.info("Listening for new messages...")
|
|
await asyncio.gather(
|
|
matrix_client.sync_forever(timeout=30_000),
|
|
retry_loop(matrix_client, db, paperless),
|
|
heartbeat_loop(),
|
|
)
|
|
|
|
finally:
|
|
await matrix_client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|