Refactor main.py for readability and maintainability

Introduce class-based architecture with inheritance for IMAP clients:
- ImapClient base class with private low-level methods
- SourceImap and DestImap subclasses for specific operations
- EmailForwarder orchestrator class for coordination
- Global load_config and main functions

Improvements:
- Clear separation of concerns and encapsulation
- Private methods for internal IMAP calls
- Better error handling and logging
- Maintains all original functionality
This commit is contained in:
Jeena 2026-01-04 16:41:58 +09:00
parent 485a5db8b9
commit f70f1853f0

395
main.py
View file

@ -3,186 +3,277 @@ import os
import logging
import sys
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from email import message_from_bytes
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
load_dotenv()
# Configuration
source_host = os.getenv("SOURCE_HOST")
if not source_host:
raise ValueError("SOURCE_HOST not set")
source_port = int(os.getenv("SOURCE_PORT", "993"))
source_user = os.getenv("SOURCE_USER")
if not source_user:
raise ValueError("SOURCE_USER not set")
source_pass = os.getenv("SOURCE_PASS")
if not source_pass:
raise ValueError("SOURCE_PASS not set")
dest_host = os.getenv("DEST_HOST")
if not dest_host:
raise ValueError("DEST_HOST not set")
dest_port = int(os.getenv("DEST_PORT", "993"))
dest_user = os.getenv("DEST_USER")
if not dest_user:
raise ValueError("DEST_USER not set")
dest_pass = os.getenv("DEST_PASS")
if not dest_pass:
raise ValueError("DEST_PASS not set")
folders_config = os.getenv("FOLDERS", "INBOX")
processed_msg_file = "processed_message_ids.txt"
last_run_file = "last_run.txt"
dry_run = os.getenv("DRY_RUN", "false").lower() == "true"
class ImapClient:
"""Base class for IMAP operations."""
def __init__(self, host: str, port: int, user: str, password: str):
self.host = host
self.port = port
self.user = user
self.password = password
self.imap = None
def connect(self):
"""Establish IMAP connection."""
self._connect()
def disconnect(self):
"""Close IMAP connection."""
self._disconnect()
def _connect(self):
"""Private: Connect and login to IMAP."""
self.imap = imaplib.IMAP4_SSL(self.host, self.port)
self.imap.login(self.user, self.password)
def _disconnect(self):
"""Private: Logout and close connection."""
if self.imap:
self.imap.logout()
self.imap = None
def _list_folders(self):
"""Private: List all folders."""
typ, data = self.imap.list()
if typ != "OK":
raise Exception("Failed to list folders")
folders = []
for item in data:
if item:
parts = item.decode().split(' "/" ')
if len(parts) > 1:
folder_name = parts[1].strip('"')
folders.append(folder_name)
return folders
def _select_folder(self, folder: str):
"""Private: Select a folder."""
typ, _ = self.imap.select(folder)
if typ != "OK":
raise Exception(f"Failed to select folder {folder}")
def _search(self, criteria: str):
"""Private: Search emails."""
typ, data = self.imap.search(None, criteria)
if typ != "OK":
raise Exception(f"Search failed: {criteria}")
return data[0].split()
def _fetch(self, uid: str):
"""Private: Fetch raw email."""
typ, data = self.imap.fetch(uid, "(RFC822)")
if typ != "OK":
raise Exception(f"Failed to fetch UID {uid}")
return data[0][1]
def main():
# Last run
if os.path.exists(last_run_file):
with open(last_run_file) as f:
last_run_str = f.read().strip()
last_run = datetime.fromisoformat(last_run_str)
else:
last_run = datetime.now(timezone.utc) - timedelta(hours=1)
since_date = last_run.strftime("%d-%b-%Y")
logging.info(f"Using SINCE {since_date}")
class SourceImap(ImapClient):
"""Handles source IMAP operations."""
# Load processed Message-IDs
processed_msg = set()
if os.path.exists(processed_msg_file):
with open(processed_msg_file, "r") as f:
processed_msg = set(line.strip() for line in f if line.strip())
try:
# Connect to source IMAP for listing
source_imap_list = imaplib.IMAP4_SSL(source_host, source_port)
source_imap_list.login(source_user, source_pass)
# Determine folders to sync
if folders_config == "all":
# List all folders
typ, folder_list = source_imap_list.list()
if typ != "OK":
raise Exception("Failed to list folders")
folders = []
for f in folder_list:
if f:
# Parse folder name, e.g., '(\\HasNoChildren) "/" "INBOX"'
parts = f.decode().split(' "/" ')
if len(parts) > 1:
folder_name = parts[1].strip('"')
# Skip system folders if desired (optional)
if folder_name not in ["Trash", "Junk", "Spam"]:
folders.append(folder_name)
def get_folders_to_sync(self, config_folders: str):
"""Get list of folders to sync based on config."""
if config_folders == "all":
folders = self._list_folders()
# Skip system folders
return [f for f in folders if f not in ["Trash", "Junk", "Spam"]]
else:
folders = [f.strip() for f in folders_config.split(",")]
return [f.strip() for f in config_folders.split(",")]
source_imap_list.logout()
def get_new_emails(self, folder: str, since_date: str):
"""Get UIDs of new emails since date."""
self._select_folder(folder)
uids = self._search(f'SINCE "{since_date}"')
return uids
class DestImap(ImapClient):
"""Handles destination IMAP operations."""
def ensure_folder_exists(self, folder: str):
"""Ensure folder exists, create if needed."""
try:
self._select_folder(folder)
except:
self._create_folder(folder)
self._select_folder(folder)
def append_email(self, folder: str, msg: bytes):
"""Append email to folder, ensuring it exists."""
self.ensure_folder_exists(folder)
self._append(folder, msg)
def check_duplicate(self, msg_id: str):
"""Check if Message-ID exists in current folder."""
try:
results = self._search(f'HEADER Message-ID "{msg_id}"')
return len(results) > 0
except:
return False
def _create_folder(self, folder: str):
"""Private: Create a folder."""
typ, _ = self.imap.create(folder)
if typ != "OK":
raise Exception(f"Failed to create folder {folder}")
def _append(self, folder: str, msg: bytes):
"""Private: Append message to folder."""
self.imap.append(folder, "", None, msg)
class EmailForwarder:
"""Orchestrates the email forwarding process."""
def __init__(self, config: dict):
self.config = config
self.source = SourceImap(
config["source_host"],
config["source_port"],
config["source_user"],
config["source_pass"],
)
self.dest = DestImap(
config["dest_host"],
config["dest_port"],
config["dest_user"],
config["dest_pass"],
)
self.processed_ids = set()
self.last_run = None
self.dry_run = config.get("dry_run", False)
def load_state(self):
"""Load last run and processed IDs."""
last_run_file = self.config["last_run_file"]
if os.path.exists(last_run_file):
with open(last_run_file) as f:
last_run_str = f.read().strip()
self.last_run = datetime.fromisoformat(last_run_str)
else:
self.last_run = datetime.now(timezone.utc)
processed_file = self.config["processed_file"]
if os.path.exists(processed_file):
with open(processed_file, "r") as f:
self.processed_ids = set(line.strip() for line in f if line.strip())
def save_state(self):
"""Save last run and processed IDs."""
if not self.dry_run:
with open(self.config["last_run_file"], "w") as f:
f.write(datetime.now(timezone.utc).isoformat())
def sync_all_folders(self):
"""Sync all configured folders."""
self.source.connect()
self.dest.connect()
folders_config = self.config["folders"]
folders = self.source.get_folders_to_sync(folders_config)
logging.info(f"Syncing folders: {folders}")
# Connect to destination IMAP
dest_imap = imaplib.IMAP4_SSL(dest_host, dest_port)
dest_imap.login(dest_user, dest_pass)
total_forwarded = 0
for folder in folders:
try:
# Connect to source IMAP per folder
source_imap = imaplib.IMAP4_SSL(source_host, source_port)
source_imap.login(source_user, source_pass)
source_imap.select(folder)
uids = self.source.get_new_emails(
folder, self.last_run.strftime("%d-%b-%Y")
)
logging.info(f"Found {len(uids)} emails in {folder}")
forwarded = self.sync_folder(folder, uids)
total_forwarded += forwarded
except Exception as e:
logging.error(f"Error syncing {folder}: {e}")
# Search for emails since last run
typ, data = source_imap.search(None, f'SINCE "{since_date}"')
if typ != "OK":
logging.error(f"Search failed for {folder}: {typ}")
source_imap.logout()
self.source.disconnect()
self.dest.disconnect()
logging.info(f"Total forwarded: {total_forwarded}")
def sync_folder(self, folder: str, uids: list):
"""Sync emails in a folder."""
forwarded = 0
for uid in uids:
uid_str = uid.decode()
try:
raw_msg = self.source._fetch(uid_str)
msg = message_from_bytes(raw_msg)
msg_id = msg.get("Message-ID")
if not msg_id:
continue
if msg_id in self.processed_ids:
continue
if self.dest.check_duplicate(msg_id):
continue
uids = data[0].split()
logging.info(f"Found {len(uids)} emails in {folder} since {since_date}")
# Ensure dest folder exists
try:
dest_imap.select(folder)
except:
logging.info(f"Creating folder {folder} in dest")
dest_imap.create(folder)
dest_imap.select(folder)
forwarded = 0
for uid in uids:
uid_str = uid.decode()
# Fetch raw message
typ, msg_data = source_imap.fetch(uid, "(RFC822)")
if typ != "OK":
logging.error(f"Failed to fetch UID {uid_str} in {folder}")
continue
raw_msg = msg_data[0][1]
msg = message_from_bytes(raw_msg)
msg_id = msg["Message-ID"]
if not msg_id:
logging.warning(
f"No Message-ID for UID {uid_str} in {folder}, skipping"
)
continue
if msg_id in processed_msg:
logging.info(f"Skipping already processed Message-ID {msg_id}")
continue
# Check dest for duplicate
typ, dest_data = dest_imap.search(
None, f'HEADER Message-ID "{msg_id}"'
)
if typ == "OK" and dest_data[0]:
logging.info(f"Skipping duplicate Message-ID {msg_id} in dest")
continue
if not dry_run:
# Append to destination
dest_imap.append(folder, "", None, raw_msg)
# Mark as processed
with open(processed_msg_file, "a") as f:
f.write(msg_id + "\n")
processed_msg.add(msg_id)
forwarded += 1
logging.info(
f"Forwarded email Message-ID {msg_id} from {folder}"
)
else:
logging.info(
f"Dry-run: Would forward Message-ID {msg_id} from {folder}"
)
source_imap.logout()
total_forwarded += forwarded
logging.info(
f"Completed syncing {folder}: forwarded {forwarded} emails"
)
if not self.dry_run:
self.dest.append_email(folder, raw_msg)
self.processed_ids.add(msg_id)
with open(self.config["processed_file"], "a") as f:
f.write(msg_id + "\n")
forwarded += 1
logging.info(f"Forwarded {msg_id} from {folder}")
else:
logging.info(f"Dry-run: Would forward {msg_id} from {folder}")
except Exception as e:
logging.error(f"Error syncing folder {folder}: {e}")
logging.error(f"Error processing UID {uid_str}: {e}")
return forwarded
dest_imap.logout()
def run(self):
"""Run the forwarding process."""
self.load_state()
self.sync_all_folders()
self.save_state()
if not dry_run:
# Update last run
with open(last_run_file, "w") as f:
f.write(datetime.now(timezone.utc).isoformat())
logging.info(
f"Email forwarding completed. Total forwarded {total_forwarded} new emails."
)
def load_config():
"""Load configuration from environment."""
load_dotenv()
config = {
"source_host": os.getenv("SOURCE_HOST"),
"source_port": int(os.getenv("SOURCE_PORT", "993")),
"source_user": os.getenv("SOURCE_USER"),
"source_pass": os.getenv("SOURCE_PASS"),
"dest_host": os.getenv("DEST_HOST"),
"dest_port": int(os.getenv("DEST_PORT", "993")),
"dest_user": os.getenv("DEST_USER"),
"dest_pass": os.getenv("DEST_PASS"),
"folders": os.getenv("FOLDERS", "INBOX"),
"processed_file": "processed_message_ids.txt",
"last_run_file": "last_run.txt",
"dry_run": os.getenv("DRY_RUN", "false").lower() == "true",
}
# Validate required
required = [
"source_host",
"source_user",
"source_pass",
"dest_host",
"dest_user",
"dest_pass",
]
for key in required:
if not config[key]:
raise ValueError(f"{key} not set")
return config
def main():
"""Main entry point."""
try:
config = load_config()
forwarder = EmailForwarder(config)
forwarder.run()
except Exception as e:
logging.error(f"Error during email forwarding: {e}")
logging.error(f"Error: {e}")
sys.exit(1)