ingest: Initial implementation

Bot that monitors a Matrix room for PDF and JPEG files and uploads
them to Paperless-ngx. Supports E2E encrypted attachments via inline
AES keys, historical catchup on startup, exponential backoff retries
with a permanent give-up after max attempts, file format validation
via magic bytes, Uptime Kuma heartbeat monitoring, and email alerts
on errors via SMTP SSL.
This commit is contained in:
Jeena 2026-03-11 13:43:48 +00:00
commit d5a3528cde
7 changed files with 1844 additions and 0 deletions

558
ingest.py Normal file
View file

@ -0,0 +1,558 @@
#!/usr/bin/env python3
"""Matrix to Paperless-ngx ingest bot.
Monitors a Matrix room for PDF and JPEG files and uploads them to Paperless-ngx.
Processes historical messages on startup and listens for new ones indefinitely.
Failed uploads are retried with exponential backoff up to a maximum number of attempts.
Supports both plain and E2E-encrypted file attachments. Encrypted files carry
their AES key inline in the event, so no Megolm session is needed to decrypt them.
"""
import asyncio
import base64
import hashlib
import json
import logging
import logging.handlers
import os
import tempfile
import time
from pathlib import Path
from typing import Optional
import aiosqlite
import httpx
from Crypto.Cipher import AES
from Crypto.Util import Counter
from dotenv import load_dotenv
from nio import AsyncClient, MessageDirection, RoomMessagesError
from nio.events.room_events import BadEvent, RoomMessageFile, RoomMessageImage
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger(__name__)
logging.getLogger("nio.events.misc").setLevel(logging.ERROR)
class _SSLSMTPHandler(logging.handlers.SMTPHandler):
"""SMTPHandler that uses SMTP_SSL (port 465) instead of STARTTLS."""
def emit(self, record: logging.LogRecord) -> None:
import smtplib
try:
host, port = self.mailhost
with smtplib.SMTP_SSL(host, port) as smtp:
smtp.login(self.username, self.password)
msg = self.format(record)
smtp.sendmail(self.fromaddr, self.toaddrs,
f"From: {self.fromaddr}\r\n"
f"To: {', '.join(self.toaddrs)}\r\n"
f"Subject: {self.getSubject(record)}\r\n\r\n"
f"{msg}")
except Exception:
self.handleError(record)
def _setup_email_alerts() -> None:
smtp_host = os.environ.get("SMTP_HOST")
if not smtp_host:
return
handler = _SSLSMTPHandler(
mailhost=(smtp_host, int(os.environ.get("SMTP_PORT", 465))),
fromaddr=os.environ["ALERT_FROM"],
toaddrs=[os.environ["ALERT_TO"]],
subject="matrix-paperless-ingest error",
credentials=(os.environ["SMTP_USER"], os.environ["SMTP_PASSWORD"]),
)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
log.info("Email alerts enabled → %s", os.environ["ALERT_TO"])
_setup_email_alerts()
MATRIX_HOMESERVER = os.environ["MATRIX_HOMESERVER"]
MATRIX_USER = os.environ["MATRIX_USER"]
MATRIX_ACCESS_TOKEN = os.environ["MATRIX_ACCESS_TOKEN"]
MATRIX_DEVICE_ID = os.environ["MATRIX_DEVICE_ID"]
MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"]
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
DB_PATH = os.environ.get("DB_PATH", "state.db")
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
# Retry delays in seconds: 1m, 5m, 30m, 2h, 24h — after the last delay is
# exhausted the event is marked permanently failed and never retried again.
RETRY_DELAYS = [60, 300, 1800, 7200, 86400]
# How long to wait for Paperless to finish processing an uploaded document.
PAPERLESS_TASK_POLL_INTERVAL = 5 # seconds between polls
PAPERLESS_TASK_TIMEOUT = 300 # give up after this many seconds
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
async def init_db(db: aiosqlite.Connection) -> None:
await db.execute("""
CREATE TABLE IF NOT EXISTS processed_events (
event_id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
mxc_url TEXT NOT NULL,
encryption_info TEXT,
checksum TEXT,
status TEXT NOT NULL,
paperless_id INTEGER,
retry_count INTEGER DEFAULT 0,
next_retry REAL,
created_at REAL DEFAULT (unixepoch())
)
""")
# Migrations for existing databases
for col in ("encryption_info TEXT", "checksum TEXT"):
try:
await db.execute(f"ALTER TABLE processed_events ADD COLUMN {col}")
except Exception:
pass
await db.commit()
async def get_event_row(db: aiosqlite.Connection, event_id: str) -> Optional[dict]:
async with db.execute(
"SELECT * FROM processed_events WHERE event_id = ?", (event_id,)
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
cols = [d[0] for d in cursor.description]
return dict(zip(cols, row))
async def upsert_event(
db: aiosqlite.Connection,
event_id: str,
filename: str,
mxc_url: str,
status: str,
encryption_info: Optional[str] = None,
checksum: Optional[str] = None,
paperless_id: Optional[int] = None,
retry_count: int = 0,
next_retry: Optional[float] = None,
) -> None:
await db.execute(
"""
INSERT OR REPLACE INTO processed_events
(event_id, filename, mxc_url, encryption_info, checksum, status,
paperless_id, retry_count, next_retry)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(event_id, filename, mxc_url, encryption_info, checksum, status,
paperless_id, retry_count, next_retry),
)
await db.commit()
# ---------------------------------------------------------------------------
# Encrypted attachment helpers
# ---------------------------------------------------------------------------
def decrypt_attachment(ciphertext: bytes, file_info: dict) -> bytes:
"""Decrypt a Matrix encrypted file attachment (AES-256-CTR)."""
key = base64.urlsafe_b64decode(file_info["key"]["k"] + "==")
iv_bytes = base64.urlsafe_b64decode(file_info["iv"] + "==")
ctr = Counter.new(128, initial_value=int.from_bytes(iv_bytes, "big"))
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
return cipher.decrypt(ciphertext)
def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]:
"""
Return the encrypted file info dict if this BadEvent is an encrypted
image/file attachment we care about, else None.
Encrypted attachments use `file` instead of `url` in the content, which
causes matrix-nio's schema validator to reject them as BadEvent.
"""
content = event.source.get("content", {})
msgtype = content.get("msgtype", "")
if msgtype not in ("m.image", "m.file"):
return None
name = content.get("body", "").lower()
if msgtype == "m.image" and not name.endswith((".jpg", ".jpeg")):
return None
if msgtype == "m.file" and not name.endswith(".pdf"):
return None
file_info = content.get("file", {})
if not file_info or "url" not in file_info or "key" not in file_info:
return None
return file_info
# ---------------------------------------------------------------------------
# File event helpers
# ---------------------------------------------------------------------------
def is_supported_file(event) -> bool:
if isinstance(event, RoomMessageFile):
return (event.body or "").lower().endswith(".pdf")
if isinstance(event, RoomMessageImage):
return (event.body or "").lower().endswith((".jpg", ".jpeg"))
if isinstance(event, BadEvent):
return _bad_event_encrypted_file_info(event) is not None
return False
def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
if isinstance(event, BadEvent):
content = event.source.get("content", {})
filename = content.get("body", "unknown")
file_info = content.get("file", {})
return event.event_id, filename, file_info["url"], json.dumps(file_info)
return event.event_id, event.body, event.url, None
def content_type_for(filename: str) -> str:
return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
def validate_file(path: Path, filename: str) -> None:
"""Raise ValueError if the file doesn't look like the format its name claims."""
data = path.read_bytes()[:8]
if filename.lower().endswith(".pdf"):
if not data.startswith(b"%PDF"):
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
else:
# JPEG: starts with FF D8 FF
if not data[:3] == b"\xff\xd8\xff":
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
# ---------------------------------------------------------------------------
# Paperless client
# ---------------------------------------------------------------------------
class PaperlessClient:
def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None:
self.base_url = base_url
self.headers = {"Authorization": f"Token {token}"}
self.inbox_tag_id = inbox_tag_id
async def find_by_checksum(self, checksum: str) -> Optional[int]:
"""Return the Paperless document ID if the checksum already exists."""
async with httpx.AsyncClient() as client:
r = await client.get(
f"{self.base_url}/api/documents/",
headers=self.headers,
params={"checksum__iexact": checksum},
timeout=30,
)
r.raise_for_status()
results = r.json()["results"]
return results[0]["id"] if results else None
async def upload(self, filename: str, path: Path, content_type: str) -> str:
"""Upload a document from a file path and return the Paperless task ID."""
async with httpx.AsyncClient() as client:
with path.open("rb") as fh:
r = await client.post(
f"{self.base_url}/api/documents/post_document/",
headers=self.headers,
data={"tags": str(self.inbox_tag_id)},
files={"document": (filename, fh, content_type)},
timeout=120,
)
r.raise_for_status()
return r.text.strip('"')
async def wait_for_task(self, task_id: str) -> int:
"""Poll until the task completes and return the created document ID.
Raises RuntimeError if the task fails or times out.
"""
deadline = time.time() + PAPERLESS_TASK_TIMEOUT
async with httpx.AsyncClient() as client:
while time.time() < deadline:
r = await client.get(
f"{self.base_url}/api/tasks/",
headers=self.headers,
params={"task_id": task_id},
timeout=30,
)
r.raise_for_status()
results = r.json()
if not results:
await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL)
continue
task = results[0]
status = task.get("status")
if status == "SUCCESS":
doc_id = task.get("related_document")
if doc_id is None:
# Paperless rejected it as a duplicate — not an error for us
log.info("Paperless task %s: document already existed", task_id)
return -1
return int(doc_id)
if status in ("FAILURE", "REVOKED"):
result = task.get("result", "unknown error")
raise RuntimeError(f"Paperless task {task_id} failed: {result}")
# PENDING or STARTED — keep waiting
await asyncio.sleep(PAPERLESS_TASK_POLL_INTERVAL)
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
# ---------------------------------------------------------------------------
# Core processing
# ---------------------------------------------------------------------------
async def process_event(
event_id: str,
filename: str,
mxc_url: str,
encryption_info: Optional[str],
matrix_client: AsyncClient,
db: aiosqlite.Connection,
paperless: PaperlessClient,
) -> None:
row = await get_event_row(db, event_id)
if row and row["status"] in ("uploaded", "skipped", "give_up"):
log.debug("Already handled %s, skipping", event_id)
return
# Check if we've exhausted all retries
if row and row["status"] == "failed" and row["retry_count"] > len(RETRY_DELAYS):
log.warning("Max retries exceeded for %s, giving up", filename)
await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info)
return
log.info("Processing %s (%s)", filename, event_id)
tmp_path = None
try:
dl = await matrix_client.download(mxc=mxc_url)
if not hasattr(dl, "body"):
raise RuntimeError(f"Download failed: {dl}")
# Write to a temp file to avoid loading large files into RAM
suffix = Path(filename).suffix or ".bin"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp_path = Path(tmp.name)
if encryption_info:
tmp.write(decrypt_attachment(dl.body, json.loads(encryption_info)))
else:
tmp.write(dl.body)
# Validate the file looks like what it claims to be
validate_file(tmp_path, filename)
# Deduplicate against Paperless by content checksum
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
existing_id = await paperless.find_by_checksum(checksum)
if existing_id is not None:
log.info("Already in Paperless (id=%d), skipping: %s", existing_id, filename)
await upsert_event(db, event_id, filename, mxc_url, "skipped",
encryption_info, checksum, existing_id)
return
# Upload and wait for Paperless to confirm it landed
task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
doc_id = await paperless.wait_for_task(task_id)
if doc_id == -1:
# Paperless said it was a duplicate — treat as skipped
log.info("Paperless reported duplicate for %s, marking skipped", filename)
await upsert_event(db, event_id, filename, mxc_url, "skipped",
encryption_info, checksum)
else:
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
await upsert_event(db, event_id, filename, mxc_url, "uploaded",
encryption_info, checksum, doc_id)
except Exception as exc:
log.error("Failed to process %s (%s): %s", filename, event_id, exc)
retry_count = (row["retry_count"] + 1) if row else 1
if retry_count > len(RETRY_DELAYS):
log.warning("Max retries exceeded for %s, giving up permanently", filename)
await upsert_event(db, event_id, filename, mxc_url, "give_up", encryption_info)
else:
delay = RETRY_DELAYS[retry_count - 1]
await upsert_event(
db, event_id, filename, mxc_url, "failed",
encryption_info,
retry_count=retry_count,
next_retry=time.time() + delay,
)
finally:
if tmp_path and tmp_path.exists():
tmp_path.unlink()
# ---------------------------------------------------------------------------
# Historical catchup
# ---------------------------------------------------------------------------
async def catchup_history(
matrix_client: AsyncClient,
db: aiosqlite.Connection,
paperless: PaperlessClient,
room_id: str,
start_token: str,
) -> None:
log.info("Starting historical catchup...")
token = start_token
total = 0
while True:
response = await matrix_client.room_messages(
room_id,
start=token,
limit=100,
direction=MessageDirection.back,
)
if isinstance(response, RoomMessagesError):
log.error("room_messages error: %s", response)
break
for event in response.chunk:
if is_supported_file(event):
total += 1
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event(
event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless,
)
if not response.end:
break
token = response.end
log.info("Historical catchup complete — processed %d file event(s).", total)
# ---------------------------------------------------------------------------
# Retry loop
# ---------------------------------------------------------------------------
async def retry_loop(
matrix_client: AsyncClient,
db: aiosqlite.Connection,
paperless: PaperlessClient,
) -> None:
while True:
await asyncio.sleep(60)
now = time.time()
async with db.execute(
"SELECT event_id, filename, mxc_url, encryption_info FROM processed_events "
"WHERE status = 'failed' AND next_retry <= ?",
(now,),
) as cursor:
rows = await cursor.fetchall()
for event_id, filename, mxc_url, enc_info in rows:
log.info("Retrying %s (%s)", filename, event_id)
await process_event(event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless)
# ---------------------------------------------------------------------------
# Uptime Kuma heartbeat
# ---------------------------------------------------------------------------
async def heartbeat_loop() -> None:
if not UPTIME_KUMA_PUSH_URL:
return
log.info("Heartbeat enabled, pinging Uptime Kuma every 60s")
async with httpx.AsyncClient() as client:
while True:
try:
await client.get(UPTIME_KUMA_PUSH_URL, timeout=10)
except Exception as exc:
log.warning("Heartbeat ping failed: %s", exc)
await asyncio.sleep(60)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> None:
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID)
async with aiosqlite.connect(DB_PATH) as db:
await init_db(db)
matrix_client = AsyncClient(MATRIX_HOMESERVER, MATRIX_USER)
matrix_client.access_token = MATRIX_ACCESS_TOKEN
matrix_client.device_id = MATRIX_DEVICE_ID
try:
log.info("Connecting to Matrix...")
sync_resp = await matrix_client.sync(timeout=30_000, full_state=True)
# Accept invite if the bot hasn't joined yet
if MATRIX_ROOM_ID in sync_resp.rooms.invite:
log.info("Accepting invite to %s...", MATRIX_ROOM_ID)
await matrix_client.join(MATRIX_ROOM_ID)
sync_resp = await matrix_client.sync(timeout=30_000, full_state=True)
room_timeline = sync_resp.rooms.join.get(MATRIX_ROOM_ID)
if room_timeline is None:
log.error(
"Room %s not found in joined rooms. Is the bot a member?",
MATRIX_ROOM_ID,
)
return
# Process events from the initial sync timeline first
for event in room_timeline.timeline.events:
if is_supported_file(event):
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event(event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless)
# Then paginate backwards for older history
prev_batch = room_timeline.timeline.prev_batch
await catchup_history(matrix_client, db, paperless, MATRIX_ROOM_ID, prev_batch)
async def on_file(room, event):
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event(
event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless,
)
matrix_client.add_event_callback(
on_file, (RoomMessageFile, RoomMessageImage, BadEvent)
)
log.info("Listening for new messages...")
await asyncio.gather(
matrix_client.sync_forever(timeout=30_000),
retry_loop(matrix_client, db, paperless),
heartbeat_loop(),
)
finally:
await matrix_client.close()
if __name__ == "__main__":
asyncio.run(main())