Compare commits

...

5 commits

Author SHA1 Message Date
9663232d84 ingest: Use timestamp-based filenames for WhatsApp files
WhatsApp files arrive with empty or non-descriptive body fields. Rather
than falling back to generic names like "image.jpg" or "document.pdf",
generate names from the event timestamp:

  whatsapp_YYYY-MM-DD_HH-MM-SS.jpg
  whatsapp_YYYY-MM-DD_HH-MM-SS.pdf

If the body contains text (e.g. a caption), it is prepended:

  Test - whatsapp_2026-03-11_23-35-13.pdf

Files whose body already ends in the correct extension are used as-is.
2026-03-12 00:08:26 +00:00
fa4662b5f3 ingest: Determine PDF vs JPEG from event type, not filename
WhatsApp bridge files may have arbitrary body text (e.g. "Test") that
does not end in .pdf, causing the filename-based magic byte check to
apply JPEG validation to PDF files and reject them.

Pass is_pdf through extract_event_fields and process_event based on
the Matrix event type (RoomMessageFile → PDF, RoomMessageImage → JPEG,
BadEvent → inferred from msgtype), so validation and content-type are
always correct regardless of the filename.
2026-03-12 00:03:18 +00:00
f49ea1dbc5 ingest: Assign uploaded documents to a configurable Paperless owner
The post_document endpoint does not support setting ownership on upload,
so after a successful upload the document is PATCHed to set the owner.

Add optional PAPERLESS_OWNER_ID env var. When set, every newly uploaded
document is assigned to that Paperless user ID via PATCH /api/documents/{id}/.
2026-03-11 23:55:00 +00:00
0aa044eead ingest: Accept RoomMessageFile events regardless of body content
WhatsApp bridge files (e.g. PDFs) may arrive with an empty body field,
causing the previous .pdf extension check to silently skip them. Accept
all RoomMessageFile events and fall back to "document.pdf" as filename.
File content is still validated via magic bytes before upload.
2026-03-11 23:35:23 +00:00
eec2d076e4 ingest: Accept RoomMessageImage events regardless of body content
WhatsApp bridge images arrive as RoomMessageImage events with an empty
body field, so the previous .jpg/.jpeg extension check silently rejected
all of them. Accept all RoomMessageImage events and fall back to
"image.jpg" as filename when body is empty. File content is still
validated via magic bytes before upload.
2026-03-11 23:32:15 +00:00
2 changed files with 80 additions and 28 deletions

View file

@ -7,6 +7,8 @@ MATRIX_ROOM_ID=!roomid:jeena.net
PAPERLESS_URL=https://paperless.jeena.net
PAPERLESS_TOKEN=your_paperless_api_token
PAPERLESS_INBOX_TAG_ID=1
# Optional: assign uploaded documents to this Paperless user ID
# PAPERLESS_OWNER_ID=7
# Optional: path to the SQLite state database (default: state.db next to the script)
DB_PATH=state.db

106
ingest.py
View file

