Compare commits

..

No commits in common. "9663232d84a636c93f32191cb2089d7a59dbf4b7" and "025228b83cea2921f375126ef1c53bee573965de" have entirely different histories.

2 changed files with 28 additions and 80 deletions

View file

@ -7,8 +7,6 @@ MATRIX_ROOM_ID=!roomid:jeena.net
PAPERLESS_URL=https://paperless.jeena.net PAPERLESS_URL=https://paperless.jeena.net
PAPERLESS_TOKEN=your_paperless_api_token PAPERLESS_TOKEN=your_paperless_api_token
PAPERLESS_INBOX_TAG_ID=1 PAPERLESS_INBOX_TAG_ID=1
# Optional: assign uploaded documents to this Paperless user ID
# PAPERLESS_OWNER_ID=7
# Optional: path to the SQLite state database (default: state.db next to the script) # Optional: path to the SQLite state database (default: state.db next to the script)
DB_PATH=state.db DB_PATH=state.db

106
ingest.py
View file

@ -43,7 +43,8 @@ class _SSLSMTPHandler(logging.handlers.SMTPHandler):
def emit(self, record: logging.LogRecord) -> None: def emit(self, record: logging.LogRecord) -> None:
import smtplib import smtplib
try: try:
with smtplib.SMTP_SSL(self.mailhost, self.mailport) as smtp: host, port = self.mailhost
with smtplib.SMTP_SSL(host, port) as smtp:
smtp.login(self.username, self.password) smtp.login(self.username, self.password)
msg = self.format(record) msg = self.format(record)
smtp.sendmail(self.fromaddr, self.toaddrs, smtp.sendmail(self.fromaddr, self.toaddrs,
@ -81,9 +82,6 @@ MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"]
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/") PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"] PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"]) PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
PAPERLESS_OWNER_ID: Optional[int] = (
int(os.environ["PAPERLESS_OWNER_ID"]) if os.environ.get("PAPERLESS_OWNER_ID") else None
)
DB_PATH = os.environ.get("DB_PATH", "state.db") DB_PATH = os.environ.get("DB_PATH", "state.db")
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL") UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
@ -206,54 +204,36 @@ def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]:
def is_supported_file(event) -> bool: def is_supported_file(event) -> bool:
if isinstance(event, RoomMessageFile): if isinstance(event, RoomMessageFile):
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) return (event.body or "").lower().endswith(".pdf")
if isinstance(event, RoomMessageImage): if isinstance(event, RoomMessageImage):
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge) return (event.body or "").lower().endswith((".jpg", ".jpeg"))
if isinstance(event, BadEvent): if isinstance(event, BadEvent):
return _bad_event_encrypted_file_info(event) is not None return _bad_event_encrypted_file_info(event) is not None
return False return False
def _whatsapp_filename(ts_ms: int, is_pdf: bool, body: str) -> str: def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
"""Generate a filename from the event timestamp, optionally prefixed with the body text.""" """Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
from datetime import datetime, timezone
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
stamp = dt.strftime("%Y-%m-%d_%H-%M-%S")
ext = ".pdf" if is_pdf else ".jpg"
base = f"whatsapp_{stamp}{ext}"
if body:
return f"{body} - {base}"
return base
def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]:
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf)."""
if isinstance(event, BadEvent): if isinstance(event, BadEvent):
content = event.source.get("content", {}) content = event.source.get("content", {})
body = content.get("body", "") filename = content.get("body", "unknown")
file_info = content.get("file", {}) file_info = content.get("file", {})
is_pdf = content.get("msgtype") == "m.file" return event.event_id, filename, file_info["url"], json.dumps(file_info)
ext = ".pdf" if is_pdf else ".jpg" return event.event_id, event.body, event.url, None
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf
is_pdf = isinstance(event, RoomMessageFile)
ext = ".pdf" if is_pdf else ".jpg"
body = event.body or ""
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
return event.event_id, filename, event.url, None, is_pdf
def content_type_for(is_pdf: bool) -> str: def content_type_for(filename: str) -> str:
return "application/pdf" if is_pdf else "image/jpeg" return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
def validate_file(path: Path, filename: str, is_pdf: bool) -> None: def validate_file(path: Path, filename: str) -> None:
"""Raise ValueError if the file content doesn't match the expected format.""" """Raise ValueError if the file doesn't look like the format its name claims."""
data = path.read_bytes()[:8] data = path.read_bytes()[:8]
if is_pdf: if filename.lower().endswith(".pdf"):
if not data.startswith(b"%PDF"): if not data.startswith(b"%PDF"):
raise ValueError(f"File does not start with %PDF magic bytes: {filename}") raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
else: else:
# JPEG: starts with FF D8 FF
if not data[:3] == b"\xff\xd8\xff": if not data[:3] == b"\xff\xd8\xff":
raise ValueError(f"File does not start with JPEG magic bytes: {filename}") raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
@ -263,11 +243,10 @@ def validate_file(path: Path, filename: str, is_pdf: bool) -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class PaperlessClient: class PaperlessClient:
def __init__(self, base_url: str, token: str, inbox_tag_id: int, owner_id: Optional[int] = None) -> None: def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None:
self.base_url = base_url self.base_url = base_url
self.headers = {"Authorization": f"Token {token}"} self.headers = {"Authorization": f"Token {token}"}
self.inbox_tag_id = inbox_tag_id self.inbox_tag_id = inbox_tag_id
self.owner_id = owner_id
async def find_by_checksum(self, checksum: str) -> Optional[int]: async def find_by_checksum(self, checksum: str) -> Optional[int]:
"""Return the Paperless document ID if the checksum already exists.""" """Return the Paperless document ID if the checksum already exists."""
@ -336,17 +315,6 @@ class PaperlessClient:
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s") raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
async def set_owner(self, doc_id: int, owner_id: int) -> None:
"""Set the owner of a document."""
async with httpx.AsyncClient() as client:
r = await client.patch(
f"{self.base_url}/api/documents/{doc_id}/",
headers=self.headers,
json={"owner": owner_id},
timeout=30,
)
r.raise_for_status()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Core processing # Core processing
@ -357,7 +325,6 @@ async def process_event(
filename: str, filename: str,
mxc_url: str, mxc_url: str,
encryption_info: Optional[str], encryption_info: Optional[str],
is_pdf: bool,
matrix_client: AsyncClient, matrix_client: AsyncClient,
db: aiosqlite.Connection, db: aiosqlite.Connection,
paperless: PaperlessClient, paperless: PaperlessClient,
@ -390,13 +357,8 @@ async def process_event(
else: else:
tmp.write(dl.body) tmp.write(dl.body)
# Validate the file looks like what it claims to be; skip unsupported formats # Validate the file looks like what it claims to be
try: validate_file(tmp_path, filename)
validate_file(tmp_path, filename, is_pdf)
except ValueError as exc:
log.info("Skipping unsupported file %s: %s", filename, exc)
await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info)
return
# Deduplicate against Paperless by content checksum # Deduplicate against Paperless by content checksum
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest() checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
@ -408,7 +370,7 @@ async def process_event(
return return
# Upload and wait for Paperless to confirm it landed # Upload and wait for Paperless to confirm it landed
task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf)) task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id) log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
doc_id = await paperless.wait_for_task(task_id) doc_id = await paperless.wait_for_task(task_id)
@ -419,9 +381,6 @@ async def process_event(
encryption_info, checksum) encryption_info, checksum)
else: else:
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename) log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
if paperless.owner_id is not None:
await paperless.set_owner(doc_id, paperless.owner_id)
log.info("Set owner of document id=%d to user id=%d", doc_id, paperless.owner_id)
await upsert_event(db, event_id, filename, mxc_url, "uploaded", await upsert_event(db, event_id, filename, mxc_url, "uploaded",
encryption_info, checksum, doc_id) encryption_info, checksum, doc_id)
@ -458,8 +417,6 @@ async def catchup_history(
log.info("Starting historical catchup...") log.info("Starting historical catchup...")
token = start_token token = start_token
total = 0 total = 0
batches = 0
events_seen = 0
while True: while True:
response = await matrix_client.room_messages( response = await matrix_client.room_messages(
@ -473,15 +430,12 @@ async def catchup_history(
log.error("room_messages error: %s", response) log.error("room_messages error: %s", response)
break break
batches += 1
events_seen += len(response.chunk)
for event in response.chunk: for event in response.chunk:
if is_supported_file(event): if is_supported_file(event):
total += 1 total += 1
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event( await process_event(
event_id, filename, mxc_url, enc_info, is_pdf, event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless, matrix_client, db, paperless,
) )
@ -489,10 +443,7 @@ async def catchup_history(
break break
token = response.end token = response.end
log.info( log.info("Historical catchup complete — processed %d file event(s).", total)
"Historical catchup complete — processed %d file event(s) across %d batches (%d total events).",
total, batches, events_seen,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -516,8 +467,7 @@ async def retry_loop(
for event_id, filename, mxc_url, enc_info in rows: for event_id, filename, mxc_url, enc_info in rows:
log.info("Retrying %s (%s)", filename, event_id) log.info("Retrying %s (%s)", filename, event_id)
is_pdf = filename.lower().endswith(".pdf") await process_event(event_id, filename, mxc_url, enc_info,
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
matrix_client, db, paperless) matrix_client, db, paperless)
@ -543,7 +493,7 @@ async def heartbeat_loop() -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def main() -> None: async def main() -> None:
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID, PAPERLESS_OWNER_ID) paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID)
async with aiosqlite.connect(DB_PATH) as db: async with aiosqlite.connect(DB_PATH) as db:
await init_db(db) await init_db(db)
@ -573,8 +523,8 @@ async def main() -> None:
# Process events from the initial sync timeline first # Process events from the initial sync timeline first
for event in room_timeline.timeline.events: for event in room_timeline.timeline.events:
if is_supported_file(event): if is_supported_file(event):
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event(event_id, filename, mxc_url, enc_info, is_pdf, await process_event(event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless) matrix_client, db, paperless)
# Then paginate backwards for older history # Then paginate backwards for older history
@ -583,9 +533,9 @@ async def main() -> None:
async def on_file(room, event): async def on_file(room, event):
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event): if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event) event_id, filename, mxc_url, enc_info = extract_event_fields(event)
await process_event( await process_event(
event_id, filename, mxc_url, enc_info, is_pdf, event_id, filename, mxc_url, enc_info,
matrix_client, db, paperless, matrix_client, db, paperless,
) )