Sep-23-2018, 04:01 AM
I've used the 5 methods for concurrency or threading and none of them performed the task faster than serial. The only method that had moderate success was the one not listed here which was to just open two terminal windows and run one half of the program and then the other half. I've got a terrabyte of data so it's very important that I figure out how to speed this stuff up. Right now, I'm just looping through some text files and putting the contents into a dictionary and saving it as a json file. All of the methods below worked, but they did not speed up the program. Why?
p = print def build_dict_json(dir1, fname, start): for e, file in en(os.listdir(byu_dir + dir1)[start:start + 1]): p (file) dict1 = {} tfile = byu_dir + dir1 + file sfile = file.split(".")[0] with open(tfile, 'r+') as f: try: for e, line in en(f): lst = line[:-1].split("\t") loc = lst[2] word = lst[3] lemma = lst[4] pos = lst[5] dict1[loc] = (word, lemma, pos) except: p (f'{file} error') vgf.save_json("dont_worry_about_this") def use_simple_thread(): threads = [] for i in range(10): p (i) thread = Thread(target=build_dict_json, args = (str3, "01/", i)) thread.start() threads.append(thread) for e, thread in en(threads): p (e) thread.join() def use_mult_process(): from multiprocessing import Process for i in range(5): q = Process(target=build_dict_json, args=(str3, "010/", i)) q.start() q.join() def use_pool(): import multiprocessing def some_thread(pool): for i in range(5): r = pool.apply(build_dict_json, (str3, "010/", i)) pool = multiprocessing.Pool() some_thread(pool) def use_actor(): from queue import Queue class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): self._mailbox.put(msg) def recv(self): msg = self._mailbox.get() if msg is ActorExit: raise ActorExit() return msg def close(self): self.send(ActorExit) def start(self): self._terminated = Event() t = Thread(target=self._bootstrap) t.daemon = True t.start() def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): while True: msg = self.recv() class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self, "do_" + tag)(*payload) def do_A(self, i): build_dict_json(str3, "010/", i) a = TaggedActor() a.start() for i in range(5): p (i) a.send(('A', i)) a.close() a.join() def use_coroutines(): def countdown(n, o): for i in range(n, o): p (i) build_dict_json(str3, "010/", i) yield from collections import deque class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): self._task_queue.append(task) def run(self): while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: pass sched = TaskScheduler() sched.new_task(countdown(0, 2)) sched.new_task(countdown(2, 4)) sched.run() str3 = "some_path" use_simple_thread() use_mult_process() use_pool() use_actor() use_coroutines()