Python Forum
Concurrent futures threading running at same speed as non-threading
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Concurrent futures threading running at same speed as non-threading
#1
I''m doing I/O reads from network devices using netmiko. using a threading context runs the exact same speed as non threading, I'm not sure if it is because I am only testing on 5 devices. I'm running enough commands against the devices, with the time it takes for these I/O operations to execute i would've assumed the threading would've ran faster.

Here is the code without threading:

The sys.argv[1] is a list of classes i'm calling against the devices via the CLI. The Connector.connect_parser(2nd to last line) iterates over a list of ip addresses, after the inlist of outside classes are fed into it.

if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
Here is the main calling program, which applies classes to execute against my ip/device list. I realize there's a bit of redundant code in defining variables more than once , I kept it in there to remind myself I can use the json_loads to override other connection drivers which I will be adding.


class Connect:
    intake_file = open('ip_list.txt', 'r')
    json_data = [json.loads(line) for line in intake_file]
    def __init__(self,):
        # self.protocol = netmiko.ssh_autodetect
        pass



    def connect_parser(self, inlist=[]):
        for data in self.json_data:
            ip = data["ip"]
            # port = data["port"]
            username = data["username"] if data["username"] else ""
            password = data["password"] if data["password"] else ""
            secret = data["secret"] if "secret" in data else False
            device_type = data["device_type"] if data["device_type"] else ""

            if data["header"] == "Netmiko":
                print("The variables being passed:  " + ip, username, password, device_type)
                ConnectHandler = netmiko.ConnectHandler(
                    device_type=device_type,
                    host=ip,
                    username=username,
                    password=password,
                    port=22,
                    secret=data["secret"] if "secret" in data else False
                )
                if ConnectHandler:
                    try:
                        ConnectHandler.enable()
                    except Exception as e:
                        print("Could not connect to {}".format(ip))

               #Hardware(ConnectHandler,ip)
            for i in inlist:
                i(ConnectHandler, ip)
Notice the for i in inlist referes to the sys.argv[1] list



Now here is the if __name__ == __main__
with the threading context
I reloaded the json file under main, and at the bottom executer the main program with both threads and non threads.
Either i am not using threading right, or 5 devices at this point is not showing a difference.
Any thoughts? I'm sorry to keep running to this forum with what may seem like trivial questions. After I get this threading down thie program is getting sent out to employers, and it's been a drag thus far to fix everything starting at the screen for hours on end.

if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    intake_file = open('ip_list.txt', 'r')
    json_data = [json.loads(line) for line in intake_file]
    for data in json_data:
        ip = data["ip"]
        # port = data["port"]
        username = data["username"] if data["username"] else ""
        password = data["password"] if data["password"] else ""
        secret = data["secret"] if "secret" in data else False
        device_type = data["device_type"] if data["device_type"] else ""
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.map(Connector.connect_parser(inlist), data["ip"])
    print(time.perf_counter())
    print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
Reply
#2
here is the latest iteration. No difference in time, but for 36 to 39 seconds in the wrong way seconds. I'm suspecting this has nothing to do with how I'm applying threading, maybe to the amount of commands that I'm sending to the device, even still I'm not sure if sending a lot of commands should keep the thread lock I would imagine it would be the other way around, as Python will be waiting on I/O calls..

class Connect:
    intake_file = open('ip_list.txt', 'r')
    json_data = [json.loads(line) for line in intake_file]
    def __init__(self,):
        # self.protocol = netmiko.ssh_autodetect
        pass



    def connect_parser(self, inlist=[]):

        for data in self.json_data:
            ip = data["ip"]
            # port = data["port"]
            username = data["username"] if data["username"] else ""
            password = data["password"] if data["password"] else ""
            secret = data["secret"] if "secret" in data else False
            device_type = data["device_type"] if data["device_type"] else ""

            if data["header"] == "Netmiko":
                print("The variables being passed:  " + ip, username, password, device_type)
                ConnectHandler = netmiko.ConnectHandler(
                    device_type=device_type,
                    host=ip,
                    username=username,
                    password=password,
                    port=22,
                    secret=data["secret"] if "secret" in data else False
                )
                if ConnectHandler:
                    try:
                        ConnectHandler.enable()
                    except Exception as e:
                        print("Could not connect to {}".format(ip))
                for i in inlist:
                    i(ConnectHandler, ip)
def threading(connect_parser):
    worker_pool = ThreadPoolExecutor(20)
    for i in Connect.json_data:
        worker_pool.submit(connect_parser, i)
        #worker_pool.shutdown(wait=True)
            #Hardware(ConnectHandler,ip)





if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())
    threading(Connector.connect_parser(inlist))
    print(time.perf_counter())
    print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
Reply
#3
One more note:


here's the latest iteration, with no improvement.
The threading is placed right above __name__ == __main__:

class Connect:

    def __init__(self,):
        intake_file = open('ip_list.txt', 'r')
        self.json_data = [json.loads(line) for line in intake_file]
        pass
    def connect_parser(self, inlistx=[]):

        for data in self.json_data:
            ip = data["ip"]
            # port = data["port"]
            username = data["username"] if data["username"] else ""
            password = data["password"] if data["password"] else ""
            secret = data["secret"] if "secret" in data else False
            device_type = data["device_type"] if data["device_type"] else ""

            if data["header"] == "Netmiko":
                print("The variables being passed:  " + ip, username, password, device_type)
                ConnectHandler = netmiko.ConnectHandler(
                    device_type=device_type,
                    host=ip,
                    username=username,
                    password=password,
                    port=22,
                    secret=data["secret"] if "secret" in data else False
                )
                if ConnectHandler:
                    try:
                        ConnectHandler.enable()
                    except Exception as e:
                        print("Could not connect to {}".format(ip))
                for i in inlistx:
                    executor = ThreadPoolExecutor(max_workers=50)
                    executor.submit(i(ConnectHandler, ip))
                    print(executor)

if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())

    print(time.perf_counter())
    # print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
Reply
#4
here is latest iteration, still at about 35 seconds.

class Connect:

    def __init__(self,):
        intake_file = open('ip_list.txt', 'r')
        self.json_data = [json.loads(line) for line in intake_file]
        pass
    def connect_parser(self, inlistx=[]):

        for data in self.json_data:
            ip = data["ip"]
            # port = data["port"]
            username = data["username"] if data["username"] else ""
            password = data["password"] if data["password"] else ""
            secret = data["secret"] if "secret" in data else False
            device_type = data["device_type"] if data["device_type"] else ""

            if data["header"] == "Netmiko":
                print("The variables being passed:  " + ip, username, password, device_type)
                ConnectHandler = netmiko.ConnectHandler(
                    device_type=device_type,
                    host=ip,
                    username=username,
                    password=password,
                    port=22,
                    secret=data["secret"] if "secret" in data else False
                )
                if ConnectHandler:
                    try:
                        ConnectHandler.enable()
                    except Exception as e:
                        print("Could not connect to {}".format(ip))
                for i in inlistx:
                    th = threading.Thread(target=i(ConnectHandler), args=ip)
                    thread_pool.append(th)
                    th.start()
                for i in thread_pool:
                    i.join()




if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())
    # print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
    print(time.perf_counter())
Reply
#5
There are quite a lot of unclear and/or odd things in this code. For example:
1. You never actually execute anything. executor.submit will only schedule the execution and returns Future object, which you just throw away.
2. We don't know what the input file looks like, we don't know what functions is (and thus functions[i]).
3. Then, if we assume inlistx is list of functions, then i in for i in inlistx: is a function. Then when you do executor.submit(i(ConnectHandler, ip)) you actually call that function and pass the return value (whatever that is) to the executor.submit. I expect this would raise an error that the argument is not callable, but does not because you actually never try to execute the Future object and get the result.
4. Note you should not use mutable default arguments, unless you actually want the effect you will get. Don't think this affects you in this case, but worth to mention.
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply
#6
(May-03-2023, 06:57 AM)buran Wrote: There are quite a lot of unclear and/or odd things in this code. For example:
1. You never actually execute anything. executor.submit will only schedule the execution and returns Future object, which you just throw away.
2. We don't know what the input file looks like, we don't know what functions is (and thus functions[i]).
3. Then, if we assume inlistx is list of functions, then i in for i in inlistx: is a function. Then when you do executor.submit(i(ConnectHandler, ip)) you actually call that function and pass the return value (whatever that is) to the executor.submit. I expect this would raise an error that the argument is not callable, but does not because you actually never try to execute the Future object and get the result.
4. Note you should not use mutable default arguments, unless you actually want the effect you will get. Don't think this affects you in this case, but worth to mention.



1) wow didn't even realize that.

2) input file:
{"ip":"10.0.1.2", "username":"cisco", "password":"cisco123", "secret":"python", "device_type": "cisco_ios", "header":"Netmiko"}
{"ip":"10.0.4.5", "username":"cisco", "password":"cisco123", "secret":"python", "device_type": "cisco_ios", "header":"Netmiko"}

functions maps out to different classes. here's the dictionary at the top of the code, which I hadn't uploaded earlier, as it was underneath the import statements:
functions = {'Hardware': Hardware, "OSPF": OSPF}

3) yes listx is a list of functions/ functions basically mapped/call classes.
you actually call that function and pass the return value (whatever that is) to the executor.submit. I expect this would raise an error that the argument is not callable, but does not because you actually never try to execute the Future object and get the result.

the argument ConnectHandler is callable. To the class which the function maps to.
th = threading.Thread(target=i(ConnectHandler), args=ip)

i is the mapped function to the functions dictionary, each function is a class which calls ConnectHandler, and performs operations on the line in json_data, and they save ip as key value pairs to an output log file

So it seems I'm not trying to execute the future object ~ any tips how to do that?
Reply
#7
As the below code shows, I was not correct, regarding 1, that you don't execute anything

Actually, I was not correct, regarding 1, so strike that

from concurrent.futures import ThreadPoolExecutor

def add(a, b):
    print('inside add')
    return a + b

def sub(a, b):
    print('inside sub')
    return a - b


def pow(a, b):
    print('inside pow')
    return a ** b
   
functions = [add, sub, pow]
a=9
b=3

with ThreadPoolExecutor(max_workers=5) as executor:
    print('start submit')
    future = [executor.submit(func, a, b) for func in functions]
    print('end submit, get result')
    for result in concurrent.futures.as_completed(future):
        print(result.result())
Output:
start submit inside add inside sub end submit, get result inside pow 6 12 729
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply
#8
(May-03-2023, 07:31 AM)buran Wrote: As the below code shows, I was not correct, regarding 1, that you don't execute anything

Actually, I was not correct, regarding 1, so strike that

from concurrent.futures import ThreadPoolExecutor

def add(a, b):
    print('inside add')
    return a + b

def sub(a, b):
    print('inside sub')
    return a - b


def pow(a, b):
    print('inside pow')
    return a ** b
   
functions = [add, sub, pow]
a=9
b=3

with ThreadPoolExecutor(max_workers=5) as executor:
    print('start submit')
    future = [executor.submit(func, a, b) for func in functions]
    print('end submit, get result')
    for result in concurrent.futures.as_completed(future):
        print(result.result())
Output:
start submit inside add inside sub end submit, get result inside pow 6 12 729




