Python Forum
Search for duplicated files
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Search for duplicated files
#1
This is a short script I wrote just now to search for duplicated files. Since already I have no room in my disks
I am open to suggestions. I don't like the function to show the file size in human readable format but I was really tired. Don't want to install additional libraries for this only


 #!/usr/bin/env python3


import argparse
from collections import defaultdict
from concurrent import futures
from hashlib import md5
import json
import os.path
import os

parser = argparse.ArgumentParser(description='Print duplicated files')
parser.add_argument('path',
                    type=str,
                    default='.',
                    nargs='?',
                    help='Path to a directory to scan')
parser.add_argument('-d', '--dump',
                    dest='res_file',
                    type=str,
                    default='./duplicated.json',
                    help='''Path/filename to store the results
                    Default: ./duplicated.json''')

args = parser.parse_args()
path = args.PATH
result_file = args.res_file

def human_fsize(size):
    KB, MB, GB = 1024, 1024**2, 1024**3
    if size < KB:
        return f'{size}B'
    elif KB < size < MB:
        return f'{round(size/KB, 3)}KB'
    elif MB < size < GB:
        return f'{round(size/MB, 2)}MB'
    elif size > GB:
        return f'{round(size/GB, 2)}GB'

def file_hash(file_name):
    hasher = md5()

    with open(file_name, 'rb') as in_file:
        while True:
            chunk = in_file.read(65536)
            if chunk:
                hasher.update(chunk)
            else:
                break

    md5sum = hasher.hexdigest()
    return md5sum, file_name

duped = {}
hashed = defaultdict(list)

for root, dirs, files in os.walk(path):
    full_paths = [os.path.join(root, file_) for file_ in files]

    with futures.ProcessPoolExecutor(max_workers=8) as executor:

        for result in executor.map(file_hash, full_paths):
            hashed[result[0]].append(result[1])

            if len(hashed[result[0]]) > 1:
                duped[result[0]] = hashed[result[0]]

yellow, purple, default = ('\033[33m', '\033[35m', '\033[0m')
for key, values in duped.items():
    print(f'md5: {yellow}{key}{default} size: {yellow}{human_fsize(os.stat(values[0]).st_size)}')
    for v in values:
        print(f'    * {purple}{v}{yellow}')
    print(f'{default}')

with open(result_file, 'w', encoding='utf-8') as dump_file:
    json.dump(duped, dump_file, indent=4, ensure_ascii=False)
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#2
New human-readable file size function which is more acceptable for me:

def human_fsize(size):
    pref = [('B', 1), ('KB', 1024),
            ('BM', 1024**2), ('GB', 1024**3), ('TB', 1024**4)]
    counter = 0
    res = size
    while True:
        res = res / 1024
        if res < 1:
            break
        else:
            counter += 1

    if size > 1024:
        h_size = round(size / pref[counter][1], 3)
    else:
        h_size = size

    prefix = pref[counter][0]

    return f'{h_size} {prefix}'
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#3
Well hashing each file is quite stupid Dodgy  so this one is doing it only on files with equal sizes.


#!/usr/bin/env python3
# Find and prints duplicated files based on their md5 sum
#
import argparse
from collections import defaultdict
from concurrent import futures
from hashlib import md5
import json
import os.path
import os

parser = argparse.ArgumentParser(description='Print duplicated files')
parser.add_argument('path',
                    type=str,
                    default='.',
                    nargs='?',
                    help='Path to a directory to scan')
parser.add_argument('-d', '--dump',
                    dest='res_file',
                    type=str,
                    default='./duplicated.json',
                    help='''Path/filename to store the results
                    Default: ./duplicated.json''')

args = parser.parse_args()
path = args.path
result_file = args.res_file


def human_fsize(size):
    """Return file size in human readable format.
       Argument: file's size
       Type: int
    """

    pref = [('B', 1), ('KB', 1024),
            ('MB', 1024**2), ('GB', 1024**3), ('TB', 1024**4)]
    counter = 0
    res = size
    while True:
        res = res / 1024
        if res < 1:
            break
        else:
            counter += 1

    if size > 1024:
        h_size = round(size / pref[counter][1], 3)
    else:
        h_size = size

    prefix = pref[counter][0]
    return f'{h_size} {prefix}'


def file_hash(file_name):
    """Returns a tuple of md5sum and file name.
       Argument: file's name
       Type: str
    """
    hasher = md5()

    with open(file_name, 'rb') as in_file:
        while True:
            chunk = in_file.read(1048576) # 1MB - 1024**2
            if chunk:
                hasher.update(chunk)
            else:
                break

    md5sum = hasher.hexdigest()
    return md5sum, file_name

# Walk through the directories and look for equal file sizes
for_hashing = {}
eq_sized = defaultdict(list)
for root, dirs, files in os.walk(path):
    full_paths = [os.path.join(root, file_) for file_ in files]

    for full_name in full_paths:
        size = os.stat(full_name).st_size
        eq_sized.append(full_name)

        if len(eq_sized) > 1:
            for_hashing = eq_sized

# Hashing the files
duped = {}
hashed = defaultdict(list)
for size, files in for_hashing.items():

    with futures.ProcessPoolExecutor(max_workers=8) as executor:

        for result in executor.map(file_hash, files):
            hashed[result[0]].append(result[1])

            if len(hashed[result[0]]) > 1:
                duped[result[0]] = hashed[result[0]]


