ingest: Determine PDF vs JPEG from event type, not filename
WhatsApp bridge files may have arbitrary body text (e.g. "Test") that does not end in .pdf, causing the filename-based magic byte check to apply JPEG validation to PDF files and reject them. Pass is_pdf through extract_event_fields and process_event based on the Matrix event type (RoomMessageFile → PDF, RoomMessageImage → JPEG, BadEvent → inferred from msgtype), so validation and content-type are always correct regardless of the filename.
This commit is contained in:
parent
f49ea1dbc5
commit
fa4662b5f3
1 changed files with 23 additions and 23 deletions
46
ingest.py
46
ingest.py
|
|
@ -214,32 +214,30 @@ def is_supported_file(event) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
|
def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]:
|
||||||
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
|
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf)."""
|
||||||
if isinstance(event, BadEvent):
|
if isinstance(event, BadEvent):
|
||||||
content = event.source.get("content", {})
|
content = event.source.get("content", {})
|
||||||
filename = content.get("body", "unknown")
|
filename = content.get("body", "unknown")
|
||||||
file_info = content.get("file", {})
|
file_info = content.get("file", {})
|
||||||
return event.event_id, filename, file_info["url"], json.dumps(file_info)
|
is_pdf = content.get("msgtype") == "m.file"
|
||||||
if isinstance(event, RoomMessageFile):
|
return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf
|
||||||
filename = event.body or "document.pdf"
|
is_pdf = isinstance(event, RoomMessageFile)
|
||||||
else:
|
filename = event.body or ("document.pdf" if is_pdf else "image.jpg")
|
||||||
filename = event.body or "image.jpg"
|
return event.event_id, filename, event.url, None, is_pdf
|
||||||
return event.event_id, filename, event.url, None
|
|
||||||
|
|
||||||
|
|
||||||
def content_type_for(filename: str) -> str:
|
def content_type_for(is_pdf: bool) -> str:
|
||||||
return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
|
return "application/pdf" if is_pdf else "image/jpeg"
|
||||||
|
|
||||||
|
|
||||||
def validate_file(path: Path, filename: str) -> None:
|
def validate_file(path: Path, filename: str, is_pdf: bool) -> None:
|
||||||
"""Raise ValueError if the file doesn't look like the format its name claims."""
|
"""Raise ValueError if the file content doesn't match the expected format."""
|
||||||
data = path.read_bytes()[:8]
|
data = path.read_bytes()[:8]
|
||||||
if filename.lower().endswith(".pdf"):
|
if is_pdf:
|
||||||
if not data.startswith(b"%PDF"):
|
if not data.startswith(b"%PDF"):
|
||||||
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
|
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
|
||||||
else:
|
else:
|
||||||
# JPEG: starts with FF D8 FF
|
|
||||||
if not data[:3] == b"\xff\xd8\xff":
|
if not data[:3] == b"\xff\xd8\xff":
|
||||||
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
|
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
|
||||||
|
|
||||||
|
|
@ -343,6 +341,7 @@ async def process_event(
|
||||||
filename: str,
|
filename: str,
|
||||||
mxc_url: str,
|
mxc_url: str,
|
||||||
encryption_info: Optional[str],
|
encryption_info: Optional[str],
|
||||||
|
is_pdf: bool,
|
||||||
matrix_client: AsyncClient,
|
matrix_client: AsyncClient,
|
||||||
db: aiosqlite.Connection,
|
db: aiosqlite.Connection,
|
||||||
paperless: PaperlessClient,
|
paperless: PaperlessClient,
|
||||||
|
|
@ -377,7 +376,7 @@ async def process_event(
|
||||||
|
|
||||||
# Validate the file looks like what it claims to be; skip unsupported formats
|
# Validate the file looks like what it claims to be; skip unsupported formats
|
||||||
try:
|
try:
|
||||||
validate_file(tmp_path, filename)
|
validate_file(tmp_path, filename, is_pdf)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
log.info("Skipping unsupported file %s: %s", filename, exc)
|
log.info("Skipping unsupported file %s: %s", filename, exc)
|
||||||
await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info)
|
await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info)
|
||||||
|
|
@ -393,7 +392,7 @@ async def process_event(
|
||||||
return
|
return
|
||||||
|
|
||||||
# Upload and wait for Paperless to confirm it landed
|
# Upload and wait for Paperless to confirm it landed
|
||||||
task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
|
task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf))
|
||||||
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
|
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
|
||||||
doc_id = await paperless.wait_for_task(task_id)
|
doc_id = await paperless.wait_for_task(task_id)
|
||||||
|
|
||||||
|
|
@ -464,9 +463,9 @@ async def catchup_history(
|
||||||
for event in response.chunk:
|
for event in response.chunk:
|
||||||
if is_supported_file(event):
|
if is_supported_file(event):
|
||||||
total += 1
|
total += 1
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(
|
await process_event(
|
||||||
event_id, filename, mxc_url, enc_info,
|
event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless,
|
matrix_client, db, paperless,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -501,7 +500,8 @@ async def retry_loop(
|
||||||
|
|
||||||
for event_id, filename, mxc_url, enc_info in rows:
|
for event_id, filename, mxc_url, enc_info in rows:
|
||||||
log.info("Retrying %s (%s)", filename, event_id)
|
log.info("Retrying %s (%s)", filename, event_id)
|
||||||
await process_event(event_id, filename, mxc_url, enc_info,
|
is_pdf = (filename or "").lower().endswith(".pdf")
|
||||||
|
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless)
|
matrix_client, db, paperless)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -557,8 +557,8 @@ async def main() -> None:
|
||||||
# Process events from the initial sync timeline first
|
# Process events from the initial sync timeline first
|
||||||
for event in room_timeline.timeline.events:
|
for event in room_timeline.timeline.events:
|
||||||
if is_supported_file(event):
|
if is_supported_file(event):
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(event_id, filename, mxc_url, enc_info,
|
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless)
|
matrix_client, db, paperless)
|
||||||
|
|
||||||
# Then paginate backwards for older history
|
# Then paginate backwards for older history
|
||||||
|
|
@ -567,9 +567,9 @@ async def main() -> None:
|
||||||
|
|
||||||
async def on_file(room, event):
|
async def on_file(room, event):
|
||||||
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
|
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(
|
await process_event(
|
||||||
event_id, filename, mxc_url, enc_info,
|
event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless,
|
matrix_client, db, paperless,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue