I have a main class which creates an instance from a list of plugins. In the example below, the 'dhcpv6' plugin is launched and it listens for subscriptions on the 'subject_test_subscribe" subject. As soon as the handler intercepts a new message on this subject, it creates an new asynchronous task by instantiating the classe "Dhcpv6_child().run()" which represents a child of the class Dhcpv6.py. Each child have a timeout to kill the process child. For information, the class plugin definied in init.py is an abstract class which allows to load plugins.
I'm not able to dynamically manage the addition of new tasks with asyncio.gather (until the current task is finished, the new task is not executed). The problem is solved with asyncio.create_subprocess_exec but I'd prefer to use the possibilities of asyncio
main.py
I'm not able to dynamically manage the addition of new tasks with asyncio.gather (until the current task is finished, the new task is not executed). The problem is solved with asyncio.create_subprocess_exec but I'd prefer to use the possibilities of asyncio
main.py
import argparse import asyncio import importlib parser = argparse.ArgumentParser() parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS") parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser") args = parser.parse_args() print("************** main() *****************") print(f"NATS server IP: {args.nats}") print(f"List of plugins to load: {args.plugins}") async def main(): tasks = [] for type_plugin in args.plugins: try: module = importlib.import_module(f'Plugins.{type_plugin}') my_class = getattr(module, type_plugin) my_instance = my_class(nats=args.nats) tasks.append(asyncio.create_task(my_instance.run())) except Exception as e: print("Erreur chargement du plugin : ", type_plugin, ":", e) try: await asyncio.gather(*tasks) except asyncio.TimeoutError: print("timeout main") if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.create_task(main()) loop.run_forever() except KeyboardInterrupt: pass except Exception as e: print(e) finally: loop.close()Dhcpv6.py
import asyncio import json import logging import sys from nats.aio.client import Client as NATS from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers from Plugins import Plugin from Plugins.Dhcpv6_child import Dhcpv6_child logger = logging.getLogger("dhcpv6") logging.basicConfig(level=logging.DEBUG) class Dhcpv6(Plugin): name = "Dhcpv6 plugin" subject = "subject_test_subscribe" def __init__(self, **kwargs): super().__init__(**kwargs) self.nats = kwargs.get('nats') self.list_task = [] self.timeout_task = 5 print("Class dhcpv6 ==> constructor / nats_ip : {}".format(self.nats)) async def run(self): print("******* run DHCPV6 ***********") nc = NATS() try: await nc.connect("127.0.0.1", verbose=True, pedantic=True) except ErrConnectionClosed: print(f"La connexion a ete fermee inopinement") return except ErrTimeout: print(f"Le delai imparti a la connexion est depasse") return except ErrNoServers: print(f"Aucun serveur n'a repondu a temps") return except Exception as e: print(f"Exception inattendue: {e}") return async def plugin_handler(msg): print(msg) try: # Creating and running a new task on demand self.list_task.append(asyncio.wait_for( asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task)) print("append a new task : ", self.list_task) except: print("Error append new task") try: print("Running a new task : ", self.list_task) await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0) except asyncio.TimeoutError: print("Fin du main : timeout atteint") print(f"Subscribing test on : {self.subject}") await nc.subscribe(f"{self.subject}", cb=plugin_handler) while nc.is_connected: await asyncio.sleep(0.5) await nc.drain()Output screen example :
************** main() ***************** NATS server IP: 127.0.0.1 List of plugins to load: : ['Dhcpv6'] ------------ Load a list of plugins : {Dhcpv6} ------- Class plugin ==> constructor Class dhcpv6 ==> constructor / nats_ip : 127.0.0.1 ******* run DHCPV6 *********** INFO:Plugin:Plugin Dhcpv6 plugin loaded Subscribing test on : subject_test_subscribe <Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'> Class CHILD ==> constructor append a new task : [<coroutine object wait_for at 0x0000026D0310F040>] Runnig a new task : [<coroutine object wait_for at 0x0000026D0310F040>]