diff --git a/.env.example b/.env.example index 6e0da38..14a8157 100644 --- a/.env.example +++ b/.env.example @@ -7,8 +7,6 @@ MATRIX_ROOM_ID=!roomid:jeena.net PAPERLESS_URL=https://paperless.jeena.net PAPERLESS_TOKEN=your_paperless_api_token PAPERLESS_INBOX_TAG_ID=1 -# Optional: assign uploaded documents to this Paperless user ID -# PAPERLESS_OWNER_ID=7 # Optional: path to the SQLite state database (default: state.db next to the script) DB_PATH=state.db diff --git a/ingest.py b/ingest.py index 21b7a59..29585ab 100644 --- a/ingest.py +++ b/ingest.py @@ -43,7 +43,8 @@ class _SSLSMTPHandler(logging.handlers.SMTPHandler): def emit(self, record: logging.LogRecord) -> None: import smtplib try: - with smtplib.SMTP_SSL(self.mailhost, self.mailport) as smtp: + host, port = self.mailhost + with smtplib.SMTP_SSL(host, port) as smtp: smtp.login(self.username, self.password) msg = self.format(record) smtp.sendmail(self.fromaddr, self.toaddrs, @@ -81,9 +82,6 @@ MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"] PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/") PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"] PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"]) -PAPERLESS_OWNER_ID: Optional[int] = ( - int(os.environ["PAPERLESS_OWNER_ID"]) if os.environ.get("PAPERLESS_OWNER_ID") else None -) DB_PATH = os.environ.get("DB_PATH", "state.db") UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL") @@ -206,54 +204,36 @@ def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]: def is_supported_file(event) -> bool: if isinstance(event, RoomMessageFile): - return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) + return (event.body or "").lower().endswith(".pdf") if isinstance(event, RoomMessageImage): - return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) + return (event.body or "").lower().endswith((".jpg", ".jpeg")) if isinstance(event, BadEvent): return _bad_event_encrypted_file_info(event) is not None return False -def _whatsapp_filename(ts_ms: int, is_pdf: bool, body: str) -> str: - """Generate a filename from the event timestamp, optionally prefixed with the body text.""" - from datetime import datetime, timezone - dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) - stamp = dt.strftime("%Y-%m-%d_%H-%M-%S") - ext = ".pdf" if is_pdf else ".jpg" - base = f"whatsapp_{stamp}{ext}" - if body: - return f"{body} - {base}" - return base - - -def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]: - """Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf).""" +def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]: + """Returns (event_id, filename, mxc_url, encryption_info_json_or_None).""" if isinstance(event, BadEvent): content = event.source.get("content", {}) - body = content.get("body", "") + filename = content.get("body", "unknown") file_info = content.get("file", {}) - is_pdf = content.get("msgtype") == "m.file" - ext = ".pdf" if is_pdf else ".jpg" - filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body) - return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf - is_pdf = isinstance(event, RoomMessageFile) - ext = ".pdf" if is_pdf else ".jpg" - body = event.body or "" - filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body) - return event.event_id, filename, event.url, None, is_pdf + return event.event_id, filename, file_info["url"], json.dumps(file_info) + return event.event_id, event.body, event.url, None -def content_type_for(is_pdf: bool) -> str: - return "application/pdf" if is_pdf else "image/jpeg" +def content_type_for(filename: str) -> str: + return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg" -def validate_file(path: Path, filename: str, is_pdf: bool) -> None: - """Raise ValueError if the file content doesn't match the expected format.""" +def validate_file(path: Path, filename: str) -> None: + """Raise ValueError if the file doesn't look like the format its name claims.""" data = path.read_bytes()[:8] - if is_pdf: + if filename.lower().endswith(".pdf"): if not data.startswith(b"%PDF"): raise ValueError(f"File does not start with %PDF magic bytes: {filename}") else: + # JPEG: starts with FF D8 FF if not data[:3] == b"\xff\xd8\xff": raise ValueError(f"File does not start with JPEG magic bytes: {filename}") @@ -263,11 +243,10 @@ def validate_file(path: Path, filename: str, is_pdf: bool) -> None: # --------------------------------------------------------------------------- class PaperlessClient: - def __init__(self, base_url: str, token: str, inbox_tag_id: int, owner_id: Optional[int] = None) -> None: + def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None: self.base_url = base_url self.headers = {"Authorization": f"Token {token}"} self.inbox_tag_id = inbox_tag_id - self.owner_id = owner_id async def find_by_checksum(self, checksum: str) -> Optional[int]: """Return the Paperless document ID if the checksum already exists.""" @@ -336,17 +315,6 @@ class PaperlessClient: raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s") - async def set_owner(self, doc_id: int, owner_id: int) -> None: - """Set the owner of a document.""" - async with httpx.AsyncClient() as client: - r = await client.patch( - f"{self.base_url}/api/documents/{doc_id}/", - headers=self.headers, - json={"owner": owner_id}, - timeout=30, - ) - r.raise_for_status() - # --------------------------------------------------------------------------- # Core processing @@ -357,7 +325,6 @@ async def process_event( filename: str, mxc_url: str, encryption_info: Optional[str], - is_pdf: bool, matrix_client: AsyncClient, db: aiosqlite.Connection, paperless: PaperlessClient, @@ -390,13 +357,8 @@ async def process_event( else: tmp.write(dl.body) - # Validate the file looks like what it claims to be; skip unsupported formats - try: - validate_file(tmp_path, filename, is_pdf) - except ValueError as exc: - log.info("Skipping unsupported file %s: %s", filename, exc) - await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info) - return + # Validate the file looks like what it claims to be + validate_file(tmp_path, filename) # Deduplicate against Paperless by content checksum checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest() @@ -408,7 +370,7 @@ async def process_event( return # Upload and wait for Paperless to confirm it landed - task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf)) + task_id = await paperless.upload(filename, tmp_path, content_type_for(filename)) log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id) doc_id = await paperless.wait_for_task(task_id) @@ -419,9 +381,6 @@ async def process_event( encryption_info, checksum) else: log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename) - if paperless.owner_id is not None: - await paperless.set_owner(doc_id, paperless.owner_id) - log.info("Set owner of document id=%d to user id=%d", doc_id, paperless.owner_id) await upsert_event(db, event_id, filename, mxc_url, "uploaded", encryption_info, checksum, doc_id) @@ -458,8 +417,6 @@ async def catchup_history( log.info("Starting historical catchup...") token = start_token total = 0 - batches = 0 - events_seen = 0 while True: response = await matrix_client.room_messages( @@ -473,15 +430,12 @@ async def catchup_history( log.error("room_messages error: %s", response) break - batches += 1 - events_seen += len(response.chunk) - for event in response.chunk: if is_supported_file(event): total += 1 - event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) + event_id, filename, mxc_url, enc_info = extract_event_fields(event) await process_event( - event_id, filename, mxc_url, enc_info, is_pdf, + event_id, filename, mxc_url, enc_info, matrix_client, db, paperless, ) @@ -489,10 +443,7 @@ async def catchup_history( break token = response.end - log.info( - "Historical catchup complete — processed %d file event(s) across %d batches (%d total events).", - total, batches, events_seen, - ) + log.info("Historical catchup complete — processed %d file event(s).", total) # --------------------------------------------------------------------------- @@ -516,8 +467,7 @@ async def retry_loop( for event_id, filename, mxc_url, enc_info in rows: log.info("Retrying %s (%s)", filename, event_id) - is_pdf = filename.lower().endswith(".pdf") - await process_event(event_id, filename, mxc_url, enc_info, is_pdf, + await process_event(event_id, filename, mxc_url, enc_info, matrix_client, db, paperless) @@ -543,7 +493,7 @@ async def heartbeat_loop() -> None: # --------------------------------------------------------------------------- async def main() -> None: - paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID, PAPERLESS_OWNER_ID) + paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID) async with aiosqlite.connect(DB_PATH) as db: await init_db(db) @@ -573,8 +523,8 @@ async def main() -> None: # Process events from the initial sync timeline first for event in room_timeline.timeline.events: if is_supported_file(event): - event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) - await process_event(event_id, filename, mxc_url, enc_info, is_pdf, + event_id, filename, mxc_url, enc_info = extract_event_fields(event) + await process_event(event_id, filename, mxc_url, enc_info, matrix_client, db, paperless) # Then paginate backwards for older history @@ -583,9 +533,9 @@ async def main() -> None: async def on_file(room, event): if room.room_id == MATRIX_ROOM_ID and is_supported_file(event): - event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) + event_id, filename, mxc_url, enc_info = extract_event_fields(event) await process_event( - event_id, filename, mxc_url, enc_info, is_pdf, + event_id, filename, mxc_url, enc_info, matrix_client, db, paperless, )