Compare commits
5 commits
025228b83c
...
9663232d84
| Author | SHA1 | Date | |
|---|---|---|---|
| 9663232d84 | |||
| fa4662b5f3 | |||
| f49ea1dbc5 | |||
| 0aa044eead | |||
| eec2d076e4 |
2 changed files with 80 additions and 28 deletions
|
|
@ -7,6 +7,8 @@ MATRIX_ROOM_ID=!roomid:jeena.net
|
||||||
PAPERLESS_URL=https://paperless.jeena.net
|
PAPERLESS_URL=https://paperless.jeena.net
|
||||||
PAPERLESS_TOKEN=your_paperless_api_token
|
PAPERLESS_TOKEN=your_paperless_api_token
|
||||||
PAPERLESS_INBOX_TAG_ID=1
|
PAPERLESS_INBOX_TAG_ID=1
|
||||||
|
# Optional: assign uploaded documents to this Paperless user ID
|
||||||
|
# PAPERLESS_OWNER_ID=7
|
||||||
|
|
||||||
# Optional: path to the SQLite state database (default: state.db next to the script)
|
# Optional: path to the SQLite state database (default: state.db next to the script)
|
||||||
DB_PATH=state.db
|
DB_PATH=state.db
|
||||||
|
|
|
||||||
106
ingest.py
106
ingest.py
|
|
@ -43,8 +43,7 @@ class _SSLSMTPHandler(logging.handlers.SMTPHandler):
|
||||||
def emit(self, record: logging.LogRecord) -> None:
|
def emit(self, record: logging.LogRecord) -> None:
|
||||||
import smtplib
|
import smtplib
|
||||||
try:
|
try:
|
||||||
host, port = self.mailhost
|
with smtplib.SMTP_SSL(self.mailhost, self.mailport) as smtp:
|
||||||
with smtplib.SMTP_SSL(host, port) as smtp:
|
|
||||||
smtp.login(self.username, self.password)
|
smtp.login(self.username, self.password)
|
||||||
msg = self.format(record)
|
msg = self.format(record)
|
||||||
smtp.sendmail(self.fromaddr, self.toaddrs,
|
smtp.sendmail(self.fromaddr, self.toaddrs,
|
||||||
|
|
@ -82,6 +81,9 @@ MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"]
|
||||||
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
|
PAPERLESS_URL = os.environ["PAPERLESS_URL"].rstrip("/")
|
||||||
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
|
PAPERLESS_TOKEN = os.environ["PAPERLESS_TOKEN"]
|
||||||
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
|
PAPERLESS_INBOX_TAG_ID = int(os.environ["PAPERLESS_INBOX_TAG_ID"])
|
||||||
|
PAPERLESS_OWNER_ID: Optional[int] = (
|
||||||
|
int(os.environ["PAPERLESS_OWNER_ID"]) if os.environ.get("PAPERLESS_OWNER_ID") else None
|
||||||
|
)
|
||||||
|
|
||||||
DB_PATH = os.environ.get("DB_PATH", "state.db")
|
DB_PATH = os.environ.get("DB_PATH", "state.db")
|
||||||
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
|
UPTIME_KUMA_PUSH_URL = os.environ.get("UPTIME_KUMA_PUSH_URL")
|
||||||
|
|
@ -204,36 +206,54 @@ def _bad_event_encrypted_file_info(event: BadEvent) -> Optional[dict]:
|
||||||
|
|
||||||
def is_supported_file(event) -> bool:
|
def is_supported_file(event) -> bool:
|
||||||
if isinstance(event, RoomMessageFile):
|
if isinstance(event, RoomMessageFile):
|
||||||
return (event.body or "").lower().endswith(".pdf")
|
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
|
||||||
if isinstance(event, RoomMessageImage):
|
if isinstance(event, RoomMessageImage):
|
||||||
return (event.body or "").lower().endswith((".jpg", ".jpeg"))
|
return True # validate magic bytes later; body may be empty (e.g. WhatsApp bridge)
|
||||||
if isinstance(event, BadEvent):
|
if isinstance(event, BadEvent):
|
||||||
return _bad_event_encrypted_file_info(event) is not None
|
return _bad_event_encrypted_file_info(event) is not None
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def extract_event_fields(event) -> tuple[str, str, str, Optional[str]]:
|
def _whatsapp_filename(ts_ms: int, is_pdf: bool, body: str) -> str:
|
||||||
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None)."""
|
"""Generate a filename from the event timestamp, optionally prefixed with the body text."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
|
||||||
|
stamp = dt.strftime("%Y-%m-%d_%H-%M-%S")
|
||||||
|
ext = ".pdf" if is_pdf else ".jpg"
|
||||||
|
base = f"whatsapp_{stamp}{ext}"
|
||||||
|
if body:
|
||||||
|
return f"{body} - {base}"
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
def extract_event_fields(event) -> tuple[str, str, str, Optional[str], bool]:
|
||||||
|
"""Returns (event_id, filename, mxc_url, encryption_info_json_or_None, is_pdf)."""
|
||||||
if isinstance(event, BadEvent):
|
if isinstance(event, BadEvent):
|
||||||
content = event.source.get("content", {})
|
content = event.source.get("content", {})
|
||||||
filename = content.get("body", "unknown")
|
body = content.get("body", "")
|
||||||
file_info = content.get("file", {})
|
file_info = content.get("file", {})
|
||||||
return event.event_id, filename, file_info["url"], json.dumps(file_info)
|
is_pdf = content.get("msgtype") == "m.file"
|
||||||
return event.event_id, event.body, event.url, None
|
ext = ".pdf" if is_pdf else ".jpg"
|
||||||
|
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
|
||||||
|
return event.event_id, filename, file_info["url"], json.dumps(file_info), is_pdf
|
||||||
|
is_pdf = isinstance(event, RoomMessageFile)
|
||||||
|
ext = ".pdf" if is_pdf else ".jpg"
|
||||||
|
body = event.body or ""
|
||||||
|
filename = body if body.lower().endswith(ext) else _whatsapp_filename(event.server_timestamp, is_pdf, body)
|
||||||
|
return event.event_id, filename, event.url, None, is_pdf
|
||||||
|
|
||||||
|
|
||||||
def content_type_for(filename: str) -> str:
|
def content_type_for(is_pdf: bool) -> str:
|
||||||
return "application/pdf" if filename.lower().endswith(".pdf") else "image/jpeg"
|
return "application/pdf" if is_pdf else "image/jpeg"
|
||||||
|
|
||||||
|
|
||||||
def validate_file(path: Path, filename: str) -> None:
|
def validate_file(path: Path, filename: str, is_pdf: bool) -> None:
|
||||||
"""Raise ValueError if the file doesn't look like the format its name claims."""
|
"""Raise ValueError if the file content doesn't match the expected format."""
|
||||||
data = path.read_bytes()[:8]
|
data = path.read_bytes()[:8]
|
||||||
if filename.lower().endswith(".pdf"):
|
if is_pdf:
|
||||||
if not data.startswith(b"%PDF"):
|
if not data.startswith(b"%PDF"):
|
||||||
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
|
raise ValueError(f"File does not start with %PDF magic bytes: {filename}")
|
||||||
else:
|
else:
|
||||||
# JPEG: starts with FF D8 FF
|
|
||||||
if not data[:3] == b"\xff\xd8\xff":
|
if not data[:3] == b"\xff\xd8\xff":
|
||||||
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
|
raise ValueError(f"File does not start with JPEG magic bytes: {filename}")
|
||||||
|
|
||||||
|
|
@ -243,10 +263,11 @@ def validate_file(path: Path, filename: str) -> None:
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
class PaperlessClient:
|
class PaperlessClient:
|
||||||
def __init__(self, base_url: str, token: str, inbox_tag_id: int) -> None:
|
def __init__(self, base_url: str, token: str, inbox_tag_id: int, owner_id: Optional[int] = None) -> None:
|
||||||
self.base_url = base_url
|
self.base_url = base_url
|
||||||
self.headers = {"Authorization": f"Token {token}"}
|
self.headers = {"Authorization": f"Token {token}"}
|
||||||
self.inbox_tag_id = inbox_tag_id
|
self.inbox_tag_id = inbox_tag_id
|
||||||
|
self.owner_id = owner_id
|
||||||
|
|
||||||
async def find_by_checksum(self, checksum: str) -> Optional[int]:
|
async def find_by_checksum(self, checksum: str) -> Optional[int]:
|
||||||
"""Return the Paperless document ID if the checksum already exists."""
|
"""Return the Paperless document ID if the checksum already exists."""
|
||||||
|
|
@ -315,6 +336,17 @@ class PaperlessClient:
|
||||||
|
|
||||||
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
|
raise RuntimeError(f"Paperless task {task_id} timed out after {PAPERLESS_TASK_TIMEOUT}s")
|
||||||
|
|
||||||
|
async def set_owner(self, doc_id: int, owner_id: int) -> None:
|
||||||
|
"""Set the owner of a document."""
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
r = await client.patch(
|
||||||
|
f"{self.base_url}/api/documents/{doc_id}/",
|
||||||
|
headers=self.headers,
|
||||||
|
json={"owner": owner_id},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Core processing
|
# Core processing
|
||||||
|
|
@ -325,6 +357,7 @@ async def process_event(
|
||||||
filename: str,
|
filename: str,
|
||||||
mxc_url: str,
|
mxc_url: str,
|
||||||
encryption_info: Optional[str],
|
encryption_info: Optional[str],
|
||||||
|
is_pdf: bool,
|
||||||
matrix_client: AsyncClient,
|
matrix_client: AsyncClient,
|
||||||
db: aiosqlite.Connection,
|
db: aiosqlite.Connection,
|
||||||
paperless: PaperlessClient,
|
paperless: PaperlessClient,
|
||||||
|
|
@ -357,8 +390,13 @@ async def process_event(
|
||||||
else:
|
else:
|
||||||
tmp.write(dl.body)
|
tmp.write(dl.body)
|
||||||
|
|
||||||
# Validate the file looks like what it claims to be
|
# Validate the file looks like what it claims to be; skip unsupported formats
|
||||||
validate_file(tmp_path, filename)
|
try:
|
||||||
|
validate_file(tmp_path, filename, is_pdf)
|
||||||
|
except ValueError as exc:
|
||||||
|
log.info("Skipping unsupported file %s: %s", filename, exc)
|
||||||
|
await upsert_event(db, event_id, filename, mxc_url, "skipped", encryption_info)
|
||||||
|
return
|
||||||
|
|
||||||
# Deduplicate against Paperless by content checksum
|
# Deduplicate against Paperless by content checksum
|
||||||
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
|
checksum = hashlib.md5(tmp_path.read_bytes()).hexdigest()
|
||||||
|
|
@ -370,7 +408,7 @@ async def process_event(
|
||||||
return
|
return
|
||||||
|
|
||||||
# Upload and wait for Paperless to confirm it landed
|
# Upload and wait for Paperless to confirm it landed
|
||||||
task_id = await paperless.upload(filename, tmp_path, content_type_for(filename))
|
task_id = await paperless.upload(filename, tmp_path, content_type_for(is_pdf))
|
||||||
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
|
log.info("Uploaded %s → waiting for Paperless task %s", filename, task_id)
|
||||||
doc_id = await paperless.wait_for_task(task_id)
|
doc_id = await paperless.wait_for_task(task_id)
|
||||||
|
|
||||||
|
|
@ -381,6 +419,9 @@ async def process_event(
|
||||||
encryption_info, checksum)
|
encryption_info, checksum)
|
||||||
else:
|
else:
|
||||||
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
|
log.info("Confirmed in Paperless as document id=%d: %s", doc_id, filename)
|
||||||
|
if paperless.owner_id is not None:
|
||||||
|
await paperless.set_owner(doc_id, paperless.owner_id)
|
||||||
|
log.info("Set owner of document id=%d to user id=%d", doc_id, paperless.owner_id)
|
||||||
await upsert_event(db, event_id, filename, mxc_url, "uploaded",
|
await upsert_event(db, event_id, filename, mxc_url, "uploaded",
|
||||||
encryption_info, checksum, doc_id)
|
encryption_info, checksum, doc_id)
|
||||||
|
|
||||||
|
|
@ -417,6 +458,8 @@ async def catchup_history(
|
||||||
log.info("Starting historical catchup...")
|
log.info("Starting historical catchup...")
|
||||||
token = start_token
|
token = start_token
|
||||||
total = 0
|
total = 0
|
||||||
|
batches = 0
|
||||||
|
events_seen = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
response = await matrix_client.room_messages(
|
response = await matrix_client.room_messages(
|
||||||
|
|
@ -430,12 +473,15 @@ async def catchup_history(
|
||||||
log.error("room_messages error: %s", response)
|
log.error("room_messages error: %s", response)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
batches += 1
|
||||||
|
events_seen += len(response.chunk)
|
||||||
|
|
||||||
for event in response.chunk:
|
for event in response.chunk:
|
||||||
if is_supported_file(event):
|
if is_supported_file(event):
|
||||||
total += 1
|
total += 1
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(
|
await process_event(
|
||||||
event_id, filename, mxc_url, enc_info,
|
event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless,
|
matrix_client, db, paperless,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -443,7 +489,10 @@ async def catchup_history(
|
||||||
break
|
break
|
||||||
token = response.end
|
token = response.end
|
||||||
|
|
||||||
log.info("Historical catchup complete — processed %d file event(s).", total)
|
log.info(
|
||||||
|
"Historical catchup complete — processed %d file event(s) across %d batches (%d total events).",
|
||||||
|
total, batches, events_seen,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
@ -467,7 +516,8 @@ async def retry_loop(
|
||||||
|
|
||||||
for event_id, filename, mxc_url, enc_info in rows:
|
for event_id, filename, mxc_url, enc_info in rows:
|
||||||
log.info("Retrying %s (%s)", filename, event_id)
|
log.info("Retrying %s (%s)", filename, event_id)
|
||||||
await process_event(event_id, filename, mxc_url, enc_info,
|
is_pdf = filename.lower().endswith(".pdf")
|
||||||
|
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless)
|
matrix_client, db, paperless)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -493,7 +543,7 @@ async def heartbeat_loop() -> None:
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID)
|
paperless = PaperlessClient(PAPERLESS_URL, PAPERLESS_TOKEN, PAPERLESS_INBOX_TAG_ID, PAPERLESS_OWNER_ID)
|
||||||
|
|
||||||
async with aiosqlite.connect(DB_PATH) as db:
|
async with aiosqlite.connect(DB_PATH) as db:
|
||||||
await init_db(db)
|
await init_db(db)
|
||||||
|
|
@ -523,8 +573,8 @@ async def main() -> None:
|
||||||
# Process events from the initial sync timeline first
|
# Process events from the initial sync timeline first
|
||||||
for event in room_timeline.timeline.events:
|
for event in room_timeline.timeline.events:
|
||||||
if is_supported_file(event):
|
if is_supported_file(event):
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(event_id, filename, mxc_url, enc_info,
|
await process_event(event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless)
|
matrix_client, db, paperless)
|
||||||
|
|
||||||
# Then paginate backwards for older history
|
# Then paginate backwards for older history
|
||||||
|
|
@ -533,9 +583,9 @@ async def main() -> None:
|
||||||
|
|
||||||
async def on_file(room, event):
|
async def on_file(room, event):
|
||||||
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
|
if room.room_id == MATRIX_ROOM_ID and is_supported_file(event):
|
||||||
event_id, filename, mxc_url, enc_info = extract_event_fields(event)
|
event_id, filename, mxc_url, enc_info, is_pdf = extract_event_fields(event)
|
||||||
await process_event(
|
await process_event(
|
||||||
event_id, filename, mxc_url, enc_info,
|
event_id, filename, mxc_url, enc_info, is_pdf,
|
||||||
matrix_client, db, paperless,
|
matrix_client, db, paperless,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue