Python Forum
5 threading methods, none sped up the program
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
5 threading methods, none sped up the program
#1
I've used the 5 methods for concurrency or threading and none of them performed the task faster than serial. The only method that had moderate success was the one not listed here which was to just open two terminal windows and run one half of the program and then the other half. I've got a terrabyte of data so it's very important that I figure out how to speed this stuff up. Right now, I'm just looping through some text files and putting the contents into a dictionary and saving it as a json file. All of the methods below worked, but they did not speed up the program. Why?

p = print
def build_dict_json(dir1, fname, start):
    for e, file in en(os.listdir(byu_dir + dir1)[start:start + 1]):
        p (file)
        dict1 = {}
        tfile = byu_dir + dir1 + file
        sfile = file.split(".")[0]
        with open(tfile, 'r+') as f:
            try:
                for e, line in en(f):
                    lst = line[:-1].split("\t")
                    loc = lst[2]
                    word = lst[3]
                    lemma = lst[4]
                    pos = lst[5]
                    dict1[loc] = (word, lemma, pos)
            except:
                p (f'{file} error')

            vgf.save_json("dont_worry_about_this")


def use_simple_thread():
    threads = []
    for i in range(10):
        p (i)
        thread = Thread(target=build_dict_json, args = (str3, "01/", i))
        thread.start()
        threads.append(thread)

    for e, thread in en(threads):
        p (e)
        thread.join()


def use_mult_process():
    from multiprocessing import Process
    for i in range(5):
        q = Process(target=build_dict_json, args=(str3, "010/", i))
        q.start()
        q.join()

def use_pool():
    import multiprocessing
    def some_thread(pool):
        for i in range(5):
            r = pool.apply(build_dict_json, (str3, "010/", i))

    pool = multiprocessing.Pool()
    some_thread(pool)


def use_actor():
    from queue import Queue
    class ActorExit(Exception):
        pass

    class Actor:
        def __init__(self):
            self._mailbox = Queue()

        def send(self, msg):
            self._mailbox.put(msg)

        def recv(self):
            msg = self._mailbox.get()
            if msg is ActorExit:
                raise ActorExit()
            return msg

        def close(self):
            self.send(ActorExit)

        def start(self):
            self._terminated = Event()
            t = Thread(target=self._bootstrap)
            t.daemon = True
            t.start()

        def _bootstrap(self):
            try:
                self.run()
            except ActorExit:
                pass
            finally:
                self._terminated.set()

        def join(self):
            self._terminated.wait()

        def run(self):
            while True:
                msg = self.recv()


    class TaggedActor(Actor):
        def run(self):
            while True:
                tag, *payload = self.recv()
                getattr(self, "do_" + tag)(*payload)

        def do_A(self, i):
            build_dict_json(str3, "010/", i)

    a = TaggedActor()
    a.start()
    for i in range(5):
        p (i)
        a.send(('A', i))
    a.close()
    a.join()


def use_coroutines():
    def countdown(n, o):
        for i in range(n, o):
            p (i)
            build_dict_json(str3, "010/", i)
            yield


    from collections import deque

    class TaskScheduler:
        def __init__(self):
            self._task_queue = deque()

        def new_task(self, task):
            self._task_queue.append(task)

        def run(self):
            while self._task_queue:
                task = self._task_queue.popleft()
                try:
                    next(task)
                    self._task_queue.append(task)
                except StopIteration:
                    pass

    sched = TaskScheduler()
    sched.new_task(countdown(0, 2))
    sched.new_task(countdown(2, 4))
    sched.run()




str3 = "some_path"

use_simple_thread()
use_mult_process()
use_pool()
use_actor()
use_coroutines()
Reply
#2
Did you profile the code to find the bottlenecks?
Reply
#3
It appears that you are reading some of the same files in different threads. You start in different places, but starting with 1, will still read 2, which possibly will conflict with the process starting with 2. Read the directory and split the files up into 5 pieces and try passing each one of the 5 pieces to a different Process to see if this is the problem.
Reply
#4
(Sep-23-2018, 04:01 AM)bobsmith76 Wrote:
def use_mult_process():
    from multiprocessing import Process
    for i in range(5):
        q = Process(target=build_dict_json, args=(str3, "010/", i))
        q.start()
        q.join()
If you join each process immediately after starting (and before starting any others), how would that be different from not using any processes at all? None of them are running at the same time as any others.

Also, I don't understand your use_coroutines() function, since it... doesn't use coroutines lol.

Do you know which parts are slow? The file reading, the writing, the parsing/processing, the conversion to json, etc?
Reply
#5
(Sep-23-2018, 04:23 AM)Gribouillis Wrote: Did you profile the code to find the bottlenecks?

I doubt there would be bottlenecks because all the code does is open a text file and then loop through each line in that text file.

(Sep-24-2018, 07:21 AM)nilamo Wrote: Also, I don't understand your use_coroutines() function, since it... doesn't use coroutines lol.
I just adapted the following code from Beazley's Python Cookbook for my own purposes:

# A very simple example of a coroutine/generator scheduler

# Two simple generator functions
def countdown(n):
    while n > 0:
        print("T-minus", n)
        yield
        n -= 1
    print("Blastoff!")

def countup(n):
    x = 0
    while x < n:
        print("Counting up", x)
        yield
        x += 1

from collections import deque

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        '''
        Admit a newly started task to the scheduler
        '''
        self._task_queue.append(task)

    def run(self):
        '''
        Run until there are no more tasks
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # Run until the next yield statement
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # Generator is no longer executing
                pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

(Sep-24-2018, 01:30 AM)woooee Wrote: It appears that you are reading some of the same files in different threads. You start in different places, but starting with 1, will still read 2, which possibly will conflict with the process starting with 2. Read the directory and split the files up into 5 pieces and try passing each one of the 5 pieces to a different Process to see if this is the problem.

Could you show a simplified example because I don't understand exactly what you're referring to?
Reply
#6
Something like this. It is very late and I am very tired so you will have to correct any typos or other errors

def use_mult_process():
        from multiprocessing import Process

        ## byu_dir has not been declares
        files_list=os.listdir(os.path.join(str3, "010/"))
        process_list=[]
        for f in file_list:
            full_path=os.path.join(str3, "010/", f)
            if os.path.isfile(full_path):  ## not a dir
                ## send one file only to the function, and
                ## it is a file and not a dir
                q = Process(target=build_dict_json, args=(full_path,))
                q.start()
                process_list.append(q)

        for q in process_list:
            q.join()
Reply
#7
(Sep-25-2018, 04:46 AM)woooee Wrote: Something like this. It is very late and I am very tired so you will have to correct any typos or other errors

def use_mult_process():
        from multiprocessing import Process

        ## byu_dir has not been declares
        files_list=os.listdir(os.path.join(str3, "010/"))
        process_list=[]
        for f in file_list:
            full_path=os.path.join(str3, "010/", f)
            if os.path.isfile(full_path):  ## not a dir
                ## send one file only to the function, and
                ## it is a file and not a dir
                q = Process(target=build_dict_json, args=(full_path,))
                q.start()
                process_list.append(q)

        for q in process_list:
            q.join()

Thanks but that only sped things up from 15 seconds to 12.
Reply
#8
So it is the file read that is most likely causing the hang-up. How big are these files? Take Gribouillis' advice and profile it to see https://docs.python.org/3/library/profile.html
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Concurrent futures threading running at same speed as non-threading billykid999 13 1,716 May-03-2023, 08:22 AM
Last Post: billykid999
  Tutorials on sockets, threading and multi-threading? muzikman 2 2,076 Oct-01-2021, 08:32 PM
Last Post: muzikman

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020