Make all attributes private (_host, _imap, etc.) and remove redundant public connect/disconnect methods. Connection management is now handled internally by the orchestrator calling private _connect/_disconnect. Changes: - Privatize ImapClient attributes and methods - Update EmailForwarder to call private connection methods - Enhance encapsulation without changing functionality
273 lines
8.7 KiB
Python
273 lines
8.7 KiB
Python
import imaplib
|
|
import os
|
|
import logging
|
|
import sys
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime, timezone
|
|
from email import message_from_bytes
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
class ImapClient:
|
|
"""Base class for IMAP operations."""
|
|
|
|
def __init__(self, host: str, port: int, user: str, password: str):
|
|
self._host = host
|
|
self._port = port
|
|
self._user = user
|
|
self._password = password
|
|
self._imap = None
|
|
|
|
def _connect(self):
|
|
"""Private: Connect and login to IMAP."""
|
|
self._imap = imaplib.IMAP4_SSL(self._host, self._port)
|
|
self._imap.login(self._user, self._password)
|
|
|
|
def _disconnect(self):
|
|
"""Private: Logout and close connection."""
|
|
if self._imap:
|
|
self._imap.logout()
|
|
self._imap = None
|
|
|
|
def _list_folders(self):
|
|
"""Private: List all folders."""
|
|
typ, data = self._imap.list()
|
|
if typ != "OK":
|
|
raise Exception("Failed to list folders")
|
|
folders = []
|
|
for item in data:
|
|
if item:
|
|
parts = item.decode().split(' "/" ')
|
|
if len(parts) > 1:
|
|
folder_name = parts[1].strip('"')
|
|
folders.append(folder_name)
|
|
return folders
|
|
|
|
def _select_folder(self, folder: str):
|
|
"""Private: Select a folder."""
|
|
typ, _ = self._imap.select(folder)
|
|
if typ != "OK":
|
|
raise Exception(f"Failed to select folder {folder}")
|
|
|
|
def _search(self, criteria: str):
|
|
"""Private: Search emails."""
|
|
typ, data = self._imap.search(None, criteria)
|
|
if typ != "OK":
|
|
raise Exception(f"Search failed: {criteria}")
|
|
return data[0].split()
|
|
|
|
def _fetch(self, uid: str):
|
|
"""Private: Fetch raw email."""
|
|
typ, data = self._imap.fetch(uid, "(RFC822)")
|
|
if typ != "OK":
|
|
raise Exception(f"Failed to fetch UID {uid}")
|
|
return data[0][1]
|
|
|
|
|
|
class SourceImap(ImapClient):
|
|
"""Handles source IMAP operations."""
|
|
|
|
def get_folders_to_sync(self, config_folders: str):
|
|
"""Get list of folders to sync based on config."""
|
|
if config_folders == "all":
|
|
folders = self._list_folders()
|
|
# Skip system folders
|
|
return [f for f in folders if f not in ["Trash", "Junk", "Spam"]]
|
|
else:
|
|
return [f.strip() for f in config_folders.split(",")]
|
|
|
|
def get_new_emails(self, folder: str, since_date: str):
|
|
"""Get UIDs of new emails since date."""
|
|
self._select_folder(folder)
|
|
uids = self._search(f'SINCE "{since_date}"')
|
|
return uids
|
|
|
|
|
|
class DestImap(ImapClient):
|
|
"""Handles destination IMAP operations."""
|
|
|
|
def ensure_folder_exists(self, folder: str):
|
|
"""Ensure folder exists, create if needed."""
|
|
try:
|
|
self._select_folder(folder)
|
|
except:
|
|
self._create_folder(folder)
|
|
self._select_folder(folder)
|
|
|
|
def append_email(self, folder: str, msg: bytes):
|
|
"""Append email to folder, ensuring it exists."""
|
|
self.ensure_folder_exists(folder)
|
|
self._append(folder, msg)
|
|
|
|
def check_duplicate(self, msg_id: str):
|
|
"""Check if Message-ID exists in current folder."""
|
|
try:
|
|
results = self._search(f'HEADER Message-ID "{msg_id}"')
|
|
return len(results) > 0
|
|
except:
|
|
return False
|
|
|
|
def _create_folder(self, folder: str):
|
|
"""Private: Create a folder."""
|
|
typ, _ = self._imap.create(folder)
|
|
if typ != "OK":
|
|
raise Exception(f"Failed to create folder {folder}")
|
|
|
|
def _append(self, folder: str, msg: bytes):
|
|
"""Private: Append message to folder."""
|
|
self._imap.append(folder, "", None, msg)
|
|
|
|
|
|
class EmailForwarder:
|
|
"""Orchestrates the email forwarding process."""
|
|
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.source = SourceImap(
|
|
config["source_host"],
|
|
config["source_port"],
|
|
config["source_user"],
|
|
config["source_pass"],
|
|
)
|
|
self.dest = DestImap(
|
|
config["dest_host"],
|
|
config["dest_port"],
|
|
config["dest_user"],
|
|
config["dest_pass"],
|
|
)
|
|
self.processed_ids = set()
|
|
self.last_run = None
|
|
self.dry_run = config.get("dry_run", False)
|
|
|
|
def load_state(self):
|
|
"""Load last run and processed IDs."""
|
|
last_run_file = self.config["last_run_file"]
|
|
if os.path.exists(last_run_file):
|
|
with open(last_run_file) as f:
|
|
last_run_str = f.read().strip()
|
|
self.last_run = datetime.fromisoformat(last_run_str)
|
|
else:
|
|
self.last_run = datetime.now(timezone.utc)
|
|
|
|
processed_file = self.config["processed_file"]
|
|
if os.path.exists(processed_file):
|
|
with open(processed_file, "r") as f:
|
|
self.processed_ids = set(line.strip() for line in f if line.strip())
|
|
|
|
def save_state(self):
|
|
"""Save last run and processed IDs."""
|
|
if not self.dry_run:
|
|
with open(self.config["last_run_file"], "w") as f:
|
|
f.write(datetime.now(timezone.utc).isoformat())
|
|
|
|
def sync_all_folders(self):
|
|
"""Sync all configured folders."""
|
|
self.source.connect()
|
|
self.dest.connect()
|
|
|
|
folders_config = self.config["folders"]
|
|
folders = self.source.get_folders_to_sync(folders_config)
|
|
logging.info(f"Syncing folders: {folders}")
|
|
|
|
total_forwarded = 0
|
|
|
|
for folder in folders:
|
|
try:
|
|
uids = self.source.get_new_emails(
|
|
folder, self.last_run.strftime("%d-%b-%Y")
|
|
)
|
|
logging.info(f"Found {len(uids)} emails in {folder}")
|
|
forwarded = self.sync_folder(folder, uids)
|
|
total_forwarded += forwarded
|
|
except Exception as e:
|
|
logging.error(f"Error syncing {folder}: {e}")
|
|
|
|
self.source.disconnect()
|
|
self.dest.disconnect()
|
|
logging.info(f"Total forwarded: {total_forwarded}")
|
|
|
|
def sync_folder(self, folder: str, uids: list):
|
|
"""Sync emails in a folder."""
|
|
forwarded = 0
|
|
for uid in uids:
|
|
uid_str = uid.decode()
|
|
try:
|
|
raw_msg = self.source._fetch(uid_str)
|
|
|
|
msg = message_from_bytes(raw_msg)
|
|
msg_id = msg.get("Message-ID")
|
|
if not msg_id:
|
|
continue
|
|
if msg_id in self.processed_ids:
|
|
continue
|
|
if self.dest.check_duplicate(msg_id):
|
|
continue
|
|
|
|
if not self.dry_run:
|
|
self.dest.append_email(folder, raw_msg)
|
|
self.processed_ids.add(msg_id)
|
|
with open(self.config["processed_file"], "a") as f:
|
|
f.write(msg_id + "\n")
|
|
forwarded += 1
|
|
logging.info(f"Forwarded {msg_id} from {folder}")
|
|
else:
|
|
logging.info(f"Dry-run: Would forward {msg_id} from {folder}")
|
|
except Exception as e:
|
|
logging.error(f"Error processing UID {uid_str}: {e}")
|
|
return forwarded
|
|
|
|
def run(self):
|
|
"""Run the forwarding process."""
|
|
self.load_state()
|
|
self.sync_all_folders()
|
|
self.save_state()
|
|
|
|
|
|
def load_config():
|
|
"""Load configuration from environment."""
|
|
load_dotenv()
|
|
config = {
|
|
"source_host": os.getenv("SOURCE_HOST"),
|
|
"source_port": int(os.getenv("SOURCE_PORT", "993")),
|
|
"source_user": os.getenv("SOURCE_USER"),
|
|
"source_pass": os.getenv("SOURCE_PASS"),
|
|
"dest_host": os.getenv("DEST_HOST"),
|
|
"dest_port": int(os.getenv("DEST_PORT", "993")),
|
|
"dest_user": os.getenv("DEST_USER"),
|
|
"dest_pass": os.getenv("DEST_PASS"),
|
|
"folders": os.getenv("FOLDERS", "INBOX"),
|
|
"processed_file": "processed_message_ids.txt",
|
|
"last_run_file": "last_run.txt",
|
|
"dry_run": os.getenv("DRY_RUN", "false").lower() == "true",
|
|
}
|
|
# Validate required
|
|
required = [
|
|
"source_host",
|
|
"source_user",
|
|
"source_pass",
|
|
"dest_host",
|
|
"dest_user",
|
|
"dest_pass",
|
|
]
|
|
for key in required:
|
|
if not config[key]:
|
|
raise ValueError(f"{key} not set")
|
|
return config
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
try:
|
|
config = load_config()
|
|
forwarder = EmailForwarder(config)
|
|
forwarder.run()
|
|
except Exception as e:
|
|
logging.error(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|