# Print the results
lblue, purple, default = ('\033[94m', '\033[35m', '\033[0m')
sizes = 0
duplicates = 0
for key, values in duped.items():
    size = os.stat(values[0]).st_size
    sizes += (len(values) - 1) * size
    duplicates += len(values) - 1
    print(f'md5: {lblue}{key}{default} size: {lblue}{len(values)} {default}* {lblue}{human_fsize(size)}')

    for v in values:
        print(f'    * {purple}{v}{lblue}')

    print(f'{default}')

# Dump the results in a json file
with open(result_file, 'w', encoding='utf-8') as dump_file:
    json.dump(duped, dump_file, indent=4, ensure_ascii=False)

print(f'Dumped as JSON in: {lblue}{result_file}{default}\n')

print(f'Summarize:\n')
print(f'    Files: {lblue}{len(duped)}{default}\n    Duplicates: {lblue}{duplicates}\n')
print(f'{default}Deleting the duplicates will free {human_fsize(sizes)}!')
In line 65 it reads 1M from the file to update the md5sum. Which chunk size is optimal for beter performance?
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#4
I had also good results with a chunk size of 1MiB. I think it's a good value for modern systems.
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#5
Good to know. Programming is not my job  Smile I will try to find additional info later.
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#6
I think a benchmark is a good source: bonnie++
I can't run it, because currently I'm running a VM with a Windows installation :-/
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#7
If I use mmap to map the file to the memory is the system decides how big is the file "chunk" - if not the whole file?
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#8
A similar script I started writing didn't complete until now, thanks for reminding.

import argparse
import os
import hashlib

def find_duplicates(args):
    """Driver function to find duplicate files
    """
    all_files = recursive_search(args.path,recurse_flag=args.recursive)
    same_size = same_size_files(all_files)
    duplicates ={}
    for file_list in same_size.values():
        if len(file_list)>1:
            duplicates.update(same_hash_dict(file_list))

    action(duplicates,oflag=args.output)

def same_size_files(file_list):
    """:param file_list:
       :return: duplicates in format {FileSize:FilePath}
    """
    duplicates = {}
    for path in file_list:
        size = os.stat(path).st_size
        if size in duplicates:
            duplicates[size].append(path)
        else:
            duplicates[size] = [path]
    return duplicates


def recursive_search(directory,recurse_flag =False, all_files=[]):
    """:param directory: Path of Directory to be searched
       :param recurse_flag: if True the subdirectories are searched too
       :param all_files:
       :return: Path string of all files in a directory/subdirectory
    """
    try:
        for entry in os.scandir(directory):
            if entry.is_dir():
                if recurse_flag:
                    all_files + (recursive_search(entry.path,recurse_flag=recurse_flag))
                else:
                    pass
            elif entry.is_file():
                all_files.append(entry.path)
    except PermissionError as e:
        print(e)
    return all_files


def same_hash_dict(file_list):
    """:param file_list:
       :return: duplicates in format {FileHash:FilePath}
    """
    duplicates = {}
    for path in file_list:
        file_hash = hashfile(path)
        if file_hash in duplicates:
            duplicates[file_hash].append(path)
        else:
            duplicates[file_hash] = [path]
    return duplicates


def hashfile(path, blocksize=1048576):
    curr_file = open(path, 'rb')
    hasher = hashlib.md5()
    buf = curr_file.read(blocksize)
    while len(buf) > 0:
        hasher.update(buf)
        buf = curr_file.read(blocksize)
    curr_file.close()
    return hasher.hexdigest()


def action(dup_dict,oflag=False):
    """:param dup_dict: Dictionary of all duplicate file
       :param oflag: if True writes output to a csv file
    """
    results = dup_dict.values()
    if len(results) > 0:
        print('Duplicates Found:')
        print("files with same content:")
        print('\n'+'___'*40)
        for result in results:
            for path in result:
                print('\t\t'+ path)
            print('___'*40)
    else:
        print('No duplicate files found.')
    if oflag:
        import csv
        with open('duplicatefiles.csv', 'w', newline='') as csvfile:
            dupwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            dupwriter.writerow(['FileName','FilePath'])
            for i,result in enumerate(results):
                for path in result:
                    dupwriter.writerow([os.path.basename(path),path])
                dupwriter.writerow([])


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('path', help='Path to the directory to be scanned', type=str)
    parser.add_argument('-o','--output', help='get result in a CSV file',action='store_true')
    parser.add_argument('-r','--recursive', help='to search path recursively',action='store_true')
    args = parser.parse_args()
    import time
    Start_time = time.time()
    find_duplicates(args)
    print("total time: ", time.time()-Start_time)


if __name__ == '__main__':
    main()
Reply
#9
I could do it that way instead. Mine will dig all subfolders without a question. Big Grin
"As they say in Mexico 'dosvidaniya'. That makes two vidaniyas."
https://freedns.afraid.org
Reply
#10
(Oct-12-2017, 03:06 PM)wavic Wrote: I could do it that way instead. Mine will dig all subfolders without a question. Big Grin

Still I think these scripts aren't efficient enough...I tried to run mine on 117gb of data, 45,739 Files(music, pics and some vids), 7,898 Folders...

Lost the patience and terminated the script in between.

There has to be another approach something faster though I tried to use os.scandir() instead of os.walk().
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020