123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- #! /usr/bin/env python3
- #
- # imapdedup.py
- #
- # Looks for duplicate messages in a set of IMAP mailboxes and removes all but the first.
- # Comparison is normally based on the Message-ID header.
- #
- # Default behaviour is purely to mark the duplicates as deleted. Some mail clients
- # will allow you to view these and undelete them if you change your mind.
- #
- # Copyright (c) 2013-2020 Quentin Stafford-Fraser.
- # All rights reserved, subject to the following:
- #
- #
- # This is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This software is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this software; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
- # USA.
- #
- import getpass
- import hashlib
- import imaplib
- import os
- import optparse
- import re
- import socket
- import sys
- from typing import List, Dict, Tuple, Optional, Union, Type, Any
- from email.parser import BytesParser
- from email.message import Message
- from email.errors import HeaderParseError
- from email.header import decode_header
- # Increase the rather small limit on result line-length
- # imposed in certain imaplib versions.
- # imaplib._MAXLINE = max(2000000, imaplib._MAXLINE)
- class ImapDedupException(Exception):
- pass
- # IMAP responses should normally begin 'OK' - we strip that off
- def check_response(resp: Tuple[str, List[bytes]]):
- status, value = resp
- if status != "OK":
- raise ImapDedupException("Got response: %s from server" % str(value))
- return value
- def get_arguments(args: List[str]) -> Tuple[optparse.Values, List[str]]:
- # Get arguments and create link to server
- parser = optparse.OptionParser(usage="%prog [options] <mailboxname> [<mailboxname> ...]")
- parser.add_option(
- "-P", "--process", dest="process", help="IMAP process to access mailboxes"
- )
- parser.add_option("-s", "--server", dest="server", help="IMAP server")
- parser.add_option("-p", "--port", dest="port", help="IMAP server port", type="int")
- parser.add_option("-x", "--ssl", dest="ssl", action="store_true", help="Use SSL")
- parser.add_option("-X", "--starttls", dest="starttls", action="store_true", help="Require STARTTLS")
- parser.add_option("-u", "--user", dest="user", help="IMAP user name")
- parser.add_option(
- "-w",
- "--password",
- dest="password",
- help="IMAP password (Will prompt if not specified)",
- )
- parser.add_option(
- "-v", "--verbose", dest="verbose", action="store_true", help="Verbose mode"
- )
- parser.add_option(
- "-n",
- "--dry-run",
- dest="dry_run",
- action="store_true",
- help="Don't actually do anything, just report what would be done",
- )
- parser.add_option(
- "-c",
- "--checksum",
- dest="use_checksum",
- action="store_true",
- help="Use a checksum of several mail headers, instead of the Message-ID",
- )
- parser.add_option(
- "-m",
- "--checksum-with-id",
- dest="use_id_in_checksum",
- action="store_true",
- help="Include the Message-ID (if any) in the -c checksum.",
- )
- parser.add_option(
- "",
- "--no-close",
- dest="no_close",
- action="store_true",
- help='Do not "close" mailbox when done. Some servers will purge deleted messages on a close command.',
- )
- parser.add_option(
- "-l",
- "--list",
- dest="just_list",
- action="store_true",
- help="Just list mailboxes",
- )
- parser.set_defaults(
- verbose=False, ssl=False, dry_run=False, no_close=False, just_list=False
- )
- (options, mboxes) = parser.parse_args(args)
- if ((not options.server) or (not options.user)) and not options.process:
- sys.stderr.write(
- "\nError: Must specify server, user, and at least one mailbox.\n\n"
- )
- parser.print_help()
- sys.exit(1)
- if not options.password and not options.process:
- # Read from IMAPDEDUP_PASSWORD env variable, or prompt for one.
- options.password = os.getenv("IMAPDEDUP_PASSWORD") or getpass.getpass()
- if options.use_id_in_checksum and not options.use_checksum:
- sys.stderr.write("\nError: If you use -m you must also use -c.\n")
- sys.exit(1)
- return (options, mboxes)
- # Thanks to http://www.doughellmann.com/PyMOTW/imaplib/
- list_response_pattern = re.compile(
- rb'\((?P<flags>.*?)\) "(?P<delimiter>.*)" (?P<name>.*)'
- )
- def parse_list_response(line: bytes):
- m = list_response_pattern.match(line)
- if m is None:
- sys.stderr.write("\nError: parsing list response '{}'".format(str(line)))
- sys.exit(1)
- flags, delimiter, mailbox_name = m.groups()
- mailbox_name = mailbox_name.strip(b'"')
- return (flags, delimiter, mailbox_name)
- def str_header(parsed_message: Message, name: str) -> str:
- """"
- Return the value (of the first instance, if more than one) of
- the given header, as a unicode string.
- """
- hdrlist = decode_header(parsed_message.get(name, ""))
- btext, charset = hdrlist[0]
- if isinstance(btext, str):
- text = btext
- else:
- text = btext.decode("utf-8", "ignore")
- return text
- def get_message_id(
- parsed_message: Message, options_use_checksum=False, options_use_id_in_checksum=False
- ) -> Optional[str]:
- """
- Normally, return the Message-ID header (or print a warning if it doesn't
- exist and return None).
- If options_use_checksum is specified, use md5 hash of several headers
- instead.
- For more safety, user should first do a dry run, reviewing them before
- deletion. Problems are extremely unlikely, but md5 is not collision-free.
- If options_use_id_in_checksum is specified, then the Message-ID will be
- included in the header checksum, otherwise it is excluded.
- """
- try:
- if options_use_checksum:
- md5 = hashlib.md5()
- md5.update(("From:" + str_header(parsed_message, "From")).encode())
- md5.update(("To:" + str_header(parsed_message, "To")).encode())
- md5.update(("Subject:" + str_header(parsed_message, "Subject")).encode())
- md5.update(("Date:" + str_header(parsed_message, "Date")).encode())
- md5.update(("Cc:" + str_header(parsed_message, "Cc")).encode())
- md5.update(("Bcc:" + str_header(parsed_message, "Bcc")).encode())
- if options_use_id_in_checksum:
- md5.update(("Message-ID:" + str_header(parsed_message, "Message-ID")).encode())
- msg_id = md5.hexdigest()
- # print(msg_id)
- else:
- msg_id = str_header(parsed_message, "Message-ID")
- if not msg_id:
- print(
- (
- "Message '%s' dated '%s' has no Message-ID header."
- % (
- str_header(parsed_message, "Subject"),
- str_header(parsed_message, "Date"),
- )
- )
- )
- print("You might want to use the -c option.")
- return None
- return msg_id
- except (ValueError, HeaderParseError):
- print(
- "WARNING: There was an exception trying to parse the headers of this message."
- )
- print("It may be corrupt, and you might consider deleting it.")
- print(
- (
- "Subject: %s\nFrom: %s\nDate: %s\n"
- % (
- parsed_message["Subject"],
- parsed_message["From"],
- parsed_message["Date"],
- )
- )
- )
- print("Message skipped.")
- return None
- def get_mailbox_list(server: imaplib.IMAP4) -> List[str]:
- """
- Return a list of usable mailbox names
- """
- resp = []
- for mb in check_response(server.list()):
- bits = parse_list_response(mb)
- if rb"\\Noselect" not in bits[0]:
- resp.append(bits[2].decode())
- return resp
- def get_deleted_msgnums(server: imaplib.IMAP4) -> List[int]:
- """
- Return a list of ids of deleted messages in the folder.
- """
- resp = []
- deleted_info = check_response(server.search(None, "DELETED"))
- if deleted_info:
- # If neither None nor empty, then
- # the first item should be a list of msg ids
- resp = [int(n) for n in deleted_info[0].split()]
- return resp
- def get_undeleted_msgnums(server: imaplib.IMAP4) -> List[int]:
- """
- Return a list of ids of non-deleted messages in the folder.
- """
- resp = []
- undeleted_info = check_response(server.search(None, "UNDELETED"))
- if undeleted_info:
- # If neither None nor empty, then
- # the first item should be a list of msg ids
- resp = [int(n) for n in undeleted_info[0].split()]
- return resp
- def mark_messages_deleted(server: imaplib.IMAP4, msgs_to_delete: List[int]):
- message_ids = ",".join(map(str, msgs_to_delete))
- check_response(
- server.store(message_ids, "+FLAGS", r"(\Deleted)")
- )
- def get_msg_headers(server: imaplib.IMAP4, msg_ids: List[int]) -> List[Tuple[int, bytes]]:
- """
- Get the dict of headers for each message in the list of provided IDs.
- Return a list of tuples: [ (msgid, header_bytes), (msgid, header_bytes)... ]
- The returned header_bytes can be parsed by
- """
- # Get the header info for each message
- message_ids_str = ",".join(map(str, msg_ids))
- ms = check_response(server.fetch(message_ids_str, "(RFC822.HEADER)"))
- # There are two lines per message in the response
- resp: List[Tuple[int, bytes]] = []
- for ci in range(0, len(ms) // 2):
- mnum = int(msg_ids[ci])
- _, hinfo = ms[ci * 2]
- resp.append((mnum, hinfo))
- return resp
- def print_message_info(parsed_message: Message):
- print("From: " + str_header(parsed_message, "From"))
- print("To: " + str_header(parsed_message, "To"))
- print("Cc: " + str_header(parsed_message, "Cc"))
- print("Bcc: " + str_header(parsed_message, "Bcc"))
- print("Subject: " + str_header(parsed_message, "Subject"))
- print("Date: " + str_header(parsed_message, "Date"))
- print("")
- # This actually does the work
- def process(options, mboxes: List[str]):
- serverclass: Type[Any]
- if options.process:
- serverclass = imaplib.IMAP4_stream
- elif options.ssl:
- serverclass = imaplib.IMAP4_SSL
- else:
- serverclass = imaplib.IMAP4
- try:
- if options.process:
- server = serverclass(options.process)
- elif options.port:
- server = serverclass(options.server, options.port)
- else:
- # Use the default, which will be different depending on SSL choice
- server = serverclass(options.server)
- except socket.error as e:
- sys.stderr.write(
- "\nFailed to connect to server. Might be host, port or SSL settings?\n"
- )
- sys.stderr.write("%s\n\n" % e)
- sys.exit(1)
- if ("STARTTLS" in server.capabilities) and hasattr(server, "starttls"):
- server.starttls()
- elif options.starttls:
- sys.stderr.write("\nError: Server did not offer TLS\n")
- sys.exit(1)
- elif not options.ssl:
- sys.stderr.write("\nWarning: Unencrypted connection\n")
- try:
- if not options.process:
- server.login(options.user, options.password)
- except:
- sys.stderr.write("\nError: Login failed\n")
- sys.exit(1)
- # List mailboxes option
- # Just do that and then exit
- if options.just_list:
- for mb in get_mailbox_list(server):
- print(mb)
- return
- if len(mboxes) == 0:
- sys.stderr.write("\nError: Must specify mailbox\n")
- sys.exit(1)
- # OK - let's get started.
- # Iterate through a set of named mailboxes and delete the later messages discovered.
- try:
- parser = BytesParser() # can be the same for all mailboxes
- # Create a list of previously seen message IDs, in any mailbox
- msg_ids: Dict[str, str] = {}
- for mbox in mboxes:
- msgs_to_delete = [] # should be reset for each mbox
- msg_map = {} # should be reset for each mbox
- # Make sure mailbox name is surrounded by quotes if it contains a space
- if " " in mbox and (mbox[0] != '"' or mbox[-1] != '"'):
- mbox = '"' + mbox + '"'
- # Select the mailbox
- msgs = check_response(server.select(mailbox=mbox, readonly=options.dry_run))[0]
- print("There are %d messages in %s." % (int(msgs), mbox))
- # Check how many messages are already marked 'deleted'...
- numdeleted = len(get_deleted_msgnums(server))
- print(
- "%s message(s) currently marked as deleted in %s"
- % (numdeleted or "No", mbox)
- )
- # Now get a list of the ones that aren't deleted.
- # That's what we'll actually use.
- msgnums = get_undeleted_msgnums(server)
- print("%s others in %s" % (len(msgnums), mbox))
- chunkSize = 100
- if options.verbose:
- print("Reading the others... (in batches of %d)" % chunkSize)
- for i in range(0, len(msgnums), chunkSize):
- if options.verbose:
- print("Batch starting at item %d" % i)
- # and parse them.
- for mnum, hinfo in get_msg_headers(server, msgnums[i: i + chunkSize]):
- # Parse the header info into a Message object
- mp = parser.parsebytes(hinfo)
- if options.verbose:
- print("Checking %s message %s" % (mbox, mnum))
- # Store message only when verbose is enabled (to print it later on)
- msg_map[mnum] = mp
- # Record the message-ID header (or generate one from other headers)
- msg_id = get_message_id(
- mp, options.use_checksum, options.use_id_in_checksum
- )
- if msg_id:
- # If we've seen this message before, record it as one to be
- # deleted in this mailbox.
- if msg_id in msg_ids:
- print(
- "Message %s_%s is a duplicate of %s and %s be marked as deleted"
- % (
- mbox, mnum, msg_ids[msg_id],
- options.dry_run and "would" or "will",
- )
- )
- if options.verbose:
- print(
- "Subject: %s\nFrom: %s\nDate: %s\n"
- % (mp["Subject"], mp["From"], mp["Date"])
- )
- msgs_to_delete.append(mnum)
- # Otherwise just record the fact that we've seen it
- else:
- msg_ids[msg_id] = f"{mbox}_{mnum}"
- print(
- (
- "%s message(s) in %s processed"
- % (min(len(msgnums), i + chunkSize), mbox)
- )
- )
- # OK - we've been through this mailbox, and msgs_to_delete holds
- # a list of the duplicates we've found.
- if len(msgs_to_delete) == 0:
- print("No duplicates were found in %s" % mbox)
- else:
- if options.verbose:
- print("These are the duplicate messages: ")
- for mnum in msgs_to_delete:
- print_message_info(msg_map[mnum])
- if options.dry_run:
- print(
- "If you had NOT selected the 'dry-run' option,\n"
- " %i messages would now be marked as 'deleted'."
- % (len(msgs_to_delete))
- )
- else:
-
- print("Marking %i messages as deleted..." % (len(msgs_to_delete)))
- # Deleting messages one at a time can be slow if there are many,
- # so we batch them up.
- chunkSize = 30
- if options.verbose:
- print("(in batches of %d)" % chunkSize)
- for i in range(0, len(msgs_to_delete), chunkSize):
- mark_messages_deleted(server, msgs_to_delete[i: i + chunkSize])
- if options.verbose:
- print("Batch starting at item %d marked." % i)
- print("Confirming new numbers...")
- numdeleted = len(get_deleted_msgnums(server))
- numundel = len(get_undeleted_msgnums(server))
- print(
- "There are now %s messages marked as deleted and %s others in %s."
- % (numdeleted, numundel, mbox)
- )
- if not options.no_close:
- server.close()
- except ImapDedupException as e:
- print("Error:", e, file=sys.stderr)
- finally:
- server.logout()
- def main(args: List[str]):
- options, mboxes = get_arguments(args)
- process(options, mboxes)
- if __name__ == "__main__":
- main(sys.argv[1:])
|