From eec2d076e429dfdbfe61dca37c993b70ebe47ee9 Mon Sep 17 00:00:00 2001 From: Jeena Date: Wed, 11 Mar 2026 23:32:15 +0000 Subject: [PATCH 1/5] ingest: Accept RoomMessageImage events regardless of body content WhatsApp bridge images arrive as RoomMessageImage events with an empty body field, so the previous .jpg/.jpeg extension check silently rejected all of them. Accept all RoomMessageImage events and fall back to "image.jpg" as filename when body is empty. File content is still validated via magic bytes before upload. --- ingest.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/ingest.py b/ingest.py index 29585ab..fd9346c 100644 --- a/ingest.py +++ b/ingest.py @@ -206,7 +206,7 @@ def is_supported_file(event) -> bool: if isinstance(event, RoomMessageFile): return (event.body or "").lower().endswith(".pdf") if isinstance(event, RoomMessageImage): - return (event.body or "").lower().endswith((".jpg", ".jpeg")) + return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) if isinstance(event, BadEvent): return _bad_event_encrypted_file_info(event) is not None return False @@ -219,7 +219,8 @@ def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]: filename = content.get("body", "unknown") file_info = content.get("file", {}) return event.event_id, filename, file_info["url"], json.dumps(file_info) - return event.event_id, event.body, event.url, None + filename = event.body or "image.jpg" + return event.event_id, filename, event.url, None def content_type_for(filename: str) -> str: @@ -417,6 +418,8 @@ async def catchup_history( log.info("Starting historical catchup...") token = start_token total = 0 + batches = 0 + events_seen = 0 while True: response = await matrix_client.room_messages( @@ -430,6 +433,9 @@ async def catchup_history( log.error("room_messages error: %s", response) break + batches += 1 + events_seen += len(response.chunk) + for event in response.chunk: if is_supported_file(event): total += 1 @@ -443,7 +449,10 @@ async def catchup_history( break token = response.end - log.info("Historical catchup complete — processed %d file event(s).", total) + log.info( + "Historical catchup complete — processed %d file event(s) across %d batches (%d total events).", + total, batches, events_seen, + ) # --------------------------------------------------------------------------- From 0aa044eead0078173148665a9bfd506afa5f3574 Mon Sep 17 00:00:00 2001 From: Jeena Date: Wed, 11 Mar 2026 23:35:23 +0000 Subject: [PATCH 2/5] ingest: Accept RoomMessageFile events regardless of body content WhatsApp bridge files (e.g. PDFs) may arrive with an empty body field, causing the previous .pdf extension check to silently skip them. Accept all RoomMessageFile events and fall back to "document.pdf" as filename. File content is still validated via magic bytes before upload. --- ingest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ingest.py b/ingest.py index fd9346c..8f5a6b7 100644 --- a/ingest.py +++ b/ingest.py @@ -204,7 +204,7 @@ def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]: def is_supported_file(event) -> bool: if isinstance(event, RoomMessageFile): - return (event.body or "").lower().endswith(".pdf") + return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) if isinstance(event, RoomMessageImage): return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) if isinstance(event, BadEvent): @@ -219,7 +219,10 @@ def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]: filename = content.get("body", "unknown") file_info = content.get("file", {}) return event.event_id, filename, file_info["url"], json.dumps(file_info) - filename = event.body or "image.jpg" + if isinstance(event, RoomMessageFile): + filename = event.body or "document.pdf" + else: + filename = event.body or "image.jpg" return event.event_id, filename, event.url, None From f49ea1dbc506535ad60589e9ba0513781db631bd Mon Sep 17 00:00:00 2001 From: Jeena Date: Wed, 11 Mar 2026 23:55:00 +0000 Subject: [PATCH 3/5] ingest: Assign uploaded documents to a configurable Paperless owner The post_document endpoint does not support setting ownership on upload, so after a successful upload the document is PATCHed to set the owner. Add optional PAPERLESS_OWNER_ID env var. When set, every newly uploaded document is assigned to that Paperless user ID via PATCH /api/documents/{id}/. --- .env.example | 2 ++ ingest.py | 34 ++++++++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/.env.example b/.env.example index 14a8157..6e0da38 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,8 @@ MATRIX_ROOM_ID=!roomid:jeena.net PAPERLESS_URL=https://paperless.jeena.net PAPERLESS_TOKEN=your_paperless_api_token PAPERLESS_INBOX_TAG_ID=1 +# Optional: assign uploaded documents to this Paperless user ID +# PAPERLESS_OWNER_ID=7 # Optional: path to the SQLite state database (default: state.db next to the script) DB_PATH=state.db diff --git a/ingest.py b/ingest.py index 8f5a6b7..9b0d0ae 100644 --- a/ingest.py +++ b/ingest.py @@ -43,8 +43,7 @@ class _SSLSMTPHandler(logging.handlers.SMTPHandler): def emit(self, record: logging.LogRecord) -> None: import smtplib try: - host, port = self.mailhost - with smtplib.SMTP_SSL(host, port) as smtp: + with smtplib.SMTP_SSL(self.mailhost, self.mailport) as smtp: smtp.login(self.username, self.password) msg = self.format(record) smtp.sendmail(self.fromaddr, self.toaddrs, @@ -82,6 +81,9 @@ MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"] PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/") PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"] PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"]) +PAPERLESS_OWNER_ID: Optional[int] = ( + int(os.environ["PAPERLESS_OWNER_ID"]) if os.environ.get("PAPERLESS_OWNER_ID") else None +) DB_PATH = os.environ.get("DB_PATH", "state.db") UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL") @@ -247,10 +249,11 @@ def validate_file(path: Path, filename: str) -> None: # --------------------------------------------------------------------------- class PaperlessClient: - def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None: + def __init__(self, base_url: str, token: str, inbox_tag_id: int, owner_id: Optional[int] = None) -> None: self.base_url = base_url self.headers = {"Authorization": f"Token {token}"} self.inbox_tag_id = inbox_tag_id + self.owner_id = owner_id async def find_by_checksum(self, checksum: str) -> Optional[int]: """Return the Paperless document ID if the checksum already exists.""" @@ -319,6 +322,17 @@ class PaperlessClient: raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s") + async def set_owner(self, doc_id: int, owner_id: int) -> None: + """Set the owner of a document.""" + async with httpx.AsyncClient() as client: + r = await client.patch( + f"{self.base_url}/api/documents/{doc_id}/", + headers=self.headers, + json={"owner": owner_id}, + timeout=30, + ) + r.raise_for_status() + # --------------------------------------------------------------------------- # Core processing @@ -361,8 +375,13 @@ async def process_event( else: tmp.write(dl.body) - # Validate the file looks like what it claims to be - validate_file(tmp_path, filename) + # Validate the file looks like what it claims to be; skip unsupported formats + try: + validate_file(tmp_path, filename) + except ValueError as exc: + log.info("Skipping unsupported file %s: %s", filename, exc) + await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info) + return # Deduplicate against Paperless by content checksum checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest() @@ -385,6 +404,9 @@ async def process_event( encryption_info, checksum) else: log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename) + if paperless.owner_id is not None: + await paperless.set_owner(doc_id, paperless.owner_id) + log.info("Set owner of document id=%d to user id=%d", doc_id, paperless.owner_id) await upsert_event(db, event_id, filename, mxc_url, "uploaded", encryption_info, checksum, doc_id) @@ -505,7 +527,7 @@ async def heartbeat_loop() -> None: # --------------------------------------------------------------------------- async def main() -> None: - paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID) + paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID, PAPERLESS_OWNER_ID) async with aiosqlite.connect(DB_PATH) as db: await init_db(db) From fa4662b5f379b55b05db0dc5ac8f60e9f30b1f58 Mon Sep 17 00:00:00 2001 From: Jeena Date: Thu, 12 Mar 2026 00:03:18 +0000 Subject: [PATCH 4/5] ingest: Determine PDF vs JPEG from event type, not filename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WhatsApp bridge files may have arbitrary body text (e.g. "Test") that does not end in .pdf, causing the filename-based magic byte check to apply JPEG validation to PDF files and reject them. Pass is_pdf through extract_event_fields and process_event based on the Matrix event type (RoomMessageFile → PDF, RoomMessageImage → JPEG, BadEvent → inferred from msgtype), so validation and content-type are always correct regardless of the filename. --- ingest.py | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/ingest.py b/ingest.py index 9b0d0ae..6659507 100644 --- a/ingest.py +++ b/ingest.py @@ -214,32 +214,30 @@ def is_supported_file(event) -> bool: return False -def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]: - """Returns (event_id, filename, mxc_url, encryption_info_json_or_None).""" +def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]: + """Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf).""" if isinstance(event, BadEvent): content = event.source.get("content", {}) filename = content.get("body", "unknown") file_info = content.get("file", {}) - return event.event_id, filename, file_info["url"], json.dumps(file_info) - if isinstance(event, RoomMessageFile): - filename = event.body or "document.pdf" - else: - filename = event.body or "image.jpg" - return event.event_id, filename, event.url, None + is_pdf = content.get("msgtype") == "m.file" + return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf + is_pdf = isinstance(event, RoomMessageFile) + filename = event.body or ("document.pdf" if is_pdf else "image.jpg") + return event.event_id, filename, event.url, None, is_pdf -def content_type_for(filename: str) -> str: - return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg" +def content_type_for(is_pdf: bool) -> str: + return "application/pdf" if is_pdf else "image/jpeg" -def validate_file(path: Path, filename: str) -> None: - """Raise ValueError if the file doesn't look like the format its name claims.""" +def validate_file(path: Path, filename: str, is_pdf: bool) -> None: + """Raise ValueError if the file content doesn't match the expected format.""" data = path.read_bytes()[:8] - if filename.lower().endswith(".pdf"): + if is_pdf: if not data.startswith(b"%PDF"): raise ValueError(f"File does not start with %PDF magic bytes: {filename}") else: - # JPEG: starts with FF D8 FF if not data[:3] == b"\xff\xd8\xff": raise ValueError(f"File does not start with JPEG magic bytes: {filename}") @@ -343,6 +341,7 @@ async def process_event( filename: str, mxc_url: str, encryption_info: Optional[str], + is_pdf: bool, matrix_client: AsyncClient, db: aiosqlite.Connection, paperless: PaperlessClient, @@ -377,7 +376,7 @@ async def process_event( # Validate the file looks like what it claims to be; skip unsupported formats try: - validate_file(tmp_path, filename) + validate_file(tmp_path, filename, is_pdf) except ValueError as exc: log.info("Skipping unsupported file %s: %s", filename, exc) await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info) @@ -393,7 +392,7 @@ async def process_event( return # Upload and wait for Paperless to confirm it landed - task_id = await paperless.upload(filename, tmp_path, content_type_for(filename)) + task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf)) log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id) doc_id = await paperless.wait_for_task(task_id) @@ -464,9 +463,9 @@ async def catchup_history( for event in response.chunk: if is_supported_file(event): total += 1 - event_id, filename, mxc_url, enc_info = extract_event_fields(event) + event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) await process_event( - event_id, filename, mxc_url, enc_info, + event_id, filename, mxc_url, enc_info, is_pdf, matrix_client, db, paperless, ) @@ -501,7 +500,8 @@ async def retry_loop( for event_id, filename, mxc_url, enc_info in rows: log.info("Retrying %s (%s)", filename, event_id) - await process_event(event_id, filename, mxc_url, enc_info, + is_pdf = (filename or "").lower().endswith(".pdf") + await process_event(event_id, filename, mxc_url, enc_info, is_pdf, matrix_client, db, paperless) @@ -557,8 +557,8 @@ async def main() -> None: # Process events from the initial sync timeline first for event in room_timeline.timeline.events: if is_supported_file(event): - event_id, filename, mxc_url, enc_info = extract_event_fields(event) - await process_event(event_id, filename, mxc_url, enc_info, + event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) + await process_event(event_id, filename, mxc_url, enc_info, is_pdf, matrix_client, db, paperless) # Then paginate backwards for older history @@ -567,9 +567,9 @@ async def main() -> None: async def on_file(room, event): if room.room_id == MATRIX_ROOM_ID and is_supported_file(event): - event_id, filename, mxc_url, enc_info = extract_event_fields(event) + event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) await process_event( - event_id, filename, mxc_url, enc_info, + event_id, filename, mxc_url, enc_info, is_pdf, matrix_client, db, paperless, ) From 9663232d84a636c93f32191cb2089d7a59dbf4b7 Mon Sep 17 00:00:00 2001 From: Jeena Date: Thu, 12 Mar 2026 00:08:26 +0000 Subject: [PATCH 5/5] ingest: Use timestamp-based filenames for WhatsApp files WhatsApp files arrive with empty or non-descriptive body fields. Rather than falling back to generic names like "image.jpg" or "document.pdf", generate names from the event timestamp: whatsapp_YYYY-MM-DD_HH-MM-SS.jpg whatsapp_YYYY-MM-DD_HH-MM-SS.pdf If the body contains text (e.g. a caption), it is prepended: Test - whatsapp_2026-03-11_23-35-13.pdf Files whose body already ends in the correct extension are used as-is. --- ingest.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/ingest.py b/ingest.py index 6659507..21b7a59 100644 --- a/ingest.py +++ b/ingest.py @@ -214,16 +214,32 @@ def is_supported_file(event) -> bool: return False +def _whatsapp_filename(ts_ms: int, is_pdf: bool, body: str) -> str: + """Generate a filename from the event timestamp, optionally prefixed with the body text.""" + from datetime import datetime, timezone + dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) + stamp = dt.strftime("%Y-%m-%d_%H-%M-%S") + ext = ".pdf" if is_pdf else ".jpg" + base = f"whatsapp_{stamp}{ext}" + if body: + return f"{body} - {base}" + return base + + def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]: """Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf).""" if isinstance(event, BadEvent): content = event.source.get("content", {}) - filename = content.get("body", "unknown") + body = content.get("body", "") file_info = content.get("file", {}) is_pdf = content.get("msgtype") == "m.file" + ext = ".pdf" if is_pdf else ".jpg" + filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body) return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf is_pdf = isinstance(event, RoomMessageFile) - filename = event.body or ("document.pdf" if is_pdf else "image.jpg") + ext = ".pdf" if is_pdf else ".jpg" + body = event.body or "" + filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body) return event.event_id, filename, event.url, None, is_pdf @@ -500,7 +516,7 @@ async def retry_loop( for event_id, filename, mxc_url, enc_info in rows: log.info("Retrying %s (%s)", filename, event_id) - is_pdf = (filename or "").lower().endswith(".pdf") + is_pdf = filename.lower().endswith(".pdf") await process_event(event_id, filename, mxc_url, enc_info, is_pdf, matrix_client, db, paperless)