@ -43,8 +43,7 @@ class _SSLSMTPHandler(logging.handlers.SMTPHandler):
def emit(self, record: logging.LogRecord) -> None:
import smtplib
try:
host, port = self.mailhost
with smtplib.SMTP_SSL(host, port) as smtp:
with smtplib.SMTP_SSL(self.mailhost, self.mailport) as smtp:
smtp.login(self.username, self.password)
msg = self.format(record)
smtp.sendmail(self.fromaddr, self.toaddrs,
@ -82,6 +81,9 @@ MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"]
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
PAPERLESS_OWNER_ID: Optional[int] = (
int(os.environ["PAPERLESS_OWNER_ID"]) if os.environ.get("PAPERLESS_OWNER_ID") else None
)
DB_PATH = os.environ.get("DB_PATH", "state.db")
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
@ -204,36 +206,54 @@ def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]:
def is_supported_file(event) -> bool:
if isinstance(event, RoomMessageFile):
return (event.body or "").lower().endswith(".pdf")
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
if isinstance(event, RoomMessageImage):
return (event.body or "").lower().endswith((".jpg", ".jpeg"))
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
if isinstance(event, BadEvent):
return _bad_event_encrypted_file_info(event) is not None
return False
def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
def _whatsapp_filename(ts_ms: int, is_pdf: bool, body: str) -> str:
"""Generate a filename from the event timestamp, optionally prefixed with the body text."""
from datetime import datetime, timezone
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
stamp = dt.strftime("%Y-%m-%d_%H-%M-%S")
ext = ".pdf" if is_pdf else ".jpg"
base = f"whatsapp_{stamp}{ext}"
if body:
return f"{body} - {base}"
return base
def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]:
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf)."""
if isinstance(event, BadEvent):
content = event.source.get("content", {})
filename = content.get("body", "unknown")
body = content.get("body", "")
file_info = content.get("file", {})
return event.event_id, filename, file_info["url"], json.dumps(file_info)
return event.event_id, event.body, event.url, None
is_pdf = content.get("msgtype") == "m.file"
ext = ".pdf" if is_pdf else ".jpg"
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf
is_pdf = isinstance(event, RoomMessageFile)
ext = ".pdf" if is_pdf else ".jpg"
body = event.body or ""
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
return event.event_id, filename, event.url, None, is_pdf
def content_type_for(filename: str) -> str:
return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
def content_type_for(is_pdf: bool) -> str:
return "application/pdf" if is_pdf else "image/jpeg"
def validate_file(path: Path, filename: str) -> None:
"""Raise ValueError if the file doesn't look like the format its name claims."""
def validate_file(path: Path, filename: str, is_pdf: bool) -> None:
"""Raise ValueError if the file content doesn't match the expected format."""
data = path.read_bytes()[:8]
if filename.lower().endswith(".pdf"):
if is_pdf:
if not data.startswith(b"%PDF"):
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
else:
# JPEG: starts with FF D8 FF
if not data[:3] == b"\xff\xd8\xff":
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
@ -243,10 +263,11 @@ def validate_file(path: Path, filename: str) -> None:
# ---------------------------------------------------------------------------
class PaperlessClient:
def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None:
def __init__(self, base_url: str, token: str, inbox_tag_id: int, owner_id: Optional[int] = None) -> None:
self.base_url = base_url
self.headers = {"Authorization": f"Token {token}"}
self.inbox_tag_id = inbox_tag_id
self.owner_id = owner_id
async def find_by_checksum(self, checksum: str) -> Optional[int]:
"""Return the Paperless document ID if the checksum already exists."""
@ -315,6 +336,17 @@ class PaperlessClient:
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
async def set_owner(self, doc_id: int, owner_id: int) -> None:
"""Set the owner of a document."""
async with httpx.AsyncClient() as client:
r = await client.patch(
f"{self.base_url}/api/documents/{doc_id}/",
headers=self.headers,
json={"owner": owner_id},
timeout=30,
)
r.raise_for_status()
# ---------------------------------------------------------------------------
# Core processing
@ -325,6 +357,7 @@ async def process_event(
filename: str,
mxc_url: str,
encryption_info: Optional[str],
is_pdf: bool,
matrix_client: AsyncClient,
db: aiosqlite.Connection,
paperless: PaperlessClient,
@ -357,8 +390,13 @@ async def process_event(
else:
tmp.write(dl.body)
# Validate the file looks like what it claims to be
validate_file(tmp_path, filename)
# Validate the file looks like what it claims to be; skip unsupported formats
try:
validate_file(tmp_path, filename, is_pdf)
except ValueError as exc:
log.info("Skipping unsupported file %s: %s", filename, exc)
await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info)
return
# Deduplicate against Paperless by content checksum
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
@ -370,7 +408,7 @@ async def process_event(
return
# Upload and wait for Paperless to confirm it landed
task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf))
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
doc_id = await paperless.wait_for_task(task_id)
@ -381,6 +419,9 @@ async def process_event(
encryption_info, checksum)
else:
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
if paperless.owner_id is not None:
await paperless.set_owner(doc_id, paperless.owner_id)
log.info("Set owner of document id=%d to user id=%d", doc_id, paperless.owner_id)
await upsert_event(db, event_id, filename, mxc_url, "uploaded",
encryption_info, checksum, doc_id)
@ -417,6 +458,8 @@ async def catchup_history(
log.info("Starting historical catchup...")
token = start_token
total = 0
batches = 0
events_seen = 0
while True:
response = await matrix_client.room_messages(
@ -430,12 +473,15 @@ async def catchup_history(
log.error("room_messages error: %s", response)
break
batches += 1
events_seen += len(response.chunk)
for event in response.chunk:
if is_supported_file(event):
total += 1
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
await process_event(
event_id, filename, mxc_url, enc_info,
event_id, filename, mxc_url, enc_info, is_pdf,
matrix_client, db, paperless,
)
@ -443,7 +489,10 @@ async def catchup_history(
break
token = response.end
log.info("Historical catchup complete — processed %d file event(s).", total)
log.info(
"Historical catchup complete — processed %d file event(s) across %d batches (%d total events).",
total, batches, events_seen,
)
# ---------------------------------------------------------------------------
@ -467,7 +516,8 @@ async def retry_loop(
for event_id, filename, mxc_url, enc_info in rows:
log.info("Retrying %s (%s)", filename, event_id)
await process_event(event_id, filename, mxc_url, enc_info,
is_pdf = filename.lower().endswith(".pdf")
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
matrix_client, db, paperless)
@ -493,7 +543,7 @@ async def heartbeat_loop() -> None:
# ---------------------------------------------------------------------------
async def main() -> None:
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID)
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID, PAPERLESS_OWNER_ID)
async with aiosqlite.connect(DB_PATH) as db:
await init_db(db)
@ -523,8 +573,8 @@ async def main() -> None:
# Process events from the initial sync timeline first
for event in room_timeline.timeline.events:
if is_supported_file(event):
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event(event_id, filename, mxc_url, enc_info,
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
matrix_client, db, paperless)
# Then paginate backwards for older history
@ -533,9 +583,9 @@ async def main() -> None:
async def on_file(room, event):
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
await process_event(
event_id, filename, mxc_url, enc_info,
event_id, filename, mxc_url, enc_info, is_pdf,
matrix_client, db, paperless,
)