email-forwarder/email_forwarder/email_forwarder.py
Jeena 69ec294a56 Fix duplicate detection by ensuring folder selection
Call ensure_folder_exists before check_duplicate to select the dest
folder, allowing IMAP SEARCH to work in SELECTED state.

Changes:
- Move dest.ensure_folder_exists(folder) before check_duplicate in process_email
- Add logging to check_duplicate for debugging
2026-01-04 20:03:49 +09:00

281 lines
9.2 KiB
Python

import imaplib
import os
import logging
import sys
from dotenv import load_dotenv
from datetime import datetime, timezone
from email import message_from_bytes
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class ImapClient:
"""Base class for IMAP operations."""
def __init__(self, host: str, port: int, user: str, password: str):
self._host = host
self._port = port
self._user = user
self._password = password
self._imap = None
def _connect(self):
"""Private: Connect and login to IMAP."""
self._imap = imaplib.IMAP4_SSL(self._host, self._port)
self._imap.login(self._user, self._password)
def _disconnect(self):
"""Private: Logout and close connection."""
if self._imap:
self._imap.logout()
self._imap = None
def _list_folders(self):
"""Private: List all folders."""
typ, data = self._imap.list()
if typ != "OK":
raise Exception("Failed to list folders")
folders = []
for item in data:
if item:
parts = item.decode().split(' "/" ')
if len(parts) > 1:
folder_name = parts[1].strip('"')
folders.append(folder_name)
return folders
def _select_folder(self, folder: str):
"""Private: Select a folder."""
typ, _ = self._imap.select(folder)
if typ != "OK":
raise Exception(f"Failed to select folder {folder}")
def _search(self, criteria: str):
"""Private: Search emails."""
typ, data = self._imap.search(None, criteria)
if typ != "OK":
raise Exception(f"Search failed: {criteria}")
return data[0].split()
def _fetch(self, uid: str):
"""Private: Fetch raw email."""
typ, data = self._imap.fetch(uid, "(RFC822)")
if typ != "OK":
raise Exception(f"Failed to fetch UID {uid}")
return data[0][1]
class SourceImap(ImapClient):
"""Handles source IMAP operations."""
def get_folders_to_sync(self, config_folders: str):
"""Get list of folders to sync based on config."""
if config_folders == "all":
folders = self._list_folders()
# Skip system folders
return [f for f in folders if f not in ["Trash", "Junk", "Spam"]]
else:
return [f.strip() for f in config_folders.split(",")]
def get_new_emails(self, folder: str, since_date: str):
"""Get UIDs of new emails since date."""
self._select_folder(folder)
uids = self._search(f'SINCE "{since_date}"')
return uids
class DestImap(ImapClient):
"""Handles destination IMAP operations."""
def ensure_folder_exists(self, folder: str):
"""Ensure folder exists, create if needed."""
try:
self._select_folder(folder)
except:
self._create_folder(folder)
self._select_folder(folder)
def append_email(self, folder: str, msg: bytes):
"""Append email to folder, ensuring it exists."""
self.ensure_folder_exists(folder)
self._append(folder, msg)
def check_duplicate(self, msg_id: str):
"""Check if Message-ID exists in current folder."""
try:
results = self._search(f'HEADER Message-ID "{msg_id}"')
logging.debug(f"Check duplicate for {msg_id}: found {len(results)} matches")
return len(results) > 0
except Exception as e:
logging.error(f"Error checking duplicate for {msg_id}: {e}")
return False
def _create_folder(self, folder: str):
"""Private: Create a folder."""
typ, _ = self._imap.create(folder)
if typ != "OK":
raise Exception(f"Failed to create folder {folder}")
def _append(self, folder: str, msg: bytes):
"""Private: Append message to folder."""
self._imap.append(folder, "", None, msg)
class EmailForwarder:
"""Orchestrates the email forwarding process."""
def __init__(self, config: dict):
self.config = config
self.state_dir = config["state_dir"]
os.makedirs(self.state_dir, exist_ok=True)
self.processed_file = os.path.join(self.state_dir, "processed_message_ids.txt")
self.last_run_file = os.path.join(self.state_dir, "last_run.txt")
self.source = SourceImap(
config["source_host"],
config["source_port"],
config["source_user"],
config["source_pass"],
)
self.dest = DestImap(
config["dest_host"],
config["dest_port"],
config["dest_user"],
config["dest_pass"],
)
self.processed_ids = set()
self.last_run = None
self.dry_run = config.get("dry_run", False)
def load_state(self):
"""Load last run and processed IDs."""
if os.path.exists(self.last_run_file):
with open(self.last_run_file) as f:
last_run_str = f.read().strip()
self.last_run = datetime.fromisoformat(last_run_str)
else:
self.last_run = datetime.now(timezone.utc)
if os.path.exists(self.processed_file):
with open(self.processed_file, "r") as f:
self.processed_ids = set(filter(None, (line.strip() for line in f)))
def save_state(self):
"""Save last run and processed IDs."""
if not self.dry_run:
with open(self.last_run_file, "w") as f:
f.write(datetime.now(timezone.utc).isoformat())
def sync_all_folders(self):
"""Sync all configured folders."""
self.source._connect()
self.dest._connect()
folders_config = self.config["folders"]
folders = self.source.get_folders_to_sync(folders_config)
logging.info(f"Syncing folders: {folders}")
total_forwarded = 0
for folder in folders:
try:
uids = self.source.get_new_emails(
folder, self.last_run.strftime("%d-%b-%Y")
)
logging.info(f"Found {len(uids)} emails in {folder}")
forwarded = self.sync_folder(folder, uids)
total_forwarded += forwarded
except Exception as e:
logging.error(f"Error syncing {folder}: {e}")
self.source._disconnect()
self.dest._disconnect()
logging.info(f"Total forwarded: {total_forwarded}")
def sync_folder(self, folder: str, uids: list):
"""Sync emails in a folder."""
forwarded = 0
for uid in uids:
uid_str = uid.decode()
try:
raw_msg = self.source._fetch(uid_str)
msg = message_from_bytes(raw_msg)
msg_id = msg.get("Message-ID")
if not msg_id:
continue
if msg_id in self.processed_ids:
continue
self.dest.ensure_folder_exists(folder)
if self.dest.check_duplicate(msg_id):
continue
if msg_id in self.processed_ids:
continue
if self.dest.check_duplicate(msg_id):
continue
if not self.dry_run:
self.dest.append_email(folder, raw_msg)
self.processed_ids.add(msg_id)
with open(self.processed_file, "a") as f:
f.write(msg_id + "\n")
forwarded += 1
logging.info(f"Forwarded {msg_id} from {folder}")
else:
logging.info(f"Dry-run: Would forward {msg_id} from {folder}")
except Exception as e:
logging.error(f"Error processing UID {uid_str}: {e}")
return forwarded
def run(self):
"""Run the forwarding process."""
self.load_state()
self.sync_all_folders()
self.save_state()
def load_config():
"""Load configuration from environment."""
load_dotenv()
config = {
"source_host": os.getenv("SOURCE_HOST"),
"source_port": int(os.getenv("SOURCE_PORT", "993")),
"source_user": os.getenv("SOURCE_USER"),
"source_pass": os.getenv("SOURCE_PASS"),
"dest_host": os.getenv("DEST_HOST"),
"dest_port": int(os.getenv("DEST_PORT", "993")),
"dest_user": os.getenv("DEST_USER"),
"dest_pass": os.getenv("DEST_PASS"),
"folders": os.getenv("FOLDERS", "INBOX"),
"state_dir": os.getenv("STATE_DIR", "./state"),
"dry_run": os.getenv("DRY_RUN", "false").lower() == "true",
}
# Validate required
required = [
"source_host",
"source_user",
"source_pass",
"dest_host",
"dest_user",
"dest_pass",
]
for key in required:
if not config[key]:
raise ValueError(f"{key} not set")
return config
def main():
"""Main entry point."""
try:
config = load_config()
forwarder = EmailForwarder(config)
forwarder.run()
except Exception as e:
logging.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()