Posts: 2,953
Threads: 48
Joined: Sep 2016
Oct-02-2017, 01:28 AM
(This post was last modified: Oct-02-2017, 01:28 AM by wavic.)
This is a short script I wrote just now to search for duplicated files. Since already I have no room in my disks
I am open to suggestions. I don't like the function to show the file size in human readable format but I was really tired. Don't want to install additional libraries for this only
#!/usr/bin/env python3
import argparse
from collections import defaultdict
from concurrent import futures
from hashlib import md5
import json
import os.path
import os
parser = argparse.ArgumentParser(description='Print duplicated files')
parser.add_argument('path',
type=str,
default='.',
nargs='?',
help='Path to a directory to scan')
parser.add_argument('-d', '--dump',
dest='res_file',
type=str,
default='./duplicated.json',
help='''Path/filename to store the results
Default: ./duplicated.json''')
args = parser.parse_args()
path = args.PATH
result_file = args.res_file
def human_fsize(size):
KB, MB, GB = 1024, 1024**2, 1024**3
if size < KB:
return f'{size}B'
elif KB < size < MB:
return f'{round(size/KB, 3)}KB'
elif MB < size < GB:
return f'{round(size/MB, 2)}MB'
elif size > GB:
return f'{round(size/GB, 2)}GB'
def file_hash(file_name):
hasher = md5()
with open(file_name, 'rb') as in_file:
while True:
chunk = in_file.read(65536)
if chunk:
hasher.update(chunk)
else:
break
md5sum = hasher.hexdigest()
return md5sum, file_name
duped = {}
hashed = defaultdict(list)
for root, dirs, files in os.walk(path):
full_paths = [os.path.join(root, file_) for file_ in files]
with futures.ProcessPoolExecutor(max_workers=8) as executor:
for result in executor.map(file_hash, full_paths):
hashed[result[0]].append(result[1])
if len(hashed[result[0]]) > 1:
duped[result[0]] = hashed[result[0]]
yellow, purple, default = ('\033[33m', '\033[35m', '\033[0m')
for key, values in duped.items():
print(f'md5: {yellow}{key}{default} size: {yellow}{human_fsize(os.stat(values[0]).st_size)}')
for v in values:
print(f' * {purple}{v}{yellow}')
print(f'{default}')
with open(result_file, 'w', encoding='utf-8') as dump_file:
json.dump(duped, dump_file, indent=4, ensure_ascii=False)
Posts: 2,953
Threads: 48
Joined: Sep 2016
New human-readable file size function which is more acceptable for me:
def human_fsize(size):
pref = [('B', 1), ('KB', 1024),
('BM', 1024**2), ('GB', 1024**3), ('TB', 1024**4)]
counter = 0
res = size
while True:
res = res / 1024
if res < 1:
break
else:
counter += 1
if size > 1024:
h_size = round(size / pref[counter][1], 3)
else:
h_size = size
prefix = pref[counter][0]
return f'{h_size} {prefix}'
Posts: 2,953
Threads: 48
Joined: Sep 2016
Oct-04-2017, 07:58 AM
(This post was last modified: Oct-04-2017, 07:58 AM by wavic.)
Well hashing each file is quite stupid  so this one is doing it only on files with equal sizes.
#!/usr/bin/env python3
# Find and prints duplicated files based on their md5 sum
#
import argparse
from collections import defaultdict
from concurrent import futures
from hashlib import md5
import json
import os.path
import os
parser = argparse.ArgumentParser(description='Print duplicated files')
parser.add_argument('path',
type=str,
default='.',
nargs='?',
help='Path to a directory to scan')
parser.add_argument('-d', '--dump',
dest='res_file',
type=str,
default='./duplicated.json',
help='''Path/filename to store the results
Default: ./duplicated.json''')
args = parser.parse_args()
path = args.path
result_file = args.res_file
def human_fsize(size):
"""Return file size in human readable format.
Argument: file's size
Type: int
"""
pref = [('B', 1), ('KB', 1024),
('MB', 1024**2), ('GB', 1024**3), ('TB', 1024**4)]
counter = 0
res = size
while True:
res = res / 1024
if res < 1:
break
else:
counter += 1
if size > 1024:
h_size = round(size / pref[counter][1], 3)
else:
h_size = size
prefix = pref[counter][0]
return f'{h_size} {prefix}'
def file_hash(file_name):
"""Returns a tuple of md5sum and file name.
Argument: file's name
Type: str
"""
hasher = md5()
with open(file_name, 'rb') as in_file:
while True:
chunk = in_file.read(1048576) # 1MB - 1024**2
if chunk:
hasher.update(chunk)
else:
break
md5sum = hasher.hexdigest()
return md5sum, file_name
# Walk through the directories and look for equal file sizes
for_hashing = {}
eq_sized = defaultdict(list)
for root, dirs, files in os.walk(path):
full_paths = [os.path.join(root, file_) for file_ in files]
for full_name in full_paths:
size = os.stat(full_name).st_size
eq_sized.append(full_name)
if len(eq_sized) > 1:
for_hashing = eq_sized
# Hashing the files
duped = {}
hashed = defaultdict(list)
for size, files in for_hashing.items():
with futures.ProcessPoolExecutor(max_workers=8) as executor:
for result in executor.map(file_hash, files):
hashed[result[0]].append(result[1])
if len(hashed[result[0]]) > 1:
duped[result[0]] = hashed[result[0]]
# Print the results
lblue, purple, default = ('\033[94m', '\033[35m', '\033[0m')
sizes = 0
duplicates = 0
for key, values in duped.items():
size = os.stat(values[0]).st_size
sizes += (len(values) - 1) * size
duplicates += len(values) - 1
print(f'md5: {lblue}{key}{default} size: {lblue}{len(values)} {default}* {lblue}{human_fsize(size)}')
for v in values:
print(f' * {purple}{v}{lblue}')
print(f'{default}')
# Dump the results in a json file
with open(result_file, 'w', encoding='utf-8') as dump_file:
json.dump(duped, dump_file, indent=4, ensure_ascii=False)
print(f'Dumped as JSON in: {lblue}{result_file}{default}\n')
print(f'Summarize:\n')
print(f' Files: {lblue}{len(duped)}{default}\n Duplicates: {lblue}{duplicates}\n')
print(f'{default}Deleting the duplicates will free {human_fsize(sizes)}!') In line 65 it reads 1M from the file to update the md5sum. Which chunk size is optimal for beter performance?
Posts: 2,128
Threads: 11
Joined: May 2017
I had also good results with a chunk size of 1MiB. I think it's a good value for modern systems.
Posts: 2,953
Threads: 48
Joined: Sep 2016
Good to know. Programming is not my job  I will try to find additional info later.
Posts: 2,128
Threads: 11
Joined: May 2017
I think a benchmark is a good source: bonnie++
I can't run it, because currently I'm running a VM with a Windows installation :-/
Posts: 2,953
Threads: 48
Joined: Sep 2016
If I use mmap to map the file to the memory is the system decides how big is the file "chunk" - if not the whole file?
Posts: 101
Threads: 7
Joined: Aug 2017
A similar script I started writing didn't complete until now, thanks for reminding.
import argparse
import os
import hashlib
def find_duplicates(args):
"""Driver function to find duplicate files
"""
all_files = recursive_search(args.path,recurse_flag=args.recursive)
same_size = same_size_files(all_files)
duplicates ={}
for file_list in same_size.values():
if len(file_list)>1:
duplicates.update(same_hash_dict(file_list))
action(duplicates,oflag=args.output)
def same_size_files(file_list):
""":param file_list:
:return: duplicates in format {FileSize:FilePath}
"""
duplicates = {}
for path in file_list:
size = os.stat(path).st_size
if size in duplicates:
duplicates[size].append(path)
else:
duplicates[size] = [path]
return duplicates
def recursive_search(directory,recurse_flag =False, all_files=[]):
""":param directory: Path of Directory to be searched
:param recurse_flag: if True the subdirectories are searched too
:param all_files:
:return: Path string of all files in a directory/subdirectory
"""
try:
for entry in os.scandir(directory):
if entry.is_dir():
if recurse_flag:
all_files + (recursive_search(entry.path,recurse_flag=recurse_flag))
else:
pass
elif entry.is_file():
all_files.append(entry.path)
except PermissionError as e:
print(e)
return all_files
def same_hash_dict(file_list):
""":param file_list:
:return: duplicates in format {FileHash:FilePath}
"""
duplicates = {}
for path in file_list:
file_hash = hashfile(path)
if file_hash in duplicates:
duplicates[file_hash].append(path)
else:
duplicates[file_hash] = [path]
return duplicates
def hashfile(path, blocksize=1048576):
curr_file = open(path, 'rb')
hasher = hashlib.md5()
buf = curr_file.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = curr_file.read(blocksize)
curr_file.close()
return hasher.hexdigest()
def action(dup_dict,oflag=False):
""":param dup_dict: Dictionary of all duplicate file
:param oflag: if True writes output to a csv file
"""
results = dup_dict.values()
if len(results) > 0:
print('Duplicates Found:')
print("files with same content:")
print('\n'+'___'*40)
for result in results:
for path in result:
print('\t\t'+ path)
print('___'*40)
else:
print('No duplicate files found.')
if oflag:
import csv
with open('duplicatefiles.csv', 'w', newline='') as csvfile:
dupwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
dupwriter.writerow(['FileName','FilePath'])
for i,result in enumerate(results):
for path in result:
dupwriter.writerow([os.path.basename(path),path])
dupwriter.writerow([])
def main():
parser = argparse.ArgumentParser()
parser.add_argument('path', help='Path to the directory to be scanned', type=str)
parser.add_argument('-o','--output', help='get result in a CSV file',action='store_true')
parser.add_argument('-r','--recursive', help='to search path recursively',action='store_true')
args = parser.parse_args()
import time
Start_time = time.time()
find_duplicates(args)
print("total time: ", time.time()-Start_time)
if __name__ == '__main__':
main()
Posts: 2,953
Threads: 48
Joined: Sep 2016
I could do it that way instead. Mine will dig all subfolders without a question.
Posts: 101
Threads: 7
Joined: Aug 2017
(Oct-12-2017, 03:06 PM)wavic Wrote: I could do it that way instead. Mine will dig all subfolders without a question. 
Still I think these scripts aren't efficient enough...I tried to run mine on 117gb of data, 45,739 Files(music, pics and some vids), 7,898 Folders...
Lost the patience and terminated the script in between.
There has to be another approach something faster though I tried to use os.scandir() instead of os.walk().
|