Python Forum

Full Version: ThreadPoolExecutor read file to list
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi,

I'm trying to create a multithreaded file read. Which has to read multiple files to different lists which further in the code will have to be handled. But I'm not able to get this code running without having a traceback call.
I also tried to call different def functions, but that's not working also. The function keeps the thread locked before continuing to the next.

( If this is a newby question I sorry, that's what I am. Blush )

Any hints on how to achieve this?

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(ArtLev = list(open(ArtLev_filename, encoding='ANSI')))
    e.submit(ArtPrijs = list(open(ArtPrijs_filename, encoding='ANSI')))
Error:
Traceback (most recent call last): File "C:/Users/danie/OneDrive_MLM/OneDrive/Daniel/PyScripts/MultiThread/MultiProcess_FileMatch_test1.py", line 127, in <module> e.submit(ArtLev = list(open(ArtLev_filename, encoding='ANSI'))) TypeError: submit() missing 1 required positional argument: 'fn'
https://docs.python.org/3/library/concur...tor.submit Wrote:class concurrent.futures.Executor
An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.

submit(fn, *args, **kwargs)
Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

The first argument passed to the method submit should be a callable function.
The Python Documentation shows a good example: threadpoolexecutor-example
If you use this as example and replace the load_url function with read_file it could work like this:

import time
from random import randint, choice
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed,
    )


def computation(desc):
    print('Slow task', desc)
    pause = randint(1,10)
    time.sleep(pause)
    return desc


def read_file(file):
    # simulation of big file
    computation('reading ' + file)
    if choice([True, False]):
        exc = choice([ZeroDivisionError('Not really'), ValueError('Wrong input'), TypeError('Wrong operation')])
        raise exc
    with open(file) as fd:
        return fd.read()

############
files = ('Chesterton.txt', 'Alice.txt', 'Chesterton.txt', 'Alice.txt', 'Chesterton.txt', 'Alice.txt')
############

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its file
    future_to_text = {executor.submit(read_file, file): file for file in files} # <- this is a dict
    # alternative
    # future_to_text = {}
    # for file in files:
    #     future = executor.submit(read_file, file)
    #     future_to_text[future] = file
    #
    # they keys are the futures, the values are the filenames
    for future in as_completed(future_to_text):
        # iterating over a dict directly, returns the key, in this case the futures
        # as_completed returns the future, as it's as_completed, it may out of order
        file = future_to_text[future]
        # text is the future
        try:
            data = future.result()
        except Exception as exc:
            print(f'{file!r} generated an exception: {exc!r}')
        else:
            print(f'{file!r} is {len(data)} chars long')
(Jun-10-2019, 08:35 PM)Yoriz Wrote: [ -> ]The first argument passed to the method submit should be a callable function.
Thanks for the replies.

The solution from Yoriz made the script working and bring back the results. However it does not multithread this way. The lines are executed in sequence.
I guess the function is the key here. Should this be a multiprocess or lies the solution in a piece of the code from Dead_EyE?

with ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its file
    future_to_text = {executor.submit(read_file, file): file for file in files} # <- this is a dict

# Could the solution be in getting the files in a dict and then submit them like above? Would they be multithreaded then?
What am I missing here?

My current half working code:
def _ListyReader1(xfile):
    ArtLev = list(open(xfile, encoding='ANSI'))
    print('len(reader): ', len(ArtLev))
    return ArtLev

with ThreadPoolExecutor(max_workers=4) as e:
    a = e.submit(_ListyReader1, ArtLev_filename)
    print(len(a.result()))
    ArtLev = a.result()

    b = e.submit(_ListyReader1, ArtPrijs_filename)
    print(len(b.result()))
    ArtPrijs = b.result()



end = time.time()
print('Inlezen klaar in: ', end - start)
print(len(ArtLev))
print(len(ArtPrijs))
Results are:
len(reader): 1281115
1281115
len(reader): 1281115
1281115
Inlezen klaar in: 17.544724225997925
1281115
1281115