One thing I noticed is it looks like I'm trying to execute different threads underneath a loop which iterates over the infile list 1 by one, I could just parse the entire file, and then run the threads, but I want to add additional connection drivers in the future outside of Netmiko, and the program will choose the best driver depending on conditions. I'm not sure if this would have an effect. I tried using a context manager underneath __name__ == __main__ and no improvement. This may all be due to this is the fastest it is going to run. I'm running about 15-20 commands on a device for 5 devices and each device takes up about 5-7 seconds so maybe this is what should be expected, and threading may be better for a production environment with many devices?

Corrction, there are 9 commands per device in a given class, which equates to about a second a command. Maybe this is the best I will get (it take about 32 seconds for 6 devices).


This iteration wheich I was getting the result for the executor.map() which I assume means I was running it properly, doesn't show an improvement in time.

        for data in self.json_data:
            ip = data["ip"]
            # port = data["port"]
            username = data["username"] if data["username"] else ""
            password = data["password"] if data["password"] else ""
            secret = data["secret"] if "secret" in data else False
            device_type = data["device_type"] if data["device_type"] else ""

            if data["header"] == "Netmiko":
                print("The variables being passed:  " + ip, username, password, device_type)
                ConnectHandler = netmiko.ConnectHandler(
                    device_type=device_type,
                    host=ip,
                    username=username,
                    password=password,
                    port=22,
                    secret=data["secret"] if "secret" in data else False
                )
                if ConnectHandler:
                    try:
                        ConnectHandler.enable()
                    except Exception as e:
                        print("Could not connect to {}".format(ip))
                for i in inlistx:
                    executor = ThreadPoolExecutor(max_workers=50)
                    res = executor.map(i(ConnectHandler, ip))
                    res.result()
                    executor.shutdown()




if __name__ == "__main__":
    from Connect_Handler import Hardware, OSPF
    from Connect_Handler import Connect
    inlist = sys.argv[1]
    outlist = inlist.split(",")
    result = [i.strip("[]") for i in outlist]
    inlist = []
    for i in result:
        inlist.append(functions[i])
    Connector = Connect()
    print(time.perf_counter())
    # print(time.perf_counter())
    Connector.connect_parser(inlist)
    print(time.perf_counter())
Reply
#9
Still I think you want executor.submit(i, ConnectHandler, ip), not executor.submit(i(ConnectHandler, ip))

using executor.map would expect that you pass iterable as second argument
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply
#10
(May-03-2023, 07:40 AM)buran Wrote: Still I think you want executor.submit(i, ConnectHandler, ip), not executor.submit(i(ConnectHandler, ip))

using executor.map would expect that you pass iterable as second argument

Okay i will try this
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  threading native_id returning same value for all threads billykid999 2 1,020 May-04-2023, 06:40 AM
Last Post: billykid999
  Trouble with threading and reading variable from a different script Lembas 14 3,073 Apr-26-2023, 11:21 PM
Last Post: Lembas
  using threading.Timer for function korenron 1 1,208 Dec-20-2022, 01:09 PM
Last Post: ndc85430
  [Solved]Help with Threading Extra 7 1,861 Sep-05-2022, 05:29 PM
Last Post: Extra
Question Opencv and threading ethernel 4 145,576 Feb-25-2022, 06:06 PM
Last Post: ethernel
  Inconsistent counting / timing with threading rantwhy 1 1,774 Nov-24-2021, 04:04 AM
Last Post: deanhystad
  Class variables and Multiprocessing(or concurrent.futures.ProcessPoolExecutor) Tomli 5 3,903 Nov-12-2021, 09:55 PM
Last Post: snippsat
  Mult-threading and locking file mr_byte31 4 2,606 Oct-16-2021, 01:54 AM
Last Post: Larz60+
  Matplotlib Animation with Threading peterjv26 4 7,235 Oct-08-2021, 05:51 PM
Last Post: peterjv26
  Tutorials on sockets, threading and multi-threading? muzikman 2 2,131 Oct-01-2021, 08:32 PM
Last Post: muzikman

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020