#!/usr/bin/env python3
"""
Server-side search and move for IMAP mailboxes.
"""
import re
import sys
import logging.config
import configparser
import argparse
from locale import setlocale, LC_ALL
from datetime import datetime, timedelta
from getpass import getpass
from imaplib import IMAP4, IMAP4_SSL
from dataclasses import dataclass, field
from typing import Tuple, List, Optional, ClassVar
class ImapSearch:
"""
Parse matches as configured into individual search criteria.
"""
_now: ClassVar[datetime] = datetime.utcnow()
@dataclass(frozen=True)
class Criterion:
command: List[str]
arguments: List[str] = field(default_factory=list)
def __init__(self) -> None:
self._criteria: List[ImapSearch.Criterion] = []
def add(self, key: str, params: List[str]) -> None:
key = key.upper()
if key in {"CC", "FROM", "TO", "SUBJECT", "TEXT"} and len(params) >= 1:
self._criteria.append(self.Criterion([key], [" ".join(params)]))
elif key in {"HEADER"} and len(params) >= 1: # with name in IMAPv4 syntax
self._criteria.append(self.Criterion([key], [params[0], " ".join(params[1:])]))
elif key in {"ALL", "SEEN", "UNSEEN"} and len(params) == 0:
self._criteria.append(self.Criterion([key], []))
elif key in {"BEFORE", "SINCE", "SENTBEFORE", "SENTSINCE"} and len(params) >= 1:
self._criteria.append(self.Criterion([key], [self._parse_time_ago(" ".join(params))]))
else:
raise ValueError(f"Unsupported search criterion '{key}' with {len(params)} arguments")
def build(self) -> List[Criterion]:
return self._criteria if len(self._criteria) else [self.Criterion(["ALL"])]
@classmethod
def _parse_time_ago(cls, s: str) -> str:
match: Optional[re.Match] = re.match("^([0-9]+) (days) ago$", s)
if match is None:
raise ValueError(f"Cannot parse time ago: '{s}'")
try:
delta: timedelta = timedelta(**{match.group(2): int(match.group(1))})
except (OverflowError, ValueError) as e:
raise ValueError(f"Cannot parse time ago: '{s}': {str(e)}") from None
else:
return (cls._now - delta).strftime("%d-%b-%Y")
@classmethod
def from_list(cls, criteria: List[List[str]]) -> 'ImapSearch':
search: ImapSearch = ImapSearch()
for criterion in criteria:
if len(criterion):
search.add(criterion[0], criterion[1:])
return search
class Imap:
"""
Wrap IMAP4 class with capability checks, response error handling, and convenience operations.
"""
def __init__(self, host: str, port: int, user: str, password: str, utf: bool, ro: bool) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._host: str = host
self._port: int = port
self._user: str = user
self._pass: str = password
self._utf: bool = utf
self._ro: bool = ro
try:
self._imap: IMAP4 = IMAP4_SSL(host=self._host, port=self._port)
except (IMAP4.error, OSError) as e:
raise RuntimeError(f"Cannot connect to {self._host}:{self._port}: {str(e)}") from None
def __str__(self) -> str:
return f"<{self.__class__.__name__} {self._user}@{self._host}:{self._port}>"
def _quote(self, s: str) -> str:
if not self._utf and re.match("^[ -~]*$", s) is None:
raise ValueError(f"String literals not supported, cannot escape '{s}'")
return "".join(['"', s.replace('\\', '\\\\').replace('"', '\\"'), '"'])
@classmethod
def _check_response(cls, response: Tuple[str, List[Optional[bytes]]], expected: str = "OK") -> str:
if not isinstance(response, tuple) or len(response) != 2 or not isinstance(response[0], str):
raise RuntimeError("Invalid response")
if response[0] != expected:
raise RuntimeError(f"Response '{response[0]}', expected '{expected}'")
if not isinstance(response[1], list) or len(response[1]) != 1:
raise RuntimeError("Invalid response value")
if response[1][0] is None:
return ''
elif isinstance(response[1][0], bytes):
return response[1][0].decode("utf-8", errors="replace")
else:
raise RuntimeError("Invalid response value value")
def handshake(self, check_capabilities: bool) -> None:
self._logger.debug(f"CAPABILITIES: {', '.join(self._imap.capabilities)}")
if check_capabilities: # preliminary check for needed capabilities
if "AUTH=PLAIN" not in self._imap.capabilities:
raise RuntimeError("No plaintext login capability")
if "MOVE" not in self._imap.capabilities:
raise RuntimeError("No 'MOVE' capability")
if self._utf and "ENABLE" not in self._imap.capabilities:
raise RuntimeError("No 'ENABLE' capability for UTF8")
if self._utf and hasattr(self._imap, '_mode_utf8'):
# ENABLE only valid in authenticated state, so already fake it to allow i.e. special chars in passwords
self._imap._mode_utf8() # noqa
self.login()
if self._utf:
self._enable("UTF8=ACCEPT")
def login(self) -> None:
try:
self._logger.debug("LOGIN")
self._check_response(self._imap.login(user=self._quote(self._user), password=self._pass)) # type: ignore
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot login: {str(e)}") from None
def logout(self) -> None:
try:
self._logger.debug("LOGOUT")
self._check_response(self._imap.logout(), "BYE") # type: ignore
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot logout: {str(e)}") from None
def _enable(self, capability: str) -> None:
try:
self._logger.debug(f"ENABLE '{capability}'")
self._check_response(self._imap.enable(capability))
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot enable '{capability}': {str(e)}") from None
def _select(self, mailbox: str) -> None:
try:
self._logger.debug(f"SELECT '{mailbox}'")
self._check_response(self._imap.select(mailbox=self._quote(mailbox), readonly=self._ro))
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot select mailbox '{mailbox}': {str(e)}") from None
def _search(self, criteria: List[ImapSearch.Criterion]) -> List[str]:
conditions: List[str] = [" ".join(criterion.command + [self._quote(_) for _ in criterion.arguments])
for criterion in criteria]
try:
self._logger.debug(f"SEARCH {' '.join(conditions)}")
response: str = self._check_response(self._imap.uid("SEARCH", " ".join(conditions)))
return response.split(" ") if response else []
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot search: {str(e)}") from None
def _move(self, mailbox: str, mail_ids: List[str]) -> None:
if self._ro:
return
try:
self._logger.debug(f"MOVE #{len(mail_ids)}")
self._check_response(self._imap.uid("MOVE", '{} "{}"'.format(",".join(mail_ids), mailbox)))
except (IMAP4.error, RuntimeError) as e:
raise RuntimeError(f"Cannot move: {str(e)}") from None
def search_and_move(self, from_mailbox: str, to_mailbox: str, criteria: List[ImapSearch]) -> None:
self._select(from_mailbox)
for criterion in criteria: # TODO: support OR (IMAP4) and merge moves for same mailboxes
mail_ids: List[str] = self._search(criterion.build())
self._logger.info(f"Found {len(mail_ids)} mails in '{from_mailbox}' for '{to_mailbox}'")
if len(mail_ids):
self._move(to_mailbox, mail_ids)
class Config:
"""
Parse ini-style config file into dataclasses.
"""
@dataclass(frozen=True)
class Account:
name: str
host: str
port: int
user: str
password: Optional[str]
utf: bool
@dataclass(frozen=True)
class Match:
name: str
account: str
match_any: bool
match: List[List[str]]
from_mailbox: List[str]
to_mailbox: str
def __init__(self, accounts: List[Account], matches: List[Match]) -> None:
self.accounts: List[Config.Account] = accounts
self.matches: List[Config.Match] = matches
@classmethod
def _getlist(cls, val: str) -> List[str]:
return [_ for _ in [line.strip() for line in val.split("\n")] if _]
@classmethod
def _getlistlist(cls, val: str) -> List[List[str]]:
return [line.split(None) for line in cls._getlist(val)]
@classmethod
def _parse(cls, config_parser: configparser.ConfigParser) -> 'Config':
accounts: List[Config.Account] = []
matches: List[Config.Match] = []
for section in config_parser.sections():
if '.' not in section:
raise ValueError(f"Invalid section '{section}'")
else:
section_type, name = section.split(".", maxsplit=1)
try:
if section_type == "account":
accounts.append(cls.Account(
name=name,
host=config_parser.get(section, "host"),
port=config_parser.getint(section, "port"),
user=config_parser.get(section, "user"),
password=config_parser.get(section, "password", fallback=None),
utf=config_parser.getboolean(section, "utf", fallback=False)
))
elif section_type == "match":
matches.append(cls.Match(
name=name,
account=config_parser.get(section, "account"),
match_any=config_parser.getboolean(section, "match_any", fallback=False),
match=config_parser.getlistlist(section, "match", fallback=[]), # type: ignore
from_mailbox=config_parser.getlist(section, "from_mailbox"), # type: ignore
to_mailbox=config_parser.get(section, "to_mailbox")
))
else:
raise ValueError("Unknown section")
except ValueError as e:
raise ValueError(f"Cannot parse {section_type} '{name}': {str(e)}") from None
return cls(accounts, matches)
@classmethod
def from_ini(cls, filename: str) -> 'Config':
try:
config_parser: configparser.ConfigParser = configparser.ConfigParser(
converters={'list': cls._getlist, 'listlist': cls._getlistlist}
)
with open(filename, "r") as fp:
config_parser.read_file(fp)
except (OSError, configparser.Error, UnicodeDecodeError) as e:
raise RuntimeError(f"Cannot read config '{filename}': {str(e)}") from None
try:
return cls._parse(config_parser)
except ValueError as e:
raise RuntimeError(f"Cannot parse config '{filename}': {str(e)}") from None
def main(config_file: str, dry_run: bool, force: bool, account_name: Optional[str], match_name: Optional[str]) -> bool:
logger: logging.Logger = logging.getLogger(main.__name__)
# parse config
try:
config: Config = Config.from_ini(config_file)
except RuntimeError as e:
logger.error(str(e))
return False
else:
logger.debug(f"Loaded {len(config.matches)} operations on {len(config.accounts)} accounts")
# process defined accounts
success: bool = True
for account in config.accounts:
if account_name is not None and account.name != account_name:
continue
# ask for password if not specified in config
password: Optional[str] = account.password
if password is None:
try:
if account.utf:
password = input(f"Password for '{account.user}' on {account.name}: ")
else:
password = getpass(f"Password for '{account.user}' on {account.name}: ")
except (KeyboardInterrupt, Warning):
pass
except (OSError, UnicodeDecodeError) as e:
logger.error(str(e))
success = False
if password is None:
logger.warning(f"Skipping '{account.name}', no password given")
continue
try:
# connect, login, and set capabilities
logger.debug(f"Connecting to {account.host}:{account.port} as '{account.user}'")
imap: Imap = Imap(host=account.host, port=account.port,
user=account.user, password=password,
utf=account.utf, ro=dry_run)
imap.handshake(check_capabilities=not force)
for match in config.matches:
if match.account != account.name:
continue
if match_name is not None and match.name != match_name:
continue
logger.info(f"Checking {match.name} on {account.name}")
# parse search matches
try:
search: List[ImapSearch]
if match.match_any: # OR
search = [ImapSearch.from_list([match]) for match in match.match]
else: # AND
search = [ImapSearch.from_list(match.match)]
except ValueError as e:
logger.warning(f"Cannot parse match for '{match.name}': {str(e)}")
success = False
continue
# do the search and move
try:
for mailbox in match.from_mailbox:
imap.search_and_move(mailbox, match.to_mailbox, search)
except ValueError as e:
logger.warning(f"{str(imap)}: {str(e)}")
success = False
except RuntimeError as e:
logger.error(f"{str(imap)}: {str(e)}")
success = False
logger.debug(f"Logout at {account.host}:{account.port} as '{account.user}'")
imap.logout()
except RuntimeError as e:
logger.error(str(e))
success = False
return success
def setup_logging(debug: bool) -> None:
logging.raiseExceptions = True
logging.logThreads = False
logging.logMultiprocessing = False
logging.logProcesses = False
logging.config.dictConfig({
'version': 1,
'formatters': {'standard': {
'format': '[%(levelname)s] %(name)s: %(message)s',
}},
'handlers': {'default': {
'formatter': 'standard',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stderr',
}},
'loggers': {'': {
'handlers': ['default'],
'level': 'DEBUG' if debug else 'INFO',
'propagate': False,
}},
})
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__.strip(),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', action='store_const', const=True, default=False,
help='enable debug logging')
parser.add_argument('--dry-run', action='store_const', const=True, default=False,
help='only search, do not actually move (read-only mode)')
parser.add_argument('--force', action='store_const', const=True, default=False,
help='skip feature detection checks for needed capabilities')
parser.add_argument('--config', metavar="CONFIG.INI", type=str, default='./imapsrt.ini',
help='path to configuration file')
parser.add_argument('--limit-account', metavar="NAME", type=str, default=None,
help='only process matches for the given account name')
parser.add_argument('--limit-match', metavar="NAME", type=str, default=None,
help='only process matches with the given name')
arguments = parser.parse_args()
setlocale(LC_ALL, "C") # for strptime/strftime %b
setup_logging(arguments.verbose)
sys.exit(0 if main(arguments.config, arguments.dry_run, arguments.force,
arguments.limit_account, arguments.limit_match) else 1)