email-forwarder/email_forwarder/email_forwarder.py
Jeena 05f2e6c870 Fail script on all server errors
Remove local exception handling for IMAP operations to ensure script
exits on any server failure (e.g., auth, connection, search errors).
This allows systemd OnFailure to notify on fixable issues.

Changes:
- Remove try-except in check_duplicate, sync_folder, sync_all_folders
- Let IMAP exceptions propagate to main() for exit(1)
2026-01-04 20:10:33 +09:00

267 lines
8.5 KiB
Python

import imaplib
import os
import logging
import sys
from dotenv import load_dotenv
from datetime import datetime, timezone
from email import message_from_bytes
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class ImapClient:
"""Base class for IMAP operations."""
def __init__(self, host: str, port: int, user: str, password: str):
self._host = host
self._port = port
self._user = user
self._password = password
self._imap = None
def _connect(self):
"""Private: Connect and login to IMAP."""
self._imap = imaplib.IMAP4_SSL(self._host, self._port)
self._imap.login(self._user, self._password)
def _disconnect(self):
"""Private: Logout and close connection."""
if self._imap:
self._imap.logout()
self._imap = None
def _list_folders(self):
"""Private: List all folders."""
typ, data = self._imap.list()
if typ != "OK":
raise Exception("Failed to list folders")
folders = []
for item in data:
if item:
parts = item.decode().split(' "/" ')
if len(parts) > 1:
folder_name = parts[1].strip('"')
folders.append(folder_name)
return folders
def _select_folder(self, folder: str):
"""Private: Select a folder."""
typ, _ = self._imap.select(folder)
if typ != "OK":
raise Exception(f"Failed to select folder {folder}")
def _search(self, criteria: str):
"""Private: Search emails."""
typ, data = self._imap.search(None, criteria)
if typ != "OK":
raise Exception(f"Search failed: {criteria}")
return data[0].split()
def _fetch(self, uid: str):
"""Private: Fetch raw email."""
typ, data = self._imap.fetch(uid, "(RFC822)")
if typ != "OK":
raise Exception(f"Failed to fetch UID {uid}")
return data[0][1]
class SourceImap(ImapClient):
"""Handles source IMAP operations."""
def get_folders_to_sync(self, config_folders: str):
"""Get list of folders to sync based on config."""
if config_folders == "all":
folders = self._list_folders()
# Skip system folders
return [f for f in folders if f not in ["Trash", "Junk", "Spam"]]
else:
return [f.strip() for f in config_folders.split(",")]
def get_new_emails(self, folder: str, since_date: str):
"""Get UIDs of new emails since date."""
self._select_folder(folder)
uids = self._search(f'SINCE "{since_date}"')
return uids
class DestImap(ImapClient):
"""Handles destination IMAP operations."""
def ensure_folder_exists(self, folder: str):
"""Ensure folder exists, create if needed."""
try:
self._select_folder(folder)
except:
self._create_folder(folder)
self._select_folder(folder)
def append_email(self, folder: str, msg: bytes):
"""Append email to folder, ensuring it exists."""
self.ensure_folder_exists(folder)
self._append(folder, msg)
def check_duplicate(self, msg_id: str):
"""Check if Message-ID exists in current folder."""
results = self._search(f'HEADER Message-ID "{msg_id}"')
logging.debug(f"Check duplicate for {msg_id}: found {len(results)} matches")
return len(results) > 0
def _create_folder(self, folder: str):
"""Private: Create a folder."""
typ, _ = self._imap.create(folder)
if typ != "OK":
raise Exception(f"Failed to create folder {folder}")
def _append(self, folder: str, msg: bytes):
"""Private: Append message to folder."""
self._imap.append(folder, "", None, msg)
class EmailForwarder:
"""Orchestrates the email forwarding process."""
def __init__(self, config: dict):
self.config = config
self.state_dir = config["state_dir"]
os.makedirs(self.state_dir, exist_ok=True)
self.processed_file = os.path.join(self.state_dir, "processed_message_ids.txt")
self.last_run_file = os.path.join(self.state_dir, "last_run.txt")
self.source = SourceImap(
config["source_host"],
config["source_port"],
config["source_user"],
config["source_pass"],
)
self.dest = DestImap(
config["dest_host"],
config["dest_port"],
config["dest_user"],
config["dest_pass"],
)
self.processed_ids = set()
self.last_run = None
self.dry_run = config.get("dry_run", False)
def load_state(self):
"""Load last run and processed IDs."""
if os.path.exists(self.last_run_file):
with open(self.last_run_file) as f:
last_run_str = f.read().strip()
self.last_run = datetime.fromisoformat(last_run_str)
else:
self.last_run = datetime.now(timezone.utc)
if os.path.exists(self.processed_file):
with open(self.processed_file, "r") as f:
self.processed_ids = set(filter(None, (line.strip() for line in f)))
def save_state(self):
"""Save last run and processed IDs."""
if not self.dry_run:
with open(self.last_run_file, "w") as f:
f.write(datetime.now(timezone.utc).isoformat())
def sync_all_folders(self):
"""Sync all configured folders."""
self.source._connect()
self.dest._connect()
folders_config = self.config["folders"]
folders = self.source.get_folders_to_sync(folders_config)
logging.info(f"Syncing folders: {folders}")
total_forwarded = 0
for folder in folders:
uids = self.source.get_new_emails(
folder, self.last_run.strftime("%d-%b-%Y")
)
logging.info(f"Found {len(uids)} emails in {folder}")
forwarded = self.sync_folder(folder, uids)
total_forwarded += forwarded
self.source._disconnect()
self.dest._disconnect()
logging.info(f"Total forwarded: {total_forwarded}")
def sync_folder(self, folder: str, uids: list):
"""Sync emails in a folder."""
forwarded = 0
for uid in uids:
uid_str = uid.decode()
raw_msg = self.source._fetch(uid_str)
msg = message_from_bytes(raw_msg)
msg_id = msg.get("Message-ID")
if not msg_id:
continue
if msg_id in self.processed_ids:
continue
self.dest.ensure_folder_exists(folder)
if self.dest.check_duplicate(msg_id):
continue
if not self.dry_run:
self.dest.append_email(folder, raw_msg)
self.processed_ids.add(msg_id)
with open(self.processed_file, "a") as f:
f.write(msg_id + "\n")
forwarded += 1
logging.info(f"Forwarded {msg_id} from {folder}")
else:
logging.info(f"Dry-run: Would forward {msg_id} from {folder}")
return forwarded
def run(self):
"""Run the forwarding process."""
self.load_state()
self.sync_all_folders()
self.save_state()
def load_config():
"""Load configuration from environment."""
load_dotenv()
config = {
"source_host": os.getenv("SOURCE_HOST"),
"source_port": int(os.getenv("SOURCE_PORT", "993")),
"source_user": os.getenv("SOURCE_USER"),
"source_pass": os.getenv("SOURCE_PASS"),
"dest_host": os.getenv("DEST_HOST"),
"dest_port": int(os.getenv("DEST_PORT", "993")),
"dest_user": os.getenv("DEST_USER"),
"dest_pass": os.getenv("DEST_PASS"),
"folders": os.getenv("FOLDERS", "INBOX"),
"state_dir": os.getenv("STATE_DIR", "./state"),
"dry_run": os.getenv("DRY_RUN", "false").lower() == "true",
}
# Validate required
required = [
"source_host",
"source_user",
"source_pass",
"dest_host",
"dest_user",
"dest_pass",
]
for key in required:
if not config[key]:
raise ValueError(f"{key} not set")
return config
def main():
"""Main entry point."""
try:
config = load_config()
forwarder = EmailForwarder(config)
forwarder.run()
except Exception as e:
logging.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()